]>
Commit | Line | Data |
---|---|---|
31f18b77 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
f64942e4 | 4 | #include <limits> |
11fdf7f2 | 5 | #include <sstream> |
f64942e4 | 6 | |
31f18b77 | 7 | #include "rgw_rados.h" |
11fdf7f2 | 8 | #include "rgw_zone.h" |
31f18b77 FG |
9 | #include "rgw_bucket.h" |
10 | #include "rgw_reshard.h" | |
11 | #include "cls/rgw/cls_rgw_client.h" | |
12 | #include "cls/lock/cls_lock_client.h" | |
13 | #include "common/errno.h" | |
14 | #include "common/ceph_json.h" | |
15 | ||
16 | #include "common/dout.h" | |
17 | ||
11fdf7f2 TL |
18 | #include "services/svc_zone.h" |
19 | #include "services/svc_sys_obj.h" | |
20 | ||
31f18b77 FG |
21 | #define dout_context g_ceph_context |
22 | #define dout_subsys ceph_subsys_rgw | |
23 | ||
24 | const string reshard_oid_prefix = "reshard."; | |
25 | const string reshard_lock_name = "reshard_process"; | |
26 | const string bucket_instance_lock_name = "bucket_instance_lock"; | |
27 | ||
f64942e4 | 28 | |
31f18b77 FG |
29 | class BucketReshardShard { |
30 | RGWRados *store; | |
31 | const RGWBucketInfo& bucket_info; | |
32 | int num_shard; | |
33 | RGWRados::BucketShard bs; | |
34 | vector<rgw_cls_bi_entry> entries; | |
11fdf7f2 | 35 | map<RGWObjCategory, rgw_bucket_category_stats> stats; |
31f18b77 | 36 | deque<librados::AioCompletion *>& aio_completions; |
11fdf7f2 TL |
37 | uint64_t max_aio_completions; |
38 | uint64_t reshard_shard_batch_size; | |
31f18b77 FG |
39 | |
40 | int wait_next_completion() { | |
41 | librados::AioCompletion *c = aio_completions.front(); | |
42 | aio_completions.pop_front(); | |
43 | ||
44 | c->wait_for_safe(); | |
45 | ||
46 | int ret = c->get_return_value(); | |
47 | c->release(); | |
48 | ||
49 | if (ret < 0) { | |
50 | derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl; | |
51 | return ret; | |
52 | } | |
53 | ||
54 | return 0; | |
55 | } | |
56 | ||
57 | int get_completion(librados::AioCompletion **c) { | |
11fdf7f2 | 58 | if (aio_completions.size() >= max_aio_completions) { |
31f18b77 FG |
59 | int ret = wait_next_completion(); |
60 | if (ret < 0) { | |
61 | return ret; | |
62 | } | |
63 | } | |
64 | ||
65 | *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); | |
66 | aio_completions.push_back(*c); | |
67 | ||
68 | return 0; | |
69 | } | |
70 | ||
71 | public: | |
72 | BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info, | |
73 | int _num_shard, | |
f64942e4 AA |
74 | deque<librados::AioCompletion *>& _completions) : |
75 | store(_store), bucket_info(_bucket_info), bs(store), | |
76 | aio_completions(_completions) | |
77 | { | |
31f18b77 | 78 | num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); |
f64942e4 | 79 | bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */); |
11fdf7f2 TL |
80 | |
81 | max_aio_completions = | |
82 | store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio"); | |
83 | reshard_shard_batch_size = | |
84 | store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size"); | |
31f18b77 FG |
85 | } |
86 | ||
87 | int get_num_shard() { | |
88 | return num_shard; | |
89 | } | |
90 | ||
11fdf7f2 | 91 | int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, |
31f18b77 FG |
92 | const rgw_bucket_category_stats& entry_stats) { |
93 | entries.push_back(entry); | |
94 | if (account) { | |
95 | rgw_bucket_category_stats& target = stats[category]; | |
96 | target.num_entries += entry_stats.num_entries; | |
97 | target.total_size += entry_stats.total_size; | |
98 | target.total_size_rounded += entry_stats.total_size_rounded; | |
91327a77 | 99 | target.actual_size += entry_stats.actual_size; |
31f18b77 | 100 | } |
11fdf7f2 | 101 | if (entries.size() >= reshard_shard_batch_size) { |
31f18b77 FG |
102 | int ret = flush(); |
103 | if (ret < 0) { | |
104 | return ret; | |
105 | } | |
106 | } | |
f64942e4 | 107 | |
31f18b77 FG |
108 | return 0; |
109 | } | |
f64942e4 | 110 | |
31f18b77 FG |
111 | int flush() { |
112 | if (entries.size() == 0) { | |
113 | return 0; | |
114 | } | |
115 | ||
116 | librados::ObjectWriteOperation op; | |
117 | for (auto& entry : entries) { | |
118 | store->bi_put(op, bs, entry); | |
119 | } | |
120 | cls_rgw_bucket_update_stats(op, false, stats); | |
121 | ||
122 | librados::AioCompletion *c; | |
123 | int ret = get_completion(&c); | |
124 | if (ret < 0) { | |
125 | return ret; | |
126 | } | |
127 | ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op); | |
128 | if (ret < 0) { | |
129 | derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl; | |
130 | return ret; | |
131 | } | |
132 | entries.clear(); | |
133 | stats.clear(); | |
134 | return 0; | |
135 | } | |
136 | ||
137 | int wait_all_aio() { | |
138 | int ret = 0; | |
139 | while (!aio_completions.empty()) { | |
140 | int r = wait_next_completion(); | |
141 | if (r < 0) { | |
142 | ret = r; | |
143 | } | |
144 | } | |
145 | return ret; | |
146 | } | |
f64942e4 AA |
147 | }; // class BucketReshardShard |
148 | ||
31f18b77 FG |
149 | |
150 | class BucketReshardManager { | |
151 | RGWRados *store; | |
152 | const RGWBucketInfo& target_bucket_info; | |
153 | deque<librados::AioCompletion *> completions; | |
154 | int num_target_shards; | |
155 | vector<BucketReshardShard *> target_shards; | |
156 | ||
157 | public: | |
f64942e4 AA |
158 | BucketReshardManager(RGWRados *_store, |
159 | const RGWBucketInfo& _target_bucket_info, | |
160 | int _num_target_shards) : | |
161 | store(_store), target_bucket_info(_target_bucket_info), | |
162 | num_target_shards(_num_target_shards) | |
163 | { | |
31f18b77 FG |
164 | target_shards.resize(num_target_shards); |
165 | for (int i = 0; i < num_target_shards; ++i) { | |
166 | target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); | |
167 | } | |
168 | } | |
169 | ||
170 | ~BucketReshardManager() { | |
171 | for (auto& shard : target_shards) { | |
172 | int ret = shard->wait_all_aio(); | |
173 | if (ret < 0) { | |
f64942e4 AA |
174 | ldout(store->ctx(), 20) << __func__ << |
175 | ": shard->wait_all_aio() returned ret=" << ret << dendl; | |
31f18b77 FG |
176 | } |
177 | } | |
178 | } | |
179 | ||
180 | int add_entry(int shard_index, | |
11fdf7f2 | 181 | rgw_cls_bi_entry& entry, bool account, RGWObjCategory category, |
31f18b77 | 182 | const rgw_bucket_category_stats& entry_stats) { |
f64942e4 AA |
183 | int ret = target_shards[shard_index]->add_entry(entry, account, category, |
184 | entry_stats); | |
31f18b77 | 185 | if (ret < 0) { |
f64942e4 AA |
186 | derr << "ERROR: target_shards.add_entry(" << entry.idx << |
187 | ") returned error: " << cpp_strerror(-ret) << dendl; | |
31f18b77 FG |
188 | return ret; |
189 | } | |
f64942e4 | 190 | |
31f18b77 FG |
191 | return 0; |
192 | } | |
193 | ||
194 | int finish() { | |
195 | int ret = 0; | |
196 | for (auto& shard : target_shards) { | |
197 | int r = shard->flush(); | |
198 | if (r < 0) { | |
199 | derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl; | |
200 | ret = r; | |
201 | } | |
202 | } | |
203 | for (auto& shard : target_shards) { | |
204 | int r = shard->wait_all_aio(); | |
205 | if (r < 0) { | |
206 | derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl; | |
207 | ret = r; | |
208 | } | |
209 | delete shard; | |
210 | } | |
211 | target_shards.clear(); | |
212 | return ret; | |
213 | } | |
f64942e4 AA |
214 | }; // class BucketReshardManager |
215 | ||
216 | RGWBucketReshard::RGWBucketReshard(RGWRados *_store, | |
217 | const RGWBucketInfo& _bucket_info, | |
218 | const map<string, bufferlist>& _bucket_attrs, | |
219 | RGWBucketReshardLock* _outer_reshard_lock) : | |
220 | store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), | |
221 | reshard_lock(store, bucket_info, true), | |
222 | outer_reshard_lock(_outer_reshard_lock) | |
223 | { } | |
224 | ||
225 | int RGWBucketReshard::set_resharding_status(RGWRados* store, | |
226 | const RGWBucketInfo& bucket_info, | |
227 | const string& new_instance_id, | |
228 | int32_t num_shards, | |
229 | cls_rgw_reshard_status status) | |
31f18b77 FG |
230 | { |
231 | if (new_instance_id.empty()) { | |
232 | ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl; | |
233 | return -EINVAL; | |
234 | } | |
235 | ||
236 | cls_rgw_bucket_instance_entry instance_entry; | |
237 | instance_entry.set_status(new_instance_id, num_shards, status); | |
238 | ||
239 | int ret = store->bucket_set_reshard(bucket_info, instance_entry); | |
240 | if (ret < 0) { | |
241 | ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: " | |
242 | << cpp_strerror(-ret) << dendl; | |
243 | return ret; | |
244 | } | |
245 | return 0; | |
246 | } | |
247 | ||
f64942e4 AA |
248 | // reshard lock assumes lock is held |
249 | int RGWBucketReshard::clear_resharding(RGWRados* store, | |
250 | const RGWBucketInfo& bucket_info) | |
31f18b77 | 251 | { |
f64942e4 AA |
252 | int ret = clear_index_shard_reshard_status(store, bucket_info); |
253 | if (ret < 0) { | |
254 | ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ << | |
255 | " ERROR: error clearing reshard status from index shard " << | |
256 | cpp_strerror(-ret) << dendl; | |
257 | return ret; | |
258 | } | |
31f18b77 | 259 | |
f64942e4 AA |
260 | cls_rgw_bucket_instance_entry instance_entry; |
261 | ret = store->bucket_set_reshard(bucket_info, instance_entry); | |
31f18b77 | 262 | if (ret < 0) { |
f64942e4 AA |
263 | ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << |
264 | " ERROR: error setting bucket resharding flag on bucket index: " << | |
265 | cpp_strerror(-ret) << dendl; | |
31f18b77 FG |
266 | return ret; |
267 | } | |
f64942e4 AA |
268 | |
269 | return 0; | |
270 | } | |
271 | ||
272 | int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados* store, | |
273 | const RGWBucketInfo& bucket_info) | |
274 | { | |
275 | uint32_t num_shards = bucket_info.num_shards; | |
276 | ||
277 | if (num_shards < std::numeric_limits<uint32_t>::max()) { | |
278 | int ret = set_resharding_status(store, bucket_info, | |
279 | bucket_info.bucket.bucket_id, | |
280 | (num_shards < 1 ? 1 : num_shards), | |
494da23a | 281 | CLS_RGW_RESHARD_NOT_RESHARDING); |
f64942e4 AA |
282 | if (ret < 0) { |
283 | ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ << | |
284 | " ERROR: error clearing reshard status from index shard " << | |
285 | cpp_strerror(-ret) << dendl; | |
286 | return ret; | |
287 | } | |
288 | } | |
289 | ||
31f18b77 FG |
290 | return 0; |
291 | } | |
292 | ||
293 | static int create_new_bucket_instance(RGWRados *store, | |
294 | int new_num_shards, | |
295 | const RGWBucketInfo& bucket_info, | |
296 | map<string, bufferlist>& attrs, | |
297 | RGWBucketInfo& new_bucket_info) | |
298 | { | |
299 | new_bucket_info = bucket_info; | |
300 | ||
301 | store->create_bucket_id(&new_bucket_info.bucket.bucket_id); | |
302 | new_bucket_info.bucket.oid.clear(); | |
303 | ||
304 | new_bucket_info.num_shards = new_num_shards; | |
305 | new_bucket_info.objv_tracker.clear(); | |
306 | ||
307 | new_bucket_info.new_bucket_instance_id.clear(); | |
308 | new_bucket_info.reshard_status = 0; | |
309 | ||
310 | int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards); | |
311 | if (ret < 0) { | |
312 | cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; | |
f64942e4 | 313 | return ret; |
31f18b77 FG |
314 | } |
315 | ||
316 | ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs); | |
317 | if (ret < 0) { | |
318 | cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; | |
f64942e4 | 319 | return ret; |
31f18b77 FG |
320 | } |
321 | ||
322 | return 0; | |
323 | } | |
324 | ||
325 | int RGWBucketReshard::create_new_bucket_instance(int new_num_shards, | |
326 | RGWBucketInfo& new_bucket_info) | |
327 | { | |
f64942e4 AA |
328 | return ::create_new_bucket_instance(store, new_num_shards, |
329 | bucket_info, bucket_attrs, new_bucket_info); | |
31f18b77 FG |
330 | } |
331 | ||
94b18763 FG |
332 | int RGWBucketReshard::cancel() |
333 | { | |
f64942e4 | 334 | int ret = reshard_lock.lock(); |
94b18763 FG |
335 | if (ret < 0) { |
336 | return ret; | |
337 | } | |
338 | ||
339 | ret = clear_resharding(); | |
340 | ||
f64942e4 AA |
341 | reshard_lock.unlock(); |
342 | return ret; | |
94b18763 FG |
343 | } |
344 | ||
31f18b77 FG |
345 | class BucketInfoReshardUpdate |
346 | { | |
347 | RGWRados *store; | |
92f5a8d4 | 348 | RGWBucketInfo& bucket_info; |
31f18b77 FG |
349 | std::map<string, bufferlist> bucket_attrs; |
350 | ||
351 | bool in_progress{false}; | |
352 | ||
353 | int set_status(cls_rgw_reshard_status s) { | |
354 | bucket_info.reshard_status = s; | |
355 | int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs); | |
356 | if (ret < 0) { | |
357 | ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl; | |
358 | return ret; | |
359 | } | |
360 | return 0; | |
361 | } | |
362 | ||
363 | public: | |
f64942e4 AA |
364 | BucketInfoReshardUpdate(RGWRados *_store, |
365 | RGWBucketInfo& _bucket_info, | |
366 | map<string, bufferlist>& _bucket_attrs, | |
367 | const string& new_bucket_id) : | |
368 | store(_store), | |
369 | bucket_info(_bucket_info), | |
370 | bucket_attrs(_bucket_attrs) | |
371 | { | |
31f18b77 FG |
372 | bucket_info.new_bucket_instance_id = new_bucket_id; |
373 | } | |
f64942e4 | 374 | |
31f18b77 FG |
375 | ~BucketInfoReshardUpdate() { |
376 | if (in_progress) { | |
f64942e4 AA |
377 | // resharding must not have ended correctly, clean up |
378 | int ret = | |
379 | RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info); | |
380 | if (ret < 0) { | |
381 | lderr(store->ctx()) << "Error: " << __func__ << | |
382 | " clear_index_shard_status returned " << ret << dendl; | |
383 | } | |
31f18b77 | 384 | bucket_info.new_bucket_instance_id.clear(); |
494da23a | 385 | set_status(CLS_RGW_RESHARD_NOT_RESHARDING); // clears new_bucket_instance as well |
31f18b77 FG |
386 | } |
387 | } | |
388 | ||
389 | int start() { | |
390 | int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS); | |
391 | if (ret < 0) { | |
392 | return ret; | |
393 | } | |
394 | in_progress = true; | |
395 | return 0; | |
396 | } | |
397 | ||
398 | int complete() { | |
399 | int ret = set_status(CLS_RGW_RESHARD_DONE); | |
400 | if (ret < 0) { | |
401 | return ret; | |
402 | } | |
403 | in_progress = false; | |
404 | return 0; | |
405 | } | |
406 | }; | |
407 | ||
f64942e4 AA |
408 | |
409 | RGWBucketReshardLock::RGWBucketReshardLock(RGWRados* _store, | |
410 | const std::string& reshard_lock_oid, | |
411 | bool _ephemeral) : | |
412 | store(_store), | |
413 | lock_oid(reshard_lock_oid), | |
414 | ephemeral(_ephemeral), | |
415 | internal_lock(reshard_lock_name) | |
416 | { | |
11fdf7f2 TL |
417 | const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>( |
418 | "rgw_reshard_bucket_lock_duration"); | |
f64942e4 AA |
419 | duration = std::chrono::seconds(lock_dur_secs); |
420 | ||
421 | #define COOKIE_LEN 16 | |
422 | char cookie_buf[COOKIE_LEN + 1]; | |
423 | gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); | |
424 | cookie_buf[COOKIE_LEN] = '\0'; | |
425 | ||
426 | internal_lock.set_cookie(cookie_buf); | |
427 | internal_lock.set_duration(duration); | |
428 | } | |
429 | ||
430 | int RGWBucketReshardLock::lock() { | |
431 | internal_lock.set_must_renew(false); | |
432 | int ret; | |
433 | if (ephemeral) { | |
434 | ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx, | |
435 | lock_oid); | |
436 | } else { | |
437 | ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid); | |
438 | } | |
439 | if (ret < 0) { | |
440 | ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ << | |
441 | " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl; | |
442 | return ret; | |
443 | } | |
444 | reset_time(Clock::now()); | |
445 | ||
446 | return 0; | |
447 | } | |
448 | ||
449 | void RGWBucketReshardLock::unlock() { | |
450 | int ret = internal_lock.unlock(&store->reshard_pool_ctx, lock_oid); | |
451 | if (ret < 0) { | |
452 | ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ << | |
453 | " failed to drop lock on " << lock_oid << " ret=" << ret << dendl; | |
454 | } | |
455 | } | |
456 | ||
457 | int RGWBucketReshardLock::renew(const Clock::time_point& now) { | |
458 | internal_lock.set_must_renew(true); | |
459 | int ret; | |
460 | if (ephemeral) { | |
461 | ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx, | |
462 | lock_oid); | |
463 | } else { | |
464 | ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid); | |
465 | } | |
466 | if (ret < 0) { /* expired or already locked by another processor */ | |
11fdf7f2 TL |
467 | std::stringstream error_s; |
468 | if (-ENOENT == ret) { | |
469 | error_s << "ENOENT (lock expired or never initially locked)"; | |
470 | } else { | |
471 | error_s << ret << " (" << cpp_strerror(-ret) << ")"; | |
472 | } | |
f64942e4 | 473 | ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " << |
11fdf7f2 | 474 | lock_oid << " with error " << error_s.str() << dendl; |
f64942e4 AA |
475 | return ret; |
476 | } | |
477 | internal_lock.set_must_renew(false); | |
478 | ||
479 | reset_time(now); | |
480 | ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " << | |
481 | lock_oid << dendl; | |
482 | ||
483 | return 0; | |
484 | } | |
485 | ||
486 | ||
487 | int RGWBucketReshard::do_reshard(int num_shards, | |
488 | RGWBucketInfo& new_bucket_info, | |
489 | int max_entries, | |
490 | bool verbose, | |
491 | ostream *out, | |
492 | Formatter *formatter) | |
31f18b77 FG |
493 | { |
494 | rgw_bucket& bucket = bucket_info.bucket; | |
495 | ||
496 | int ret = 0; | |
497 | ||
498 | if (out) { | |
31f18b77 FG |
499 | (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl; |
500 | (*out) << "bucket name: " << bucket_info.bucket.name << std::endl; | |
f64942e4 AA |
501 | (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << |
502 | std::endl; | |
503 | (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << | |
504 | std::endl; | |
31f18b77 FG |
505 | } |
506 | ||
f64942e4 | 507 | /* update bucket info -- in progress*/ |
31f18b77 FG |
508 | list<rgw_cls_bi_entry> entries; |
509 | ||
510 | if (max_entries < 0) { | |
f64942e4 AA |
511 | ldout(store->ctx(), 0) << __func__ << |
512 | ": can't reshard, negative max_entries" << dendl; | |
31f18b77 FG |
513 | return -EINVAL; |
514 | } | |
515 | ||
f64942e4 AA |
516 | // NB: destructor cleans up sharding state if reshard does not |
517 | // complete successfully | |
31f18b77 FG |
518 | BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id); |
519 | ||
520 | ret = bucket_info_updater.start(); | |
521 | if (ret < 0) { | |
522 | ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; | |
523 | return ret; | |
524 | } | |
525 | ||
526 | int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); | |
527 | ||
528 | BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); | |
529 | ||
92f5a8d4 | 530 | bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr); |
31f18b77 | 531 | |
92f5a8d4 | 532 | if (verbose_json_out) { |
31f18b77 FG |
533 | formatter->open_array_section("entries"); |
534 | } | |
535 | ||
536 | uint64_t total_entries = 0; | |
537 | ||
92f5a8d4 TL |
538 | if (!verbose_json_out && out) { |
539 | (*out) << "total entries:"; | |
31f18b77 FG |
540 | } |
541 | ||
f64942e4 AA |
542 | const int num_source_shards = |
543 | (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); | |
31f18b77 FG |
544 | string marker; |
545 | for (int i = 0; i < num_source_shards; ++i) { | |
546 | bool is_truncated = true; | |
547 | marker.clear(); | |
548 | while (is_truncated) { | |
549 | entries.clear(); | |
550 | ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); | |
551 | if (ret < 0 && ret != -ENOENT) { | |
552 | derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; | |
f64942e4 | 553 | return ret; |
31f18b77 FG |
554 | } |
555 | ||
f64942e4 | 556 | for (auto iter = entries.begin(); iter != entries.end(); ++iter) { |
31f18b77 | 557 | rgw_cls_bi_entry& entry = *iter; |
92f5a8d4 | 558 | if (verbose_json_out) { |
31f18b77 FG |
559 | formatter->open_object_section("entry"); |
560 | ||
561 | encode_json("shard_id", i, formatter); | |
562 | encode_json("num_entry", total_entries, formatter); | |
563 | encode_json("entry", entry, formatter); | |
564 | } | |
565 | total_entries++; | |
566 | ||
567 | marker = entry.idx; | |
568 | ||
569 | int target_shard_id; | |
570 | cls_rgw_obj_key cls_key; | |
11fdf7f2 | 571 | RGWObjCategory category; |
31f18b77 FG |
572 | rgw_bucket_category_stats stats; |
573 | bool account = entry.get_info(&cls_key, &category, &stats); | |
574 | rgw_obj_key key(cls_key); | |
575 | rgw_obj obj(new_bucket_info.bucket, key); | |
92f5a8d4 TL |
576 | RGWMPObj mp; |
577 | if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) { | |
578 | // place the multipart .meta object on the same shard as its head object | |
579 | obj.index_hash_source = mp.get_key(); | |
580 | } | |
31f18b77 FG |
581 | int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); |
582 | if (ret < 0) { | |
583 | lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; | |
584 | return ret; | |
585 | } | |
586 | ||
587 | int shard_index = (target_shard_id > 0 ? target_shard_id : 0); | |
588 | ||
f64942e4 AA |
589 | ret = target_shards_mgr.add_entry(shard_index, entry, account, |
590 | category, stats); | |
31f18b77 FG |
591 | if (ret < 0) { |
592 | return ret; | |
593 | } | |
f64942e4 AA |
594 | |
595 | Clock::time_point now = Clock::now(); | |
596 | if (reshard_lock.should_renew(now)) { | |
597 | // assume outer locks have timespans at least the size of ours, so | |
598 | // can call inside conditional | |
599 | if (outer_reshard_lock) { | |
600 | ret = outer_reshard_lock->renew(now); | |
601 | if (ret < 0) { | |
602 | return ret; | |
603 | } | |
604 | } | |
605 | ret = reshard_lock.renew(now); | |
606 | if (ret < 0) { | |
607 | lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; | |
608 | return ret; | |
609 | } | |
610 | } | |
611 | ||
92f5a8d4 | 612 | if (verbose_json_out) { |
31f18b77 | 613 | formatter->close_section(); |
92f5a8d4 | 614 | formatter->flush(*out); |
31f18b77 FG |
615 | } else if (out && !(total_entries % 1000)) { |
616 | (*out) << " " << total_entries; | |
617 | } | |
f64942e4 | 618 | } // entries loop |
31f18b77 FG |
619 | } |
620 | } | |
f64942e4 | 621 | |
92f5a8d4 | 622 | if (verbose_json_out) { |
31f18b77 | 623 | formatter->close_section(); |
92f5a8d4 | 624 | formatter->flush(*out); |
31f18b77 FG |
625 | } else if (out) { |
626 | (*out) << " " << total_entries << std::endl; | |
627 | } | |
628 | ||
629 | ret = target_shards_mgr.finish(); | |
630 | if (ret < 0) { | |
631 | lderr(store->ctx()) << "ERROR: failed to reshard" << dendl; | |
f64942e4 | 632 | return -EIO; |
31f18b77 FG |
633 | } |
634 | ||
b32b8144 FG |
635 | ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time); |
636 | if (ret < 0) { | |
637 | lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl; | |
f64942e4 | 638 | return ret; |
31f18b77 FG |
639 | } |
640 | ||
641 | ret = bucket_info_updater.complete(); | |
642 | if (ret < 0) { | |
643 | ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; | |
644 | /* don't error out, reshard process succeeded */ | |
645 | } | |
f64942e4 | 646 | |
31f18b77 | 647 | return 0; |
f64942e4 AA |
648 | // NB: some error clean-up is done by ~BucketInfoReshardUpdate |
649 | } // RGWBucketReshard::do_reshard | |
31f18b77 FG |
650 | |
651 | int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status) | |
652 | { | |
653 | librados::IoCtx index_ctx; | |
654 | map<int, string> bucket_objs; | |
655 | ||
656 | int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs); | |
657 | if (r < 0) { | |
658 | return r; | |
659 | } | |
660 | ||
661 | for (auto i : bucket_objs) { | |
662 | cls_rgw_bucket_instance_entry entry; | |
663 | ||
664 | int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry); | |
665 | if (ret < 0 && ret != -ENOENT) { | |
666 | lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl; | |
667 | return ret; | |
668 | } | |
669 | ||
670 | status->push_back(entry); | |
671 | } | |
672 | ||
673 | return 0; | |
674 | } | |
675 | ||
31f18b77 | 676 | |
f64942e4 AA |
677 | int RGWBucketReshard::execute(int num_shards, int max_op_entries, |
678 | bool verbose, ostream *out, Formatter *formatter, | |
679 | RGWReshard* reshard_log) | |
31f18b77 | 680 | { |
f64942e4 AA |
681 | Clock::time_point now; |
682 | ||
683 | int ret = reshard_lock.lock(); | |
31f18b77 FG |
684 | if (ret < 0) { |
685 | return ret; | |
686 | } | |
687 | ||
688 | RGWBucketInfo new_bucket_info; | |
689 | ret = create_new_bucket_instance(num_shards, new_bucket_info); | |
690 | if (ret < 0) { | |
f64942e4 AA |
691 | // shard state is uncertain, but this will attempt to remove them anyway |
692 | goto error_out; | |
31f18b77 FG |
693 | } |
694 | ||
695 | if (reshard_log) { | |
696 | ret = reshard_log->update(bucket_info, new_bucket_info); | |
697 | if (ret < 0) { | |
f64942e4 | 698 | goto error_out; |
31f18b77 FG |
699 | } |
700 | } | |
701 | ||
f64942e4 AA |
702 | // set resharding status of current bucket_info & shards with |
703 | // information about planned resharding | |
704 | ret = set_resharding_status(new_bucket_info.bucket.bucket_id, | |
705 | num_shards, CLS_RGW_RESHARD_IN_PROGRESS); | |
31f18b77 | 706 | if (ret < 0) { |
f64942e4 | 707 | reshard_lock.unlock(); |
31f18b77 FG |
708 | return ret; |
709 | } | |
710 | ||
711 | ret = do_reshard(num_shards, | |
712 | new_bucket_info, | |
713 | max_op_entries, | |
714 | verbose, out, formatter); | |
31f18b77 | 715 | if (ret < 0) { |
f64942e4 | 716 | goto error_out; |
31f18b77 FG |
717 | } |
718 | ||
f64942e4 AA |
719 | // at this point we've done the main work; we'll make a best-effort |
720 | // to clean-up but will not indicate any errors encountered | |
721 | ||
722 | reshard_lock.unlock(); | |
723 | ||
724 | // resharding successful, so remove old bucket index shards; use | |
725 | // best effort and don't report out an error; the lock isn't needed | |
726 | // at this point since all we're using a best effor to to remove old | |
727 | // shard objects | |
728 | ret = store->clean_bucket_index(bucket_info, bucket_info.num_shards); | |
31f18b77 | 729 | if (ret < 0) { |
f64942e4 AA |
730 | lderr(store->ctx()) << "Error: " << __func__ << |
731 | " failed to clean up old shards; " << | |
732 | "RGWRados::clean_bucket_index returned " << ret << dendl; | |
31f18b77 FG |
733 | } |
734 | ||
f64942e4 AA |
735 | ret = rgw_bucket_instance_remove_entry(store, |
736 | bucket_info.bucket.get_key(), | |
737 | nullptr); | |
738 | if (ret < 0) { | |
739 | lderr(store->ctx()) << "Error: " << __func__ << | |
740 | " failed to clean old bucket info object \"" << | |
741 | bucket_info.bucket.get_key() << | |
742 | "\"created after successful resharding with error " << ret << dendl; | |
743 | } | |
31f18b77 | 744 | |
a8e16298 TL |
745 | ldout(store->ctx(), 1) << __func__ << |
746 | " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" << | |
747 | bucket_info.bucket.get_key() << "\" to \"" << | |
748 | new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl; | |
749 | ||
31f18b77 | 750 | return 0; |
f64942e4 AA |
751 | |
752 | error_out: | |
753 | ||
754 | reshard_lock.unlock(); | |
755 | ||
756 | // since the real problem is the issue that led to this error code | |
757 | // path, we won't touch ret and instead use another variable to | |
758 | // temporarily error codes | |
759 | int ret2 = store->clean_bucket_index(new_bucket_info, | |
760 | new_bucket_info.num_shards); | |
761 | if (ret2 < 0) { | |
762 | lderr(store->ctx()) << "Error: " << __func__ << | |
763 | " failed to clean up shards from failed incomplete resharding; " << | |
764 | "RGWRados::clean_bucket_index returned " << ret2 << dendl; | |
765 | } | |
766 | ||
767 | ret2 = rgw_bucket_instance_remove_entry(store, | |
768 | new_bucket_info.bucket.get_key(), | |
769 | nullptr); | |
770 | if (ret2 < 0) { | |
771 | lderr(store->ctx()) << "Error: " << __func__ << | |
772 | " failed to clean bucket info object \"" << | |
773 | new_bucket_info.bucket.get_key() << | |
774 | "\"created during incomplete resharding with error " << ret2 << dendl; | |
775 | } | |
776 | ||
777 | return ret; | |
778 | } // execute | |
31f18b77 FG |
779 | |
780 | ||
781 | RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out, | |
f64942e4 AA |
782 | Formatter *_formatter) : |
783 | store(_store), instance_lock(bucket_instance_lock_name), | |
784 | verbose(_verbose), out(_out), formatter(_formatter) | |
31f18b77 | 785 | { |
11fdf7f2 | 786 | num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs"); |
31f18b77 FG |
787 | } |
788 | ||
f64942e4 AA |
789 | string RGWReshard::get_logshard_key(const string& tenant, |
790 | const string& bucket_name) | |
31f18b77 FG |
791 | { |
792 | return tenant + ":" + bucket_name; | |
793 | } | |
794 | ||
795 | #define MAX_RESHARD_LOGSHARDS_PRIME 7877 | |
796 | ||
797 | void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid) | |
798 | { | |
799 | string key = get_logshard_key(tenant, bucket_name); | |
800 | ||
801 | uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size()); | |
802 | uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); | |
803 | sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards; | |
31f18b77 | 804 | |
1adf2230 | 805 | get_logshard_oid(int(sid), oid); |
31f18b77 FG |
806 | } |
807 | ||
808 | int RGWReshard::add(cls_rgw_reshard_entry& entry) | |
809 | { | |
11fdf7f2 | 810 | if (!store->svc.zone->can_reshard()) { |
3efd9988 FG |
811 | ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; |
812 | return 0; | |
813 | } | |
814 | ||
31f18b77 FG |
815 | string logshard_oid; |
816 | ||
817 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
818 | ||
819 | librados::ObjectWriteOperation op; | |
820 | cls_rgw_reshard_add(op, entry); | |
821 | ||
822 | int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); | |
823 | if (ret < 0) { | |
824 | lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; | |
825 | return ret; | |
826 | } | |
827 | return 0; | |
828 | } | |
829 | ||
830 | int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info) | |
831 | { | |
832 | cls_rgw_reshard_entry entry; | |
833 | entry.bucket_name = bucket_info.bucket.name; | |
834 | entry.bucket_id = bucket_info.bucket.bucket_id; | |
b32b8144 | 835 | entry.tenant = bucket_info.owner.tenant; |
31f18b77 FG |
836 | |
837 | int ret = get(entry); | |
838 | if (ret < 0) { | |
839 | return ret; | |
840 | } | |
841 | ||
842 | entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id; | |
843 | ||
844 | ret = add(entry); | |
845 | if (ret < 0) { | |
846 | ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << | |
847 | cpp_strerror(-ret) << dendl; | |
848 | } | |
849 | ||
850 | return ret; | |
851 | } | |
852 | ||
853 | ||
854 | int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated) | |
855 | { | |
856 | string logshard_oid; | |
857 | ||
858 | get_logshard_oid(logshard_num, &logshard_oid); | |
859 | ||
860 | int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated); | |
c07f9fc5 FG |
861 | |
862 | if (ret < 0) { | |
863 | if (ret == -ENOENT) { | |
864 | *is_truncated = false; | |
865 | ret = 0; | |
866 | } | |
31f18b77 | 867 | lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl; |
c07f9fc5 | 868 | if (ret == -EACCES) { |
11fdf7f2 | 869 | lderr(store->ctx()) << "access denied to pool " << store->svc.zone->get_zone_params().reshard_pool |
c07f9fc5 FG |
870 | << ". Fix the pool access permissions of your client" << dendl; |
871 | } | |
31f18b77 | 872 | } |
c07f9fc5 FG |
873 | |
874 | return ret; | |
31f18b77 FG |
875 | } |
876 | ||
877 | int RGWReshard::get(cls_rgw_reshard_entry& entry) | |
878 | { | |
879 | string logshard_oid; | |
880 | ||
881 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
882 | ||
883 | int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry); | |
884 | if (ret < 0) { | |
94b18763 FG |
885 | if (ret != -ENOENT) { |
886 | lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << | |
887 | " bucket=" << entry.bucket_name << dendl; | |
888 | } | |
31f18b77 FG |
889 | return ret; |
890 | } | |
891 | ||
892 | return 0; | |
893 | } | |
894 | ||
895 | int RGWReshard::remove(cls_rgw_reshard_entry& entry) | |
896 | { | |
897 | string logshard_oid; | |
898 | ||
899 | get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); | |
900 | ||
901 | librados::ObjectWriteOperation op; | |
902 | cls_rgw_reshard_remove(op, entry); | |
903 | ||
904 | int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); | |
905 | if (ret < 0) { | |
906 | lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; | |
907 | return ret; | |
908 | } | |
909 | ||
910 | return ret; | |
911 | } | |
912 | ||
913 | int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) | |
914 | { | |
915 | int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid); | |
916 | if (ret < 0) { | |
917 | lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl; | |
918 | return ret; | |
919 | } | |
920 | ||
921 | return 0; | |
922 | } | |
923 | ||
11fdf7f2 | 924 | int RGWReshardWait::wait(optional_yield y) |
31f18b77 | 925 | { |
11fdf7f2 TL |
926 | std::unique_lock lock(mutex); |
927 | ||
928 | if (going_down) { | |
929 | return -ECANCELED; | |
930 | } | |
931 | ||
932 | #ifdef HAVE_BOOST_CONTEXT | |
933 | if (y) { | |
934 | auto& context = y.get_io_context(); | |
935 | auto& yield = y.get_yield_context(); | |
936 | ||
937 | Waiter waiter(context); | |
938 | waiters.push_back(waiter); | |
939 | lock.unlock(); | |
940 | ||
941 | waiter.timer.expires_after(duration); | |
942 | ||
943 | boost::system::error_code ec; | |
944 | waiter.timer.async_wait(yield[ec]); | |
945 | ||
946 | lock.lock(); | |
947 | waiters.erase(waiters.iterator_to(waiter)); | |
948 | return -ec.value(); | |
949 | } | |
950 | #endif | |
31f18b77 | 951 | |
11fdf7f2 | 952 | cond.wait_for(lock, duration); |
31f18b77 FG |
953 | |
954 | if (going_down) { | |
955 | return -ECANCELED; | |
956 | } | |
957 | ||
958 | return 0; | |
959 | } | |
960 | ||
11fdf7f2 | 961 | void RGWReshardWait::stop() |
31f18b77 | 962 | { |
11fdf7f2 TL |
963 | std::scoped_lock lock(mutex); |
964 | going_down = true; | |
965 | cond.notify_all(); | |
966 | for (auto& waiter : waiters) { | |
967 | // unblock any waiters with ECANCELED | |
968 | waiter.timer.cancel(); | |
31f18b77 | 969 | } |
31f18b77 FG |
970 | } |
971 | ||
972 | int RGWReshard::process_single_logshard(int logshard_num) | |
973 | { | |
974 | string marker; | |
975 | bool truncated = true; | |
976 | ||
977 | CephContext *cct = store->ctx(); | |
f64942e4 | 978 | constexpr uint32_t max_entries = 1000; |
31f18b77 FG |
979 | |
980 | string logshard_oid; | |
981 | get_logshard_oid(logshard_num, &logshard_oid); | |
982 | ||
f64942e4 AA |
983 | RGWBucketReshardLock logshard_lock(store, logshard_oid, false); |
984 | ||
985 | int ret = logshard_lock.lock(); | |
92f5a8d4 | 986 | if (ret < 0) { |
f64942e4 | 987 | ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << |
92f5a8d4 | 988 | logshard_oid << ", ret = " << ret <<dendl; |
31f18b77 FG |
989 | return ret; |
990 | } | |
92f5a8d4 | 991 | |
31f18b77 FG |
992 | do { |
993 | std::list<cls_rgw_reshard_entry> entries; | |
994 | ret = list(logshard_num, marker, max_entries, entries, &truncated); | |
995 | if (ret < 0) { | |
f64942e4 AA |
996 | ldout(cct, 10) << "cannot list all reshards in logshard oid=" << |
997 | logshard_oid << dendl; | |
31f18b77 FG |
998 | continue; |
999 | } | |
1000 | ||
f64942e4 | 1001 | for(auto& entry: entries) { // logshard entries |
31f18b77 FG |
1002 | if(entry.new_instance_id.empty()) { |
1003 | ||
f64942e4 AA |
1004 | ldout(store->ctx(), 20) << __func__ << " resharding " << |
1005 | entry.bucket_name << dendl; | |
31f18b77 | 1006 | |
11fdf7f2 | 1007 | auto obj_ctx = store->svc.sysobj->init_obj_ctx(); |
31f18b77 FG |
1008 | rgw_bucket bucket; |
1009 | RGWBucketInfo bucket_info; | |
1010 | map<string, bufferlist> attrs; | |
1011 | ||
f64942e4 AA |
1012 | ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, |
1013 | bucket_info, nullptr, &attrs); | |
31f18b77 | 1014 | if (ret < 0) { |
92f5a8d4 TL |
1015 | ldout(cct, 0) << __func__ << |
1016 | ": Error in get_bucket_info for bucket " << entry.bucket_name << | |
1017 | ": " << cpp_strerror(-ret) << dendl; | |
1018 | if (ret != -ENOENT) { | |
1019 | // any error other than ENOENT will abort | |
1020 | return ret; | |
1021 | } | |
1022 | ||
1023 | // we've encountered a reshard queue entry for an apparently | |
1024 | // non-existent bucket; let's try to recover by cleaning up | |
1025 | ldout(cct, 0) << __func__ << | |
1026 | ": removing reshard queue entry for non-existent bucket " << | |
1027 | entry.bucket_name << dendl; | |
1028 | ||
1029 | ret = remove(entry); | |
1030 | if (ret < 0) { | |
1031 | ldout(cct, 0) << __func__ << | |
1032 | ": Error removing non-existent bucket " << | |
1033 | entry.bucket_name << " from resharding queue: " << | |
1034 | cpp_strerror(-ret) << dendl; | |
1035 | return ret; | |
1036 | } | |
1037 | ||
1038 | // we cleaned up, move on to the next entry | |
1039 | goto finished_entry; | |
31f18b77 FG |
1040 | } |
1041 | ||
f64942e4 | 1042 | RGWBucketReshard br(store, bucket_info, attrs, nullptr); |
92f5a8d4 TL |
1043 | ret = br.execute(entry.new_num_shards, max_entries, false, nullptr, |
1044 | nullptr, this); | |
31f18b77 | 1045 | if (ret < 0) { |
92f5a8d4 TL |
1046 | ldout(store->ctx(), 0) << __func__ << |
1047 | ": Error during resharding bucket " << entry.bucket_name << ":" << | |
31f18b77 FG |
1048 | cpp_strerror(-ret)<< dendl; |
1049 | return ret; | |
1050 | } | |
1051 | ||
92f5a8d4 TL |
1052 | ldout(store->ctx(), 20) << __func__ << |
1053 | " removing reshard queue entry for bucket " << entry.bucket_name << | |
f64942e4 | 1054 | dendl; |
31f18b77 FG |
1055 | |
1056 | ret = remove(entry); | |
1057 | if (ret < 0) { | |
92f5a8d4 TL |
1058 | ldout(cct, 0) << __func__ << ": Error removing bucket " << |
1059 | entry.bucket_name << " from resharding queue: " << | |
f64942e4 | 1060 | cpp_strerror(-ret) << dendl; |
31f18b77 FG |
1061 | return ret; |
1062 | } | |
92f5a8d4 TL |
1063 | } // if new instance id is empty |
1064 | ||
1065 | finished_entry: | |
f64942e4 AA |
1066 | |
1067 | Clock::time_point now = Clock::now(); | |
1068 | if (logshard_lock.should_renew(now)) { | |
1069 | ret = logshard_lock.renew(now); | |
1070 | if (ret < 0) { | |
1071 | return ret; | |
1072 | } | |
31f18b77 | 1073 | } |
f64942e4 | 1074 | |
31f18b77 | 1075 | entry.get_key(&marker); |
92f5a8d4 | 1076 | } // entry for loop |
31f18b77 FG |
1077 | } while (truncated); |
1078 | ||
f64942e4 | 1079 | logshard_lock.unlock(); |
31f18b77 FG |
1080 | return 0; |
1081 | } | |
1082 | ||
1083 | ||
1084 | void RGWReshard::get_logshard_oid(int shard_num, string *logshard) | |
1085 | { | |
1086 | char buf[32]; | |
1087 | snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num); | |
1088 | ||
1089 | string objname(reshard_oid_prefix); | |
1090 | *logshard = objname + buf; | |
1091 | } | |
1092 | ||
1093 | int RGWReshard::process_all_logshards() | |
1094 | { | |
11fdf7f2 | 1095 | if (!store->svc.zone->can_reshard()) { |
3efd9988 FG |
1096 | ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl; |
1097 | return 0; | |
1098 | } | |
31f18b77 FG |
1099 | int ret = 0; |
1100 | ||
1101 | for (int i = 0; i < num_logshards; i++) { | |
1102 | string logshard; | |
1103 | get_logshard_oid(i, &logshard); | |
1104 | ||
81eedcae | 1105 | ldout(store->ctx(), 20) << "processing logshard = " << logshard << dendl; |
31f18b77 FG |
1106 | |
1107 | ret = process_single_logshard(i); | |
1108 | if (ret <0) { | |
1109 | return ret; | |
1110 | } | |
1111 | } | |
1112 | ||
1113 | return 0; | |
1114 | } | |
1115 | ||
1116 | bool RGWReshard::going_down() | |
1117 | { | |
1118 | return down_flag; | |
1119 | } | |
1120 | ||
1121 | void RGWReshard::start_processor() | |
1122 | { | |
1123 | worker = new ReshardWorker(store->ctx(), this); | |
1124 | worker->create("rgw_reshard"); | |
1125 | } | |
1126 | ||
1127 | void RGWReshard::stop_processor() | |
1128 | { | |
1129 | down_flag = true; | |
1130 | if (worker) { | |
1131 | worker->stop(); | |
1132 | worker->join(); | |
1133 | } | |
1134 | delete worker; | |
224ce89b | 1135 | worker = nullptr; |
31f18b77 FG |
1136 | } |
1137 | ||
1138 | void *RGWReshard::ReshardWorker::entry() { | |
1139 | utime_t last_run; | |
1140 | do { | |
1141 | utime_t start = ceph_clock_now(); | |
31f18b77 FG |
1142 | if (reshard->process_all_logshards()) { |
1143 | /* All shards have been processed properly. Next time we can start | |
1144 | * from this moment. */ | |
1145 | last_run = start; | |
1146 | } | |
31f18b77 FG |
1147 | |
1148 | if (reshard->going_down()) | |
1149 | break; | |
1150 | ||
1151 | utime_t end = ceph_clock_now(); | |
1152 | end -= start; | |
11fdf7f2 | 1153 | int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval"); |
31f18b77 FG |
1154 | |
1155 | if (secs <= end.sec()) | |
1156 | continue; // next round | |
1157 | ||
1158 | secs -= end.sec(); | |
1159 | ||
1160 | lock.Lock(); | |
1161 | cond.WaitInterval(lock, utime_t(secs, 0)); | |
1162 | lock.Unlock(); | |
1163 | } while (!reshard->going_down()); | |
1164 | ||
1165 | return NULL; | |
1166 | } | |
1167 | ||
1168 | void RGWReshard::ReshardWorker::stop() | |
1169 | { | |
1170 | Mutex::Locker l(lock); | |
1171 | cond.Signal(); | |
1172 | } |