]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_reshard.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <limits>
5 #include <sstream>
6
7 #include "rgw_rados.h"
8 #include "rgw_zone.h"
9 #include "rgw_bucket.h"
10 #include "rgw_reshard.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15
16 #include "common/dout.h"
17
18 #include "services/svc_zone.h"
19 #include "services/svc_sys_obj.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
23
24 const string reshard_oid_prefix = "reshard.";
25 const string reshard_lock_name = "reshard_process";
26 const string bucket_instance_lock_name = "bucket_instance_lock";
27
28
29 class BucketReshardShard {
30 RGWRados *store;
31 const RGWBucketInfo& bucket_info;
32 int num_shard;
33 RGWRados::BucketShard bs;
34 vector<rgw_cls_bi_entry> entries;
35 map<RGWObjCategory, rgw_bucket_category_stats> stats;
36 deque<librados::AioCompletion *>& aio_completions;
37 uint64_t max_aio_completions;
38 uint64_t reshard_shard_batch_size;
39
40 int wait_next_completion() {
41 librados::AioCompletion *c = aio_completions.front();
42 aio_completions.pop_front();
43
44 c->wait_for_safe();
45
46 int ret = c->get_return_value();
47 c->release();
48
49 if (ret < 0) {
50 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
51 return ret;
52 }
53
54 return 0;
55 }
56
57 int get_completion(librados::AioCompletion **c) {
58 if (aio_completions.size() >= max_aio_completions) {
59 int ret = wait_next_completion();
60 if (ret < 0) {
61 return ret;
62 }
63 }
64
65 *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
66 aio_completions.push_back(*c);
67
68 return 0;
69 }
70
71 public:
72 BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
73 int _num_shard,
74 deque<librados::AioCompletion *>& _completions) :
75 store(_store), bucket_info(_bucket_info), bs(store),
76 aio_completions(_completions)
77 {
78 num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
79 bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */);
80
81 max_aio_completions =
82 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio");
83 reshard_shard_batch_size =
84 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size");
85 }
86
87 int get_num_shard() {
88 return num_shard;
89 }
90
91 int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
92 const rgw_bucket_category_stats& entry_stats) {
93 entries.push_back(entry);
94 if (account) {
95 rgw_bucket_category_stats& target = stats[category];
96 target.num_entries += entry_stats.num_entries;
97 target.total_size += entry_stats.total_size;
98 target.total_size_rounded += entry_stats.total_size_rounded;
99 target.actual_size += entry_stats.actual_size;
100 }
101 if (entries.size() >= reshard_shard_batch_size) {
102 int ret = flush();
103 if (ret < 0) {
104 return ret;
105 }
106 }
107
108 return 0;
109 }
110
111 int flush() {
112 if (entries.size() == 0) {
113 return 0;
114 }
115
116 librados::ObjectWriteOperation op;
117 for (auto& entry : entries) {
118 store->bi_put(op, bs, entry);
119 }
120 cls_rgw_bucket_update_stats(op, false, stats);
121
122 librados::AioCompletion *c;
123 int ret = get_completion(&c);
124 if (ret < 0) {
125 return ret;
126 }
127 ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
128 if (ret < 0) {
129 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
130 return ret;
131 }
132 entries.clear();
133 stats.clear();
134 return 0;
135 }
136
137 int wait_all_aio() {
138 int ret = 0;
139 while (!aio_completions.empty()) {
140 int r = wait_next_completion();
141 if (r < 0) {
142 ret = r;
143 }
144 }
145 return ret;
146 }
147 }; // class BucketReshardShard
148
149
150 class BucketReshardManager {
151 RGWRados *store;
152 const RGWBucketInfo& target_bucket_info;
153 deque<librados::AioCompletion *> completions;
154 int num_target_shards;
155 vector<BucketReshardShard *> target_shards;
156
157 public:
158 BucketReshardManager(RGWRados *_store,
159 const RGWBucketInfo& _target_bucket_info,
160 int _num_target_shards) :
161 store(_store), target_bucket_info(_target_bucket_info),
162 num_target_shards(_num_target_shards)
163 {
164 target_shards.resize(num_target_shards);
165 for (int i = 0; i < num_target_shards; ++i) {
166 target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
167 }
168 }
169
170 ~BucketReshardManager() {
171 for (auto& shard : target_shards) {
172 int ret = shard->wait_all_aio();
173 if (ret < 0) {
174 ldout(store->ctx(), 20) << __func__ <<
175 ": shard->wait_all_aio() returned ret=" << ret << dendl;
176 }
177 }
178 }
179
180 int add_entry(int shard_index,
181 rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
182 const rgw_bucket_category_stats& entry_stats) {
183 int ret = target_shards[shard_index]->add_entry(entry, account, category,
184 entry_stats);
185 if (ret < 0) {
186 derr << "ERROR: target_shards.add_entry(" << entry.idx <<
187 ") returned error: " << cpp_strerror(-ret) << dendl;
188 return ret;
189 }
190
191 return 0;
192 }
193
194 int finish() {
195 int ret = 0;
196 for (auto& shard : target_shards) {
197 int r = shard->flush();
198 if (r < 0) {
199 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
200 ret = r;
201 }
202 }
203 for (auto& shard : target_shards) {
204 int r = shard->wait_all_aio();
205 if (r < 0) {
206 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
207 ret = r;
208 }
209 delete shard;
210 }
211 target_shards.clear();
212 return ret;
213 }
214 }; // class BucketReshardManager
215
216 RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
217 const RGWBucketInfo& _bucket_info,
218 const map<string, bufferlist>& _bucket_attrs,
219 RGWBucketReshardLock* _outer_reshard_lock) :
220 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
221 reshard_lock(store, bucket_info, true),
222 outer_reshard_lock(_outer_reshard_lock)
223 { }
224
225 int RGWBucketReshard::set_resharding_status(RGWRados* store,
226 const RGWBucketInfo& bucket_info,
227 const string& new_instance_id,
228 int32_t num_shards,
229 cls_rgw_reshard_status status)
230 {
231 if (new_instance_id.empty()) {
232 ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
233 return -EINVAL;
234 }
235
236 cls_rgw_bucket_instance_entry instance_entry;
237 instance_entry.set_status(new_instance_id, num_shards, status);
238
239 int ret = store->bucket_set_reshard(bucket_info, instance_entry);
240 if (ret < 0) {
241 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
242 << cpp_strerror(-ret) << dendl;
243 return ret;
244 }
245 return 0;
246 }
247
248 // reshard lock assumes lock is held
249 int RGWBucketReshard::clear_resharding(RGWRados* store,
250 const RGWBucketInfo& bucket_info)
251 {
252 int ret = clear_index_shard_reshard_status(store, bucket_info);
253 if (ret < 0) {
254 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
255 " ERROR: error clearing reshard status from index shard " <<
256 cpp_strerror(-ret) << dendl;
257 return ret;
258 }
259
260 cls_rgw_bucket_instance_entry instance_entry;
261 ret = store->bucket_set_reshard(bucket_info, instance_entry);
262 if (ret < 0) {
263 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ <<
264 " ERROR: error setting bucket resharding flag on bucket index: " <<
265 cpp_strerror(-ret) << dendl;
266 return ret;
267 }
268
269 return 0;
270 }
271
272 int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados* store,
273 const RGWBucketInfo& bucket_info)
274 {
275 uint32_t num_shards = bucket_info.num_shards;
276
277 if (num_shards < std::numeric_limits<uint32_t>::max()) {
278 int ret = set_resharding_status(store, bucket_info,
279 bucket_info.bucket.bucket_id,
280 (num_shards < 1 ? 1 : num_shards),
281 CLS_RGW_RESHARD_NOT_RESHARDING);
282 if (ret < 0) {
283 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
284 " ERROR: error clearing reshard status from index shard " <<
285 cpp_strerror(-ret) << dendl;
286 return ret;
287 }
288 }
289
290 return 0;
291 }
292
293 static int create_new_bucket_instance(RGWRados *store,
294 int new_num_shards,
295 const RGWBucketInfo& bucket_info,
296 map<string, bufferlist>& attrs,
297 RGWBucketInfo& new_bucket_info)
298 {
299 new_bucket_info = bucket_info;
300
301 store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
302 new_bucket_info.bucket.oid.clear();
303
304 new_bucket_info.num_shards = new_num_shards;
305 new_bucket_info.objv_tracker.clear();
306
307 new_bucket_info.new_bucket_instance_id.clear();
308 new_bucket_info.reshard_status = 0;
309
310 int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
311 if (ret < 0) {
312 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
313 return ret;
314 }
315
316 ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
317 if (ret < 0) {
318 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
319 return ret;
320 }
321
322 return 0;
323 }
324
325 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
326 RGWBucketInfo& new_bucket_info)
327 {
328 return ::create_new_bucket_instance(store, new_num_shards,
329 bucket_info, bucket_attrs, new_bucket_info);
330 }
331
332 int RGWBucketReshard::cancel()
333 {
334 int ret = reshard_lock.lock();
335 if (ret < 0) {
336 return ret;
337 }
338
339 ret = clear_resharding();
340
341 reshard_lock.unlock();
342 return ret;
343 }
344
345 class BucketInfoReshardUpdate
346 {
347 RGWRados *store;
348 RGWBucketInfo bucket_info;
349 std::map<string, bufferlist> bucket_attrs;
350
351 bool in_progress{false};
352
353 int set_status(cls_rgw_reshard_status s) {
354 bucket_info.reshard_status = s;
355 int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
356 if (ret < 0) {
357 ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
358 return ret;
359 }
360 return 0;
361 }
362
363 public:
364 BucketInfoReshardUpdate(RGWRados *_store,
365 RGWBucketInfo& _bucket_info,
366 map<string, bufferlist>& _bucket_attrs,
367 const string& new_bucket_id) :
368 store(_store),
369 bucket_info(_bucket_info),
370 bucket_attrs(_bucket_attrs)
371 {
372 bucket_info.new_bucket_instance_id = new_bucket_id;
373 }
374
375 ~BucketInfoReshardUpdate() {
376 if (in_progress) {
377 // resharding must not have ended correctly, clean up
378 int ret =
379 RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info);
380 if (ret < 0) {
381 lderr(store->ctx()) << "Error: " << __func__ <<
382 " clear_index_shard_status returned " << ret << dendl;
383 }
384 bucket_info.new_bucket_instance_id.clear();
385 set_status(CLS_RGW_RESHARD_NOT_RESHARDING); // clears new_bucket_instance as well
386 }
387 }
388
389 int start() {
390 int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS);
391 if (ret < 0) {
392 return ret;
393 }
394 in_progress = true;
395 return 0;
396 }
397
398 int complete() {
399 int ret = set_status(CLS_RGW_RESHARD_DONE);
400 if (ret < 0) {
401 return ret;
402 }
403 in_progress = false;
404 return 0;
405 }
406 };
407
408
409 RGWBucketReshardLock::RGWBucketReshardLock(RGWRados* _store,
410 const std::string& reshard_lock_oid,
411 bool _ephemeral) :
412 store(_store),
413 lock_oid(reshard_lock_oid),
414 ephemeral(_ephemeral),
415 internal_lock(reshard_lock_name)
416 {
417 const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>(
418 "rgw_reshard_bucket_lock_duration");
419 duration = std::chrono::seconds(lock_dur_secs);
420
421 #define COOKIE_LEN 16
422 char cookie_buf[COOKIE_LEN + 1];
423 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
424 cookie_buf[COOKIE_LEN] = '\0';
425
426 internal_lock.set_cookie(cookie_buf);
427 internal_lock.set_duration(duration);
428 }
429
430 int RGWBucketReshardLock::lock() {
431 internal_lock.set_must_renew(false);
432 int ret;
433 if (ephemeral) {
434 ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
435 lock_oid);
436 } else {
437 ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
438 }
439 if (ret < 0) {
440 ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ <<
441 " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl;
442 return ret;
443 }
444 reset_time(Clock::now());
445
446 return 0;
447 }
448
449 void RGWBucketReshardLock::unlock() {
450 int ret = internal_lock.unlock(&store->reshard_pool_ctx, lock_oid);
451 if (ret < 0) {
452 ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
453 " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
454 }
455 }
456
457 int RGWBucketReshardLock::renew(const Clock::time_point& now) {
458 internal_lock.set_must_renew(true);
459 int ret;
460 if (ephemeral) {
461 ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
462 lock_oid);
463 } else {
464 ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
465 }
466 if (ret < 0) { /* expired or already locked by another processor */
467 std::stringstream error_s;
468 if (-ENOENT == ret) {
469 error_s << "ENOENT (lock expired or never initially locked)";
470 } else {
471 error_s << ret << " (" << cpp_strerror(-ret) << ")";
472 }
473 ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
474 lock_oid << " with error " << error_s.str() << dendl;
475 return ret;
476 }
477 internal_lock.set_must_renew(false);
478
479 reset_time(now);
480 ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
481 lock_oid << dendl;
482
483 return 0;
484 }
485
486
487 int RGWBucketReshard::do_reshard(int num_shards,
488 RGWBucketInfo& new_bucket_info,
489 int max_entries,
490 bool verbose,
491 ostream *out,
492 Formatter *formatter)
493 {
494 rgw_bucket& bucket = bucket_info.bucket;
495
496 int ret = 0;
497
498 if (out) {
499 (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
500 (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
501 (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id <<
502 std::endl;
503 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
504 std::endl;
505 }
506
507 /* update bucket info -- in progress*/
508 list<rgw_cls_bi_entry> entries;
509
510 if (max_entries < 0) {
511 ldout(store->ctx(), 0) << __func__ <<
512 ": can't reshard, negative max_entries" << dendl;
513 return -EINVAL;
514 }
515
516 // NB: destructor cleans up sharding state if reshard does not
517 // complete successfully
518 BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
519
520 ret = bucket_info_updater.start();
521 if (ret < 0) {
522 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
523 return ret;
524 }
525
526 int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
527
528 BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
529
530 verbose = verbose && (formatter != nullptr);
531
532 if (verbose) {
533 formatter->open_array_section("entries");
534 }
535
536 uint64_t total_entries = 0;
537
538 if (!verbose) {
539 cout << "total entries:";
540 }
541
542 const int num_source_shards =
543 (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
544 string marker;
545 for (int i = 0; i < num_source_shards; ++i) {
546 bool is_truncated = true;
547 marker.clear();
548 while (is_truncated) {
549 entries.clear();
550 ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
551 if (ret < 0 && ret != -ENOENT) {
552 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
553 return ret;
554 }
555
556 for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
557 rgw_cls_bi_entry& entry = *iter;
558 if (verbose) {
559 formatter->open_object_section("entry");
560
561 encode_json("shard_id", i, formatter);
562 encode_json("num_entry", total_entries, formatter);
563 encode_json("entry", entry, formatter);
564 }
565 total_entries++;
566
567 marker = entry.idx;
568
569 int target_shard_id;
570 cls_rgw_obj_key cls_key;
571 RGWObjCategory category;
572 rgw_bucket_category_stats stats;
573 bool account = entry.get_info(&cls_key, &category, &stats);
574 rgw_obj_key key(cls_key);
575 rgw_obj obj(new_bucket_info.bucket, key);
576 int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
577 if (ret < 0) {
578 lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
579 return ret;
580 }
581
582 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
583
584 ret = target_shards_mgr.add_entry(shard_index, entry, account,
585 category, stats);
586 if (ret < 0) {
587 return ret;
588 }
589
590 Clock::time_point now = Clock::now();
591 if (reshard_lock.should_renew(now)) {
592 // assume outer locks have timespans at least the size of ours, so
593 // can call inside conditional
594 if (outer_reshard_lock) {
595 ret = outer_reshard_lock->renew(now);
596 if (ret < 0) {
597 return ret;
598 }
599 }
600 ret = reshard_lock.renew(now);
601 if (ret < 0) {
602 lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
603 return ret;
604 }
605 }
606
607 if (verbose) {
608 formatter->close_section();
609 if (out) {
610 formatter->flush(*out);
611 }
612 } else if (out && !(total_entries % 1000)) {
613 (*out) << " " << total_entries;
614 }
615 } // entries loop
616 }
617 }
618
619 if (verbose) {
620 formatter->close_section();
621 if (out) {
622 formatter->flush(*out);
623 }
624 } else if (out) {
625 (*out) << " " << total_entries << std::endl;
626 }
627
628 ret = target_shards_mgr.finish();
629 if (ret < 0) {
630 lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
631 return -EIO;
632 }
633
634 ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
635 if (ret < 0) {
636 lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
637 return ret;
638 }
639
640 ret = bucket_info_updater.complete();
641 if (ret < 0) {
642 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
643 /* don't error out, reshard process succeeded */
644 }
645
646 return 0;
647 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
648 } // RGWBucketReshard::do_reshard
649
650 int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
651 {
652 librados::IoCtx index_ctx;
653 map<int, string> bucket_objs;
654
655 int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs);
656 if (r < 0) {
657 return r;
658 }
659
660 for (auto i : bucket_objs) {
661 cls_rgw_bucket_instance_entry entry;
662
663 int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry);
664 if (ret < 0 && ret != -ENOENT) {
665 lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
666 return ret;
667 }
668
669 status->push_back(entry);
670 }
671
672 return 0;
673 }
674
675
676 int RGWBucketReshard::execute(int num_shards, int max_op_entries,
677 bool verbose, ostream *out, Formatter *formatter,
678 RGWReshard* reshard_log)
679 {
680 Clock::time_point now;
681
682 int ret = reshard_lock.lock();
683 if (ret < 0) {
684 return ret;
685 }
686
687 RGWBucketInfo new_bucket_info;
688 ret = create_new_bucket_instance(num_shards, new_bucket_info);
689 if (ret < 0) {
690 // shard state is uncertain, but this will attempt to remove them anyway
691 goto error_out;
692 }
693
694 if (reshard_log) {
695 ret = reshard_log->update(bucket_info, new_bucket_info);
696 if (ret < 0) {
697 goto error_out;
698 }
699 }
700
701 // set resharding status of current bucket_info & shards with
702 // information about planned resharding
703 ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
704 num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
705 if (ret < 0) {
706 reshard_lock.unlock();
707 return ret;
708 }
709
710 ret = do_reshard(num_shards,
711 new_bucket_info,
712 max_op_entries,
713 verbose, out, formatter);
714 if (ret < 0) {
715 goto error_out;
716 }
717
718 // at this point we've done the main work; we'll make a best-effort
719 // to clean-up but will not indicate any errors encountered
720
721 reshard_lock.unlock();
722
723 // resharding successful, so remove old bucket index shards; use
724 // best effort and don't report out an error; the lock isn't needed
725 // at this point since all we're using a best effor to to remove old
726 // shard objects
727 ret = store->clean_bucket_index(bucket_info, bucket_info.num_shards);
728 if (ret < 0) {
729 lderr(store->ctx()) << "Error: " << __func__ <<
730 " failed to clean up old shards; " <<
731 "RGWRados::clean_bucket_index returned " << ret << dendl;
732 }
733
734 ret = rgw_bucket_instance_remove_entry(store,
735 bucket_info.bucket.get_key(),
736 nullptr);
737 if (ret < 0) {
738 lderr(store->ctx()) << "Error: " << __func__ <<
739 " failed to clean old bucket info object \"" <<
740 bucket_info.bucket.get_key() <<
741 "\"created after successful resharding with error " << ret << dendl;
742 }
743
744 ldout(store->ctx(), 1) << __func__ <<
745 " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
746 bucket_info.bucket.get_key() << "\" to \"" <<
747 new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
748
749 return 0;
750
751 error_out:
752
753 reshard_lock.unlock();
754
755 // since the real problem is the issue that led to this error code
756 // path, we won't touch ret and instead use another variable to
757 // temporarily error codes
758 int ret2 = store->clean_bucket_index(new_bucket_info,
759 new_bucket_info.num_shards);
760 if (ret2 < 0) {
761 lderr(store->ctx()) << "Error: " << __func__ <<
762 " failed to clean up shards from failed incomplete resharding; " <<
763 "RGWRados::clean_bucket_index returned " << ret2 << dendl;
764 }
765
766 ret2 = rgw_bucket_instance_remove_entry(store,
767 new_bucket_info.bucket.get_key(),
768 nullptr);
769 if (ret2 < 0) {
770 lderr(store->ctx()) << "Error: " << __func__ <<
771 " failed to clean bucket info object \"" <<
772 new_bucket_info.bucket.get_key() <<
773 "\"created during incomplete resharding with error " << ret2 << dendl;
774 }
775
776 return ret;
777 } // execute
778
779
780 RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
781 Formatter *_formatter) :
782 store(_store), instance_lock(bucket_instance_lock_name),
783 verbose(_verbose), out(_out), formatter(_formatter)
784 {
785 num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs");
786 }
787
788 string RGWReshard::get_logshard_key(const string& tenant,
789 const string& bucket_name)
790 {
791 return tenant + ":" + bucket_name;
792 }
793
794 #define MAX_RESHARD_LOGSHARDS_PRIME 7877
795
796 void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
797 {
798 string key = get_logshard_key(tenant, bucket_name);
799
800 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
801 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
802 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
803
804 get_logshard_oid(int(sid), oid);
805 }
806
807 int RGWReshard::add(cls_rgw_reshard_entry& entry)
808 {
809 if (!store->svc.zone->can_reshard()) {
810 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
811 return 0;
812 }
813
814 string logshard_oid;
815
816 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
817
818 librados::ObjectWriteOperation op;
819 cls_rgw_reshard_add(op, entry);
820
821 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
822 if (ret < 0) {
823 lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
824 return ret;
825 }
826 return 0;
827 }
828
829 int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
830 {
831 cls_rgw_reshard_entry entry;
832 entry.bucket_name = bucket_info.bucket.name;
833 entry.bucket_id = bucket_info.bucket.bucket_id;
834 entry.tenant = bucket_info.owner.tenant;
835
836 int ret = get(entry);
837 if (ret < 0) {
838 return ret;
839 }
840
841 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
842
843 ret = add(entry);
844 if (ret < 0) {
845 ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
846 cpp_strerror(-ret) << dendl;
847 }
848
849 return ret;
850 }
851
852
853 int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
854 {
855 string logshard_oid;
856
857 get_logshard_oid(logshard_num, &logshard_oid);
858
859 int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
860
861 if (ret < 0) {
862 if (ret == -ENOENT) {
863 *is_truncated = false;
864 ret = 0;
865 }
866 lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
867 if (ret == -EACCES) {
868 lderr(store->ctx()) << "access denied to pool " << store->svc.zone->get_zone_params().reshard_pool
869 << ". Fix the pool access permissions of your client" << dendl;
870 }
871 }
872
873 return ret;
874 }
875
876 int RGWReshard::get(cls_rgw_reshard_entry& entry)
877 {
878 string logshard_oid;
879
880 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
881
882 int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
883 if (ret < 0) {
884 if (ret != -ENOENT) {
885 lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
886 " bucket=" << entry.bucket_name << dendl;
887 }
888 return ret;
889 }
890
891 return 0;
892 }
893
894 int RGWReshard::remove(cls_rgw_reshard_entry& entry)
895 {
896 string logshard_oid;
897
898 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
899
900 librados::ObjectWriteOperation op;
901 cls_rgw_reshard_remove(op, entry);
902
903 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
904 if (ret < 0) {
905 lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
906 return ret;
907 }
908
909 return ret;
910 }
911
912 int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
913 {
914 int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
915 if (ret < 0) {
916 lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
917 return ret;
918 }
919
920 return 0;
921 }
922
923 int RGWReshardWait::wait(optional_yield y)
924 {
925 std::unique_lock lock(mutex);
926
927 if (going_down) {
928 return -ECANCELED;
929 }
930
931 #ifdef HAVE_BOOST_CONTEXT
932 if (y) {
933 auto& context = y.get_io_context();
934 auto& yield = y.get_yield_context();
935
936 Waiter waiter(context);
937 waiters.push_back(waiter);
938 lock.unlock();
939
940 waiter.timer.expires_after(duration);
941
942 boost::system::error_code ec;
943 waiter.timer.async_wait(yield[ec]);
944
945 lock.lock();
946 waiters.erase(waiters.iterator_to(waiter));
947 return -ec.value();
948 }
949 #endif
950
951 cond.wait_for(lock, duration);
952
953 if (going_down) {
954 return -ECANCELED;
955 }
956
957 return 0;
958 }
959
960 void RGWReshardWait::stop()
961 {
962 std::scoped_lock lock(mutex);
963 going_down = true;
964 cond.notify_all();
965 for (auto& waiter : waiters) {
966 // unblock any waiters with ECANCELED
967 waiter.timer.cancel();
968 }
969 }
970
971 int RGWReshard::process_single_logshard(int logshard_num)
972 {
973 string marker;
974 bool truncated = true;
975
976 CephContext *cct = store->ctx();
977 constexpr uint32_t max_entries = 1000;
978
979 string logshard_oid;
980 get_logshard_oid(logshard_num, &logshard_oid);
981
982 RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
983
984 int ret = logshard_lock.lock();
985 if (ret == -EBUSY) { /* already locked by another processor */
986 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
987 logshard_oid << dendl;
988 return ret;
989 }
990
991 do {
992 std::list<cls_rgw_reshard_entry> entries;
993 ret = list(logshard_num, marker, max_entries, entries, &truncated);
994 if (ret < 0) {
995 ldout(cct, 10) << "cannot list all reshards in logshard oid=" <<
996 logshard_oid << dendl;
997 continue;
998 }
999
1000 for(auto& entry: entries) { // logshard entries
1001 if(entry.new_instance_id.empty()) {
1002
1003 ldout(store->ctx(), 20) << __func__ << " resharding " <<
1004 entry.bucket_name << dendl;
1005
1006 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
1007 rgw_bucket bucket;
1008 RGWBucketInfo bucket_info;
1009 map<string, bufferlist> attrs;
1010
1011 ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
1012 bucket_info, nullptr, &attrs);
1013 if (ret < 0) {
1014 ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " <<
1015 cpp_strerror(-ret) << dendl;
1016 return -ret;
1017 }
1018
1019 RGWBucketReshard br(store, bucket_info, attrs, nullptr);
1020
1021 Formatter* formatter = new JSONFormatter(false);
1022 auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
1023 ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
1024 formatter, this);
1025 if (ret < 0) {
1026 ldout (store->ctx(), 0) << __func__ <<
1027 "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
1028 cpp_strerror(-ret)<< dendl;
1029 return ret;
1030 }
1031
1032 ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name <<
1033 dendl;
1034
1035 ret = remove(entry);
1036 if (ret < 0) {
1037 ldout(cct, 0)<< __func__ << ":Error removing bucket " <<
1038 entry.bucket_name << " for resharding queue: " <<
1039 cpp_strerror(-ret) << dendl;
1040 return ret;
1041 }
1042 }
1043
1044 Clock::time_point now = Clock::now();
1045 if (logshard_lock.should_renew(now)) {
1046 ret = logshard_lock.renew(now);
1047 if (ret < 0) {
1048 return ret;
1049 }
1050 }
1051
1052 entry.get_key(&marker);
1053 }
1054 } while (truncated);
1055
1056 logshard_lock.unlock();
1057 return 0;
1058 }
1059
1060
1061 void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
1062 {
1063 char buf[32];
1064 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
1065
1066 string objname(reshard_oid_prefix);
1067 *logshard = objname + buf;
1068 }
1069
1070 int RGWReshard::process_all_logshards()
1071 {
1072 if (!store->svc.zone->can_reshard()) {
1073 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
1074 return 0;
1075 }
1076 int ret = 0;
1077
1078 for (int i = 0; i < num_logshards; i++) {
1079 string logshard;
1080 get_logshard_oid(i, &logshard);
1081
1082 ldout(store->ctx(), 20) << "processing logshard = " << logshard << dendl;
1083
1084 ret = process_single_logshard(i);
1085 if (ret <0) {
1086 return ret;
1087 }
1088 }
1089
1090 return 0;
1091 }
1092
1093 bool RGWReshard::going_down()
1094 {
1095 return down_flag;
1096 }
1097
1098 void RGWReshard::start_processor()
1099 {
1100 worker = new ReshardWorker(store->ctx(), this);
1101 worker->create("rgw_reshard");
1102 }
1103
1104 void RGWReshard::stop_processor()
1105 {
1106 down_flag = true;
1107 if (worker) {
1108 worker->stop();
1109 worker->join();
1110 }
1111 delete worker;
1112 worker = nullptr;
1113 }
1114
1115 void *RGWReshard::ReshardWorker::entry() {
1116 utime_t last_run;
1117 do {
1118 utime_t start = ceph_clock_now();
1119 if (reshard->process_all_logshards()) {
1120 /* All shards have been processed properly. Next time we can start
1121 * from this moment. */
1122 last_run = start;
1123 }
1124
1125 if (reshard->going_down())
1126 break;
1127
1128 utime_t end = ceph_clock_now();
1129 end -= start;
1130 int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval");
1131
1132 if (secs <= end.sec())
1133 continue; // next round
1134
1135 secs -= end.sec();
1136
1137 lock.Lock();
1138 cond.WaitInterval(lock, utime_t(secs, 0));
1139 lock.Unlock();
1140 } while (!reshard->going_down());
1141
1142 return NULL;
1143 }
1144
1145 void RGWReshard::ReshardWorker::stop()
1146 {
1147 Mutex::Locker l(lock);
1148 cond.Signal();
1149 }