]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_reshard.cc
import ceph 12.2.12
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits>
5
6 #include "rgw_rados.h"
7 #include "rgw_bucket.h"
8 #include "rgw_reshard.h"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/lock/cls_lock_client.h"
11 #include "common/errno.h"
12 #include "common/ceph_json.h"
13
14 #include "common/dout.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rgw
18
19 const string reshard_oid_prefix = "reshard.";
20 const string reshard_lock_name = "reshard_process";
21 const string bucket_instance_lock_name = "bucket_instance_lock";
22
23 using namespace std;
24
25 #define RESHARD_SHARD_WINDOW 64
26 #define RESHARD_MAX_AIO 128
27
28
29 class BucketReshardShard {
30 RGWRados *store;
31 const RGWBucketInfo& bucket_info;
32 int num_shard;
33 RGWRados::BucketShard bs;
34 vector<rgw_cls_bi_entry> entries;
35 map<uint8_t, rgw_bucket_category_stats> stats;
36 deque<librados::AioCompletion *>& aio_completions;
37
38 int wait_next_completion() {
39 librados::AioCompletion *c = aio_completions.front();
40 aio_completions.pop_front();
41
42 c->wait_for_safe();
43
44 int ret = c->get_return_value();
45 c->release();
46
47 if (ret < 0) {
48 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
49 return ret;
50 }
51
52 return 0;
53 }
54
55 int get_completion(librados::AioCompletion **c) {
56 if (aio_completions.size() >= RESHARD_MAX_AIO) {
57 int ret = wait_next_completion();
58 if (ret < 0) {
59 return ret;
60 }
61 }
62
63 *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
64 aio_completions.push_back(*c);
65
66 return 0;
67 }
68
69 public:
70 BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
71 int _num_shard,
72 deque<librados::AioCompletion *>& _completions) :
73 store(_store), bucket_info(_bucket_info), bs(store),
74 aio_completions(_completions)
75 {
76 num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
77 bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */);
78 }
79
80 int get_num_shard() {
81 return num_shard;
82 }
83
84 int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
85 const rgw_bucket_category_stats& entry_stats) {
86 entries.push_back(entry);
87 if (account) {
88 rgw_bucket_category_stats& target = stats[category];
89 target.num_entries += entry_stats.num_entries;
90 target.total_size += entry_stats.total_size;
91 target.total_size_rounded += entry_stats.total_size_rounded;
92 target.actual_size += entry_stats.actual_size;
93 }
94 if (entries.size() >= RESHARD_SHARD_WINDOW) {
95 int ret = flush();
96 if (ret < 0) {
97 return ret;
98 }
99 }
100
101 return 0;
102 }
103
104 int flush() {
105 if (entries.size() == 0) {
106 return 0;
107 }
108
109 librados::ObjectWriteOperation op;
110 for (auto& entry : entries) {
111 store->bi_put(op, bs, entry);
112 }
113 cls_rgw_bucket_update_stats(op, false, stats);
114
115 librados::AioCompletion *c;
116 int ret = get_completion(&c);
117 if (ret < 0) {
118 return ret;
119 }
120 ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
121 if (ret < 0) {
122 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
123 return ret;
124 }
125 entries.clear();
126 stats.clear();
127 return 0;
128 }
129
130 int wait_all_aio() {
131 int ret = 0;
132 while (!aio_completions.empty()) {
133 int r = wait_next_completion();
134 if (r < 0) {
135 ret = r;
136 }
137 }
138 return ret;
139 }
140 }; // class BucketReshardShard
141
142
143 class BucketReshardManager {
144 RGWRados *store;
145 const RGWBucketInfo& target_bucket_info;
146 deque<librados::AioCompletion *> completions;
147 int num_target_shards;
148 vector<BucketReshardShard *> target_shards;
149
150 public:
151 BucketReshardManager(RGWRados *_store,
152 const RGWBucketInfo& _target_bucket_info,
153 int _num_target_shards) :
154 store(_store), target_bucket_info(_target_bucket_info),
155 num_target_shards(_num_target_shards)
156 {
157 target_shards.resize(num_target_shards);
158 for (int i = 0; i < num_target_shards; ++i) {
159 target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
160 }
161 }
162
163 ~BucketReshardManager() {
164 for (auto& shard : target_shards) {
165 int ret = shard->wait_all_aio();
166 if (ret < 0) {
167 ldout(store->ctx(), 20) << __func__ <<
168 ": shard->wait_all_aio() returned ret=" << ret << dendl;
169 }
170 }
171 }
172
173 int add_entry(int shard_index,
174 rgw_cls_bi_entry& entry, bool account, uint8_t category,
175 const rgw_bucket_category_stats& entry_stats) {
176 int ret = target_shards[shard_index]->add_entry(entry, account, category,
177 entry_stats);
178 if (ret < 0) {
179 derr << "ERROR: target_shards.add_entry(" << entry.idx <<
180 ") returned error: " << cpp_strerror(-ret) << dendl;
181 return ret;
182 }
183
184 return 0;
185 }
186
187 int finish() {
188 int ret = 0;
189 for (auto& shard : target_shards) {
190 int r = shard->flush();
191 if (r < 0) {
192 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
193 ret = r;
194 }
195 }
196 for (auto& shard : target_shards) {
197 int r = shard->wait_all_aio();
198 if (r < 0) {
199 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
200 ret = r;
201 }
202 delete shard;
203 }
204 target_shards.clear();
205 return ret;
206 }
207 }; // class BucketReshardManager
208
209 RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
210 const RGWBucketInfo& _bucket_info,
211 const map<string, bufferlist>& _bucket_attrs,
212 RGWBucketReshardLock* _outer_reshard_lock) :
213 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
214 reshard_lock(store, bucket_info, true),
215 outer_reshard_lock(_outer_reshard_lock)
216 { }
217
218 int RGWBucketReshard::set_resharding_status(RGWRados* store,
219 const RGWBucketInfo& bucket_info,
220 const string& new_instance_id,
221 int32_t num_shards,
222 cls_rgw_reshard_status status)
223 {
224 if (new_instance_id.empty()) {
225 ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
226 return -EINVAL;
227 }
228
229 cls_rgw_bucket_instance_entry instance_entry;
230 instance_entry.set_status(new_instance_id, num_shards, status);
231
232 int ret = store->bucket_set_reshard(bucket_info, instance_entry);
233 if (ret < 0) {
234 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
235 << cpp_strerror(-ret) << dendl;
236 return ret;
237 }
238 return 0;
239 }
240
241 // reshard lock assumes lock is held
242 int RGWBucketReshard::clear_resharding(RGWRados* store,
243 const RGWBucketInfo& bucket_info)
244 {
245 int ret = clear_index_shard_reshard_status(store, bucket_info);
246 if (ret < 0) {
247 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
248 " ERROR: error clearing reshard status from index shard " <<
249 cpp_strerror(-ret) << dendl;
250 return ret;
251 }
252
253 cls_rgw_bucket_instance_entry instance_entry;
254 ret = store->bucket_set_reshard(bucket_info, instance_entry);
255 if (ret < 0) {
256 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ <<
257 " ERROR: error setting bucket resharding flag on bucket index: " <<
258 cpp_strerror(-ret) << dendl;
259 return ret;
260 }
261
262 return 0;
263 }
264
265 int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados* store,
266 const RGWBucketInfo& bucket_info)
267 {
268 uint32_t num_shards = bucket_info.num_shards;
269
270 if (num_shards < std::numeric_limits<uint32_t>::max()) {
271 int ret = set_resharding_status(store, bucket_info,
272 bucket_info.bucket.bucket_id,
273 (num_shards < 1 ? 1 : num_shards),
274 CLS_RGW_RESHARD_NONE);
275 if (ret < 0) {
276 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
277 " ERROR: error clearing reshard status from index shard " <<
278 cpp_strerror(-ret) << dendl;
279 return ret;
280 }
281 }
282
283 return 0;
284 }
285
286 static int create_new_bucket_instance(RGWRados *store,
287 int new_num_shards,
288 const RGWBucketInfo& bucket_info,
289 map<string, bufferlist>& attrs,
290 RGWBucketInfo& new_bucket_info)
291 {
292 new_bucket_info = bucket_info;
293
294 store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
295 new_bucket_info.bucket.oid.clear();
296
297 new_bucket_info.num_shards = new_num_shards;
298 new_bucket_info.objv_tracker.clear();
299
300 new_bucket_info.new_bucket_instance_id.clear();
301 new_bucket_info.reshard_status = 0;
302
303 int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
304 if (ret < 0) {
305 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
306 return ret;
307 }
308
309 ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
310 if (ret < 0) {
311 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
312 return ret;
313 }
314
315 return 0;
316 }
317
318 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
319 RGWBucketInfo& new_bucket_info)
320 {
321 return ::create_new_bucket_instance(store, new_num_shards,
322 bucket_info, bucket_attrs, new_bucket_info);
323 }
324
325 int RGWBucketReshard::cancel()
326 {
327 int ret = reshard_lock.lock();
328 if (ret < 0) {
329 return ret;
330 }
331
332 ret = clear_resharding();
333
334 reshard_lock.unlock();
335 return ret;
336 }
337
338 class BucketInfoReshardUpdate
339 {
340 RGWRados *store;
341 RGWBucketInfo bucket_info;
342 std::map<string, bufferlist> bucket_attrs;
343
344 bool in_progress{false};
345
346 int set_status(cls_rgw_reshard_status s) {
347 bucket_info.reshard_status = s;
348 int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
349 if (ret < 0) {
350 ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
351 return ret;
352 }
353 return 0;
354 }
355
356 public:
357 BucketInfoReshardUpdate(RGWRados *_store,
358 RGWBucketInfo& _bucket_info,
359 map<string, bufferlist>& _bucket_attrs,
360 const string& new_bucket_id) :
361 store(_store),
362 bucket_info(_bucket_info),
363 bucket_attrs(_bucket_attrs)
364 {
365 bucket_info.new_bucket_instance_id = new_bucket_id;
366 }
367
368 ~BucketInfoReshardUpdate() {
369 if (in_progress) {
370 // resharding must not have ended correctly, clean up
371 int ret =
372 RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info);
373 if (ret < 0) {
374 lderr(store->ctx()) << "Error: " << __func__ <<
375 " clear_index_shard_status returned " << ret << dendl;
376 }
377 bucket_info.new_bucket_instance_id.clear();
378 set_status(CLS_RGW_RESHARD_NONE); // clears new_bucket_instance as well
379 }
380 }
381
382 int start() {
383 int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS);
384 if (ret < 0) {
385 return ret;
386 }
387 in_progress = true;
388 return 0;
389 }
390
391 int complete() {
392 int ret = set_status(CLS_RGW_RESHARD_DONE);
393 if (ret < 0) {
394 return ret;
395 }
396 in_progress = false;
397 return 0;
398 }
399 };
400
401
402 RGWBucketReshardLock::RGWBucketReshardLock(RGWRados* _store,
403 const std::string& reshard_lock_oid,
404 bool _ephemeral) :
405 store(_store),
406 lock_oid(reshard_lock_oid),
407 ephemeral(_ephemeral),
408 internal_lock(reshard_lock_name)
409 {
410 const int lock_dur_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
411 duration = std::chrono::seconds(lock_dur_secs);
412
413 #define COOKIE_LEN 16
414 char cookie_buf[COOKIE_LEN + 1];
415 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
416 cookie_buf[COOKIE_LEN] = '\0';
417
418 internal_lock.set_cookie(cookie_buf);
419 internal_lock.set_duration(duration);
420 }
421
422 int RGWBucketReshardLock::lock() {
423 internal_lock.set_must_renew(false);
424 int ret;
425 if (ephemeral) {
426 ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
427 lock_oid);
428 } else {
429 ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
430 }
431 if (ret < 0) {
432 ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ <<
433 " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl;
434 return ret;
435 }
436 reset_time(Clock::now());
437
438 return 0;
439 }
440
441 void RGWBucketReshardLock::unlock() {
442 int ret = internal_lock.unlock(&store->reshard_pool_ctx, lock_oid);
443 if (ret < 0) {
444 ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
445 " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
446 }
447 }
448
449 int RGWBucketReshardLock::renew(const Clock::time_point& now) {
450 internal_lock.set_must_renew(true);
451 int ret;
452 if (ephemeral) {
453 ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
454 lock_oid);
455 } else {
456 ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
457 }
458 if (ret < 0) { /* expired or already locked by another processor */
459 ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
460 lock_oid << " with " << cpp_strerror(-ret) << dendl;
461 return ret;
462 }
463 internal_lock.set_must_renew(false);
464
465 reset_time(now);
466 ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
467 lock_oid << dendl;
468
469 return 0;
470 }
471
472
473 int RGWBucketReshard::do_reshard(int num_shards,
474 RGWBucketInfo& new_bucket_info,
475 int max_entries,
476 bool verbose,
477 ostream *out,
478 Formatter *formatter)
479 {
480 rgw_bucket& bucket = bucket_info.bucket;
481
482 int ret = 0;
483
484 if (out) {
485 (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
486 (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
487 (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id <<
488 std::endl;
489 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
490 std::endl;
491 }
492
493 /* update bucket info -- in progress*/
494 list<rgw_cls_bi_entry> entries;
495
496 if (max_entries < 0) {
497 ldout(store->ctx(), 0) << __func__ <<
498 ": can't reshard, negative max_entries" << dendl;
499 return -EINVAL;
500 }
501
502 // NB: destructor cleans up sharding state if reshard does not
503 // complete successfully
504 BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
505
506 ret = bucket_info_updater.start();
507 if (ret < 0) {
508 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
509 return ret;
510 }
511
512 int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
513
514 BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
515
516 verbose = verbose && (formatter != nullptr);
517
518 if (verbose) {
519 formatter->open_array_section("entries");
520 }
521
522 uint64_t total_entries = 0;
523
524 if (!verbose) {
525 cout << "total entries:";
526 }
527
528 const int num_source_shards =
529 (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
530 string marker;
531 for (int i = 0; i < num_source_shards; ++i) {
532 bool is_truncated = true;
533 marker.clear();
534 while (is_truncated) {
535 entries.clear();
536 ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
537 if (ret < 0 && ret != -ENOENT) {
538 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
539 return ret;
540 }
541
542 for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
543 rgw_cls_bi_entry& entry = *iter;
544 if (verbose) {
545 formatter->open_object_section("entry");
546
547 encode_json("shard_id", i, formatter);
548 encode_json("num_entry", total_entries, formatter);
549 encode_json("entry", entry, formatter);
550 }
551 total_entries++;
552
553 marker = entry.idx;
554
555 int target_shard_id;
556 cls_rgw_obj_key cls_key;
557 uint8_t category;
558 rgw_bucket_category_stats stats;
559 bool account = entry.get_info(&cls_key, &category, &stats);
560 rgw_obj_key key(cls_key);
561 rgw_obj obj(new_bucket_info.bucket, key);
562 int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
563 if (ret < 0) {
564 lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
565 return ret;
566 }
567
568 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
569
570 ret = target_shards_mgr.add_entry(shard_index, entry, account,
571 category, stats);
572 if (ret < 0) {
573 return ret;
574 }
575
576 Clock::time_point now = Clock::now();
577 if (reshard_lock.should_renew(now)) {
578 // assume outer locks have timespans at least the size of ours, so
579 // can call inside conditional
580 if (outer_reshard_lock) {
581 ret = outer_reshard_lock->renew(now);
582 if (ret < 0) {
583 return ret;
584 }
585 }
586 ret = reshard_lock.renew(now);
587 if (ret < 0) {
588 lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
589 return ret;
590 }
591 }
592
593 if (verbose) {
594 formatter->close_section();
595 if (out) {
596 formatter->flush(*out);
597 }
598 } else if (out && !(total_entries % 1000)) {
599 (*out) << " " << total_entries;
600 }
601 } // entries loop
602 }
603 }
604
605 if (verbose) {
606 formatter->close_section();
607 if (out) {
608 formatter->flush(*out);
609 }
610 } else if (out) {
611 (*out) << " " << total_entries << std::endl;
612 }
613
614 ret = target_shards_mgr.finish();
615 if (ret < 0) {
616 lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
617 return -EIO;
618 }
619
620 ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
621 if (ret < 0) {
622 lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
623 return ret;
624 }
625
626 ret = bucket_info_updater.complete();
627 if (ret < 0) {
628 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
629 /* don't error out, reshard process succeeded */
630 }
631
632 return 0;
633 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
634 } // RGWBucketReshard::do_reshard
635
636 int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
637 {
638 librados::IoCtx index_ctx;
639 map<int, string> bucket_objs;
640
641 int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs);
642 if (r < 0) {
643 return r;
644 }
645
646 for (auto i : bucket_objs) {
647 cls_rgw_bucket_instance_entry entry;
648
649 int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry);
650 if (ret < 0 && ret != -ENOENT) {
651 lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
652 return ret;
653 }
654
655 status->push_back(entry);
656 }
657
658 return 0;
659 }
660
661
662 int RGWBucketReshard::execute(int num_shards, int max_op_entries,
663 bool verbose, ostream *out, Formatter *formatter,
664 RGWReshard* reshard_log)
665 {
666 Clock::time_point now;
667
668 int ret = reshard_lock.lock();
669 if (ret < 0) {
670 return ret;
671 }
672
673 RGWBucketInfo new_bucket_info;
674 ret = create_new_bucket_instance(num_shards, new_bucket_info);
675 if (ret < 0) {
676 // shard state is uncertain, but this will attempt to remove them anyway
677 goto error_out;
678 }
679
680 if (reshard_log) {
681 ret = reshard_log->update(bucket_info, new_bucket_info);
682 if (ret < 0) {
683 goto error_out;
684 }
685 }
686
687 // set resharding status of current bucket_info & shards with
688 // information about planned resharding
689 ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
690 num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
691 if (ret < 0) {
692 reshard_lock.unlock();
693 return ret;
694 }
695
696 ret = do_reshard(num_shards,
697 new_bucket_info,
698 max_op_entries,
699 verbose, out, formatter);
700 if (ret < 0) {
701 goto error_out;
702 }
703
704 // at this point we've done the main work; we'll make a best-effort
705 // to clean-up but will not indicate any errors encountered
706
707 reshard_lock.unlock();
708
709 // resharding successful, so remove old bucket index shards; use
710 // best effort and don't report out an error; the lock isn't needed
711 // at this point since all we're using a best effor to to remove old
712 // shard objects
713 ret = store->clean_bucket_index(bucket_info, bucket_info.num_shards);
714 if (ret < 0) {
715 lderr(store->ctx()) << "Error: " << __func__ <<
716 " failed to clean up old shards; " <<
717 "RGWRados::clean_bucket_index returned " << ret << dendl;
718 }
719
720 ret = rgw_bucket_instance_remove_entry(store,
721 bucket_info.bucket.get_key(),
722 nullptr);
723 if (ret < 0) {
724 lderr(store->ctx()) << "Error: " << __func__ <<
725 " failed to clean old bucket info object \"" <<
726 bucket_info.bucket.get_key() <<
727 "\"created after successful resharding with error " << ret << dendl;
728 }
729
730 ldout(store->ctx(), 1) << __func__ <<
731 " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
732 bucket_info.bucket.get_key() << "\" to \"" <<
733 new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
734
735 return 0;
736
737 error_out:
738
739 reshard_lock.unlock();
740
741 // since the real problem is the issue that led to this error code
742 // path, we won't touch ret and instead use another variable to
743 // temporarily error codes
744 int ret2 = store->clean_bucket_index(new_bucket_info,
745 new_bucket_info.num_shards);
746 if (ret2 < 0) {
747 lderr(store->ctx()) << "Error: " << __func__ <<
748 " failed to clean up shards from failed incomplete resharding; " <<
749 "RGWRados::clean_bucket_index returned " << ret2 << dendl;
750 }
751
752 ret2 = rgw_bucket_instance_remove_entry(store,
753 new_bucket_info.bucket.get_key(),
754 nullptr);
755 if (ret2 < 0) {
756 lderr(store->ctx()) << "Error: " << __func__ <<
757 " failed to clean bucket info object \"" <<
758 new_bucket_info.bucket.get_key() <<
759 "\"created during incomplete resharding with error " << ret2 << dendl;
760 }
761
762 return ret;
763 } // execute
764
765
766 RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
767 Formatter *_formatter) :
768 store(_store), instance_lock(bucket_instance_lock_name),
769 verbose(_verbose), out(_out), formatter(_formatter)
770 {
771 num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
772 }
773
774 string RGWReshard::get_logshard_key(const string& tenant,
775 const string& bucket_name)
776 {
777 return tenant + ":" + bucket_name;
778 }
779
780 #define MAX_RESHARD_LOGSHARDS_PRIME 7877
781
782 void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
783 {
784 string key = get_logshard_key(tenant, bucket_name);
785
786 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
787 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
788 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
789
790 get_logshard_oid(int(sid), oid);
791 }
792
793 int RGWReshard::add(cls_rgw_reshard_entry& entry)
794 {
795 if (!store->can_reshard()) {
796 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
797 return 0;
798 }
799
800 string logshard_oid;
801
802 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
803
804 librados::ObjectWriteOperation op;
805 cls_rgw_reshard_add(op, entry);
806
807 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
808 if (ret < 0) {
809 lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
810 return ret;
811 }
812 return 0;
813 }
814
815 int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
816 {
817 cls_rgw_reshard_entry entry;
818 entry.bucket_name = bucket_info.bucket.name;
819 entry.bucket_id = bucket_info.bucket.bucket_id;
820 entry.tenant = bucket_info.owner.tenant;
821
822 int ret = get(entry);
823 if (ret < 0) {
824 return ret;
825 }
826
827 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
828
829 ret = add(entry);
830 if (ret < 0) {
831 ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
832 cpp_strerror(-ret) << dendl;
833 }
834
835 return ret;
836 }
837
838
839 int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
840 {
841 string logshard_oid;
842
843 get_logshard_oid(logshard_num, &logshard_oid);
844
845 int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
846
847 if (ret < 0) {
848 if (ret == -ENOENT) {
849 *is_truncated = false;
850 ret = 0;
851 }
852 lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
853 if (ret == -EACCES) {
854 lderr(store->ctx()) << "access denied to pool " << store->get_zone_params().reshard_pool
855 << ". Fix the pool access permissions of your client" << dendl;
856 }
857 }
858
859 return ret;
860 }
861
862 int RGWReshard::get(cls_rgw_reshard_entry& entry)
863 {
864 string logshard_oid;
865
866 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
867
868 int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
869 if (ret < 0) {
870 if (ret != -ENOENT) {
871 lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
872 " bucket=" << entry.bucket_name << dendl;
873 }
874 return ret;
875 }
876
877 return 0;
878 }
879
880 int RGWReshard::remove(cls_rgw_reshard_entry& entry)
881 {
882 string logshard_oid;
883
884 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
885
886 librados::ObjectWriteOperation op;
887 cls_rgw_reshard_remove(op, entry);
888
889 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
890 if (ret < 0) {
891 lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
892 return ret;
893 }
894
895 return ret;
896 }
897
898 int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
899 {
900 int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
901 if (ret < 0) {
902 lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
903 return ret;
904 }
905
906 return 0;
907 }
908
909 const int num_retries = 10;
910 const int default_reshard_sleep_duration = 5;
911
912 int RGWReshardWait::do_wait()
913 {
914 Mutex::Locker l(lock);
915
916 cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0));
917
918 if (going_down) {
919 return -ECANCELED;
920 }
921
922 return 0;
923 }
924
925 int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs,
926 string *new_bucket_id,
927 const RGWBucketInfo& bucket_info)
928 {
929 int ret = 0;
930 cls_rgw_bucket_instance_entry entry;
931
932 for (int i=0; i < num_retries; i++) {
933 ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
934 if (ret < 0) {
935 ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :" <<
936 cpp_strerror(-ret)<< dendl;
937 return ret;
938 }
939 if (!entry.resharding_in_progress()) {
940 *new_bucket_id = entry.new_bucket_instance_id;
941 return 0;
942 }
943 ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
944
945 if (i == num_retries - 1) {
946 break;
947 }
948
949 // If bucket is erroneously marked as resharding (e.g., crash or
950 // other error) then fix it. If we can take the bucket reshard
951 // lock then it means no other resharding should be taking place,
952 // and we're free to clear the flags.
953 {
954 // since we expect to do this rarely, we'll do our work in a
955 // block and erase our work after each try
956
957 RGWObjectCtx obj_ctx(bs->store);
958 const rgw_bucket& b = bs->bucket;
959 std::string bucket_id = b.get_key();
960 RGWBucketReshardLock reshard_lock(bs->store, bucket_info, true);
961 ret = reshard_lock.lock();
962 if (ret < 0) {
963 ldout(store->ctx(), 20) << __func__ <<
964 " INFO: failed to take reshard lock for bucket " <<
965 bucket_id << "; expected if resharding underway" << dendl;
966 } else {
967 ldout(store->ctx(), 10) << __func__ <<
968 " INFO: was able to take reshard lock for bucket " <<
969 bucket_id << dendl;
970 ret = RGWBucketReshard::clear_resharding(bs->store, bucket_info);
971 if (ret < 0) {
972 reshard_lock.unlock();
973 ldout(store->ctx(), 0) << __func__ <<
974 " ERROR: failed to clear resharding flags for bucket " <<
975 bucket_id << dendl;
976 } else {
977 reshard_lock.unlock();
978 ldout(store->ctx(), 5) << __func__ <<
979 " INFO: apparently successfully cleared resharding flags for "
980 "bucket " << bucket_id << dendl;
981 continue; // if we apparently succeed immediately test again
982 } // if clear resharding succeeded
983 } // if taking of lock succeeded
984 } // block to encapsulate recovery from incomplete reshard
985
986 ret = do_wait();
987 if (ret < 0) {
988 ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
989 return ret;
990 }
991 }
992 ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
993 return -ERR_BUSY_RESHARDING;
994 }
995
996 int RGWReshard::process_single_logshard(int logshard_num)
997 {
998 string marker;
999 bool truncated = true;
1000
1001 CephContext *cct = store->ctx();
1002 constexpr uint32_t max_entries = 1000;
1003
1004 string logshard_oid;
1005 get_logshard_oid(logshard_num, &logshard_oid);
1006
1007 RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
1008
1009 int ret = logshard_lock.lock();
1010 if (ret == -EBUSY) { /* already locked by another processor */
1011 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
1012 logshard_oid << dendl;
1013 return ret;
1014 }
1015
1016 do {
1017 std::list<cls_rgw_reshard_entry> entries;
1018 ret = list(logshard_num, marker, max_entries, entries, &truncated);
1019 if (ret < 0) {
1020 ldout(cct, 10) << "cannot list all reshards in logshard oid=" <<
1021 logshard_oid << dendl;
1022 continue;
1023 }
1024
1025 for(auto& entry: entries) { // logshard entries
1026 if(entry.new_instance_id.empty()) {
1027
1028 ldout(store->ctx(), 20) << __func__ << " resharding " <<
1029 entry.bucket_name << dendl;
1030
1031 RGWObjectCtx obj_ctx(store);
1032 rgw_bucket bucket;
1033 RGWBucketInfo bucket_info;
1034 map<string, bufferlist> attrs;
1035
1036 ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
1037 bucket_info, nullptr, &attrs);
1038 if (ret < 0) {
1039 ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " <<
1040 cpp_strerror(-ret) << dendl;
1041 return -ret;
1042 }
1043
1044 RGWBucketReshard br(store, bucket_info, attrs, nullptr);
1045
1046 Formatter* formatter = new JSONFormatter(false);
1047 auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
1048 ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
1049 formatter, this);
1050 if (ret < 0) {
1051 ldout (store->ctx(), 0) << __func__ <<
1052 "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
1053 cpp_strerror(-ret)<< dendl;
1054 return ret;
1055 }
1056
1057 ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name <<
1058 dendl;
1059
1060 ret = remove(entry);
1061 if (ret < 0) {
1062 ldout(cct, 0)<< __func__ << ":Error removing bucket " <<
1063 entry.bucket_name << " for resharding queue: " <<
1064 cpp_strerror(-ret) << dendl;
1065 return ret;
1066 }
1067 }
1068
1069 Clock::time_point now = Clock::now();
1070 if (logshard_lock.should_renew(now)) {
1071 ret = logshard_lock.renew(now);
1072 if (ret < 0) {
1073 return ret;
1074 }
1075 }
1076
1077 entry.get_key(&marker);
1078 }
1079 } while (truncated);
1080
1081 logshard_lock.unlock();
1082 return 0;
1083 }
1084
1085
1086 void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
1087 {
1088 char buf[32];
1089 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
1090
1091 string objname(reshard_oid_prefix);
1092 *logshard = objname + buf;
1093 }
1094
1095 int RGWReshard::process_all_logshards()
1096 {
1097 if (!store->can_reshard()) {
1098 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
1099 return 0;
1100 }
1101 int ret = 0;
1102
1103 for (int i = 0; i < num_logshards; i++) {
1104 string logshard;
1105 get_logshard_oid(i, &logshard);
1106
1107 ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl;
1108
1109 ret = process_single_logshard(i);
1110 if (ret <0) {
1111 return ret;
1112 }
1113 }
1114
1115 return 0;
1116 }
1117
1118 bool RGWReshard::going_down()
1119 {
1120 return down_flag;
1121 }
1122
1123 void RGWReshard::start_processor()
1124 {
1125 worker = new ReshardWorker(store->ctx(), this);
1126 worker->create("rgw_reshard");
1127 }
1128
1129 void RGWReshard::stop_processor()
1130 {
1131 down_flag = true;
1132 if (worker) {
1133 worker->stop();
1134 worker->join();
1135 }
1136 delete worker;
1137 worker = nullptr;
1138 }
1139
1140 void *RGWReshard::ReshardWorker::entry() {
1141 utime_t last_run;
1142 do {
1143 utime_t start = ceph_clock_now();
1144 if (reshard->process_all_logshards()) {
1145 /* All shards have been processed properly. Next time we can start
1146 * from this moment. */
1147 last_run = start;
1148 }
1149
1150 if (reshard->going_down())
1151 break;
1152
1153 utime_t end = ceph_clock_now();
1154 end -= start;
1155 int secs = cct->_conf->rgw_reshard_thread_interval;
1156
1157 if (secs <= end.sec())
1158 continue; // next round
1159
1160 secs -= end.sec();
1161
1162 lock.Lock();
1163 cond.WaitInterval(lock, utime_t(secs, 0));
1164 lock.Unlock();
1165 } while (!reshard->going_down());
1166
1167 return NULL;
1168 }
1169
1170 void RGWReshard::ReshardWorker::stop()
1171 {
1172 Mutex::Locker l(lock);
1173 cond.Signal();
1174 }