]>
Commit | Line | Data |
---|---|---|
31f18b77 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
31f18b77 | 3 | |
f64942e4 | 4 | #include <limits> |
11fdf7f2 | 5 | #include <sstream> |
f64942e4 | 6 | |
31f18b77 | 7 | #include "rgw_rados.h" |
11fdf7f2 | 8 | #include "rgw_zone.h" |
31f18b77 FG |
9 | #include "rgw_bucket.h" |
10 | #include "rgw_reshard.h" | |
9f95a23c | 11 | #include "rgw_sal.h" |
31f18b77 FG |
12 | #include "cls/rgw/cls_rgw_client.h" |
13 | #include "cls/lock/cls_lock_client.h" | |
14 | #include "common/errno.h" | |
15 | #include "common/ceph_json.h" | |
16 | ||
17 | #include "common/dout.h" | |
18 | ||
11fdf7f2 TL |
19 | #include "services/svc_zone.h" |
20 | #include "services/svc_sys_obj.h" | |
9f95a23c | 21 | #include "services/svc_tier_rados.h" |
11fdf7f2 | 22 | |
31f18b77 FG |
23 | #define dout_context g_ceph_context |
24 | #define dout_subsys ceph_subsys_rgw | |
25 | ||
26 | const string reshard_oid_prefix = "reshard."; | |
27 | const string reshard_lock_name = "reshard_process"; | |
28 | const string bucket_instance_lock_name = "bucket_instance_lock"; | |
29 | ||
9f95a23c TL |
30 | /* All primes up to 2000 used to attempt to make dynamic sharding use |
31 | * a prime numbers of shards. Note: this list also includes 1 for when | |
32 | * 1 shard is the most appropriate, even though 1 is not prime. | |
33 | */ | |
34 | const std::initializer_list<uint16_t> RGWBucketReshard::reshard_primes = { | |
35 | 1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, | |
36 | 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, | |
37 | 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, | |
38 | 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, | |
39 | 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, | |
40 | 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, | |
41 | 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, | |
42 | 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, | |
43 | 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739, | |
44 | 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, | |
45 | 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, | |
46 | 941, 947, 953, 967, 971, 977, 983, 991, 997, 1009, 1013, 1019, 1021, | |
47 | 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093, | |
48 | 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181, | |
49 | 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, | |
50 | 1277, 1279, 1283, 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321, | |
51 | 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, 1429, 1433, | |
52 | 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493, | |
53 | 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, | |
54 | 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, 1627, 1637, 1657, | |
55 | 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741, | |
56 | 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, | |
57 | 1847, 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, 1913, | |
58 | 1931, 1933, 1949, 1951, 1973, 1979, 1987, 1993, 1997, 1999 | |
59 | }; | |
f64942e4 | 60 | |
31f18b77 | 61 | class BucketReshardShard { |
9f95a23c | 62 | rgw::sal::RGWRadosStore *store; |
31f18b77 FG |
63 | const RGWBucketInfo& bucket_info; |
64 | int num_shard; | |
65 | RGWRados::BucketShard bs; | |
66 | vector<rgw_cls_bi_entry> entries; | |
11fdf7f2 | 67 | map<RGWObjCategory, rgw_bucket_category_stats> stats; |
31f18b77 | 68 | deque<librados::AioCompletion *>& aio_completions; |
11fdf7f2 TL |
69 | uint64_t max_aio_completions; |
70 | uint64_t reshard_shard_batch_size; | |
31f18b77 FG |
71 | |
72 | int wait_next_completion() { | |
73 | librados::AioCompletion *c = aio_completions.front(); | |
74 | aio_completions.pop_front(); | |
75 | ||
9f95a23c | 76 | c->wait_for_complete(); |
31f18b77 FG |
77 | |
78 | int ret = c->get_return_value(); | |
79 | c->release(); | |
80 | ||
81 | if (ret < 0) { | |
82 | derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl; | |
83 | return ret; | |
84 | } | |
85 | ||
86 | return 0; | |
87 | } | |
88 | ||
89 | int get_completion(librados::AioCompletion **c) { | |
11fdf7f2 | 90 | if (aio_completions.size() >= max_aio_completions) { |
31f18b77 FG |
91 | int ret = wait_next_completion(); |
92 | if (ret < 0) { | |
93 | return ret; | |
94 | } | |
95 | } | |
96 | ||
9f95a23c | 97 | *c = librados::Rados::aio_create_completion(nullptr, nullptr); |
31f18b77 FG |
98 | aio_completions.push_back(*c); |
99 | ||
100 | return 0; | |
101 | } | |
102 | ||
103 | public: | |
9f95a23c | 104 | BucketReshardShard(rgw::sal::RGWRadosStore *_store, const RGWBucketInfo& _bucket_info, |
31f18b77 | 105 | int _num_shard, |
f64942e4 | 106 | deque<librados::AioCompletion *>& _completions) : |
9f95a23c | 107 | store(_store), bucket_info(_bucket_info), bs(store->getRados()), |
f64942e4 AA |
108 | aio_completions(_completions) |
109 | { | |
31f18b77 | 110 | num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); |
f64942e4 | 111 | bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */); |
11fdf7f2 TL |
112 | |
113 | max_aio_completions = | |
114 | store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio"); | |
115 | reshard_shard_batch_size = | |
116 | store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size"); | |
31f18b77 FG |
117 | } |
118 | ||
119 | int get_num_shard() { | |
120 | return num_shard; | |
121 | } | |
122 | ||
11fdf7f2 | 123 | int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, |
31f18b77 FG |
124 | const rgw_bucket_category_stats& entry_stats) { |
125 | entries.push_back(entry); | |
126 | if (account) { | |
127 | rgw_bucket_category_stats& target = stats[category]; | |
128 | target.num_entries += entry_stats.num_entries; | |
129 | target.total_size += entry_stats.total_size; | |
130 | target.total_size_rounded += entry_stats.total_size_rounded; | |
91327a77 | 131 | target.actual_size += entry_stats.actual_size; |
31f18b77 | 132 | } |
11fdf7f2 | 133 | if (entries.size() >= reshard_shard_batch_size) { |
31f18b77 FG |
134 | int ret = flush(); |
135 | if (ret < 0) { | |
136 | return ret; | |
137 | } | |
138 | } | |
f64942e4 | 139 | |
31f18b77 FG |
140 | return 0; |
141 | } | |
f64942e4 | 142 | |
31f18b77 FG |
143 | int flush() { |
144 | if (entries.size() == 0) { | |
145 | return 0; | |
146 | } | |
147 | ||
148 | librados::ObjectWriteOperation op; | |
149 | for (auto& entry : entries) { | |
9f95a23c | 150 | store->getRados()->bi_put(op, bs, entry); |
31f18b77 FG |
151 | } |
152 | cls_rgw_bucket_update_stats(op, false, stats); | |
153 | ||
154 | librados::AioCompletion *c; | |
155 | int ret = get_completion(&c); | |
156 | if (ret < 0) { | |
157 | return ret; | |
158 | } | |
9f95a23c | 159 | ret = bs.bucket_obj.aio_operate(c, &op); |
31f18b77 FG |
160 | if (ret < 0) { |
161 | derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl; | |
162 | return ret; | |
163 | } | |
164 | entries.clear(); | |
165 | stats.clear(); | |
166 | return 0; | |
167 | } | |
168 | ||
169 | int wait_all_aio() { | |
170 | int ret = 0; | |
171 | while (!aio_completions.empty()) { | |
172 | int r = wait_next_completion(); | |
173 | if (r < 0) { | |
174 | ret = r; | |
175 | } | |
176 | } | |
177 | return ret; | |
178 | } | |
f64942e4 AA |
179 | }; // class BucketReshardShard |
180 | ||
31f18b77 FG |
181 | |
182 | class BucketReshardManager { | |
9f95a23c | 183 | rgw::sal::RGWRadosStore *store; |
31f18b77 FG |
184 | const RGWBucketInfo& target_bucket_info; |
185 | deque<librados::AioCompletion *> completions; | |
186 | int num_target_shards; | |
187 | vector<BucketReshardShard *> target_shards; | |
188 | ||
189 | public: | |
9f95a23c | 190 | BucketReshardManager(rgw::sal::RGWRadosStore *_store, |
f64942e4 AA |
191 | const RGWBucketInfo& _target_bucket_info, |
192 | int _num_target_shards) : | |
193 | store(_store), target_bucket_info(_target_bucket_info), | |
194 | num_target_shards(_num_target_shards) | |
195 | { | |
31f18b77 FG |
196 | target_shards.resize(num_target_shards); |
197 | for (int i = 0; i < num_target_shards; ++i) { | |
198 | target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); | |
199 | } | |
200 | } | |
201 | ||
202 | ~BucketReshardManager() { | |
203 | for (auto& shard : target_shards) { | |
204 | int ret = shard->wait_all_aio(); | |
205 | if (ret < 0) { | |
f64942e4 AA |
206 | ldout(store->ctx(), 20) << __func__ << |
207 | ": shard->wait_all_aio() returned ret=" << ret << dendl; | |
31f18b77 FG |
208 | } |
209 | } | |
210 | } | |
211 | ||
212 | int add_entry(int shard_index, | |
11fdf7f2 | 213 | rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, |
31f18b77 | 214 | const rgw_bucket_category_stats& entry_stats) { |
f64942e4 AA |
215 | int ret = target_shards[shard_index]->add_entry(entry, account, category, |
216 | entry_stats); | |
31f18b77 | 217 | if (ret < 0) { |
f64942e4 AA |
218 | derr << "ERROR: target_shards.add_entry(" << entry.idx << |
219 | ") returned error: " << cpp_strerror(-ret) << dendl; | |
31f18b77 FG |
220 | return ret; |
221 | } | |
f64942e4 | 222 | |
31f18b77 FG |
223 | return 0; |
224 | } | |
225 | ||
226 | int finish() { | |
227 | int ret = 0; | |
228 | for (auto& shard : target_shards) { | |
229 | int r = shard->flush(); | |
230 | if (r < 0) { | |
231 | derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl; | |
232 | ret = r; | |
233 | } | |
234 | } | |
235 | for (auto& shard : target_shards) { | |
236 | int r = shard->wait_all_aio(); | |
237 | if (r < 0) { | |
238 | derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl; | |
239 | ret = r; | |
240 | } | |
241 | delete shard; | |
242 | } | |
243 | target_shards.clear(); | |
244 | return ret; | |
245 | } | |
f64942e4 AA |
246 | }; // class BucketReshardManager |
247 | ||
9f95a23c | 248 | RGWBucketReshard::RGWBucketReshard(rgw::sal::RGWRadosStore *_store, |
f64942e4 AA |
249 | const RGWBucketInfo& _bucket_info, |
250 | const map<string, bufferlist>& _bucket_attrs, | |
251 | RGWBucketReshardLock* _outer_reshard_lock) : | |
252 | store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), | |
253 | reshard_lock(store, bucket_info, true), | |
254 | outer_reshard_lock(_outer_reshard_lock) | |
255 | { } | |
256 | ||
9f95a23c | 257 | int RGWBucketReshard::set_resharding_status(rgw::sal::RGWRadosStore* store, |
f64942e4 AA |
258 | const RGWBucketInfo& bucket_info, |
259 | const string& new_instance_id, | |
260 | int32_t num_shards, | |
261 | cls_rgw_reshard_status status) | |
31f18b77 FG |
262 | { |
263 | if (new_instance_id.empty()) { | |
264 | ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl; | |
265 | return -EINVAL; | |
266 | } | |
267 | ||
268 | cls_rgw_bucket_instance_entry instance_entry; | |
269 | instance_entry.set_status(new_instance_id, num_shards, status); | |
270 | ||
9f95a23c | 271 | int ret = store->getRados()->bucket_set_reshard(bucket_info, instance_entry); |
31f18b77 FG |
272 | if (ret < 0) { |
273 | ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " | |
274 | << cpp_strerror(-ret) << dendl; | |
275 | return ret; | |
276 | } | |
277 | return 0; | |
278 | } | |
279 | ||
f64942e4 | 280 | // reshard lock assumes lock is held |
9f95a23c | 281 | int RGWBucketReshard::clear_resharding(rgw::sal::RGWRadosStore* store, |
f64942e4 | 282 | const RGWBucketInfo& bucket_info) |
31f18b77 | 283 | { |
f64942e4 AA |
284 | int ret = clear_index_shard_reshard_status(store, bucket_info); |
285 | if (ret < 0) { | |
286 | ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ << | |
287 | " ERROR: error clearing reshard status from index shard " << | |
288 | cpp_strerror(-ret) << dendl; | |
289 | return ret; | |
290 | } | |
31f18b77 | 291 | |
f64942e4 | 292 | cls_rgw_bucket_instance_entry instance_entry; |
9f95a23c | 293 | ret = store->getRados()->bucket_set_reshard(bucket_info, instance_entry); |
31f18b77 | 294 | if (ret < 0) { |
f64942e4 AA |
295 | ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << |
296 | " ERROR: error setting bucket resharding flag on bucket index: " << | |
297 | cpp_strerror(-ret) << dendl; | |
31f18b77 FG |
298 | return ret; |
299 | } | |
f64942e4 AA |
300 | |
301 | return 0; | |
302 | } | |
303 | ||
9f95a23c | 304 | int RGWBucketReshard::clear_index_shard_reshard_status(rgw::sal::RGWRadosStore* store, |
f64942e4 AA |
305 | const RGWBucketInfo& bucket_info) |
306 | { | |
307 | uint32_t num_shards = bucket_info.num_shards; | |
308 | ||
309 | if (num_shards < std::numeric_limits<uint32_t>::max()) { | |
310 | int ret = set_resharding_status(store, bucket_info, | |
311 | bucket_info.bucket.bucket_id, | |
312 | (num_shards < 1 ? 1 : num_shards), | |
9f95a23c | 313 | cls_rgw_reshard_status::NOT_RESHARDING); |
f64942e4 AA |
314 | if (ret < 0) { |
315 | ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ << | |
316 | " ERROR: error clearing reshard status from index shard " << | |
317 | cpp_strerror(-ret) << dendl; | |
318 | return ret; | |
319 | } | |
320 | } | |
321 | ||
31f18b77 FG |
322 | return 0; |
323 | } | |
324 | ||
9f95a23c | 325 | static int create_new_bucket_instance(rgw::sal::RGWRadosStore *store, |
31f18b77 FG |
326 | int new_num_shards, |
327 | const RGWBucketInfo& bucket_info, | |
328 | map<string, bufferlist>& attrs, | |
329 | RGWBucketInfo& new_bucket_info) | |
330 | { | |
331 | new_bucket_info = bucket_info; | |
332 | ||
9f95a23c | 333 | store->getRados()->create_bucket_id(&new_bucket_info.bucket.bucket_id); |
31f18b77 FG |
334 | |
335 | new_bucket_info.num_shards = new_num_shards; | |
336 | new_bucket_info.objv_tracker.clear(); | |
337 | ||
338 | new_bucket_info.new_bucket_instance_id.clear(); | |
9f95a23c | 339 | new_bucket_info.reshard_status = cls_rgw_reshard_status::NOT_RESHARDING; |
31f18b77 | 340 | |
9f95a23c | 341 | int ret = store->svc()->bi->init_index(new_bucket_info); |
31f18b77 FG |
342 | if (ret < 0) { |
343 | cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; | |
f64942e4 | 344 | return ret; |
31f18b77 FG |
345 | } |
346 | ||
9f95a23c | 347 | ret = store->getRados()->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs); |
31f18b77 FG |
348 | if (ret < 0) { |
349 | cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; | |
f64942e4 | 350 | return ret; |
31f18b77 FG |
351 | } |
352 | ||
353 | return 0; | |
354 | } | |
355 | ||
356 | int RGWBucketReshard::create_new_bucket_instance(int new_num_shards, | |
357 | RGWBucketInfo& new_bucket_info) | |
358 | { | |
f64942e4 AA |
359 | return ::create_new_bucket_instance(store, new_num_shards, |
360 | bucket_info, bucket_attrs, new_bucket_info); | |
31f18b77 FG |
361 | } |
362 | ||
94b18763 FG |
363 | int RGWBucketReshard::cancel() |
364 | { | |
f64942e4 | 365 | int ret = reshard_lock.lock(); |
94b18763 FG |
366 | if (ret < 0) { |
367 | return ret; | |
368 | } | |
369 | ||
370 | ret = clear_resharding(); | |
371 | ||
f64942e4 AA |
372 | reshard_lock.unlock(); |
373 | return ret; | |
94b18763 FG |
374 | } |
375 | ||
31f18b77 FG |
376 | class BucketInfoReshardUpdate |
377 | { | |
9f95a23c | 378 | rgw::sal::RGWRadosStore *store; |
92f5a8d4 | 379 | RGWBucketInfo& bucket_info; |
31f18b77 FG |
380 | std::map<string, bufferlist> bucket_attrs; |
381 | ||
382 | bool in_progress{false}; | |
383 | ||
384 | int set_status(cls_rgw_reshard_status s) { | |
385 | bucket_info.reshard_status = s; | |
9f95a23c | 386 | int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs); |
31f18b77 FG |
387 | if (ret < 0) { |
388 | ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl; | |
389 | return ret; | |
390 | } | |
391 | return 0; | |
392 | } | |
393 | ||
394 | public: | |
9f95a23c | 395 | BucketInfoReshardUpdate(rgw::sal::RGWRadosStore *_store, |
f64942e4 AA |
396 | RGWBucketInfo& _bucket_info, |
397 | map<string, bufferlist>& _bucket_attrs, | |
398 | const string& new_bucket_id) : | |
399 | store(_store), | |
400 | bucket_info(_bucket_info), | |
401 | bucket_attrs(_bucket_attrs) | |
402 | { | |
31f18b77 FG |
403 | bucket_info.new_bucket_instance_id = new_bucket_id; |
404 | } | |
f64942e4 | 405 | |
31f18b77 FG |
406 | ~BucketInfoReshardUpdate() { |
407 | if (in_progress) { | |
f64942e4 AA |
408 | // resharding must not have ended correctly, clean up |
409 | int ret = | |
410 | RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info); | |
411 | if (ret < 0) { | |
412 | lderr(store->ctx()) << "Error: " << __func__ << | |
413 | " clear_index_shard_status returned " << ret << dendl; | |
414 | } | |
31f18b77 | 415 | bucket_info.new_bucket_instance_id.clear(); |
9f95a23c TL |
416 | |
417 | // clears new_bucket_instance as well | |
418 | set_status(cls_rgw_reshard_status::NOT_RESHARDING); | |
31f18b77 FG |
419 | } |
420 | } | |
421 | ||
422 | int start() { | |
9f95a23c | 423 | int ret = set_status(cls_rgw_reshard_status::IN_PROGRESS); |
31f18b77 FG |
424 | if (ret < 0) { |
425 | return ret; | |
426 | } | |
427 | in_progress = true; | |
428 | return 0; | |
429 | } | |
430 | ||
431 | int complete() { | |
9f95a23c | 432 | int ret = set_status(cls_rgw_reshard_status::DONE); |
31f18b77 FG |
433 | if (ret < 0) { |
434 | return ret; | |
435 | } | |
436 | in_progress = false; | |
437 | return 0; | |
438 | } | |
439 | }; | |
440 | ||
f64942e4 | 441 | |
9f95a23c | 442 | RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store, |
f64942e4 AA |
443 | const std::string& reshard_lock_oid, |
444 | bool _ephemeral) : | |
445 | store(_store), | |
446 | lock_oid(reshard_lock_oid), | |
447 | ephemeral(_ephemeral), | |
448 | internal_lock(reshard_lock_name) | |
449 | { | |
11fdf7f2 TL |
450 | const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>( |
451 | "rgw_reshard_bucket_lock_duration"); | |
f64942e4 AA |
452 | duration = std::chrono::seconds(lock_dur_secs); |
453 | ||
454 | #define COOKIE_LEN 16 | |
455 | char cookie_buf[COOKIE_LEN + 1]; | |
456 | gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); | |
457 | cookie_buf[COOKIE_LEN] = '\0'; | |
458 | ||
459 | internal_lock.set_cookie(cookie_buf); | |
460 | internal_lock.set_duration(duration); | |
461 | } | |
462 | ||
463 | int RGWBucketReshardLock::lock() { | |
464 | internal_lock.set_must_renew(false); | |
465 | int ret; | |
466 | if (ephemeral) { | |
9f95a23c | 467 | ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx, |
f64942e4 AA |
468 | lock_oid); |
469 | } else { | |
9f95a23c | 470 | ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid); |
f64942e4 AA |
471 | } |
472 | if (ret < 0) { | |
473 | ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ << | |
474 | " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl; | |
475 | return ret; | |
476 | } | |
477 | reset_time(Clock::now()); | |
478 | ||
479 | return 0; | |
480 | } | |
481 | ||
482 | void RGWBucketReshardLock::unlock() { | |
9f95a23c | 483 | int ret = internal_lock.unlock(&store->getRados()->reshard_pool_ctx, lock_oid); |
f64942e4 AA |
484 | if (ret < 0) { |
485 | ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ << | |
486 | " failed to drop lock on " << lock_oid << " ret=" << ret << dendl; | |
487 | } | |
488 | } | |
489 | ||
490 | int RGWBucketReshardLock::renew(const Clock::time_point& now) { | |
491 | internal_lock.set_must_renew(true); | |
492 | int ret; | |
493 | if (ephemeral) { | |
9f95a23c | 494 | ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx, |
f64942e4 AA |
495 | lock_oid); |
496 | } else { | |
9f95a23c | 497 | ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid); |
f64942e4 AA |
498 | } |
499 | if (ret < 0) { /* expired or already locked by another processor */ | |
11fdf7f2 TL |
500 | std::stringstream error_s; |
501 | if (-ENOENT == ret) { | |
502 | error_s << "ENOENT (lock expired or never initially locked)"; | |
503 | } else { | |
504 | error_s << ret << " (" << cpp_strerror(-ret) << ")"; | |
505 | } | |
f64942e4 | 506 | ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " << |
11fdf7f2 | 507 | lock_oid << " with error " << error_s.str() << dendl; |
f64942e4 AA |
508 | return ret; |
509 | } | |
510 | internal_lock.set_must_renew(false); | |
511 | ||
512 | reset_time(now); | |
513 | ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " << | |
514 | lock_oid << dendl; | |
515 | ||
516 | return 0; | |
517 | } | |
518 | ||
519 | ||
520 | int RGWBucketReshard::do_reshard(int num_shards, | |
521 | RGWBucketInfo& new_bucket_info, | |
522 | int max_entries, | |
523 | bool verbose, | |
524 | ostream *out, | |
525 | Formatter *formatter) | |
31f18b77 FG |
526 | { |
527 | rgw_bucket& bucket = bucket_info.bucket; | |
528 | ||
529 | int ret = 0; | |
530 | ||
531 | if (out) { | |
31f18b77 FG |
532 | (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl; |
533 | (*out) << "bucket name: " << bucket_info.bucket.name << std::endl; | |
f64942e4 AA |
534 | (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << |
535 | std::endl; | |
536 | (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << | |
537 | std::endl; | |
31f18b77 FG |
538 | } |
539 | ||
f64942e4 | 540 | /* update bucket info -- in progress*/ |
31f18b77 FG |
541 | list<rgw_cls_bi_entry> entries; |
542 | ||
543 | if (max_entries < 0) { | |
f64942e4 AA |
544 | ldout(store->ctx(), 0) << __func__ << |
545 | ": can't reshard, negative max_entries" << dendl; | |
31f18b77 FG |
546 | return -EINVAL; |
547 | } | |
548 | ||
f64942e4 AA |
549 | // NB: destructor cleans up sharding state if reshard does not |
550 | // complete successfully | |
31f18b77 FG |
551 | BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id); |
552 | ||
553 | ret = bucket_info_updater.start(); | |
554 | if (ret < 0) { | |
555 | ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; | |
556 | return ret; | |
557 | } | |
558 | ||
559 | int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); | |
560 | ||
561 | BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); | |
562 | ||
92f5a8d4 | 563 | bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr); |
31f18b77 | 564 | |
92f5a8d4 | 565 | if (verbose_json_out) { |
31f18b77 FG |
566 | formatter->open_array_section("entries"); |
567 | } | |
568 | ||
569 | uint64_t total_entries = 0; | |
570 | ||
92f5a8d4 TL |
571 | if (!verbose_json_out && out) { |
572 | (*out) << "total entries:"; | |
31f18b77 FG |
573 | } |
574 | ||
f64942e4 AA |
575 | const int num_source_shards = |
576 | (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); | |
31f18b77 FG |
577 | string marker; |
578 | for (int i = 0; i < num_source_shards; ++i) { | |
579 | bool is_truncated = true; | |
580 | marker.clear(); | |
581 | while (is_truncated) { | |
582 | entries.clear(); | |
9f95a23c | 583 | ret = store->getRados()->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); |
31f18b77 FG |
584 | if (ret < 0 && ret != -ENOENT) { |
585 | derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; | |
f64942e4 | 586 | return ret; |
31f18b77 FG |
587 | } |
588 | ||
f64942e4 | 589 | for (auto iter = entries.begin(); iter != entries.end(); ++iter) { |
31f18b77 | 590 | rgw_cls_bi_entry& entry = *iter; |
92f5a8d4 | 591 | if (verbose_json_out) { |
31f18b77 FG |
592 | formatter->open_object_section("entry"); |
593 | ||
594 | encode_json("shard_id", i, formatter); | |
595 | encode_json("num_entry", total_entries, formatter); | |
596 | encode_json("entry", entry, formatter); | |
597 | } | |
598 | total_entries++; | |
599 | ||
600 | marker = entry.idx; | |
601 | ||
602 | int target_shard_id; | |
603 | cls_rgw_obj_key cls_key; | |
11fdf7f2 | 604 | RGWObjCategory category; |
31f18b77 FG |
605 | rgw_bucket_category_stats stats; |
606 | bool account = entry.get_info(&cls_key, &category, &stats); | |
607 | rgw_obj_key key(cls_key); | |
608 | rgw_obj obj(new_bucket_info.bucket, key); | |
92f5a8d4 TL |
609 | RGWMPObj mp; |
610 | if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { | |
611 | // place the multipart .meta object on the same shard as its head object | |
612 | obj.index_hash_source = mp.get_key(); | |
613 | } | |
9f95a23c | 614 | int ret = store->getRados()->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); |
31f18b77 FG |
615 | if (ret < 0) { |
616 | lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; | |
617 | return ret; | |
618 | } | |
619 | ||
620 | int shard_index = (target_shard_id > 0 ? target_shard_id : 0); | |
621 | ||
f64942e4 AA |
622 | ret = target_shards_mgr.add_entry(shard_index, entry, account, |
623 | category, stats); | |
31f18b77 FG |
624 | if (ret < 0) { |
625 | return ret; | |
626 | } | |
f64942e4 AA |
627 | |
628 | Clock::time_point now = Clock::now(); | |
629 | if (reshard_lock.should_renew(now)) { | |
630 | // assume outer locks have timespans at least the size of ours, so | |
631 | // can call inside conditional | |
632 | if (outer_reshard_lock) { | |
633 | ret = outer_reshard_lock->renew(now); | |
634 | if (ret < 0) { | |
635 | return ret; | |
636 | } | |
637 | } | |
638 | ret = reshard_lock.renew(now); | |
639 | if (ret < 0) { | |
640 | lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; | |
641 | return ret; | |
642 | } | |
643 | } | |
92f5a8d4 | 644 | if (verbose_json_out) { |
31f18b77 | 645 | formatter->close_section(); |
92f5a8d4 | 646 | formatter->flush(*out); |
31f18b77 FG |
647 | } else if (out && !(total_entries % 1000)) { |
648 | (*out) << " " << total_entries; | |
649 | } | |
f64942e4 | 650 | } // entries loop |
31f18b77 FG |
651 | } |
652 | } | |
f64942e4 | 653 | |
92f5a8d4 | 654 | if (verbose_json_out) { |
31f18b77 | 655 | formatter->close_section(); |
92f5a8d4 | 656 | formatter->flush(*out); |
31f18b77 FG |
657 | } else if (out) { |
658 | (*out) << " " << total_entries << std::endl; | |
659 | } | |
660 | ||
661 | ret = target_shards_mgr.finish(); | |
662 | if (ret < 0) { | |
663 | lderr(store->ctx()) << "ERROR: failed to reshard" << dendl; | |
f64942e4 | 664 | return -EIO; |
31f18b77 FG |
665 | } |
666 | ||
9f95a23c | 667 | ret = store->ctl()->bucket->link_bucket(new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time, null_yield); |
b32b8144 FG |
668 | if (ret < 0) { |
669 | lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl; | |
f64942e4 | 670 | return ret; |
31f18b77 FG |
671 | } |
672 | ||
673 | ret = bucket_info_updater.complete(); | |
674 | if (ret < 0) { | |
675 | ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; | |
676 | /* don't error out, reshard process succeeded */ | |
677 | } | |
f64942e4 | 678 | |
31f18b77 | 679 | return 0; |
f64942e4 AA |
680 | // NB: some error clean-up is done by ~BucketInfoReshardUpdate |
681 | } // RGWBucketReshard::do_reshard | |
31f18b77 FG |
682 | |
683 | int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status) | |
684 | { | |
9f95a23c | 685 | return store->svc()->bi_rados->get_reshard_status(bucket_info, status); |
31f18b77 FG |
686 | } |
687 | ||
31f18b77 | 688 | |
f64942e4 AA |
689 | int RGWBucketReshard::execute(int num_shards, int max_op_entries, |
690 | bool verbose, ostream *out, Formatter *formatter, | |
691 | RGWReshard* reshard_log) | |
31f18b77 | 692 | { |
f64942e4 | 693 | int ret = reshard_lock.lock(); |
31f18b77 FG |
694 | if (ret < 0) { |
695 | return ret; | |
696 | } | |
697 | ||
698 | RGWBucketInfo new_bucket_info; | |
699 | ret = create_new_bucket_instance(num_shards, new_bucket_info); | |
700 | if (ret < 0) { | |
f64942e4 AA |
701 | // shard state is uncertain, but this will attempt to remove them anyway |
702 | goto error_out; | |
31f18b77 FG |
703 | } |
704 | ||
705 | if (reshard_log) { | |
706 | ret = reshard_log->update(bucket_info, new_bucket_info); | |
707 | if (ret < 0) { | |
f64942e4 | 708 | goto error_out; |
31f18b77 FG |
709 | } |
710 | } | |
711 | ||
f64942e4 AA |
712 | // set resharding status of current bucket_info & shards with |
713 | // information about planned resharding | |
714 | ret = set_resharding_status(new_bucket_info.bucket.bucket_id, | |
9f95a23c | 715 | num_shards, cls_rgw_reshard_status::IN_PROGRESS); |
31f18b77 | 716 | if (ret < 0) { |
9f95a23c | 717 | goto error_out; |
31f18b77 FG |
718 | } |
719 | ||
720 | ret = do_reshard(num_shards, | |
721 | new_bucket_info, | |
722 | max_op_entries, | |
723 | verbose, out, formatter); | |
31f18b77 | 724 | if (ret < 0) { |
f64942e4 | 725 | goto error_out; |
31f18b77 FG |
726 | } |
727 | ||
f64942e4 AA |
728 | // at this point we've done the main work; we'll make a best-effort |
729 | // to clean-up but will not indicate any errors encountered | |
730 | ||
731 | reshard_lock.unlock(); | |
732 | ||
733 | // resharding successful, so remove old bucket index shards; use | |
734 | // best effort and don't report out an error; the lock isn't needed | |
735 | // at this point since all we're using a best effor to to remove old | |
736 | // shard objects | |
9f95a23c | 737 | ret = store->svc()->bi->clean_index(bucket_info); |
31f18b77 | 738 | if (ret < 0) { |
f64942e4 AA |
739 | lderr(store->ctx()) << "Error: " << __func__ << |
740 | " failed to clean up old shards; " << | |
741 | "RGWRados::clean_bucket_index returned " << ret << dendl; | |
31f18b77 FG |
742 | } |
743 | ||
9f95a23c TL |
744 | ret = store->ctl()->bucket->remove_bucket_instance_info(bucket_info.bucket, |
745 | bucket_info, null_yield); | |
f64942e4 AA |
746 | if (ret < 0) { |
747 | lderr(store->ctx()) << "Error: " << __func__ << | |
748 | " failed to clean old bucket info object \"" << | |
749 | bucket_info.bucket.get_key() << | |
750 | "\"created after successful resharding with error " << ret << dendl; | |
751 | } | |
31f18b77 | 752 | |
a8e16298 TL |
753 | ldout(store->ctx(), 1) << __func__ << |
754 | " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" << | |
755 | bucket_info.bucket.get_key() << "\" to \"" << | |
756 | new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl; | |
757 | ||
31f18b77 | 758 | return 0; |
f64942e4 AA |
759 | |
760 | error_out: | |
761 | ||
762 | reshard_lock.unlock(); | |
763 | ||
764 | // since the real problem is the issue that led to this error code | |
765 | // path, we won't touch ret and instead use another variable to | |
766 | // temporarily error codes | |
9f95a23c | 767 | int ret2 = store->svc()->bi->clean_index(new_bucket_info); |
f64942e4 AA |
768 | if (ret2 < 0) { |
769 | lderr(store->ctx()) << "Error: " << __func__ << | |
770 | " failed to clean up shards from failed incomplete resharding; " << | |
771 | "RGWRados::clean_bucket_index returned " << ret2 << dendl; | |
772 | } | |
773 | ||
9f95a23c TL |
774 | ret2 = store->ctl()->bucket->remove_bucket_instance_info(new_bucket_info.bucket, |
775 | new_bucket_info, | |
776 | null_yield); | |
f64942e4 AA |
777 | if (ret2 < 0) { |
778 | lderr(store->ctx()) << "Error: " << __func__ << | |
779 | " failed to clean bucket info object \"" << | |
780 | new_bucket_info.bucket.get_key() << | |
781 | "\"created during incomplete resharding with error " << ret2 << dendl; | |
782 | } | |
783 | ||
784 | return ret; | |
785 | } // execute | |
31f18b77 FG |
786 | |
787 | ||
9f95a23c | 788 | RGWReshard::RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose, ostream *_out, |
f64942e4 AA |
789 | Formatter *_formatter) : |
790 | store(_store), instance_lock(bucket_instance_lock_name), | |
791 | verbose(_verbose), out(_out), formatter(_formatter) | |
31f18b77 | 792 | { |
11fdf7f2 | 793 | num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs"); |
31f18b77 FG |
794 | } |
795 | ||
f64942e4 AA |
796 | string RGWReshard::get_logshard_key(const string& tenant, |
797 | const string& bucket_name) | |
31f18b77 FG |
798 | { |
799 | return tenant + ":" + bucket_name; | |
800 | } | |
801 | ||
802 | #define MAX_RESHARD_LOGSHARDS_PRIME 7877 | |
803 | ||
804 | void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid) | |
805 | { | |
806 | string key = get_logshard_key(tenant, bucket_name); | |
807 | ||
808 | uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size()); | |
809 | uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); | |
810 | sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards; | |
31f18b77 | 811 | |
1adf2230 | 812 | get_logshard_oid(int(sid), oid); |
31f18b77 FG |
813 | } |
814 | ||
815 | int RGWReshard::add(cls_rgw_reshard_entry& entry) | |
816 | { | |
9f95a23c | 817 | if (!store->svc()->zone->can_reshard()) { |
3efd9988 FG |
818 | ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; |
819 | return 0; | |
820 | } | |
821 | ||
31f18b77 FG |
822 | string logshard_oid; |
823 | ||
824 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
825 | ||
826 | librados::ObjectWriteOperation op; | |
827 | cls_rgw_reshard_add(op, entry); | |
828 | ||
9f95a23c | 829 | int ret = rgw_rados_operate(store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield); |
31f18b77 FG |
830 | if (ret < 0) { |
831 | lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; | |
832 | return ret; | |
833 | } | |
834 | return 0; | |
835 | } | |
836 | ||
837 | int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info) | |
838 | { | |
839 | cls_rgw_reshard_entry entry; | |
840 | entry.bucket_name = bucket_info.bucket.name; | |
841 | entry.bucket_id = bucket_info.bucket.bucket_id; | |
b32b8144 | 842 | entry.tenant = bucket_info.owner.tenant; |
31f18b77 FG |
843 | |
844 | int ret = get(entry); | |
845 | if (ret < 0) { | |
846 | return ret; | |
847 | } | |
848 | ||
849 | entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id; | |
850 | ||
851 | ret = add(entry); | |
852 | if (ret < 0) { | |
853 | ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << | |
854 | cpp_strerror(-ret) << dendl; | |
855 | } | |
856 | ||
857 | return ret; | |
858 | } | |
859 | ||
860 | ||
861 | int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated) | |
862 | { | |
863 | string logshard_oid; | |
864 | ||
865 | get_logshard_oid(logshard_num, &logshard_oid); | |
866 | ||
9f95a23c | 867 | int ret = cls_rgw_reshard_list(store->getRados()->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated); |
c07f9fc5 FG |
868 | |
869 | if (ret < 0) { | |
870 | if (ret == -ENOENT) { | |
871 | *is_truncated = false; | |
872 | ret = 0; | |
9f95a23c TL |
873 | } else { |
874 | lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl; | |
875 | if (ret == -EACCES) { | |
876 | lderr(store->ctx()) << "access denied to pool " << store->svc()->zone->get_zone_params().reshard_pool | |
c07f9fc5 | 877 | << ". Fix the pool access permissions of your client" << dendl; |
9f95a23c TL |
878 | } |
879 | } | |
31f18b77 | 880 | } |
c07f9fc5 FG |
881 | |
882 | return ret; | |
31f18b77 FG |
883 | } |
884 | ||
885 | int RGWReshard::get(cls_rgw_reshard_entry& entry) | |
886 | { | |
887 | string logshard_oid; | |
888 | ||
889 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
890 | ||
9f95a23c | 891 | int ret = cls_rgw_reshard_get(store->getRados()->reshard_pool_ctx, logshard_oid, entry); |
31f18b77 | 892 | if (ret < 0) { |
94b18763 FG |
893 | if (ret != -ENOENT) { |
894 | lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << | |
895 | " bucket=" << entry.bucket_name << dendl; | |
896 | } | |
31f18b77 FG |
897 | return ret; |
898 | } | |
899 | ||
900 | return 0; | |
901 | } | |
902 | ||
903 | int RGWReshard::remove(cls_rgw_reshard_entry& entry) | |
904 | { | |
905 | string logshard_oid; | |
906 | ||
907 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
908 | ||
909 | librados::ObjectWriteOperation op; | |
910 | cls_rgw_reshard_remove(op, entry); | |
911 | ||
9f95a23c | 912 | int ret = rgw_rados_operate(store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield); |
31f18b77 FG |
913 | if (ret < 0) { |
914 | lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; | |
915 | return ret; | |
916 | } | |
917 | ||
918 | return ret; | |
919 | } | |
920 | ||
921 | int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) | |
922 | { | |
9f95a23c | 923 | int ret = cls_rgw_clear_bucket_resharding(store->getRados()->reshard_pool_ctx, bucket_instance_oid); |
31f18b77 FG |
924 | if (ret < 0) { |
925 | lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl; | |
926 | return ret; | |
927 | } | |
928 | ||
929 | return 0; | |
930 | } | |
931 | ||
11fdf7f2 | 932 | int RGWReshardWait::wait(optional_yield y) |
31f18b77 | 933 | { |
11fdf7f2 TL |
934 | std::unique_lock lock(mutex); |
935 | ||
936 | if (going_down) { | |
937 | return -ECANCELED; | |
938 | } | |
939 | ||
940 | #ifdef HAVE_BOOST_CONTEXT | |
941 | if (y) { | |
942 | auto& context = y.get_io_context(); | |
943 | auto& yield = y.get_yield_context(); | |
944 | ||
945 | Waiter waiter(context); | |
946 | waiters.push_back(waiter); | |
947 | lock.unlock(); | |
948 | ||
949 | waiter.timer.expires_after(duration); | |
950 | ||
951 | boost::system::error_code ec; | |
952 | waiter.timer.async_wait(yield[ec]); | |
953 | ||
954 | lock.lock(); | |
955 | waiters.erase(waiters.iterator_to(waiter)); | |
956 | return -ec.value(); | |
957 | } | |
958 | #endif | |
31f18b77 | 959 | |
11fdf7f2 | 960 | cond.wait_for(lock, duration); |
31f18b77 FG |
961 | |
962 | if (going_down) { | |
963 | return -ECANCELED; | |
964 | } | |
965 | ||
966 | return 0; | |
967 | } | |
968 | ||
11fdf7f2 | 969 | void RGWReshardWait::stop() |
31f18b77 | 970 | { |
11fdf7f2 TL |
971 | std::scoped_lock lock(mutex); |
972 | going_down = true; | |
973 | cond.notify_all(); | |
974 | for (auto& waiter : waiters) { | |
975 | // unblock any waiters with ECANCELED | |
976 | waiter.timer.cancel(); | |
31f18b77 | 977 | } |
31f18b77 FG |
978 | } |
979 | ||
980 | int RGWReshard::process_single_logshard(int logshard_num) | |
981 | { | |
982 | string marker; | |
983 | bool truncated = true; | |
984 | ||
985 | CephContext *cct = store->ctx(); | |
f64942e4 | 986 | constexpr uint32_t max_entries = 1000; |
31f18b77 FG |
987 | |
988 | string logshard_oid; | |
989 | get_logshard_oid(logshard_num, &logshard_oid); | |
990 | ||
f64942e4 AA |
991 | RGWBucketReshardLock logshard_lock(store, logshard_oid, false); |
992 | ||
993 | int ret = logshard_lock.lock(); | |
92f5a8d4 | 994 | if (ret < 0) { |
f64942e4 | 995 | ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << |
92f5a8d4 | 996 | logshard_oid << ", ret = " << ret <<dendl; |
31f18b77 FG |
997 | return ret; |
998 | } | |
92f5a8d4 | 999 | |
31f18b77 FG |
1000 | do { |
1001 | std::list<cls_rgw_reshard_entry> entries; | |
1002 | ret = list(logshard_num, marker, max_entries, entries, &truncated); | |
1003 | if (ret < 0) { | |
f64942e4 AA |
1004 | ldout(cct, 10) << "cannot list all reshards in logshard oid=" << |
1005 | logshard_oid << dendl; | |
31f18b77 FG |
1006 | continue; |
1007 | } | |
1008 | ||
f64942e4 | 1009 | for(auto& entry: entries) { // logshard entries |
31f18b77 FG |
1010 | if(entry.new_instance_id.empty()) { |
1011 | ||
f64942e4 AA |
1012 | ldout(store->ctx(), 20) << __func__ << " resharding " << |
1013 | entry.bucket_name << dendl; | |
31f18b77 | 1014 | |
31f18b77 FG |
1015 | rgw_bucket bucket; |
1016 | RGWBucketInfo bucket_info; | |
1017 | map<string, bufferlist> attrs; | |
1018 | ||
9f95a23c TL |
1019 | ret = store->getRados()->get_bucket_info(store->svc(), |
1020 | entry.tenant, entry.bucket_name, | |
1021 | bucket_info, nullptr, | |
1022 | null_yield, &attrs); | |
1911f103 TL |
1023 | if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) { |
1024 | if (ret < 0) { | |
1025 | ldout(cct, 0) << __func__ << | |
1026 | ": Error in get_bucket_info for bucket " << entry.bucket_name << | |
1027 | ": " << cpp_strerror(-ret) << dendl; | |
1028 | if (ret != -ENOENT) { | |
1029 | // any error other than ENOENT will abort | |
1030 | return ret; | |
1031 | } | |
1032 | } else { | |
1033 | ldout(cct,0) << __func__ << | |
1034 | ": Bucket: " << entry.bucket_name << | |
1035 | " already resharded by someone, skipping " << dendl; | |
92f5a8d4 TL |
1036 | } |
1037 | ||
1038 | // we've encountered a reshard queue entry for an apparently | |
1039 | // non-existent bucket; let's try to recover by cleaning up | |
1040 | ldout(cct, 0) << __func__ << | |
1911f103 | 1041 | ": removing reshard queue entry for a resharded or non-existent bucket" << |
92f5a8d4 TL |
1042 | entry.bucket_name << dendl; |
1043 | ||
1044 | ret = remove(entry); | |
1045 | if (ret < 0) { | |
1046 | ldout(cct, 0) << __func__ << | |
1047 | ": Error removing non-existent bucket " << | |
1048 | entry.bucket_name << " from resharding queue: " << | |
1049 | cpp_strerror(-ret) << dendl; | |
1050 | return ret; | |
1051 | } | |
1052 | ||
1053 | // we cleaned up, move on to the next entry | |
1054 | goto finished_entry; | |
31f18b77 FG |
1055 | } |
1056 | ||
f64942e4 | 1057 | RGWBucketReshard br(store, bucket_info, attrs, nullptr); |
92f5a8d4 TL |
1058 | ret = br.execute(entry.new_num_shards, max_entries, false, nullptr, |
1059 | nullptr, this); | |
31f18b77 | 1060 | if (ret < 0) { |
92f5a8d4 TL |
1061 | ldout(store->ctx(), 0) << __func__ << |
1062 | ": Error during resharding bucket " << entry.bucket_name << ":" << | |
31f18b77 FG |
1063 | cpp_strerror(-ret)<< dendl; |
1064 | return ret; | |
1065 | } | |
1066 | ||
92f5a8d4 TL |
1067 | ldout(store->ctx(), 20) << __func__ << |
1068 | " removing reshard queue entry for bucket " << entry.bucket_name << | |
f64942e4 | 1069 | dendl; |
31f18b77 FG |
1070 | |
1071 | ret = remove(entry); | |
1072 | if (ret < 0) { | |
92f5a8d4 TL |
1073 | ldout(cct, 0) << __func__ << ": Error removing bucket " << |
1074 | entry.bucket_name << " from resharding queue: " << | |
f64942e4 | 1075 | cpp_strerror(-ret) << dendl; |
31f18b77 FG |
1076 | return ret; |
1077 | } | |
92f5a8d4 TL |
1078 | } // if new instance id is empty |
1079 | ||
1080 | finished_entry: | |
f64942e4 AA |
1081 | |
1082 | Clock::time_point now = Clock::now(); | |
1083 | if (logshard_lock.should_renew(now)) { | |
1084 | ret = logshard_lock.renew(now); | |
1085 | if (ret < 0) { | |
1086 | return ret; | |
1087 | } | |
31f18b77 | 1088 | } |
f64942e4 | 1089 | |
31f18b77 | 1090 | entry.get_key(&marker); |
92f5a8d4 | 1091 | } // entry for loop |
31f18b77 FG |
1092 | } while (truncated); |
1093 | ||
f64942e4 | 1094 | logshard_lock.unlock(); |
31f18b77 FG |
1095 | return 0; |
1096 | } | |
1097 | ||
1098 | ||
1099 | void RGWReshard::get_logshard_oid(int shard_num, string *logshard) | |
1100 | { | |
1101 | char buf[32]; | |
1102 | snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num); | |
1103 | ||
1104 | string objname(reshard_oid_prefix); | |
1105 | *logshard = objname + buf; | |
1106 | } | |
1107 | ||
1108 | int RGWReshard::process_all_logshards() | |
1109 | { | |
9f95a23c | 1110 | if (!store->svc()->zone->can_reshard()) { |
3efd9988 FG |
1111 | ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; |
1112 | return 0; | |
1113 | } | |
31f18b77 FG |
1114 | int ret = 0; |
1115 | ||
1116 | for (int i = 0; i < num_logshards; i++) { | |
1117 | string logshard; | |
1118 | get_logshard_oid(i, &logshard); | |
1119 | ||
81eedcae | 1120 | ldout(store->ctx(), 20) << "processing logshard = " << logshard << dendl; |
31f18b77 FG |
1121 | |
1122 | ret = process_single_logshard(i); | |
9f95a23c TL |
1123 | |
1124 | ldout(store->ctx(), 20) << "finish processing logshard = " << logshard << " , ret = " << ret << dendl; | |
31f18b77 FG |
1125 | } |
1126 | ||
1127 | return 0; | |
1128 | } | |
1129 | ||
1130 | bool RGWReshard::going_down() | |
1131 | { | |
1132 | return down_flag; | |
1133 | } | |
1134 | ||
1135 | void RGWReshard::start_processor() | |
1136 | { | |
1137 | worker = new ReshardWorker(store->ctx(), this); | |
1138 | worker->create("rgw_reshard"); | |
1139 | } | |
1140 | ||
1141 | void RGWReshard::stop_processor() | |
1142 | { | |
1143 | down_flag = true; | |
1144 | if (worker) { | |
1145 | worker->stop(); | |
1146 | worker->join(); | |
1147 | } | |
1148 | delete worker; | |
224ce89b | 1149 | worker = nullptr; |
31f18b77 FG |
1150 | } |
1151 | ||
1152 | void *RGWReshard::ReshardWorker::entry() { | |
31f18b77 FG |
1153 | do { |
1154 | utime_t start = ceph_clock_now(); | |
9f95a23c | 1155 | reshard->process_all_logshards(); |
31f18b77 FG |
1156 | |
1157 | if (reshard->going_down()) | |
1158 | break; | |
1159 | ||
1160 | utime_t end = ceph_clock_now(); | |
1161 | end -= start; | |
11fdf7f2 | 1162 | int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval"); |
31f18b77 FG |
1163 | |
1164 | if (secs <= end.sec()) | |
1165 | continue; // next round | |
1166 | ||
1167 | secs -= end.sec(); | |
1168 | ||
9f95a23c TL |
1169 | std::unique_lock locker{lock}; |
1170 | cond.wait_for(locker, std::chrono::seconds(secs)); | |
31f18b77 FG |
1171 | } while (!reshard->going_down()); |
1172 | ||
1173 | return NULL; | |
1174 | } | |
1175 | ||
1176 | void RGWReshard::ReshardWorker::stop() | |
1177 | { | |
9f95a23c TL |
1178 | std::lock_guard l{lock}; |
1179 | cond.notify_all(); | |
31f18b77 | 1180 | } |