1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "rgw_bucket.h"
6 #include "rgw_reshard.h"
7 #include "cls/rgw/cls_rgw_client.h"
8 #include "cls/lock/cls_lock_client.h"
9 #include "common/errno.h"
10 #include "common/ceph_json.h"
12 #include "common/dout.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rgw
17 const string reshard_oid_prefix
= "reshard.";
18 const string reshard_lock_name
= "reshard_process";
19 const string bucket_instance_lock_name
= "bucket_instance_lock";
23 #define RESHARD_SHARD_WINDOW 64
24 #define RESHARD_MAX_AIO 128
26 class BucketReshardShard
{
28 const RGWBucketInfo
& bucket_info
;
30 RGWRados::BucketShard bs
;
31 vector
<rgw_cls_bi_entry
> entries
;
32 map
<uint8_t, rgw_bucket_category_stats
> stats
;
33 deque
<librados::AioCompletion
*>& aio_completions
;
35 int wait_next_completion() {
36 librados::AioCompletion
*c
= aio_completions
.front();
37 aio_completions
.pop_front();
41 int ret
= c
->get_return_value();
45 derr
<< "ERROR: reshard rados operation failed: " << cpp_strerror(-ret
) << dendl
;
52 int get_completion(librados::AioCompletion
**c
) {
53 if (aio_completions
.size() >= RESHARD_MAX_AIO
) {
54 int ret
= wait_next_completion();
60 *c
= librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
61 aio_completions
.push_back(*c
);
67 BucketReshardShard(RGWRados
*_store
, const RGWBucketInfo
& _bucket_info
,
69 deque
<librados::AioCompletion
*>& _completions
) : store(_store
), bucket_info(_bucket_info
), bs(store
),
70 aio_completions(_completions
) {
71 num_shard
= (bucket_info
.num_shards
> 0 ? _num_shard
: -1);
72 bs
.init(bucket_info
.bucket
, num_shard
);
79 int add_entry(rgw_cls_bi_entry
& entry
, bool account
, uint8_t category
,
80 const rgw_bucket_category_stats
& entry_stats
) {
81 entries
.push_back(entry
);
83 rgw_bucket_category_stats
& target
= stats
[category
];
84 target
.num_entries
+= entry_stats
.num_entries
;
85 target
.total_size
+= entry_stats
.total_size
;
86 target
.total_size_rounded
+= entry_stats
.total_size_rounded
;
88 if (entries
.size() >= RESHARD_SHARD_WINDOW
) {
97 if (entries
.size() == 0) {
101 librados::ObjectWriteOperation op
;
102 for (auto& entry
: entries
) {
103 store
->bi_put(op
, bs
, entry
);
105 cls_rgw_bucket_update_stats(op
, false, stats
);
107 librados::AioCompletion
*c
;
108 int ret
= get_completion(&c
);
112 ret
= bs
.index_ctx
.aio_operate(bs
.bucket_obj
, c
, &op
);
114 derr
<< "ERROR: failed to store entries in target bucket shard (bs=" << bs
.bucket
<< "/" << bs
.shard_id
<< ") error=" << cpp_strerror(-ret
) << dendl
;
124 while (!aio_completions
.empty()) {
125 int r
= wait_next_completion();
134 class BucketReshardManager
{
136 const RGWBucketInfo
& target_bucket_info
;
137 deque
<librados::AioCompletion
*> completions
;
138 int num_target_shards
;
139 vector
<BucketReshardShard
*> target_shards
;
142 BucketReshardManager(RGWRados
*_store
, const RGWBucketInfo
& _target_bucket_info
, int _num_target_shards
) : store(_store
), target_bucket_info(_target_bucket_info
),
143 num_target_shards(_num_target_shards
) {
144 target_shards
.resize(num_target_shards
);
145 for (int i
= 0; i
< num_target_shards
; ++i
) {
146 target_shards
[i
] = new BucketReshardShard(store
, target_bucket_info
, i
, completions
);
150 ~BucketReshardManager() {
151 for (auto& shard
: target_shards
) {
152 int ret
= shard
->wait_all_aio();
154 ldout(store
->ctx(), 20) << __func__
<< ": shard->wait_all_aio() returned ret=" << ret
<< dendl
;
159 int add_entry(int shard_index
,
160 rgw_cls_bi_entry
& entry
, bool account
, uint8_t category
,
161 const rgw_bucket_category_stats
& entry_stats
) {
162 int ret
= target_shards
[shard_index
]->add_entry(entry
, account
, category
, entry_stats
);
164 derr
<< "ERROR: target_shards.add_entry(" << entry
.idx
<< ") returned error: " << cpp_strerror(-ret
) << dendl
;
172 for (auto& shard
: target_shards
) {
173 int r
= shard
->flush();
175 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r
) << dendl
;
179 for (auto& shard
: target_shards
) {
180 int r
= shard
->wait_all_aio();
182 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r
) << dendl
;
187 target_shards
.clear();
192 RGWBucketReshard::RGWBucketReshard(RGWRados
*_store
, const RGWBucketInfo
& _bucket_info
, const map
<string
, bufferlist
>& _bucket_attrs
) :
193 store(_store
), bucket_info(_bucket_info
), bucket_attrs(_bucket_attrs
),
194 reshard_lock(reshard_lock_name
) {
195 const rgw_bucket
& b
= bucket_info
.bucket
;
196 reshard_oid
= b
.tenant
+ (b
.tenant
.empty() ? "" : ":") + b
.name
+ ":" + b
.bucket_id
;
198 utime_t
lock_duration(store
->ctx()->_conf
->rgw_reshard_bucket_lock_duration
, 0);
199 #define COOKIE_LEN 16
200 char cookie_buf
[COOKIE_LEN
+ 1];
201 gen_rand_alphanumeric(store
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
202 cookie_buf
[COOKIE_LEN
] = '\0';
204 reshard_lock
.set_cookie(cookie_buf
);
205 reshard_lock
.set_duration(lock_duration
);
208 int RGWBucketReshard::lock_bucket()
210 int ret
= reshard_lock
.lock_exclusive(&store
->reshard_pool_ctx
, reshard_oid
);
212 ldout(store
->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid
<< " ret=" << ret
<< dendl
;
218 void RGWBucketReshard::unlock_bucket()
220 int ret
= reshard_lock
.unlock(&store
->reshard_pool_ctx
, reshard_oid
);
222 ldout(store
->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid
<< " ret=" << ret
<< dendl
;
226 int RGWBucketReshard::set_resharding_status(const string
& new_instance_id
, int32_t num_shards
, cls_rgw_reshard_status status
)
228 if (new_instance_id
.empty()) {
229 ldout(store
->ctx(), 0) << __func__
<< " missing new bucket instance id" << dendl
;
233 cls_rgw_bucket_instance_entry instance_entry
;
234 instance_entry
.set_status(new_instance_id
, num_shards
, status
);
236 int ret
= store
->bucket_set_reshard(bucket_info
, instance_entry
);
238 ldout(store
->ctx(), 0) << "RGWReshard::" << __func__
<< " ERROR: error setting bucket resharding flag on bucket index: "
239 << cpp_strerror(-ret
) << dendl
;
245 int RGWBucketReshard::clear_resharding()
247 cls_rgw_bucket_instance_entry instance_entry
;
249 int ret
= store
->bucket_set_reshard(bucket_info
, instance_entry
);
251 ldout(store
->ctx(), 0) << "RGWReshard::" << __func__
<< " ERROR: error setting bucket resharding flag on bucket index: "
252 << cpp_strerror(-ret
) << dendl
;
258 static int create_new_bucket_instance(RGWRados
*store
,
260 const RGWBucketInfo
& bucket_info
,
261 map
<string
, bufferlist
>& attrs
,
262 RGWBucketInfo
& new_bucket_info
)
264 new_bucket_info
= bucket_info
;
266 store
->create_bucket_id(&new_bucket_info
.bucket
.bucket_id
);
267 new_bucket_info
.bucket
.oid
.clear();
269 new_bucket_info
.num_shards
= new_num_shards
;
270 new_bucket_info
.objv_tracker
.clear();
272 new_bucket_info
.new_bucket_instance_id
.clear();
273 new_bucket_info
.reshard_status
= 0;
275 int ret
= store
->init_bucket_index(new_bucket_info
, new_bucket_info
.num_shards
);
277 cerr
<< "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret
) << std::endl
;
281 ret
= store
->put_bucket_instance_info(new_bucket_info
, true, real_time(), &attrs
);
283 cerr
<< "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret
) << std::endl
;
290 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards
,
291 RGWBucketInfo
& new_bucket_info
)
293 return ::create_new_bucket_instance(store
, new_num_shards
, bucket_info
, bucket_attrs
, new_bucket_info
);
296 int RGWBucketReshard::cancel()
298 int ret
= lock_bucket();
303 ret
= clear_resharding();
309 class BucketInfoReshardUpdate
312 RGWBucketInfo bucket_info
;
313 std::map
<string
, bufferlist
> bucket_attrs
;
315 bool in_progress
{false};
317 int set_status(cls_rgw_reshard_status s
) {
318 bucket_info
.reshard_status
= s
;
319 int ret
= store
->put_bucket_instance_info(bucket_info
, false, real_time(), &bucket_attrs
);
321 ldout(store
->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret
<< dendl
;
328 BucketInfoReshardUpdate(RGWRados
*_store
, RGWBucketInfo
& _bucket_info
,
329 map
<string
, bufferlist
>& _bucket_attrs
, const string
& new_bucket_id
) : store(_store
),
330 bucket_info(_bucket_info
),
331 bucket_attrs(_bucket_attrs
) {
332 bucket_info
.new_bucket_instance_id
= new_bucket_id
;
334 ~BucketInfoReshardUpdate() {
336 bucket_info
.new_bucket_instance_id
.clear();
337 set_status(CLS_RGW_RESHARD_NONE
);
342 int ret
= set_status(CLS_RGW_RESHARD_IN_PROGRESS
);
351 int ret
= set_status(CLS_RGW_RESHARD_DONE
);
360 int RGWBucketReshard::do_reshard(
362 RGWBucketInfo
& new_bucket_info
,
366 Formatter
*formatter
)
368 rgw_bucket
& bucket
= bucket_info
.bucket
;
373 (*out
) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl
;
374 (*out
) << "*** these will need to be removed manually ***" << std::endl
;
375 (*out
) << "tenant: " << bucket_info
.bucket
.tenant
<< std::endl
;
376 (*out
) << "bucket name: " << bucket_info
.bucket
.name
<< std::endl
;
377 (*out
) << "old bucket instance id: " << bucket_info
.bucket
.bucket_id
<< std::endl
;
378 (*out
) << "new bucket instance id: " << new_bucket_info
.bucket
.bucket_id
<< std::endl
;
381 /* update bucket info -- in progress*/
382 list
<rgw_cls_bi_entry
> entries
;
384 if (max_entries
< 0) {
385 ldout(store
->ctx(), 0) << __func__
<< ": can't reshard, negative max_entries" << dendl
;
389 BucketInfoReshardUpdate
bucket_info_updater(store
, bucket_info
, bucket_attrs
, new_bucket_info
.bucket
.bucket_id
);
391 ret
= bucket_info_updater
.start();
393 ldout(store
->ctx(), 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
397 int num_target_shards
= (new_bucket_info
.num_shards
> 0 ? new_bucket_info
.num_shards
: 1);
399 BucketReshardManager
target_shards_mgr(store
, new_bucket_info
, num_target_shards
);
401 verbose
= verbose
&& (formatter
!= nullptr);
404 formatter
->open_array_section("entries");
407 uint64_t total_entries
= 0;
410 cout
<< "total entries:";
413 int num_source_shards
= (bucket_info
.num_shards
> 0 ? bucket_info
.num_shards
: 1);
415 for (int i
= 0; i
< num_source_shards
; ++i
) {
416 bool is_truncated
= true;
418 while (is_truncated
) {
420 ret
= store
->bi_list(bucket
, i
, string(), marker
, max_entries
, &entries
, &is_truncated
);
421 if (ret
< 0 && ret
!= -ENOENT
) {
422 derr
<< "ERROR: bi_list(): " << cpp_strerror(-ret
) << dendl
;
426 list
<rgw_cls_bi_entry
>::iterator iter
;
427 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
428 rgw_cls_bi_entry
& entry
= *iter
;
430 formatter
->open_object_section("entry");
432 encode_json("shard_id", i
, formatter
);
433 encode_json("num_entry", total_entries
, formatter
);
434 encode_json("entry", entry
, formatter
);
441 cls_rgw_obj_key cls_key
;
443 rgw_bucket_category_stats stats
;
444 bool account
= entry
.get_info(&cls_key
, &category
, &stats
);
445 rgw_obj_key
key(cls_key
);
446 rgw_obj
obj(new_bucket_info
.bucket
, key
);
447 int ret
= store
->get_target_shard_id(new_bucket_info
, obj
.get_hash_object(), &target_shard_id
);
449 lderr(store
->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret
<< dendl
;
453 int shard_index
= (target_shard_id
> 0 ? target_shard_id
: 0);
455 ret
= target_shards_mgr
.add_entry(shard_index
, entry
, account
, category
, stats
);
460 formatter
->close_section();
462 formatter
->flush(*out
);
463 formatter
->flush(*out
);
465 } else if (out
&& !(total_entries
% 1000)) {
466 (*out
) << " " << total_entries
;
472 formatter
->close_section();
474 formatter
->flush(*out
);
477 (*out
) << " " << total_entries
<< std::endl
;
480 ret
= target_shards_mgr
.finish();
482 lderr(store
->ctx()) << "ERROR: failed to reshard" << dendl
;
486 ret
= rgw_link_bucket(store
, new_bucket_info
.owner
, new_bucket_info
.bucket
, bucket_info
.creation_time
);
488 lderr(store
->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info
.bucket
.bucket_id
<< ": " << cpp_strerror(-ret
) << ")" << dendl
;
492 ret
= bucket_info_updater
.complete();
494 ldout(store
->ctx(), 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
495 /* don't error out, reshard process succeeded */
500 int RGWBucketReshard::get_status(list
<cls_rgw_bucket_instance_entry
> *status
)
502 librados::IoCtx index_ctx
;
503 map
<int, string
> bucket_objs
;
505 int r
= store
->open_bucket_index(bucket_info
, index_ctx
, bucket_objs
);
510 for (auto i
: bucket_objs
) {
511 cls_rgw_bucket_instance_entry entry
;
513 int ret
= cls_rgw_get_bucket_resharding(index_ctx
, i
.second
, &entry
);
514 if (ret
< 0 && ret
!= -ENOENT
) {
515 lderr(store
->ctx()) << "ERROR: " << __func__
<< ": cls_rgw_get_bucket_resharding() returned ret=" << ret
<< dendl
;
519 status
->push_back(entry
);
525 int RGWBucketReshard::execute(int num_shards
, int max_op_entries
,
526 bool verbose
, ostream
*out
, Formatter
*formatter
, RGWReshard
* reshard_log
)
529 int ret
= lock_bucket();
534 RGWBucketInfo new_bucket_info
;
535 ret
= create_new_bucket_instance(num_shards
, new_bucket_info
);
542 ret
= reshard_log
->update(bucket_info
, new_bucket_info
);
549 ret
= set_resharding_status(new_bucket_info
.bucket
.bucket_id
, num_shards
, CLS_RGW_RESHARD_IN_PROGRESS
);
555 ret
= do_reshard(num_shards
,
558 verbose
, out
, formatter
);
565 ret
= set_resharding_status(new_bucket_info
.bucket
.bucket_id
, num_shards
, CLS_RGW_RESHARD_DONE
);
577 RGWReshard::RGWReshard(RGWRados
* _store
, bool _verbose
, ostream
*_out
,
578 Formatter
*_formatter
) : store(_store
), instance_lock(bucket_instance_lock_name
),
579 verbose(_verbose
), out(_out
), formatter(_formatter
)
581 num_logshards
= store
->ctx()->_conf
->rgw_reshard_num_logs
;
584 string
RGWReshard::get_logshard_key(const string
& tenant
, const string
& bucket_name
)
586 return tenant
+ ":" + bucket_name
;
589 #define MAX_RESHARD_LOGSHARDS_PRIME 7877
591 void RGWReshard::get_bucket_logshard_oid(const string
& tenant
, const string
& bucket_name
, string
*oid
)
593 string key
= get_logshard_key(tenant
, bucket_name
);
595 uint32_t sid
= ceph_str_hash_linux(key
.c_str(), key
.size());
596 uint32_t sid2
= sid
^ ((sid
& 0xFF) << 24);
597 sid
= sid2
% MAX_RESHARD_LOGSHARDS_PRIME
% num_logshards
;
598 int logshard
= sid
% num_logshards
;
600 get_logshard_oid(logshard
, oid
);
603 int RGWReshard::add(cls_rgw_reshard_entry
& entry
)
605 if (!store
->can_reshard()) {
606 ldout(store
->ctx(), 20) << __func__
<< " Resharding is disabled" << dendl
;
612 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
614 librados::ObjectWriteOperation op
;
615 cls_rgw_reshard_add(op
, entry
);
617 int ret
= store
->reshard_pool_ctx
.operate(logshard_oid
, &op
);
619 lderr(store
->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
625 int RGWReshard::update(const RGWBucketInfo
& bucket_info
, const RGWBucketInfo
& new_bucket_info
)
627 cls_rgw_reshard_entry entry
;
628 entry
.bucket_name
= bucket_info
.bucket
.name
;
629 entry
.bucket_id
= bucket_info
.bucket
.bucket_id
;
630 entry
.tenant
= bucket_info
.owner
.tenant
;
632 int ret
= get(entry
);
637 entry
.new_instance_id
= new_bucket_info
.bucket
.name
+ ":" + new_bucket_info
.bucket
.bucket_id
;
641 ldout(store
->ctx(), 0) << __func__
<< ":Error in updating entry bucket " << entry
.bucket_name
<< ": " <<
642 cpp_strerror(-ret
) << dendl
;
649 int RGWReshard::list(int logshard_num
, string
& marker
, uint32_t max
, std::list
<cls_rgw_reshard_entry
>& entries
, bool *is_truncated
)
653 get_logshard_oid(logshard_num
, &logshard_oid
);
655 int ret
= cls_rgw_reshard_list(store
->reshard_pool_ctx
, logshard_oid
, marker
, max
, entries
, is_truncated
);
658 if (ret
== -ENOENT
) {
659 *is_truncated
= false;
662 lderr(store
->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid
<< dendl
;
663 if (ret
== -EACCES
) {
664 lderr(store
->ctx()) << "access denied to pool " << store
->get_zone_params().reshard_pool
665 << ". Fix the pool access permissions of your client" << dendl
;
672 int RGWReshard::get(cls_rgw_reshard_entry
& entry
)
676 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
678 int ret
= cls_rgw_reshard_get(store
->reshard_pool_ctx
, logshard_oid
, entry
);
680 if (ret
!= -ENOENT
) {
681 lderr(store
->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<<
682 " bucket=" << entry
.bucket_name
<< dendl
;
690 int RGWReshard::remove(cls_rgw_reshard_entry
& entry
)
694 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
696 librados::ObjectWriteOperation op
;
697 cls_rgw_reshard_remove(op
, entry
);
699 int ret
= store
->reshard_pool_ctx
.operate(logshard_oid
, &op
);
701 lderr(store
->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
708 int RGWReshard::clear_bucket_resharding(const string
& bucket_instance_oid
, cls_rgw_reshard_entry
& entry
)
710 int ret
= cls_rgw_clear_bucket_resharding(store
->reshard_pool_ctx
, bucket_instance_oid
);
712 lderr(store
->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid
<< dendl
;
719 const int num_retries
= 10;
720 const int default_reshard_sleep_duration
= 5;
722 int RGWReshardWait::do_wait()
724 Mutex::Locker
l(lock
);
726 cond
.WaitInterval(lock
, utime_t(default_reshard_sleep_duration
, 0));
735 int RGWReshardWait::block_while_resharding(RGWRados::BucketShard
*bs
, string
*new_bucket_id
)
738 cls_rgw_bucket_instance_entry entry
;
740 for (int i
=0; i
< num_retries
;i
++) {
741 ret
= cls_rgw_get_bucket_resharding(bs
->index_ctx
, bs
->bucket_obj
, &entry
);
743 ldout(store
->ctx(), 0) << __func__
<< " ERROR: failed to get bucket resharding :" <<
744 cpp_strerror(-ret
)<< dendl
;
747 if (!entry
.resharding_in_progress()) {
748 *new_bucket_id
= entry
.new_bucket_instance_id
;
751 ldout(store
->ctx(), 20) << "NOTICE: reshard still in progress; " << (i
< num_retries
- 1 ? "retrying" : "too many retries") << dendl
;
752 /* needed to unlock as clear resharding uses the same lock */
754 if (i
== num_retries
- 1) {
760 ldout(store
->ctx(), 0) << __func__
<< " ERROR: bucket is still resharding, please retry" << dendl
;
764 ldout(store
->ctx(), 0) << __func__
<< " ERROR: bucket is still resharding, please retry" << dendl
;
765 return -ERR_BUSY_RESHARDING
;
768 int RGWReshard::process_single_logshard(int logshard_num
)
771 bool truncated
= true;
773 CephContext
*cct
= store
->ctx();
774 int max_entries
= 1000;
777 rados::cls::lock::Lock
l(reshard_lock_name
);
779 utime_t
time(max_secs
, 0);
780 l
.set_duration(time
);
782 char cookie_buf
[COOKIE_LEN
+ 1];
783 gen_rand_alphanumeric(store
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
784 cookie_buf
[COOKIE_LEN
] = '\0';
786 l
.set_cookie(cookie_buf
);
789 get_logshard_oid(logshard_num
, &logshard_oid
);
791 int ret
= l
.lock_exclusive(&store
->reshard_pool_ctx
, logshard_oid
);
792 if (ret
== -EBUSY
) { /* already locked by another processor */
793 ldout(store
->ctx(), 5) << __func__
<< "(): failed to acquire lock on " << logshard_oid
<< dendl
;
797 utime_t lock_start_time
= ceph_clock_now();
800 std::list
<cls_rgw_reshard_entry
> entries
;
801 ret
= list(logshard_num
, marker
, max_entries
, entries
, &truncated
);
803 ldout(cct
, 10) << "cannot list all reshards in logshard oid=" << logshard_oid
<< dendl
;
807 for(auto& entry
: entries
) {
808 if(entry
.new_instance_id
.empty()) {
810 ldout(store
->ctx(), 20) << __func__
<< " resharding " << entry
.bucket_name
<< dendl
;
812 RGWObjectCtx
obj_ctx(store
);
814 RGWBucketInfo bucket_info
;
815 map
<string
, bufferlist
> attrs
;
817 ret
= store
->get_bucket_info(obj_ctx
, entry
.tenant
, entry
.bucket_name
, bucket_info
, nullptr,
820 ldout(cct
, 0) << __func__
<< ": Error in get_bucket_info: " << cpp_strerror(-ret
) << dendl
;
824 RGWBucketReshard
br(store
, bucket_info
, attrs
);
826 Formatter
* formatter
= new JSONFormatter(false);
827 auto formatter_ptr
= std::unique_ptr
<Formatter
>(formatter
);
828 ret
= br
.execute(entry
.new_num_shards
, max_entries
, true,nullptr, formatter
, this);
830 ldout (store
->ctx(), 0) << __func__
<< "ERROR in reshard_bucket " << entry
.bucket_name
<< ":" <<
831 cpp_strerror(-ret
)<< dendl
;
835 ldout (store
->ctx(), 20) << " removing entry" << entry
.bucket_name
<< dendl
;
839 ldout(cct
, 0)<< __func__
<< ":Error removing bucket " << entry
.bucket_name
<< " for resharding queue: "
840 << cpp_strerror(-ret
) << dendl
;
844 utime_t now
= ceph_clock_now();
846 if (now
> lock_start_time
+ max_secs
/ 2) { /* do you need to renew lock? */
848 ret
= l
.lock_exclusive(&store
->reshard_pool_ctx
, logshard_oid
);
849 if (ret
== -EBUSY
) { /* already locked by another processor */
850 ldout(store
->ctx(), 5) << __func__
<< "(): failed to acquire lock on " << logshard_oid
<< dendl
;
853 lock_start_time
= now
;
855 entry
.get_key(&marker
);
859 l
.unlock(&store
->reshard_pool_ctx
, logshard_oid
);
864 void RGWReshard::get_logshard_oid(int shard_num
, string
*logshard
)
867 snprintf(buf
, sizeof(buf
), "%010u", (unsigned)shard_num
);
869 string
objname(reshard_oid_prefix
);
870 *logshard
= objname
+ buf
;
873 int RGWReshard::process_all_logshards()
875 if (!store
->can_reshard()) {
876 ldout(store
->ctx(), 20) << __func__
<< " Resharding is disabled" << dendl
;
881 for (int i
= 0; i
< num_logshards
; i
++) {
883 get_logshard_oid(i
, &logshard
);
885 ldout(store
->ctx(), 20) << "proceeding logshard = " << logshard
<< dendl
;
887 ret
= process_single_logshard(i
);
896 bool RGWReshard::going_down()
901 void RGWReshard::start_processor()
903 worker
= new ReshardWorker(store
->ctx(), this);
904 worker
->create("rgw_reshard");
907 void RGWReshard::stop_processor()
918 void *RGWReshard::ReshardWorker::entry() {
921 utime_t start
= ceph_clock_now();
922 if (reshard
->process_all_logshards()) {
923 /* All shards have been processed properly. Next time we can start
924 * from this moment. */
928 if (reshard
->going_down())
931 utime_t end
= ceph_clock_now();
933 int secs
= cct
->_conf
->rgw_reshard_thread_interval
;
935 if (secs
<= end
.sec())
936 continue; // next round
941 cond
.WaitInterval(lock
, utime_t(secs
, 0));
943 } while (!reshard
->going_down());
948 void RGWReshard::ReshardWorker::stop()
950 Mutex::Locker
l(lock
);