1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
7 #include "rgw_bucket.h"
8 #include "rgw_reshard.h"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/lock/cls_lock_client.h"
11 #include "common/errno.h"
12 #include "common/ceph_json.h"
14 #include "common/dout.h"
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rgw
19 const string reshard_oid_prefix
= "reshard.";
20 const string reshard_lock_name
= "reshard_process";
21 const string bucket_instance_lock_name
= "bucket_instance_lock";
25 #define RESHARD_SHARD_WINDOW 64
26 #define RESHARD_MAX_AIO 128
29 class BucketReshardShard
{
31 const RGWBucketInfo
& bucket_info
;
33 RGWRados::BucketShard bs
;
34 vector
<rgw_cls_bi_entry
> entries
;
35 map
<uint8_t, rgw_bucket_category_stats
> stats
;
36 deque
<librados::AioCompletion
*>& aio_completions
;
38 int wait_next_completion() {
39 librados::AioCompletion
*c
= aio_completions
.front();
40 aio_completions
.pop_front();
44 int ret
= c
->get_return_value();
48 derr
<< "ERROR: reshard rados operation failed: " << cpp_strerror(-ret
) << dendl
;
55 int get_completion(librados::AioCompletion
**c
) {
56 if (aio_completions
.size() >= RESHARD_MAX_AIO
) {
57 int ret
= wait_next_completion();
63 *c
= librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
64 aio_completions
.push_back(*c
);
70 BucketReshardShard(RGWRados
*_store
, const RGWBucketInfo
& _bucket_info
,
72 deque
<librados::AioCompletion
*>& _completions
) :
73 store(_store
), bucket_info(_bucket_info
), bs(store
),
74 aio_completions(_completions
)
76 num_shard
= (bucket_info
.num_shards
> 0 ? _num_shard
: -1);
77 bs
.init(bucket_info
.bucket
, num_shard
, nullptr /* no RGWBucketInfo */);
84 int add_entry(rgw_cls_bi_entry
& entry
, bool account
, uint8_t category
,
85 const rgw_bucket_category_stats
& entry_stats
) {
86 entries
.push_back(entry
);
88 rgw_bucket_category_stats
& target
= stats
[category
];
89 target
.num_entries
+= entry_stats
.num_entries
;
90 target
.total_size
+= entry_stats
.total_size
;
91 target
.total_size_rounded
+= entry_stats
.total_size_rounded
;
92 target
.actual_size
+= entry_stats
.actual_size
;
94 if (entries
.size() >= RESHARD_SHARD_WINDOW
) {
105 if (entries
.size() == 0) {
109 librados::ObjectWriteOperation op
;
110 for (auto& entry
: entries
) {
111 store
->bi_put(op
, bs
, entry
);
113 cls_rgw_bucket_update_stats(op
, false, stats
);
115 librados::AioCompletion
*c
;
116 int ret
= get_completion(&c
);
120 ret
= bs
.index_ctx
.aio_operate(bs
.bucket_obj
, c
, &op
);
122 derr
<< "ERROR: failed to store entries in target bucket shard (bs=" << bs
.bucket
<< "/" << bs
.shard_id
<< ") error=" << cpp_strerror(-ret
) << dendl
;
132 while (!aio_completions
.empty()) {
133 int r
= wait_next_completion();
140 }; // class BucketReshardShard
143 class BucketReshardManager
{
145 const RGWBucketInfo
& target_bucket_info
;
146 deque
<librados::AioCompletion
*> completions
;
147 int num_target_shards
;
148 vector
<BucketReshardShard
*> target_shards
;
151 BucketReshardManager(RGWRados
*_store
,
152 const RGWBucketInfo
& _target_bucket_info
,
153 int _num_target_shards
) :
154 store(_store
), target_bucket_info(_target_bucket_info
),
155 num_target_shards(_num_target_shards
)
157 target_shards
.resize(num_target_shards
);
158 for (int i
= 0; i
< num_target_shards
; ++i
) {
159 target_shards
[i
] = new BucketReshardShard(store
, target_bucket_info
, i
, completions
);
163 ~BucketReshardManager() {
164 for (auto& shard
: target_shards
) {
165 int ret
= shard
->wait_all_aio();
167 ldout(store
->ctx(), 20) << __func__
<<
168 ": shard->wait_all_aio() returned ret=" << ret
<< dendl
;
173 int add_entry(int shard_index
,
174 rgw_cls_bi_entry
& entry
, bool account
, uint8_t category
,
175 const rgw_bucket_category_stats
& entry_stats
) {
176 int ret
= target_shards
[shard_index
]->add_entry(entry
, account
, category
,
179 derr
<< "ERROR: target_shards.add_entry(" << entry
.idx
<<
180 ") returned error: " << cpp_strerror(-ret
) << dendl
;
189 for (auto& shard
: target_shards
) {
190 int r
= shard
->flush();
192 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r
) << dendl
;
196 for (auto& shard
: target_shards
) {
197 int r
= shard
->wait_all_aio();
199 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r
) << dendl
;
204 target_shards
.clear();
207 }; // class BucketReshardManager
209 RGWBucketReshard::RGWBucketReshard(RGWRados
*_store
,
210 const RGWBucketInfo
& _bucket_info
,
211 const map
<string
, bufferlist
>& _bucket_attrs
,
212 RGWBucketReshardLock
* _outer_reshard_lock
) :
213 store(_store
), bucket_info(_bucket_info
), bucket_attrs(_bucket_attrs
),
214 reshard_lock(store
, bucket_info
, true),
215 outer_reshard_lock(_outer_reshard_lock
)
218 int RGWBucketReshard::set_resharding_status(RGWRados
* store
,
219 const RGWBucketInfo
& bucket_info
,
220 const string
& new_instance_id
,
222 cls_rgw_reshard_status status
)
224 if (new_instance_id
.empty()) {
225 ldout(store
->ctx(), 0) << __func__
<< " missing new bucket instance id" << dendl
;
229 cls_rgw_bucket_instance_entry instance_entry
;
230 instance_entry
.set_status(new_instance_id
, num_shards
, status
);
232 int ret
= store
->bucket_set_reshard(bucket_info
, instance_entry
);
234 ldout(store
->ctx(), 0) << "RGWReshard::" << __func__
<< " ERROR: error setting bucket resharding flag on bucket index: "
235 << cpp_strerror(-ret
) << dendl
;
241 // reshard lock assumes lock is held
242 int RGWBucketReshard::clear_resharding(RGWRados
* store
,
243 const RGWBucketInfo
& bucket_info
)
245 int ret
= clear_index_shard_reshard_status(store
, bucket_info
);
247 ldout(store
->ctx(), 0) << "RGWBucketReshard::" << __func__
<<
248 " ERROR: error clearing reshard status from index shard " <<
249 cpp_strerror(-ret
) << dendl
;
253 cls_rgw_bucket_instance_entry instance_entry
;
254 ret
= store
->bucket_set_reshard(bucket_info
, instance_entry
);
256 ldout(store
->ctx(), 0) << "RGWReshard::" << __func__
<<
257 " ERROR: error setting bucket resharding flag on bucket index: " <<
258 cpp_strerror(-ret
) << dendl
;
265 int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados
* store
,
266 const RGWBucketInfo
& bucket_info
)
268 uint32_t num_shards
= bucket_info
.num_shards
;
270 if (num_shards
< std::numeric_limits
<uint32_t>::max()) {
271 int ret
= set_resharding_status(store
, bucket_info
,
272 bucket_info
.bucket
.bucket_id
,
273 (num_shards
< 1 ? 1 : num_shards
),
274 CLS_RGW_RESHARD_NONE
);
276 ldout(store
->ctx(), 0) << "RGWBucketReshard::" << __func__
<<
277 " ERROR: error clearing reshard status from index shard " <<
278 cpp_strerror(-ret
) << dendl
;
286 static int create_new_bucket_instance(RGWRados
*store
,
288 const RGWBucketInfo
& bucket_info
,
289 map
<string
, bufferlist
>& attrs
,
290 RGWBucketInfo
& new_bucket_info
)
292 new_bucket_info
= bucket_info
;
294 store
->create_bucket_id(&new_bucket_info
.bucket
.bucket_id
);
295 new_bucket_info
.bucket
.oid
.clear();
297 new_bucket_info
.num_shards
= new_num_shards
;
298 new_bucket_info
.objv_tracker
.clear();
300 new_bucket_info
.new_bucket_instance_id
.clear();
301 new_bucket_info
.reshard_status
= 0;
303 int ret
= store
->init_bucket_index(new_bucket_info
, new_bucket_info
.num_shards
);
305 cerr
<< "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret
) << std::endl
;
309 ret
= store
->put_bucket_instance_info(new_bucket_info
, true, real_time(), &attrs
);
311 cerr
<< "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret
) << std::endl
;
318 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards
,
319 RGWBucketInfo
& new_bucket_info
)
321 return ::create_new_bucket_instance(store
, new_num_shards
,
322 bucket_info
, bucket_attrs
, new_bucket_info
);
325 int RGWBucketReshard::cancel()
327 int ret
= reshard_lock
.lock();
332 ret
= clear_resharding();
334 reshard_lock
.unlock();
338 class BucketInfoReshardUpdate
341 RGWBucketInfo bucket_info
;
342 std::map
<string
, bufferlist
> bucket_attrs
;
344 bool in_progress
{false};
346 int set_status(cls_rgw_reshard_status s
) {
347 bucket_info
.reshard_status
= s
;
348 int ret
= store
->put_bucket_instance_info(bucket_info
, false, real_time(), &bucket_attrs
);
350 ldout(store
->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret
<< dendl
;
357 BucketInfoReshardUpdate(RGWRados
*_store
,
358 RGWBucketInfo
& _bucket_info
,
359 map
<string
, bufferlist
>& _bucket_attrs
,
360 const string
& new_bucket_id
) :
362 bucket_info(_bucket_info
),
363 bucket_attrs(_bucket_attrs
)
365 bucket_info
.new_bucket_instance_id
= new_bucket_id
;
368 ~BucketInfoReshardUpdate() {
370 // resharding must not have ended correctly, clean up
372 RGWBucketReshard::clear_index_shard_reshard_status(store
, bucket_info
);
374 lderr(store
->ctx()) << "Error: " << __func__
<<
375 " clear_index_shard_status returned " << ret
<< dendl
;
377 bucket_info
.new_bucket_instance_id
.clear();
378 set_status(CLS_RGW_RESHARD_NONE
); // clears new_bucket_instance as well
383 int ret
= set_status(CLS_RGW_RESHARD_IN_PROGRESS
);
392 int ret
= set_status(CLS_RGW_RESHARD_DONE
);
402 RGWBucketReshardLock::RGWBucketReshardLock(RGWRados
* _store
,
403 const std::string
& reshard_lock_oid
,
406 lock_oid(reshard_lock_oid
),
407 ephemeral(_ephemeral
),
408 internal_lock(reshard_lock_name
)
410 const int lock_dur_secs
= store
->ctx()->_conf
->rgw_reshard_bucket_lock_duration
;
411 duration
= std::chrono::seconds(lock_dur_secs
);
413 #define COOKIE_LEN 16
414 char cookie_buf
[COOKIE_LEN
+ 1];
415 gen_rand_alphanumeric(store
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
416 cookie_buf
[COOKIE_LEN
] = '\0';
418 internal_lock
.set_cookie(cookie_buf
);
419 internal_lock
.set_duration(duration
);
422 int RGWBucketReshardLock::lock() {
423 internal_lock
.set_must_renew(false);
426 ret
= internal_lock
.lock_exclusive_ephemeral(&store
->reshard_pool_ctx
,
429 ret
= internal_lock
.lock_exclusive(&store
->reshard_pool_ctx
, lock_oid
);
432 ldout(store
->ctx(), 0) << "RGWReshardLock::" << __func__
<<
433 " failed to acquire lock on " << lock_oid
<< " ret=" << ret
<< dendl
;
436 reset_time(Clock::now());
441 void RGWBucketReshardLock::unlock() {
442 int ret
= internal_lock
.unlock(&store
->reshard_pool_ctx
, lock_oid
);
444 ldout(store
->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__
<<
445 " failed to drop lock on " << lock_oid
<< " ret=" << ret
<< dendl
;
449 int RGWBucketReshardLock::renew(const Clock::time_point
& now
) {
450 internal_lock
.set_must_renew(true);
453 ret
= internal_lock
.lock_exclusive_ephemeral(&store
->reshard_pool_ctx
,
456 ret
= internal_lock
.lock_exclusive(&store
->reshard_pool_ctx
, lock_oid
);
458 if (ret
< 0) { /* expired or already locked by another processor */
459 ldout(store
->ctx(), 5) << __func__
<< "(): failed to renew lock on " <<
460 lock_oid
<< " with " << cpp_strerror(-ret
) << dendl
;
463 internal_lock
.set_must_renew(false);
466 ldout(store
->ctx(), 20) << __func__
<< "(): successfully renewed lock on " <<
473 int RGWBucketReshard::do_reshard(int num_shards
,
474 RGWBucketInfo
& new_bucket_info
,
478 Formatter
*formatter
)
480 rgw_bucket
& bucket
= bucket_info
.bucket
;
485 (*out
) << "tenant: " << bucket_info
.bucket
.tenant
<< std::endl
;
486 (*out
) << "bucket name: " << bucket_info
.bucket
.name
<< std::endl
;
487 (*out
) << "old bucket instance id: " << bucket_info
.bucket
.bucket_id
<<
489 (*out
) << "new bucket instance id: " << new_bucket_info
.bucket
.bucket_id
<<
493 /* update bucket info -- in progress*/
494 list
<rgw_cls_bi_entry
> entries
;
496 if (max_entries
< 0) {
497 ldout(store
->ctx(), 0) << __func__
<<
498 ": can't reshard, negative max_entries" << dendl
;
502 // NB: destructor cleans up sharding state if reshard does not
503 // complete successfully
504 BucketInfoReshardUpdate
bucket_info_updater(store
, bucket_info
, bucket_attrs
, new_bucket_info
.bucket
.bucket_id
);
506 ret
= bucket_info_updater
.start();
508 ldout(store
->ctx(), 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
512 int num_target_shards
= (new_bucket_info
.num_shards
> 0 ? new_bucket_info
.num_shards
: 1);
514 BucketReshardManager
target_shards_mgr(store
, new_bucket_info
, num_target_shards
);
516 verbose
= verbose
&& (formatter
!= nullptr);
519 formatter
->open_array_section("entries");
522 uint64_t total_entries
= 0;
525 cout
<< "total entries:";
528 const int num_source_shards
=
529 (bucket_info
.num_shards
> 0 ? bucket_info
.num_shards
: 1);
531 for (int i
= 0; i
< num_source_shards
; ++i
) {
532 bool is_truncated
= true;
534 while (is_truncated
) {
536 ret
= store
->bi_list(bucket
, i
, string(), marker
, max_entries
, &entries
, &is_truncated
);
537 if (ret
< 0 && ret
!= -ENOENT
) {
538 derr
<< "ERROR: bi_list(): " << cpp_strerror(-ret
) << dendl
;
542 for (auto iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
543 rgw_cls_bi_entry
& entry
= *iter
;
545 formatter
->open_object_section("entry");
547 encode_json("shard_id", i
, formatter
);
548 encode_json("num_entry", total_entries
, formatter
);
549 encode_json("entry", entry
, formatter
);
556 cls_rgw_obj_key cls_key
;
558 rgw_bucket_category_stats stats
;
559 bool account
= entry
.get_info(&cls_key
, &category
, &stats
);
560 rgw_obj_key
key(cls_key
);
561 rgw_obj
obj(new_bucket_info
.bucket
, key
);
562 int ret
= store
->get_target_shard_id(new_bucket_info
, obj
.get_hash_object(), &target_shard_id
);
564 lderr(store
->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret
<< dendl
;
568 int shard_index
= (target_shard_id
> 0 ? target_shard_id
: 0);
570 ret
= target_shards_mgr
.add_entry(shard_index
, entry
, account
,
576 Clock::time_point now
= Clock::now();
577 if (reshard_lock
.should_renew(now
)) {
578 // assume outer locks have timespans at least the size of ours, so
579 // can call inside conditional
580 if (outer_reshard_lock
) {
581 ret
= outer_reshard_lock
->renew(now
);
586 ret
= reshard_lock
.renew(now
);
588 lderr(store
->ctx()) << "Error renewing bucket lock: " << ret
<< dendl
;
594 formatter
->close_section();
596 formatter
->flush(*out
);
598 } else if (out
&& !(total_entries
% 1000)) {
599 (*out
) << " " << total_entries
;
606 formatter
->close_section();
608 formatter
->flush(*out
);
611 (*out
) << " " << total_entries
<< std::endl
;
614 ret
= target_shards_mgr
.finish();
616 lderr(store
->ctx()) << "ERROR: failed to reshard" << dendl
;
620 ret
= rgw_link_bucket(store
, new_bucket_info
.owner
, new_bucket_info
.bucket
, bucket_info
.creation_time
);
622 lderr(store
->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info
.bucket
.bucket_id
<< ": " << cpp_strerror(-ret
) << ")" << dendl
;
626 ret
= bucket_info_updater
.complete();
628 ldout(store
->ctx(), 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
629 /* don't error out, reshard process succeeded */
633 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
634 } // RGWBucketReshard::do_reshard
636 int RGWBucketReshard::get_status(list
<cls_rgw_bucket_instance_entry
> *status
)
638 librados::IoCtx index_ctx
;
639 map
<int, string
> bucket_objs
;
641 int r
= store
->open_bucket_index(bucket_info
, index_ctx
, bucket_objs
);
646 for (auto i
: bucket_objs
) {
647 cls_rgw_bucket_instance_entry entry
;
649 int ret
= cls_rgw_get_bucket_resharding(index_ctx
, i
.second
, &entry
);
650 if (ret
< 0 && ret
!= -ENOENT
) {
651 lderr(store
->ctx()) << "ERROR: " << __func__
<< ": cls_rgw_get_bucket_resharding() returned ret=" << ret
<< dendl
;
655 status
->push_back(entry
);
662 int RGWBucketReshard::execute(int num_shards
, int max_op_entries
,
663 bool verbose
, ostream
*out
, Formatter
*formatter
,
664 RGWReshard
* reshard_log
)
666 Clock::time_point now
;
668 int ret
= reshard_lock
.lock();
673 RGWBucketInfo new_bucket_info
;
674 ret
= create_new_bucket_instance(num_shards
, new_bucket_info
);
676 // shard state is uncertain, but this will attempt to remove them anyway
681 ret
= reshard_log
->update(bucket_info
, new_bucket_info
);
687 // set resharding status of current bucket_info & shards with
688 // information about planned resharding
689 ret
= set_resharding_status(new_bucket_info
.bucket
.bucket_id
,
690 num_shards
, CLS_RGW_RESHARD_IN_PROGRESS
);
692 reshard_lock
.unlock();
696 ret
= do_reshard(num_shards
,
699 verbose
, out
, formatter
);
704 // at this point we've done the main work; we'll make a best-effort
705 // to clean-up but will not indicate any errors encountered
707 reshard_lock
.unlock();
709 // resharding successful, so remove old bucket index shards; use
710 // best effort and don't report out an error; the lock isn't needed
711 // at this point since all we're using a best effor to to remove old
713 ret
= store
->clean_bucket_index(bucket_info
, bucket_info
.num_shards
);
715 lderr(store
->ctx()) << "Error: " << __func__
<<
716 " failed to clean up old shards; " <<
717 "RGWRados::clean_bucket_index returned " << ret
<< dendl
;
720 ret
= rgw_bucket_instance_remove_entry(store
,
721 bucket_info
.bucket
.get_key(),
724 lderr(store
->ctx()) << "Error: " << __func__
<<
725 " failed to clean old bucket info object \"" <<
726 bucket_info
.bucket
.get_key() <<
727 "\"created after successful resharding with error " << ret
<< dendl
;
730 ldout(store
->ctx(), 1) << __func__
<<
731 " INFO: reshard of bucket \"" << bucket_info
.bucket
.name
<< "\" from \"" <<
732 bucket_info
.bucket
.get_key() << "\" to \"" <<
733 new_bucket_info
.bucket
.get_key() << "\" completed successfully" << dendl
;
739 reshard_lock
.unlock();
741 // since the real problem is the issue that led to this error code
742 // path, we won't touch ret and instead use another variable to
743 // temporarily error codes
744 int ret2
= store
->clean_bucket_index(new_bucket_info
,
745 new_bucket_info
.num_shards
);
747 lderr(store
->ctx()) << "Error: " << __func__
<<
748 " failed to clean up shards from failed incomplete resharding; " <<
749 "RGWRados::clean_bucket_index returned " << ret2
<< dendl
;
752 ret2
= rgw_bucket_instance_remove_entry(store
,
753 new_bucket_info
.bucket
.get_key(),
756 lderr(store
->ctx()) << "Error: " << __func__
<<
757 " failed to clean bucket info object \"" <<
758 new_bucket_info
.bucket
.get_key() <<
759 "\"created during incomplete resharding with error " << ret2
<< dendl
;
766 RGWReshard::RGWReshard(RGWRados
* _store
, bool _verbose
, ostream
*_out
,
767 Formatter
*_formatter
) :
768 store(_store
), instance_lock(bucket_instance_lock_name
),
769 verbose(_verbose
), out(_out
), formatter(_formatter
)
771 num_logshards
= store
->ctx()->_conf
->rgw_reshard_num_logs
;
774 string
RGWReshard::get_logshard_key(const string
& tenant
,
775 const string
& bucket_name
)
777 return tenant
+ ":" + bucket_name
;
780 #define MAX_RESHARD_LOGSHARDS_PRIME 7877
782 void RGWReshard::get_bucket_logshard_oid(const string
& tenant
, const string
& bucket_name
, string
*oid
)
784 string key
= get_logshard_key(tenant
, bucket_name
);
786 uint32_t sid
= ceph_str_hash_linux(key
.c_str(), key
.size());
787 uint32_t sid2
= sid
^ ((sid
& 0xFF) << 24);
788 sid
= sid2
% MAX_RESHARD_LOGSHARDS_PRIME
% num_logshards
;
790 get_logshard_oid(int(sid
), oid
);
793 int RGWReshard::add(cls_rgw_reshard_entry
& entry
)
795 if (!store
->can_reshard()) {
796 ldout(store
->ctx(), 20) << __func__
<< " Resharding is disabled" << dendl
;
802 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
804 librados::ObjectWriteOperation op
;
805 cls_rgw_reshard_add(op
, entry
);
807 int ret
= store
->reshard_pool_ctx
.operate(logshard_oid
, &op
);
809 lderr(store
->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
815 int RGWReshard::update(const RGWBucketInfo
& bucket_info
, const RGWBucketInfo
& new_bucket_info
)
817 cls_rgw_reshard_entry entry
;
818 entry
.bucket_name
= bucket_info
.bucket
.name
;
819 entry
.bucket_id
= bucket_info
.bucket
.bucket_id
;
820 entry
.tenant
= bucket_info
.owner
.tenant
;
822 int ret
= get(entry
);
827 entry
.new_instance_id
= new_bucket_info
.bucket
.name
+ ":" + new_bucket_info
.bucket
.bucket_id
;
831 ldout(store
->ctx(), 0) << __func__
<< ":Error in updating entry bucket " << entry
.bucket_name
<< ": " <<
832 cpp_strerror(-ret
) << dendl
;
839 int RGWReshard::list(int logshard_num
, string
& marker
, uint32_t max
, std::list
<cls_rgw_reshard_entry
>& entries
, bool *is_truncated
)
843 get_logshard_oid(logshard_num
, &logshard_oid
);
845 int ret
= cls_rgw_reshard_list(store
->reshard_pool_ctx
, logshard_oid
, marker
, max
, entries
, is_truncated
);
848 if (ret
== -ENOENT
) {
849 *is_truncated
= false;
852 lderr(store
->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid
<< dendl
;
853 if (ret
== -EACCES
) {
854 lderr(store
->ctx()) << "access denied to pool " << store
->get_zone_params().reshard_pool
855 << ". Fix the pool access permissions of your client" << dendl
;
862 int RGWReshard::get(cls_rgw_reshard_entry
& entry
)
866 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
868 int ret
= cls_rgw_reshard_get(store
->reshard_pool_ctx
, logshard_oid
, entry
);
870 if (ret
!= -ENOENT
) {
871 lderr(store
->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<<
872 " bucket=" << entry
.bucket_name
<< dendl
;
880 int RGWReshard::remove(cls_rgw_reshard_entry
& entry
)
884 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
886 librados::ObjectWriteOperation op
;
887 cls_rgw_reshard_remove(op
, entry
);
889 int ret
= store
->reshard_pool_ctx
.operate(logshard_oid
, &op
);
891 lderr(store
->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
898 int RGWReshard::clear_bucket_resharding(const string
& bucket_instance_oid
, cls_rgw_reshard_entry
& entry
)
900 int ret
= cls_rgw_clear_bucket_resharding(store
->reshard_pool_ctx
, bucket_instance_oid
);
902 lderr(store
->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid
<< dendl
;
909 const int num_retries
= 10;
910 const int default_reshard_sleep_duration
= 5;
912 int RGWReshardWait::do_wait()
914 Mutex::Locker
l(lock
);
916 cond
.WaitInterval(lock
, utime_t(default_reshard_sleep_duration
, 0));
925 int RGWReshardWait::block_while_resharding(RGWRados::BucketShard
*bs
,
926 string
*new_bucket_id
,
927 const RGWBucketInfo
& bucket_info
)
930 cls_rgw_bucket_instance_entry entry
;
932 for (int i
=0; i
< num_retries
; i
++) {
933 ret
= cls_rgw_get_bucket_resharding(bs
->index_ctx
, bs
->bucket_obj
, &entry
);
935 ldout(store
->ctx(), 0) << __func__
<< " ERROR: failed to get bucket resharding :" <<
936 cpp_strerror(-ret
)<< dendl
;
939 if (!entry
.resharding_in_progress()) {
940 *new_bucket_id
= entry
.new_bucket_instance_id
;
943 ldout(store
->ctx(), 20) << "NOTICE: reshard still in progress; " << (i
< num_retries
- 1 ? "retrying" : "too many retries") << dendl
;
945 if (i
== num_retries
- 1) {
949 // If bucket is erroneously marked as resharding (e.g., crash or
950 // other error) then fix it. If we can take the bucket reshard
951 // lock then it means no other resharding should be taking place,
952 // and we're free to clear the flags.
954 // since we expect to do this rarely, we'll do our work in a
955 // block and erase our work after each try
957 RGWObjectCtx
obj_ctx(bs
->store
);
958 const rgw_bucket
& b
= bs
->bucket
;
959 std::string bucket_id
= b
.get_key();
960 RGWBucketReshardLock
reshard_lock(bs
->store
, bucket_info
, true);
961 ret
= reshard_lock
.lock();
963 ldout(store
->ctx(), 20) << __func__
<<
964 " INFO: failed to take reshard lock for bucket " <<
965 bucket_id
<< "; expected if resharding underway" << dendl
;
967 ldout(store
->ctx(), 10) << __func__
<<
968 " INFO: was able to take reshard lock for bucket " <<
970 ret
= RGWBucketReshard::clear_resharding(bs
->store
, bucket_info
);
972 reshard_lock
.unlock();
973 ldout(store
->ctx(), 0) << __func__
<<
974 " ERROR: failed to clear resharding flags for bucket " <<
977 reshard_lock
.unlock();
978 ldout(store
->ctx(), 5) << __func__
<<
979 " INFO: apparently successfully cleared resharding flags for "
980 "bucket " << bucket_id
<< dendl
;
981 continue; // if we apparently succeed immediately test again
982 } // if clear resharding succeeded
983 } // if taking of lock succeeded
984 } // block to encapsulate recovery from incomplete reshard
988 ldout(store
->ctx(), 0) << __func__
<< " ERROR: bucket is still resharding, please retry" << dendl
;
992 ldout(store
->ctx(), 0) << __func__
<< " ERROR: bucket is still resharding, please retry" << dendl
;
993 return -ERR_BUSY_RESHARDING
;
996 int RGWReshard::process_single_logshard(int logshard_num
)
999 bool truncated
= true;
1001 CephContext
*cct
= store
->ctx();
1002 constexpr uint32_t max_entries
= 1000;
1004 string logshard_oid
;
1005 get_logshard_oid(logshard_num
, &logshard_oid
);
1007 RGWBucketReshardLock
logshard_lock(store
, logshard_oid
, false);
1009 int ret
= logshard_lock
.lock();
1010 if (ret
== -EBUSY
) { /* already locked by another processor */
1011 ldout(store
->ctx(), 5) << __func__
<< "(): failed to acquire lock on " <<
1012 logshard_oid
<< dendl
;
1017 std::list
<cls_rgw_reshard_entry
> entries
;
1018 ret
= list(logshard_num
, marker
, max_entries
, entries
, &truncated
);
1020 ldout(cct
, 10) << "cannot list all reshards in logshard oid=" <<
1021 logshard_oid
<< dendl
;
1025 for(auto& entry
: entries
) { // logshard entries
1026 if(entry
.new_instance_id
.empty()) {
1028 ldout(store
->ctx(), 20) << __func__
<< " resharding " <<
1029 entry
.bucket_name
<< dendl
;
1031 RGWObjectCtx
obj_ctx(store
);
1033 RGWBucketInfo bucket_info
;
1034 map
<string
, bufferlist
> attrs
;
1036 ret
= store
->get_bucket_info(obj_ctx
, entry
.tenant
, entry
.bucket_name
,
1037 bucket_info
, nullptr, &attrs
);
1039 ldout(cct
, 0) << __func__
<< ": Error in get_bucket_info: " <<
1040 cpp_strerror(-ret
) << dendl
;
1044 RGWBucketReshard
br(store
, bucket_info
, attrs
, nullptr);
1046 Formatter
* formatter
= new JSONFormatter(false);
1047 auto formatter_ptr
= std::unique_ptr
<Formatter
>(formatter
);
1048 ret
= br
.execute(entry
.new_num_shards
, max_entries
, true, nullptr,
1051 ldout (store
->ctx(), 0) << __func__
<<
1052 "ERROR in reshard_bucket " << entry
.bucket_name
<< ":" <<
1053 cpp_strerror(-ret
)<< dendl
;
1057 ldout (store
->ctx(), 20) << " removing entry" << entry
.bucket_name
<<
1060 ret
= remove(entry
);
1062 ldout(cct
, 0)<< __func__
<< ":Error removing bucket " <<
1063 entry
.bucket_name
<< " for resharding queue: " <<
1064 cpp_strerror(-ret
) << dendl
;
1069 Clock::time_point now
= Clock::now();
1070 if (logshard_lock
.should_renew(now
)) {
1071 ret
= logshard_lock
.renew(now
);
1077 entry
.get_key(&marker
);
1079 } while (truncated
);
1081 logshard_lock
.unlock();
1086 void RGWReshard::get_logshard_oid(int shard_num
, string
*logshard
)
1089 snprintf(buf
, sizeof(buf
), "%010u", (unsigned)shard_num
);
1091 string
objname(reshard_oid_prefix
);
1092 *logshard
= objname
+ buf
;
1095 int RGWReshard::process_all_logshards()
1097 if (!store
->can_reshard()) {
1098 ldout(store
->ctx(), 20) << __func__
<< " Resharding is disabled" << dendl
;
1103 for (int i
= 0; i
< num_logshards
; i
++) {
1105 get_logshard_oid(i
, &logshard
);
1107 ldout(store
->ctx(), 20) << "proceeding logshard = " << logshard
<< dendl
;
1109 ret
= process_single_logshard(i
);
1118 bool RGWReshard::going_down()
1123 void RGWReshard::start_processor()
1125 worker
= new ReshardWorker(store
->ctx(), this);
1126 worker
->create("rgw_reshard");
1129 void RGWReshard::stop_processor()
1140 void *RGWReshard::ReshardWorker::entry() {
1143 utime_t start
= ceph_clock_now();
1144 if (reshard
->process_all_logshards()) {
1145 /* All shards have been processed properly. Next time we can start
1146 * from this moment. */
1150 if (reshard
->going_down())
1153 utime_t end
= ceph_clock_now();
1155 int secs
= cct
->_conf
->rgw_reshard_thread_interval
;
1157 if (secs
<= end
.sec())
1158 continue; // next round
1163 cond
.WaitInterval(lock
, utime_t(secs
, 0));
1165 } while (!reshard
->going_down());
1170 void RGWReshard::ReshardWorker::stop()
1172 Mutex::Locker
l(lock
);