1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
9 #include "rgw_bucket.h"
10 #include "rgw_reshard.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
16 #include "common/dout.h"
18 #include "services/svc_zone.h"
19 #include "services/svc_sys_obj.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
24 const string reshard_oid_prefix
= "reshard.";
25 const string reshard_lock_name
= "reshard_process";
26 const string bucket_instance_lock_name
= "bucket_instance_lock";
29 class BucketReshardShard
{
31 const RGWBucketInfo
& bucket_info
;
33 RGWRados::BucketShard bs
;
34 vector
<rgw_cls_bi_entry
> entries
;
35 map
<RGWObjCategory
, rgw_bucket_category_stats
> stats
;
36 deque
<librados::AioCompletion
*>& aio_completions
;
37 uint64_t max_aio_completions
;
38 uint64_t reshard_shard_batch_size
;
40 int wait_next_completion() {
41 librados::AioCompletion
*c
= aio_completions
.front();
42 aio_completions
.pop_front();
46 int ret
= c
->get_return_value();
50 derr
<< "ERROR: reshard rados operation failed: " << cpp_strerror(-ret
) << dendl
;
57 int get_completion(librados::AioCompletion
**c
) {
58 if (aio_completions
.size() >= max_aio_completions
) {
59 int ret
= wait_next_completion();
65 *c
= librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
66 aio_completions
.push_back(*c
);
72 BucketReshardShard(RGWRados
*_store
, const RGWBucketInfo
& _bucket_info
,
74 deque
<librados::AioCompletion
*>& _completions
) :
75 store(_store
), bucket_info(_bucket_info
), bs(store
),
76 aio_completions(_completions
)
78 num_shard
= (bucket_info
.num_shards
> 0 ? _num_shard
: -1);
79 bs
.init(bucket_info
.bucket
, num_shard
, nullptr /* no RGWBucketInfo */);
82 store
->ctx()->_conf
.get_val
<uint64_t>("rgw_reshard_max_aio");
83 reshard_shard_batch_size
=
84 store
->ctx()->_conf
.get_val
<uint64_t>("rgw_reshard_batch_size");
91 int add_entry(rgw_cls_bi_entry
& entry
, bool account
, RGWObjCategory category
,
92 const rgw_bucket_category_stats
& entry_stats
) {
93 entries
.push_back(entry
);
95 rgw_bucket_category_stats
& target
= stats
[category
];
96 target
.num_entries
+= entry_stats
.num_entries
;
97 target
.total_size
+= entry_stats
.total_size
;
98 target
.total_size_rounded
+= entry_stats
.total_size_rounded
;
99 target
.actual_size
+= entry_stats
.actual_size
;
101 if (entries
.size() >= reshard_shard_batch_size
) {
112 if (entries
.size() == 0) {
116 librados::ObjectWriteOperation op
;
117 for (auto& entry
: entries
) {
118 store
->bi_put(op
, bs
, entry
);
120 cls_rgw_bucket_update_stats(op
, false, stats
);
122 librados::AioCompletion
*c
;
123 int ret
= get_completion(&c
);
127 ret
= bs
.index_ctx
.aio_operate(bs
.bucket_obj
, c
, &op
);
129 derr
<< "ERROR: failed to store entries in target bucket shard (bs=" << bs
.bucket
<< "/" << bs
.shard_id
<< ") error=" << cpp_strerror(-ret
) << dendl
;
139 while (!aio_completions
.empty()) {
140 int r
= wait_next_completion();
147 }; // class BucketReshardShard
150 class BucketReshardManager
{
152 const RGWBucketInfo
& target_bucket_info
;
153 deque
<librados::AioCompletion
*> completions
;
154 int num_target_shards
;
155 vector
<BucketReshardShard
*> target_shards
;
158 BucketReshardManager(RGWRados
*_store
,
159 const RGWBucketInfo
& _target_bucket_info
,
160 int _num_target_shards
) :
161 store(_store
), target_bucket_info(_target_bucket_info
),
162 num_target_shards(_num_target_shards
)
164 target_shards
.resize(num_target_shards
);
165 for (int i
= 0; i
< num_target_shards
; ++i
) {
166 target_shards
[i
] = new BucketReshardShard(store
, target_bucket_info
, i
, completions
);
170 ~BucketReshardManager() {
171 for (auto& shard
: target_shards
) {
172 int ret
= shard
->wait_all_aio();
174 ldout(store
->ctx(), 20) << __func__
<<
175 ": shard->wait_all_aio() returned ret=" << ret
<< dendl
;
180 int add_entry(int shard_index
,
181 rgw_cls_bi_entry
& entry
, bool account
, RGWObjCategory category
,
182 const rgw_bucket_category_stats
& entry_stats
) {
183 int ret
= target_shards
[shard_index
]->add_entry(entry
, account
, category
,
186 derr
<< "ERROR: target_shards.add_entry(" << entry
.idx
<<
187 ") returned error: " << cpp_strerror(-ret
) << dendl
;
196 for (auto& shard
: target_shards
) {
197 int r
= shard
->flush();
199 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r
) << dendl
;
203 for (auto& shard
: target_shards
) {
204 int r
= shard
->wait_all_aio();
206 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r
) << dendl
;
211 target_shards
.clear();
214 }; // class BucketReshardManager
216 RGWBucketReshard::RGWBucketReshard(RGWRados
*_store
,
217 const RGWBucketInfo
& _bucket_info
,
218 const map
<string
, bufferlist
>& _bucket_attrs
,
219 RGWBucketReshardLock
* _outer_reshard_lock
) :
220 store(_store
), bucket_info(_bucket_info
), bucket_attrs(_bucket_attrs
),
221 reshard_lock(store
, bucket_info
, true),
222 outer_reshard_lock(_outer_reshard_lock
)
225 int RGWBucketReshard::set_resharding_status(RGWRados
* store
,
226 const RGWBucketInfo
& bucket_info
,
227 const string
& new_instance_id
,
229 cls_rgw_reshard_status status
)
231 if (new_instance_id
.empty()) {
232 ldout(store
->ctx(), 0) << __func__
<< " missing new bucket instance id" << dendl
;
236 cls_rgw_bucket_instance_entry instance_entry
;
237 instance_entry
.set_status(new_instance_id
, num_shards
, status
);
239 int ret
= store
->bucket_set_reshard(bucket_info
, instance_entry
);
241 ldout(store
->ctx(), 0) << "RGWReshard::" << __func__
<< " ERROR: error setting bucket resharding flag on bucket index: "
242 << cpp_strerror(-ret
) << dendl
;
248 // reshard lock assumes lock is held
249 int RGWBucketReshard::clear_resharding(RGWRados
* store
,
250 const RGWBucketInfo
& bucket_info
)
252 int ret
= clear_index_shard_reshard_status(store
, bucket_info
);
254 ldout(store
->ctx(), 0) << "RGWBucketReshard::" << __func__
<<
255 " ERROR: error clearing reshard status from index shard " <<
256 cpp_strerror(-ret
) << dendl
;
260 cls_rgw_bucket_instance_entry instance_entry
;
261 ret
= store
->bucket_set_reshard(bucket_info
, instance_entry
);
263 ldout(store
->ctx(), 0) << "RGWReshard::" << __func__
<<
264 " ERROR: error setting bucket resharding flag on bucket index: " <<
265 cpp_strerror(-ret
) << dendl
;
272 int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados
* store
,
273 const RGWBucketInfo
& bucket_info
)
275 uint32_t num_shards
= bucket_info
.num_shards
;
277 if (num_shards
< std::numeric_limits
<uint32_t>::max()) {
278 int ret
= set_resharding_status(store
, bucket_info
,
279 bucket_info
.bucket
.bucket_id
,
280 (num_shards
< 1 ? 1 : num_shards
),
281 CLS_RGW_RESHARD_NOT_RESHARDING
);
283 ldout(store
->ctx(), 0) << "RGWBucketReshard::" << __func__
<<
284 " ERROR: error clearing reshard status from index shard " <<
285 cpp_strerror(-ret
) << dendl
;
293 static int create_new_bucket_instance(RGWRados
*store
,
295 const RGWBucketInfo
& bucket_info
,
296 map
<string
, bufferlist
>& attrs
,
297 RGWBucketInfo
& new_bucket_info
)
299 new_bucket_info
= bucket_info
;
301 store
->create_bucket_id(&new_bucket_info
.bucket
.bucket_id
);
302 new_bucket_info
.bucket
.oid
.clear();
304 new_bucket_info
.num_shards
= new_num_shards
;
305 new_bucket_info
.objv_tracker
.clear();
307 new_bucket_info
.new_bucket_instance_id
.clear();
308 new_bucket_info
.reshard_status
= 0;
310 int ret
= store
->init_bucket_index(new_bucket_info
, new_bucket_info
.num_shards
);
312 cerr
<< "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret
) << std::endl
;
316 ret
= store
->put_bucket_instance_info(new_bucket_info
, true, real_time(), &attrs
);
318 cerr
<< "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret
) << std::endl
;
325 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards
,
326 RGWBucketInfo
& new_bucket_info
)
328 return ::create_new_bucket_instance(store
, new_num_shards
,
329 bucket_info
, bucket_attrs
, new_bucket_info
);
332 int RGWBucketReshard::cancel()
334 int ret
= reshard_lock
.lock();
339 ret
= clear_resharding();
341 reshard_lock
.unlock();
345 class BucketInfoReshardUpdate
348 RGWBucketInfo
& bucket_info
;
349 std::map
<string
, bufferlist
> bucket_attrs
;
351 bool in_progress
{false};
353 int set_status(cls_rgw_reshard_status s
) {
354 bucket_info
.reshard_status
= s
;
355 int ret
= store
->put_bucket_instance_info(bucket_info
, false, real_time(), &bucket_attrs
);
357 ldout(store
->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret
<< dendl
;
364 BucketInfoReshardUpdate(RGWRados
*_store
,
365 RGWBucketInfo
& _bucket_info
,
366 map
<string
, bufferlist
>& _bucket_attrs
,
367 const string
& new_bucket_id
) :
369 bucket_info(_bucket_info
),
370 bucket_attrs(_bucket_attrs
)
372 bucket_info
.new_bucket_instance_id
= new_bucket_id
;
375 ~BucketInfoReshardUpdate() {
377 // resharding must not have ended correctly, clean up
379 RGWBucketReshard::clear_index_shard_reshard_status(store
, bucket_info
);
381 lderr(store
->ctx()) << "Error: " << __func__
<<
382 " clear_index_shard_status returned " << ret
<< dendl
;
384 bucket_info
.new_bucket_instance_id
.clear();
385 set_status(CLS_RGW_RESHARD_NOT_RESHARDING
); // clears new_bucket_instance as well
390 int ret
= set_status(CLS_RGW_RESHARD_IN_PROGRESS
);
399 int ret
= set_status(CLS_RGW_RESHARD_DONE
);
409 RGWBucketReshardLock::RGWBucketReshardLock(RGWRados
* _store
,
410 const std::string
& reshard_lock_oid
,
413 lock_oid(reshard_lock_oid
),
414 ephemeral(_ephemeral
),
415 internal_lock(reshard_lock_name
)
417 const int lock_dur_secs
= store
->ctx()->_conf
.get_val
<uint64_t>(
418 "rgw_reshard_bucket_lock_duration");
419 duration
= std::chrono::seconds(lock_dur_secs
);
421 #define COOKIE_LEN 16
422 char cookie_buf
[COOKIE_LEN
+ 1];
423 gen_rand_alphanumeric(store
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
424 cookie_buf
[COOKIE_LEN
] = '\0';
426 internal_lock
.set_cookie(cookie_buf
);
427 internal_lock
.set_duration(duration
);
430 int RGWBucketReshardLock::lock() {
431 internal_lock
.set_must_renew(false);
434 ret
= internal_lock
.lock_exclusive_ephemeral(&store
->reshard_pool_ctx
,
437 ret
= internal_lock
.lock_exclusive(&store
->reshard_pool_ctx
, lock_oid
);
440 ldout(store
->ctx(), 0) << "RGWReshardLock::" << __func__
<<
441 " failed to acquire lock on " << lock_oid
<< " ret=" << ret
<< dendl
;
444 reset_time(Clock::now());
449 void RGWBucketReshardLock::unlock() {
450 int ret
= internal_lock
.unlock(&store
->reshard_pool_ctx
, lock_oid
);
452 ldout(store
->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__
<<
453 " failed to drop lock on " << lock_oid
<< " ret=" << ret
<< dendl
;
457 int RGWBucketReshardLock::renew(const Clock::time_point
& now
) {
458 internal_lock
.set_must_renew(true);
461 ret
= internal_lock
.lock_exclusive_ephemeral(&store
->reshard_pool_ctx
,
464 ret
= internal_lock
.lock_exclusive(&store
->reshard_pool_ctx
, lock_oid
);
466 if (ret
< 0) { /* expired or already locked by another processor */
467 std::stringstream error_s
;
468 if (-ENOENT
== ret
) {
469 error_s
<< "ENOENT (lock expired or never initially locked)";
471 error_s
<< ret
<< " (" << cpp_strerror(-ret
) << ")";
473 ldout(store
->ctx(), 5) << __func__
<< "(): failed to renew lock on " <<
474 lock_oid
<< " with error " << error_s
.str() << dendl
;
477 internal_lock
.set_must_renew(false);
480 ldout(store
->ctx(), 20) << __func__
<< "(): successfully renewed lock on " <<
487 int RGWBucketReshard::do_reshard(int num_shards
,
488 RGWBucketInfo
& new_bucket_info
,
492 Formatter
*formatter
)
494 rgw_bucket
& bucket
= bucket_info
.bucket
;
499 (*out
) << "tenant: " << bucket_info
.bucket
.tenant
<< std::endl
;
500 (*out
) << "bucket name: " << bucket_info
.bucket
.name
<< std::endl
;
501 (*out
) << "old bucket instance id: " << bucket_info
.bucket
.bucket_id
<<
503 (*out
) << "new bucket instance id: " << new_bucket_info
.bucket
.bucket_id
<<
507 /* update bucket info -- in progress*/
508 list
<rgw_cls_bi_entry
> entries
;
510 if (max_entries
< 0) {
511 ldout(store
->ctx(), 0) << __func__
<<
512 ": can't reshard, negative max_entries" << dendl
;
516 // NB: destructor cleans up sharding state if reshard does not
517 // complete successfully
518 BucketInfoReshardUpdate
bucket_info_updater(store
, bucket_info
, bucket_attrs
, new_bucket_info
.bucket
.bucket_id
);
520 ret
= bucket_info_updater
.start();
522 ldout(store
->ctx(), 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
526 int num_target_shards
= (new_bucket_info
.num_shards
> 0 ? new_bucket_info
.num_shards
: 1);
528 BucketReshardManager
target_shards_mgr(store
, new_bucket_info
, num_target_shards
);
530 bool verbose_json_out
= verbose
&& (formatter
!= nullptr) && (out
!= nullptr);
532 if (verbose_json_out
) {
533 formatter
->open_array_section("entries");
536 uint64_t total_entries
= 0;
538 if (!verbose_json_out
&& out
) {
539 (*out
) << "total entries:";
542 const int num_source_shards
=
543 (bucket_info
.num_shards
> 0 ? bucket_info
.num_shards
: 1);
545 for (int i
= 0; i
< num_source_shards
; ++i
) {
546 bool is_truncated
= true;
548 while (is_truncated
) {
550 ret
= store
->bi_list(bucket
, i
, string(), marker
, max_entries
, &entries
, &is_truncated
);
551 if (ret
< 0 && ret
!= -ENOENT
) {
552 derr
<< "ERROR: bi_list(): " << cpp_strerror(-ret
) << dendl
;
556 for (auto iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
557 rgw_cls_bi_entry
& entry
= *iter
;
558 if (verbose_json_out
) {
559 formatter
->open_object_section("entry");
561 encode_json("shard_id", i
, formatter
);
562 encode_json("num_entry", total_entries
, formatter
);
563 encode_json("entry", entry
, formatter
);
570 cls_rgw_obj_key cls_key
;
571 RGWObjCategory category
;
572 rgw_bucket_category_stats stats
;
573 bool account
= entry
.get_info(&cls_key
, &category
, &stats
);
574 rgw_obj_key
key(cls_key
);
575 rgw_obj
obj(new_bucket_info
.bucket
, key
);
577 if (key
.ns
== RGW_OBJ_NS_MULTIPART
&& mp
.from_meta(key
.name
)) {
578 // place the multipart .meta object on the same shard as its head object
579 obj
.index_hash_source
= mp
.get_key();
581 int ret
= store
->get_target_shard_id(new_bucket_info
, obj
.get_hash_object(), &target_shard_id
);
583 lderr(store
->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret
<< dendl
;
587 int shard_index
= (target_shard_id
> 0 ? target_shard_id
: 0);
589 ret
= target_shards_mgr
.add_entry(shard_index
, entry
, account
,
595 Clock::time_point now
= Clock::now();
596 if (reshard_lock
.should_renew(now
)) {
597 // assume outer locks have timespans at least the size of ours, so
598 // can call inside conditional
599 if (outer_reshard_lock
) {
600 ret
= outer_reshard_lock
->renew(now
);
605 ret
= reshard_lock
.renew(now
);
607 lderr(store
->ctx()) << "Error renewing bucket lock: " << ret
<< dendl
;
612 if (verbose_json_out
) {
613 formatter
->close_section();
614 formatter
->flush(*out
);
615 } else if (out
&& !(total_entries
% 1000)) {
616 (*out
) << " " << total_entries
;
622 if (verbose_json_out
) {
623 formatter
->close_section();
624 formatter
->flush(*out
);
626 (*out
) << " " << total_entries
<< std::endl
;
629 ret
= target_shards_mgr
.finish();
631 lderr(store
->ctx()) << "ERROR: failed to reshard" << dendl
;
635 ret
= rgw_link_bucket(store
, new_bucket_info
.owner
, new_bucket_info
.bucket
, bucket_info
.creation_time
);
637 lderr(store
->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info
.bucket
.bucket_id
<< ": " << cpp_strerror(-ret
) << ")" << dendl
;
641 ret
= bucket_info_updater
.complete();
643 ldout(store
->ctx(), 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
644 /* don't error out, reshard process succeeded */
648 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
649 } // RGWBucketReshard::do_reshard
651 int RGWBucketReshard::get_status(list
<cls_rgw_bucket_instance_entry
> *status
)
653 librados::IoCtx index_ctx
;
654 map
<int, string
> bucket_objs
;
656 int r
= store
->open_bucket_index(bucket_info
, index_ctx
, bucket_objs
);
661 for (auto i
: bucket_objs
) {
662 cls_rgw_bucket_instance_entry entry
;
664 int ret
= cls_rgw_get_bucket_resharding(index_ctx
, i
.second
, &entry
);
665 if (ret
< 0 && ret
!= -ENOENT
) {
666 lderr(store
->ctx()) << "ERROR: " << __func__
<< ": cls_rgw_get_bucket_resharding() returned ret=" << ret
<< dendl
;
670 status
->push_back(entry
);
677 int RGWBucketReshard::execute(int num_shards
, int max_op_entries
,
678 bool verbose
, ostream
*out
, Formatter
*formatter
,
679 RGWReshard
* reshard_log
)
681 Clock::time_point now
;
683 int ret
= reshard_lock
.lock();
688 RGWBucketInfo new_bucket_info
;
689 ret
= create_new_bucket_instance(num_shards
, new_bucket_info
);
691 // shard state is uncertain, but this will attempt to remove them anyway
696 ret
= reshard_log
->update(bucket_info
, new_bucket_info
);
702 // set resharding status of current bucket_info & shards with
703 // information about planned resharding
704 ret
= set_resharding_status(new_bucket_info
.bucket
.bucket_id
,
705 num_shards
, CLS_RGW_RESHARD_IN_PROGRESS
);
707 reshard_lock
.unlock();
711 ret
= do_reshard(num_shards
,
714 verbose
, out
, formatter
);
719 // at this point we've done the main work; we'll make a best-effort
720 // to clean-up but will not indicate any errors encountered
722 reshard_lock
.unlock();
724 // resharding successful, so remove old bucket index shards; use
725 // best effort and don't report out an error; the lock isn't needed
726 // at this point since all we're using a best effor to to remove old
728 ret
= store
->clean_bucket_index(bucket_info
, bucket_info
.num_shards
);
730 lderr(store
->ctx()) << "Error: " << __func__
<<
731 " failed to clean up old shards; " <<
732 "RGWRados::clean_bucket_index returned " << ret
<< dendl
;
735 ret
= rgw_bucket_instance_remove_entry(store
,
736 bucket_info
.bucket
.get_key(),
739 lderr(store
->ctx()) << "Error: " << __func__
<<
740 " failed to clean old bucket info object \"" <<
741 bucket_info
.bucket
.get_key() <<
742 "\"created after successful resharding with error " << ret
<< dendl
;
745 ldout(store
->ctx(), 1) << __func__
<<
746 " INFO: reshard of bucket \"" << bucket_info
.bucket
.name
<< "\" from \"" <<
747 bucket_info
.bucket
.get_key() << "\" to \"" <<
748 new_bucket_info
.bucket
.get_key() << "\" completed successfully" << dendl
;
754 reshard_lock
.unlock();
756 // since the real problem is the issue that led to this error code
757 // path, we won't touch ret and instead use another variable to
758 // temporarily error codes
759 int ret2
= store
->clean_bucket_index(new_bucket_info
,
760 new_bucket_info
.num_shards
);
762 lderr(store
->ctx()) << "Error: " << __func__
<<
763 " failed to clean up shards from failed incomplete resharding; " <<
764 "RGWRados::clean_bucket_index returned " << ret2
<< dendl
;
767 ret2
= rgw_bucket_instance_remove_entry(store
,
768 new_bucket_info
.bucket
.get_key(),
771 lderr(store
->ctx()) << "Error: " << __func__
<<
772 " failed to clean bucket info object \"" <<
773 new_bucket_info
.bucket
.get_key() <<
774 "\"created during incomplete resharding with error " << ret2
<< dendl
;
781 RGWReshard::RGWReshard(RGWRados
* _store
, bool _verbose
, ostream
*_out
,
782 Formatter
*_formatter
) :
783 store(_store
), instance_lock(bucket_instance_lock_name
),
784 verbose(_verbose
), out(_out
), formatter(_formatter
)
786 num_logshards
= store
->ctx()->_conf
.get_val
<uint64_t>("rgw_reshard_num_logs");
789 string
RGWReshard::get_logshard_key(const string
& tenant
,
790 const string
& bucket_name
)
792 return tenant
+ ":" + bucket_name
;
795 #define MAX_RESHARD_LOGSHARDS_PRIME 7877
797 void RGWReshard::get_bucket_logshard_oid(const string
& tenant
, const string
& bucket_name
, string
*oid
)
799 string key
= get_logshard_key(tenant
, bucket_name
);
801 uint32_t sid
= ceph_str_hash_linux(key
.c_str(), key
.size());
802 uint32_t sid2
= sid
^ ((sid
& 0xFF) << 24);
803 sid
= sid2
% MAX_RESHARD_LOGSHARDS_PRIME
% num_logshards
;
805 get_logshard_oid(int(sid
), oid
);
808 int RGWReshard::add(cls_rgw_reshard_entry
& entry
)
810 if (!store
->svc
.zone
->can_reshard()) {
811 ldout(store
->ctx(), 20) << __func__
<< " Resharding is disabled" << dendl
;
817 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
819 librados::ObjectWriteOperation op
;
820 cls_rgw_reshard_add(op
, entry
);
822 int ret
= store
->reshard_pool_ctx
.operate(logshard_oid
, &op
);
824 lderr(store
->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
830 int RGWReshard::update(const RGWBucketInfo
& bucket_info
, const RGWBucketInfo
& new_bucket_info
)
832 cls_rgw_reshard_entry entry
;
833 entry
.bucket_name
= bucket_info
.bucket
.name
;
834 entry
.bucket_id
= bucket_info
.bucket
.bucket_id
;
835 entry
.tenant
= bucket_info
.owner
.tenant
;
837 int ret
= get(entry
);
842 entry
.new_instance_id
= new_bucket_info
.bucket
.name
+ ":" + new_bucket_info
.bucket
.bucket_id
;
846 ldout(store
->ctx(), 0) << __func__
<< ":Error in updating entry bucket " << entry
.bucket_name
<< ": " <<
847 cpp_strerror(-ret
) << dendl
;
854 int RGWReshard::list(int logshard_num
, string
& marker
, uint32_t max
, std::list
<cls_rgw_reshard_entry
>& entries
, bool *is_truncated
)
858 get_logshard_oid(logshard_num
, &logshard_oid
);
860 int ret
= cls_rgw_reshard_list(store
->reshard_pool_ctx
, logshard_oid
, marker
, max
, entries
, is_truncated
);
863 if (ret
== -ENOENT
) {
864 *is_truncated
= false;
867 lderr(store
->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid
<< dendl
;
868 if (ret
== -EACCES
) {
869 lderr(store
->ctx()) << "access denied to pool " << store
->svc
.zone
->get_zone_params().reshard_pool
870 << ". Fix the pool access permissions of your client" << dendl
;
877 int RGWReshard::get(cls_rgw_reshard_entry
& entry
)
881 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
883 int ret
= cls_rgw_reshard_get(store
->reshard_pool_ctx
, logshard_oid
, entry
);
885 if (ret
!= -ENOENT
) {
886 lderr(store
->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<<
887 " bucket=" << entry
.bucket_name
<< dendl
;
895 int RGWReshard::remove(cls_rgw_reshard_entry
& entry
)
899 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
901 librados::ObjectWriteOperation op
;
902 cls_rgw_reshard_remove(op
, entry
);
904 int ret
= store
->reshard_pool_ctx
.operate(logshard_oid
, &op
);
906 lderr(store
->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
913 int RGWReshard::clear_bucket_resharding(const string
& bucket_instance_oid
, cls_rgw_reshard_entry
& entry
)
915 int ret
= cls_rgw_clear_bucket_resharding(store
->reshard_pool_ctx
, bucket_instance_oid
);
917 lderr(store
->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid
<< dendl
;
924 int RGWReshardWait::wait(optional_yield y
)
926 std::unique_lock
lock(mutex
);
932 #ifdef HAVE_BOOST_CONTEXT
934 auto& context
= y
.get_io_context();
935 auto& yield
= y
.get_yield_context();
937 Waiter
waiter(context
);
938 waiters
.push_back(waiter
);
941 waiter
.timer
.expires_after(duration
);
943 boost::system::error_code ec
;
944 waiter
.timer
.async_wait(yield
[ec
]);
947 waiters
.erase(waiters
.iterator_to(waiter
));
952 cond
.wait_for(lock
, duration
);
961 void RGWReshardWait::stop()
963 std::scoped_lock
lock(mutex
);
966 for (auto& waiter
: waiters
) {
967 // unblock any waiters with ECANCELED
968 waiter
.timer
.cancel();
972 int RGWReshard::process_single_logshard(int logshard_num
)
975 bool truncated
= true;
977 CephContext
*cct
= store
->ctx();
978 constexpr uint32_t max_entries
= 1000;
981 get_logshard_oid(logshard_num
, &logshard_oid
);
983 RGWBucketReshardLock
logshard_lock(store
, logshard_oid
, false);
985 int ret
= logshard_lock
.lock();
987 ldout(store
->ctx(), 5) << __func__
<< "(): failed to acquire lock on " <<
988 logshard_oid
<< ", ret = " << ret
<<dendl
;
993 std::list
<cls_rgw_reshard_entry
> entries
;
994 ret
= list(logshard_num
, marker
, max_entries
, entries
, &truncated
);
996 ldout(cct
, 10) << "cannot list all reshards in logshard oid=" <<
997 logshard_oid
<< dendl
;
1001 for(auto& entry
: entries
) { // logshard entries
1002 if(entry
.new_instance_id
.empty()) {
1004 ldout(store
->ctx(), 20) << __func__
<< " resharding " <<
1005 entry
.bucket_name
<< dendl
;
1007 auto obj_ctx
= store
->svc
.sysobj
->init_obj_ctx();
1009 RGWBucketInfo bucket_info
;
1010 map
<string
, bufferlist
> attrs
;
1012 ret
= store
->get_bucket_info(obj_ctx
, entry
.tenant
, entry
.bucket_name
,
1013 bucket_info
, nullptr, &attrs
);
1015 ldout(cct
, 0) << __func__
<<
1016 ": Error in get_bucket_info for bucket " << entry
.bucket_name
<<
1017 ": " << cpp_strerror(-ret
) << dendl
;
1018 if (ret
!= -ENOENT
) {
1019 // any error other than ENOENT will abort
1023 // we've encountered a reshard queue entry for an apparently
1024 // non-existent bucket; let's try to recover by cleaning up
1025 ldout(cct
, 0) << __func__
<<
1026 ": removing reshard queue entry for non-existent bucket " <<
1027 entry
.bucket_name
<< dendl
;
1029 ret
= remove(entry
);
1031 ldout(cct
, 0) << __func__
<<
1032 ": Error removing non-existent bucket " <<
1033 entry
.bucket_name
<< " from resharding queue: " <<
1034 cpp_strerror(-ret
) << dendl
;
1038 // we cleaned up, move on to the next entry
1039 goto finished_entry
;
1042 RGWBucketReshard
br(store
, bucket_info
, attrs
, nullptr);
1043 ret
= br
.execute(entry
.new_num_shards
, max_entries
, false, nullptr,
1046 ldout(store
->ctx(), 0) << __func__
<<
1047 ": Error during resharding bucket " << entry
.bucket_name
<< ":" <<
1048 cpp_strerror(-ret
)<< dendl
;
1052 ldout(store
->ctx(), 20) << __func__
<<
1053 " removing reshard queue entry for bucket " << entry
.bucket_name
<<
1056 ret
= remove(entry
);
1058 ldout(cct
, 0) << __func__
<< ": Error removing bucket " <<
1059 entry
.bucket_name
<< " from resharding queue: " <<
1060 cpp_strerror(-ret
) << dendl
;
1063 } // if new instance id is empty
1067 Clock::time_point now
= Clock::now();
1068 if (logshard_lock
.should_renew(now
)) {
1069 ret
= logshard_lock
.renew(now
);
1075 entry
.get_key(&marker
);
1077 } while (truncated
);
1079 logshard_lock
.unlock();
1084 void RGWReshard::get_logshard_oid(int shard_num
, string
*logshard
)
1087 snprintf(buf
, sizeof(buf
), "%010u", (unsigned)shard_num
);
1089 string
objname(reshard_oid_prefix
);
1090 *logshard
= objname
+ buf
;
1093 int RGWReshard::process_all_logshards()
1095 if (!store
->svc
.zone
->can_reshard()) {
1096 ldout(store
->ctx(), 20) << __func__
<< " Resharding is disabled" << dendl
;
1101 for (int i
= 0; i
< num_logshards
; i
++) {
1103 get_logshard_oid(i
, &logshard
);
1105 ldout(store
->ctx(), 20) << "processing logshard = " << logshard
<< dendl
;
1107 ret
= process_single_logshard(i
);
1116 bool RGWReshard::going_down()
1121 void RGWReshard::start_processor()
1123 worker
= new ReshardWorker(store
->ctx(), this);
1124 worker
->create("rgw_reshard");
1127 void RGWReshard::stop_processor()
1138 void *RGWReshard::ReshardWorker::entry() {
1141 utime_t start
= ceph_clock_now();
1142 if (reshard
->process_all_logshards()) {
1143 /* All shards have been processed properly. Next time we can start
1144 * from this moment. */
1148 if (reshard
->going_down())
1151 utime_t end
= ceph_clock_now();
1153 int secs
= cct
->_conf
.get_val
<uint64_t>("rgw_reshard_thread_interval");
1155 if (secs
<= end
.sec())
1156 continue; // next round
1161 cond
.WaitInterval(lock
, utime_t(secs
, 0));
1163 } while (!reshard
->going_down());
1168 void RGWReshard::ReshardWorker::stop()
1170 Mutex::Locker
l(lock
);