1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #include "rgw_bucket.h"
9 #include "rgw_reshard.h"
11 #include "rgw_sal_rados.h"
12 #include "cls/rgw/cls_rgw_client.h"
13 #include "cls/lock/cls_lock_client.h"
14 #include "common/errno.h"
15 #include "common/ceph_json.h"
17 #include "common/dout.h"
19 #include "services/svc_zone.h"
20 #include "services/svc_sys_obj.h"
21 #include "services/svc_tier_rados.h"
23 #define dout_context g_ceph_context
24 #define dout_subsys ceph_subsys_rgw
28 const string reshard_oid_prefix
= "reshard.";
29 const string reshard_lock_name
= "reshard_process";
30 const string bucket_instance_lock_name
= "bucket_instance_lock";
32 /* All primes up to 2000 used to attempt to make dynamic sharding use
33 * a prime numbers of shards. Note: this list also includes 1 for when
34 * 1 shard is the most appropriate, even though 1 is not prime.
36 const std::initializer_list
<uint16_t> RGWBucketReshard::reshard_primes
= {
37 1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61,
38 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137,
39 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211,
40 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283,
41 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379,
42 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461,
43 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563,
44 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643,
45 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739,
46 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829,
47 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937,
48 941, 947, 953, 967, 971, 977, 983, 991, 997, 1009, 1013, 1019, 1021,
49 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093,
50 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181,
51 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259,
52 1277, 1279, 1283, 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321,
53 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, 1429, 1433,
54 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493,
55 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579,
56 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, 1627, 1637, 1657,
57 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741,
58 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831,
59 1847, 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, 1913,
60 1931, 1933, 1949, 1951, 1973, 1979, 1987, 1993, 1997, 1999
63 class BucketReshardShard
{
64 rgw::sal::RadosStore
* store
;
65 const RGWBucketInfo
& bucket_info
;
67 const rgw::bucket_index_layout_generation
& idx_layout
;
68 RGWRados::BucketShard bs
;
69 vector
<rgw_cls_bi_entry
> entries
;
70 map
<RGWObjCategory
, rgw_bucket_category_stats
> stats
;
71 deque
<librados::AioCompletion
*>& aio_completions
;
72 uint64_t max_aio_completions
;
73 uint64_t reshard_shard_batch_size
;
75 int wait_next_completion() {
76 librados::AioCompletion
*c
= aio_completions
.front();
77 aio_completions
.pop_front();
79 c
->wait_for_complete();
81 int ret
= c
->get_return_value();
85 derr
<< "ERROR: reshard rados operation failed: " << cpp_strerror(-ret
) << dendl
;
92 int get_completion(librados::AioCompletion
**c
) {
93 if (aio_completions
.size() >= max_aio_completions
) {
94 int ret
= wait_next_completion();
100 *c
= librados::Rados::aio_create_completion(nullptr, nullptr);
101 aio_completions
.push_back(*c
);
107 BucketReshardShard(const DoutPrefixProvider
*dpp
,
108 rgw::sal::RadosStore
* _store
, const RGWBucketInfo
& _bucket_info
,
109 int _num_shard
, const rgw::bucket_index_layout_generation
& _idx_layout
,
110 deque
<librados::AioCompletion
*>& _completions
) :
111 store(_store
), bucket_info(_bucket_info
), idx_layout(_idx_layout
), bs(store
->getRados()),
112 aio_completions(_completions
)
114 num_shard
= (idx_layout
.layout
.normal
.num_shards
> 0 ? _num_shard
: -1);
116 bs
.init(bucket_info
.bucket
, num_shard
, idx_layout
, nullptr /* no RGWBucketInfo */, dpp
);
118 max_aio_completions
=
119 store
->ctx()->_conf
.get_val
<uint64_t>("rgw_reshard_max_aio");
120 reshard_shard_batch_size
=
121 store
->ctx()->_conf
.get_val
<uint64_t>("rgw_reshard_batch_size");
124 int get_num_shard() {
128 int add_entry(rgw_cls_bi_entry
& entry
, bool account
, RGWObjCategory category
,
129 const rgw_bucket_category_stats
& entry_stats
) {
130 entries
.push_back(entry
);
132 rgw_bucket_category_stats
& target
= stats
[category
];
133 target
.num_entries
+= entry_stats
.num_entries
;
134 target
.total_size
+= entry_stats
.total_size
;
135 target
.total_size_rounded
+= entry_stats
.total_size_rounded
;
136 target
.actual_size
+= entry_stats
.actual_size
;
138 if (entries
.size() >= reshard_shard_batch_size
) {
149 if (entries
.size() == 0) {
153 librados::ObjectWriteOperation op
;
154 for (auto& entry
: entries
) {
155 store
->getRados()->bi_put(op
, bs
, entry
);
157 cls_rgw_bucket_update_stats(op
, false, stats
);
159 librados::AioCompletion
*c
;
160 int ret
= get_completion(&c
);
164 ret
= bs
.bucket_obj
.aio_operate(c
, &op
);
166 derr
<< "ERROR: failed to store entries in target bucket shard (bs=" << bs
.bucket
<< "/" << bs
.shard_id
<< ") error=" << cpp_strerror(-ret
) << dendl
;
176 while (!aio_completions
.empty()) {
177 int r
= wait_next_completion();
184 }; // class BucketReshardShard
187 class BucketReshardManager
{
188 rgw::sal::RadosStore
* store
;
189 const RGWBucketInfo
& target_bucket_info
;
190 deque
<librados::AioCompletion
*> completions
;
191 int num_target_shards
;
192 vector
<BucketReshardShard
*> target_shards
;
195 BucketReshardManager(const DoutPrefixProvider
*dpp
,
196 rgw::sal::RadosStore
* _store
,
197 const RGWBucketInfo
& _target_bucket_info
,
198 int _num_target_shards
) :
199 store(_store
), target_bucket_info(_target_bucket_info
),
200 num_target_shards(_num_target_shards
)
202 const auto& idx_layout
= target_bucket_info
.layout
.current_index
;
203 target_shards
.resize(num_target_shards
);
204 for (int i
= 0; i
< num_target_shards
; ++i
) {
205 target_shards
[i
] = new BucketReshardShard(dpp
, store
, target_bucket_info
, i
, idx_layout
, completions
);
209 ~BucketReshardManager() {
210 for (auto& shard
: target_shards
) {
211 int ret
= shard
->wait_all_aio();
213 ldout(store
->ctx(), 20) << __func__
<<
214 ": shard->wait_all_aio() returned ret=" << ret
<< dendl
;
219 int add_entry(int shard_index
,
220 rgw_cls_bi_entry
& entry
, bool account
, RGWObjCategory category
,
221 const rgw_bucket_category_stats
& entry_stats
) {
222 int ret
= target_shards
[shard_index
]->add_entry(entry
, account
, category
,
225 derr
<< "ERROR: target_shards.add_entry(" << entry
.idx
<<
226 ") returned error: " << cpp_strerror(-ret
) << dendl
;
235 for (auto& shard
: target_shards
) {
236 int r
= shard
->flush();
238 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r
) << dendl
;
242 for (auto& shard
: target_shards
) {
243 int r
= shard
->wait_all_aio();
245 derr
<< "ERROR: target_shards[" << shard
->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r
) << dendl
;
250 target_shards
.clear();
253 }; // class BucketReshardManager
255 RGWBucketReshard::RGWBucketReshard(rgw::sal::RadosStore
* _store
,
256 const RGWBucketInfo
& _bucket_info
,
257 const map
<string
, bufferlist
>& _bucket_attrs
,
258 RGWBucketReshardLock
* _outer_reshard_lock
) :
259 store(_store
), bucket_info(_bucket_info
), bucket_attrs(_bucket_attrs
),
260 reshard_lock(store
, bucket_info
, true),
261 outer_reshard_lock(_outer_reshard_lock
)
264 int RGWBucketReshard::set_resharding_status(const DoutPrefixProvider
*dpp
,
265 rgw::sal::RadosStore
* store
,
266 const RGWBucketInfo
& bucket_info
,
267 const string
& new_instance_id
,
269 cls_rgw_reshard_status status
)
271 if (new_instance_id
.empty()) {
272 ldpp_dout(dpp
, 0) << __func__
<< " missing new bucket instance id" << dendl
;
276 cls_rgw_bucket_instance_entry instance_entry
;
277 instance_entry
.set_status(new_instance_id
, num_shards
, status
);
279 int ret
= store
->getRados()->bucket_set_reshard(dpp
, bucket_info
, instance_entry
);
281 ldpp_dout(dpp
, 0) << "RGWReshard::" << __func__
<< " ERROR: error setting bucket resharding flag on bucket index: "
282 << cpp_strerror(-ret
) << dendl
;
288 // reshard lock assumes lock is held
289 int RGWBucketReshard::clear_resharding(const DoutPrefixProvider
*dpp
,
290 rgw::sal::RadosStore
* store
,
291 const RGWBucketInfo
& bucket_info
)
293 int ret
= clear_index_shard_reshard_status(dpp
, store
, bucket_info
);
295 ldpp_dout(dpp
, 0) << "RGWBucketReshard::" << __func__
<<
296 " ERROR: error clearing reshard status from index shard " <<
297 cpp_strerror(-ret
) << dendl
;
301 cls_rgw_bucket_instance_entry instance_entry
;
302 ret
= store
->getRados()->bucket_set_reshard(dpp
, bucket_info
, instance_entry
);
304 ldpp_dout(dpp
, 0) << "RGWReshard::" << __func__
<<
305 " ERROR: error setting bucket resharding flag on bucket index: " <<
306 cpp_strerror(-ret
) << dendl
;
313 int RGWBucketReshard::clear_index_shard_reshard_status(const DoutPrefixProvider
*dpp
,
314 rgw::sal::RadosStore
* store
,
315 const RGWBucketInfo
& bucket_info
)
317 uint32_t num_shards
= bucket_info
.layout
.current_index
.layout
.normal
.num_shards
;
319 if (num_shards
< std::numeric_limits
<uint32_t>::max()) {
320 int ret
= set_resharding_status(dpp
, store
, bucket_info
,
321 bucket_info
.bucket
.bucket_id
,
322 (num_shards
< 1 ? 1 : num_shards
),
323 cls_rgw_reshard_status::NOT_RESHARDING
);
325 ldpp_dout(dpp
, 0) << "RGWBucketReshard::" << __func__
<<
326 " ERROR: error clearing reshard status from index shard " <<
327 cpp_strerror(-ret
) << dendl
;
335 static int create_new_bucket_instance(rgw::sal::RadosStore
* store
,
337 const RGWBucketInfo
& bucket_info
,
338 map
<string
, bufferlist
>& attrs
,
339 RGWBucketInfo
& new_bucket_info
,
340 const DoutPrefixProvider
*dpp
)
342 new_bucket_info
= bucket_info
;
344 store
->getRados()->create_bucket_id(&new_bucket_info
.bucket
.bucket_id
);
346 new_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
= new_num_shards
;
347 new_bucket_info
.objv_tracker
.clear();
349 new_bucket_info
.new_bucket_instance_id
.clear();
350 new_bucket_info
.reshard_status
= cls_rgw_reshard_status::NOT_RESHARDING
;
352 int ret
= store
->getRados()->put_bucket_instance_info(new_bucket_info
, true, real_time(), &attrs
, dpp
);
354 cerr
<< "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret
) << std::endl
;
358 ret
= store
->svc()->bi
->init_index(dpp
, new_bucket_info
);
360 cerr
<< "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret
) << std::endl
;
367 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards
,
368 RGWBucketInfo
& new_bucket_info
,
369 const DoutPrefixProvider
*dpp
)
371 return ::create_new_bucket_instance(store
, new_num_shards
,
372 bucket_info
, bucket_attrs
, new_bucket_info
, dpp
);
375 int RGWBucketReshard::cancel(const DoutPrefixProvider
*dpp
)
377 int ret
= reshard_lock
.lock(dpp
);
382 ret
= clear_resharding(dpp
);
384 reshard_lock
.unlock();
388 class BucketInfoReshardUpdate
390 const DoutPrefixProvider
*dpp
;
391 rgw::sal::RadosStore
* store
;
392 RGWBucketInfo
& bucket_info
;
393 std::map
<string
, bufferlist
> bucket_attrs
;
395 bool in_progress
{false};
397 int set_status(cls_rgw_reshard_status s
, const DoutPrefixProvider
*dpp
) {
398 bucket_info
.reshard_status
= s
;
399 int ret
= store
->getRados()->put_bucket_instance_info(bucket_info
, false, real_time(), &bucket_attrs
, dpp
);
401 ldpp_dout(dpp
, 0) << "ERROR: failed to write bucket info, ret=" << ret
<< dendl
;
408 BucketInfoReshardUpdate(const DoutPrefixProvider
*_dpp
,
409 rgw::sal::RadosStore
* _store
,
410 RGWBucketInfo
& _bucket_info
,
411 map
<string
, bufferlist
>& _bucket_attrs
,
412 const string
& new_bucket_id
) :
415 bucket_info(_bucket_info
),
416 bucket_attrs(_bucket_attrs
)
418 bucket_info
.new_bucket_instance_id
= new_bucket_id
;
421 ~BucketInfoReshardUpdate() {
423 // resharding must not have ended correctly, clean up
425 RGWBucketReshard::clear_index_shard_reshard_status(dpp
, store
, bucket_info
);
427 ldpp_dout(dpp
, -1) << "Error: " << __func__
<<
428 " clear_index_shard_status returned " << ret
<< dendl
;
430 bucket_info
.new_bucket_instance_id
.clear();
432 // clears new_bucket_instance as well
433 set_status(cls_rgw_reshard_status::NOT_RESHARDING
, dpp
);
438 int ret
= set_status(cls_rgw_reshard_status::IN_PROGRESS
, dpp
);
447 int ret
= set_status(cls_rgw_reshard_status::DONE
, dpp
);
457 RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RadosStore
* _store
,
458 const std::string
& reshard_lock_oid
,
461 lock_oid(reshard_lock_oid
),
462 ephemeral(_ephemeral
),
463 internal_lock(reshard_lock_name
)
465 const int lock_dur_secs
= store
->ctx()->_conf
.get_val
<uint64_t>(
466 "rgw_reshard_bucket_lock_duration");
467 duration
= std::chrono::seconds(lock_dur_secs
);
469 #define COOKIE_LEN 16
470 char cookie_buf
[COOKIE_LEN
+ 1];
471 gen_rand_alphanumeric(store
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
472 cookie_buf
[COOKIE_LEN
] = '\0';
474 internal_lock
.set_cookie(cookie_buf
);
475 internal_lock
.set_duration(duration
);
478 int RGWBucketReshardLock::lock(const DoutPrefixProvider
*dpp
) {
479 internal_lock
.set_must_renew(false);
483 ret
= internal_lock
.lock_exclusive_ephemeral(&store
->getRados()->reshard_pool_ctx
,
486 ret
= internal_lock
.lock_exclusive(&store
->getRados()->reshard_pool_ctx
, lock_oid
);
490 ldout(store
->ctx(), 0) << "INFO: RGWReshardLock::" << __func__
<<
491 " found lock on " << lock_oid
<<
492 " to be held by another RGW process; skipping for now" << dendl
;
494 } else if (ret
< 0) {
495 ldpp_dout(dpp
, -1) << "ERROR: RGWReshardLock::" << __func__
<<
496 " failed to acquire lock on " << lock_oid
<< ": " <<
497 cpp_strerror(-ret
) << dendl
;
501 reset_time(Clock::now());
506 void RGWBucketReshardLock::unlock() {
507 int ret
= internal_lock
.unlock(&store
->getRados()->reshard_pool_ctx
, lock_oid
);
509 ldout(store
->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__
<<
510 " failed to drop lock on " << lock_oid
<< " ret=" << ret
<< dendl
;
514 int RGWBucketReshardLock::renew(const Clock::time_point
& now
) {
515 internal_lock
.set_must_renew(true);
518 ret
= internal_lock
.lock_exclusive_ephemeral(&store
->getRados()->reshard_pool_ctx
,
521 ret
= internal_lock
.lock_exclusive(&store
->getRados()->reshard_pool_ctx
, lock_oid
);
523 if (ret
< 0) { /* expired or already locked by another processor */
524 std::stringstream error_s
;
525 if (-ENOENT
== ret
) {
526 error_s
<< "ENOENT (lock expired or never initially locked)";
528 error_s
<< ret
<< " (" << cpp_strerror(-ret
) << ")";
530 ldout(store
->ctx(), 5) << __func__
<< "(): failed to renew lock on " <<
531 lock_oid
<< " with error " << error_s
.str() << dendl
;
534 internal_lock
.set_must_renew(false);
537 ldout(store
->ctx(), 20) << __func__
<< "(): successfully renewed lock on " <<
544 int RGWBucketReshard::do_reshard(int num_shards
,
545 RGWBucketInfo
& new_bucket_info
,
549 Formatter
*formatter
,
550 const DoutPrefixProvider
*dpp
)
553 const rgw_bucket
& bucket
= bucket_info
.bucket
;
554 (*out
) << "tenant: " << bucket
.tenant
<< std::endl
;
555 (*out
) << "bucket name: " << bucket
.name
<< std::endl
;
556 (*out
) << "old bucket instance id: " << bucket
.bucket_id
<<
558 (*out
) << "new bucket instance id: " << new_bucket_info
.bucket
.bucket_id
<<
562 /* update bucket info -- in progress*/
563 list
<rgw_cls_bi_entry
> entries
;
565 if (max_entries
< 0) {
566 ldpp_dout(dpp
, 0) << __func__
<<
567 ": can't reshard, negative max_entries" << dendl
;
571 // NB: destructor cleans up sharding state if reshard does not
572 // complete successfully
573 BucketInfoReshardUpdate
bucket_info_updater(dpp
, store
, bucket_info
, bucket_attrs
, new_bucket_info
.bucket
.bucket_id
);
575 int ret
= bucket_info_updater
.start();
577 ldpp_dout(dpp
, 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
581 int num_target_shards
= (new_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
> 0 ? new_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
: 1);
583 BucketReshardManager
target_shards_mgr(dpp
, store
, new_bucket_info
, num_target_shards
);
585 bool verbose_json_out
= verbose
&& (formatter
!= nullptr) && (out
!= nullptr);
587 if (verbose_json_out
) {
588 formatter
->open_array_section("entries");
591 uint64_t total_entries
= 0;
593 if (!verbose_json_out
&& out
) {
594 (*out
) << "total entries:";
597 const int num_source_shards
=
598 (bucket_info
.layout
.current_index
.layout
.normal
.num_shards
> 0 ? bucket_info
.layout
.current_index
.layout
.normal
.num_shards
: 1);
600 for (int i
= 0; i
< num_source_shards
; ++i
) {
601 bool is_truncated
= true;
603 const std::string null_object_filter
; // empty string since we're not filtering by object
604 while (is_truncated
) {
606 ret
= store
->getRados()->bi_list(dpp
, bucket_info
, i
, null_object_filter
, marker
, max_entries
, &entries
, &is_truncated
);
607 if (ret
< 0 && ret
!= -ENOENT
) {
608 derr
<< "ERROR: bi_list(): " << cpp_strerror(-ret
) << dendl
;
612 for (auto iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
613 rgw_cls_bi_entry
& entry
= *iter
;
614 if (verbose_json_out
) {
615 formatter
->open_object_section("entry");
617 encode_json("shard_id", i
, formatter
);
618 encode_json("num_entry", total_entries
, formatter
);
619 encode_json("entry", entry
, formatter
);
626 cls_rgw_obj_key cls_key
;
627 RGWObjCategory category
;
628 rgw_bucket_category_stats stats
;
629 bool account
= entry
.get_info(&cls_key
, &category
, &stats
);
630 rgw_obj_key
key(cls_key
);
631 if (entry
.type
== BIIndexType::OLH
&& key
.empty()) {
632 // bogus entry created by https://tracker.ceph.com/issues/46456
633 // to fix, skip so it doesn't get include in the new bucket instance
635 ldpp_dout(dpp
, 10) << "Dropping entry with empty name, idx=" << marker
<< dendl
;
638 rgw_obj
obj(new_bucket_info
.bucket
, key
);
640 if (key
.ns
== RGW_OBJ_NS_MULTIPART
&& mp
.from_meta(key
.name
)) {
641 // place the multipart .meta object on the same shard as its head object
642 obj
.index_hash_source
= mp
.get_key();
644 int ret
= store
->getRados()->get_target_shard_id(new_bucket_info
.layout
.current_index
.layout
.normal
, obj
.get_hash_object(), &target_shard_id
);
646 ldpp_dout(dpp
, -1) << "ERROR: get_target_shard_id() returned ret=" << ret
<< dendl
;
650 int shard_index
= (target_shard_id
> 0 ? target_shard_id
: 0);
652 ret
= target_shards_mgr
.add_entry(shard_index
, entry
, account
,
658 Clock::time_point now
= Clock::now();
659 if (reshard_lock
.should_renew(now
)) {
660 // assume outer locks have timespans at least the size of ours, so
661 // can call inside conditional
662 if (outer_reshard_lock
) {
663 ret
= outer_reshard_lock
->renew(now
);
668 ret
= reshard_lock
.renew(now
);
670 ldpp_dout(dpp
, -1) << "Error renewing bucket lock: " << ret
<< dendl
;
674 if (verbose_json_out
) {
675 formatter
->close_section();
676 formatter
->flush(*out
);
677 } else if (out
&& !(total_entries
% 1000)) {
678 (*out
) << " " << total_entries
;
684 if (verbose_json_out
) {
685 formatter
->close_section();
686 formatter
->flush(*out
);
688 (*out
) << " " << total_entries
<< std::endl
;
691 ret
= target_shards_mgr
.finish();
693 ldpp_dout(dpp
, -1) << "ERROR: failed to reshard" << dendl
;
697 ret
= store
->ctl()->bucket
->link_bucket(new_bucket_info
.owner
, new_bucket_info
.bucket
, bucket_info
.creation_time
, null_yield
, dpp
);
699 ldpp_dout(dpp
, -1) << "failed to link new bucket instance (bucket_id=" << new_bucket_info
.bucket
.bucket_id
<< ": " << cpp_strerror(-ret
) << ")" << dendl
;
703 ret
= bucket_info_updater
.complete();
705 ldpp_dout(dpp
, 0) << __func__
<< ": failed to update bucket info ret=" << ret
<< dendl
;
706 /* don't error out, reshard process succeeded */
710 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
711 } // RGWBucketReshard::do_reshard
713 int RGWBucketReshard::get_status(const DoutPrefixProvider
*dpp
, list
<cls_rgw_bucket_instance_entry
> *status
)
715 return store
->svc()->bi_rados
->get_reshard_status(dpp
, bucket_info
, status
);
719 int RGWBucketReshard::execute(int num_shards
, int max_op_entries
,
720 const DoutPrefixProvider
*dpp
,
721 bool verbose
, ostream
*out
, Formatter
*formatter
,
722 RGWReshard
* reshard_log
)
724 int ret
= reshard_lock
.lock(dpp
);
729 RGWBucketInfo new_bucket_info
;
730 ret
= create_new_bucket_instance(num_shards
, new_bucket_info
, dpp
);
732 // shard state is uncertain, but this will attempt to remove them anyway
737 ret
= reshard_log
->update(dpp
, bucket_info
, new_bucket_info
);
743 // set resharding status of current bucket_info & shards with
744 // information about planned resharding
745 ret
= set_resharding_status(dpp
, new_bucket_info
.bucket
.bucket_id
,
746 num_shards
, cls_rgw_reshard_status::IN_PROGRESS
);
751 ret
= do_reshard(num_shards
,
754 verbose
, out
, formatter
, dpp
);
759 // at this point we've done the main work; we'll make a best-effort
760 // to clean-up but will not indicate any errors encountered
762 reshard_lock
.unlock();
764 // resharding successful, so remove old bucket index shards; use
765 // best effort and don't report out an error; the lock isn't needed
766 // at this point since all we're using a best effort to remove old
768 ret
= store
->svc()->bi
->clean_index(dpp
, bucket_info
);
770 ldpp_dout(dpp
, -1) << "Error: " << __func__
<<
771 " failed to clean up old shards; " <<
772 "RGWRados::clean_bucket_index returned " << ret
<< dendl
;
775 ret
= store
->ctl()->bucket
->remove_bucket_instance_info(bucket_info
.bucket
,
776 bucket_info
, null_yield
, dpp
);
778 ldpp_dout(dpp
, -1) << "Error: " << __func__
<<
779 " failed to clean old bucket info object \"" <<
780 bucket_info
.bucket
.get_key() <<
781 "\"created after successful resharding with error " << ret
<< dendl
;
784 ldpp_dout(dpp
, 1) << __func__
<<
785 " INFO: reshard of bucket \"" << bucket_info
.bucket
.name
<< "\" from \"" <<
786 bucket_info
.bucket
.get_key() << "\" to \"" <<
787 new_bucket_info
.bucket
.get_key() << "\" completed successfully" << dendl
;
793 reshard_lock
.unlock();
795 // since the real problem is the issue that led to this error code
796 // path, we won't touch ret and instead use another variable to
797 // temporarily error codes
798 int ret2
= store
->svc()->bi
->clean_index(dpp
, new_bucket_info
);
800 ldpp_dout(dpp
, -1) << "Error: " << __func__
<<
801 " failed to clean up shards from failed incomplete resharding; " <<
802 "RGWRados::clean_bucket_index returned " << ret2
<< dendl
;
805 ret2
= store
->ctl()->bucket
->remove_bucket_instance_info(new_bucket_info
.bucket
,
809 ldpp_dout(dpp
, -1) << "Error: " << __func__
<<
810 " failed to clean bucket info object \"" <<
811 new_bucket_info
.bucket
.get_key() <<
812 "\"created during incomplete resharding with error " << ret2
<< dendl
;
819 RGWReshard::RGWReshard(rgw::sal::RadosStore
* _store
, bool _verbose
, ostream
*_out
,
820 Formatter
*_formatter
) :
821 store(_store
), instance_lock(bucket_instance_lock_name
),
822 verbose(_verbose
), out(_out
), formatter(_formatter
)
824 num_logshards
= store
->ctx()->_conf
.get_val
<uint64_t>("rgw_reshard_num_logs");
827 string
RGWReshard::get_logshard_key(const string
& tenant
,
828 const string
& bucket_name
)
830 return tenant
+ ":" + bucket_name
;
833 #define MAX_RESHARD_LOGSHARDS_PRIME 7877
835 void RGWReshard::get_bucket_logshard_oid(const string
& tenant
, const string
& bucket_name
, string
*oid
)
837 string key
= get_logshard_key(tenant
, bucket_name
);
839 uint32_t sid
= ceph_str_hash_linux(key
.c_str(), key
.size());
840 uint32_t sid2
= sid
^ ((sid
& 0xFF) << 24);
841 sid
= sid2
% MAX_RESHARD_LOGSHARDS_PRIME
% num_logshards
;
843 get_logshard_oid(int(sid
), oid
);
846 int RGWReshard::add(const DoutPrefixProvider
*dpp
, cls_rgw_reshard_entry
& entry
)
848 if (!store
->svc()->zone
->can_reshard()) {
849 ldpp_dout(dpp
, 20) << __func__
<< " Resharding is disabled" << dendl
;
855 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
857 librados::ObjectWriteOperation op
;
858 cls_rgw_reshard_add(op
, entry
);
860 int ret
= rgw_rados_operate(dpp
, store
->getRados()->reshard_pool_ctx
, logshard_oid
, &op
, null_yield
);
862 ldpp_dout(dpp
, -1) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
868 int RGWReshard::update(const DoutPrefixProvider
*dpp
, const RGWBucketInfo
& bucket_info
, const RGWBucketInfo
& new_bucket_info
)
870 cls_rgw_reshard_entry entry
;
871 entry
.bucket_name
= bucket_info
.bucket
.name
;
872 entry
.bucket_id
= bucket_info
.bucket
.bucket_id
;
873 entry
.tenant
= bucket_info
.owner
.tenant
;
875 int ret
= get(dpp
, entry
);
880 entry
.new_instance_id
= new_bucket_info
.bucket
.name
+ ":" + new_bucket_info
.bucket
.bucket_id
;
882 ret
= add(dpp
, entry
);
884 ldpp_dout(dpp
, 0) << __func__
<< ":Error in updating entry bucket " << entry
.bucket_name
<< ": " <<
885 cpp_strerror(-ret
) << dendl
;
892 int RGWReshard::list(const DoutPrefixProvider
*dpp
, int logshard_num
, string
& marker
, uint32_t max
, std::list
<cls_rgw_reshard_entry
>& entries
, bool *is_truncated
)
896 get_logshard_oid(logshard_num
, &logshard_oid
);
898 int ret
= cls_rgw_reshard_list(store
->getRados()->reshard_pool_ctx
, logshard_oid
, marker
, max
, entries
, is_truncated
);
900 if (ret
== -ENOENT
) {
901 // these shard objects aren't created until we actually write something to
902 // them, so treat ENOENT as a successful empty listing
903 *is_truncated
= false;
905 } else if (ret
== -EACCES
) {
906 ldpp_dout(dpp
, -1) << "ERROR: access denied to pool " << store
->svc()->zone
->get_zone_params().reshard_pool
907 << ". Fix the pool access permissions of your client" << dendl
;
908 } else if (ret
< 0) {
909 ldpp_dout(dpp
, -1) << "ERROR: failed to list reshard log entries, oid="
910 << logshard_oid
<< " marker=" << marker
<< " " << cpp_strerror(ret
) << dendl
;
916 int RGWReshard::get(const DoutPrefixProvider
*dpp
, cls_rgw_reshard_entry
& entry
)
920 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
922 int ret
= cls_rgw_reshard_get(store
->getRados()->reshard_pool_ctx
, logshard_oid
, entry
);
924 if (ret
!= -ENOENT
) {
925 ldpp_dout(dpp
, -1) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<<
926 " bucket=" << entry
.bucket_name
<< dendl
;
934 int RGWReshard::remove(const DoutPrefixProvider
*dpp
, cls_rgw_reshard_entry
& entry
)
938 get_bucket_logshard_oid(entry
.tenant
, entry
.bucket_name
, &logshard_oid
);
940 librados::ObjectWriteOperation op
;
941 cls_rgw_reshard_remove(op
, entry
);
943 int ret
= rgw_rados_operate(dpp
, store
->getRados()->reshard_pool_ctx
, logshard_oid
, &op
, null_yield
);
945 ldpp_dout(dpp
, -1) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid
<< " tenant=" << entry
.tenant
<< " bucket=" << entry
.bucket_name
<< dendl
;
952 int RGWReshard::clear_bucket_resharding(const DoutPrefixProvider
*dpp
, const string
& bucket_instance_oid
, cls_rgw_reshard_entry
& entry
)
954 int ret
= cls_rgw_clear_bucket_resharding(store
->getRados()->reshard_pool_ctx
, bucket_instance_oid
);
956 ldpp_dout(dpp
, -1) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid
<< dendl
;
963 int RGWReshardWait::wait(optional_yield y
)
965 std::unique_lock
lock(mutex
);
972 auto& context
= y
.get_io_context();
973 auto& yield
= y
.get_yield_context();
975 Waiter
waiter(context
);
976 waiters
.push_back(waiter
);
979 waiter
.timer
.expires_after(duration
);
981 boost::system::error_code ec
;
982 waiter
.timer
.async_wait(yield
[ec
]);
985 waiters
.erase(waiters
.iterator_to(waiter
));
989 cond
.wait_for(lock
, duration
);
998 void RGWReshardWait::stop()
1000 std::scoped_lock
lock(mutex
);
1003 for (auto& waiter
: waiters
) {
1004 // unblock any waiters with ECANCELED
1005 waiter
.timer
.cancel();
1009 int RGWReshard::process_single_logshard(int logshard_num
, const DoutPrefixProvider
*dpp
)
1012 bool truncated
= true;
1014 constexpr uint32_t max_entries
= 1000;
1016 string logshard_oid
;
1017 get_logshard_oid(logshard_num
, &logshard_oid
);
1019 RGWBucketReshardLock
logshard_lock(store
, logshard_oid
, false);
1021 int ret
= logshard_lock
.lock(dpp
);
1023 ldpp_dout(dpp
, 5) << __func__
<< "(): failed to acquire lock on " <<
1024 logshard_oid
<< ", ret = " << ret
<<dendl
;
1029 std::list
<cls_rgw_reshard_entry
> entries
;
1030 ret
= list(dpp
, logshard_num
, marker
, max_entries
, entries
, &truncated
);
1032 ldpp_dout(dpp
, 10) << "cannot list all reshards in logshard oid=" <<
1033 logshard_oid
<< dendl
;
1037 for(auto& entry
: entries
) { // logshard entries
1038 if(entry
.new_instance_id
.empty()) {
1040 ldpp_dout(dpp
, 20) << __func__
<< " resharding " <<
1041 entry
.bucket_name
<< dendl
;
1044 RGWBucketInfo bucket_info
;
1045 map
<string
, bufferlist
> attrs
;
1047 ret
= store
->getRados()->get_bucket_info(store
->svc(),
1048 entry
.tenant
, entry
.bucket_name
,
1049 bucket_info
, nullptr,
1050 null_yield
, dpp
, &attrs
);
1051 if (ret
< 0 || bucket_info
.bucket
.bucket_id
!= entry
.bucket_id
) {
1053 ldpp_dout(dpp
, 0) << __func__
<<
1054 ": Error in get_bucket_info for bucket " << entry
.bucket_name
<<
1055 ": " << cpp_strerror(-ret
) << dendl
;
1056 if (ret
!= -ENOENT
) {
1057 // any error other than ENOENT will abort
1061 ldpp_dout(dpp
, 0) << __func__
<<
1062 ": Bucket: " << entry
.bucket_name
<<
1063 " already resharded by someone, skipping " << dendl
;
1066 // we've encountered a reshard queue entry for an apparently
1067 // non-existent bucket; let's try to recover by cleaning up
1068 ldpp_dout(dpp
, 0) << __func__
<<
1069 ": removing reshard queue entry for a resharded or non-existent bucket" <<
1070 entry
.bucket_name
<< dendl
;
1072 ret
= remove(dpp
, entry
);
1074 ldpp_dout(dpp
, 0) << __func__
<<
1075 ": Error removing non-existent bucket " <<
1076 entry
.bucket_name
<< " from resharding queue: " <<
1077 cpp_strerror(-ret
) << dendl
;
1081 // we cleaned up, move on to the next entry
1082 goto finished_entry
;
1085 RGWBucketReshard
br(store
, bucket_info
, attrs
, nullptr);
1086 ret
= br
.execute(entry
.new_num_shards
, max_entries
, dpp
, false, nullptr,
1089 ldpp_dout(dpp
, 0) << __func__
<<
1090 ": Error during resharding bucket " << entry
.bucket_name
<< ":" <<
1091 cpp_strerror(-ret
)<< dendl
;
1095 ldpp_dout(dpp
, 20) << __func__
<<
1096 " removing reshard queue entry for bucket " << entry
.bucket_name
<<
1099 ret
= remove(dpp
, entry
);
1101 ldpp_dout(dpp
, 0) << __func__
<< ": Error removing bucket " <<
1102 entry
.bucket_name
<< " from resharding queue: " <<
1103 cpp_strerror(-ret
) << dendl
;
1106 } // if new instance id is empty
1110 Clock::time_point now
= Clock::now();
1111 if (logshard_lock
.should_renew(now
)) {
1112 ret
= logshard_lock
.renew(now
);
1118 entry
.get_key(&marker
);
1120 } while (truncated
);
1122 logshard_lock
.unlock();
1127 void RGWReshard::get_logshard_oid(int shard_num
, string
*logshard
)
1130 snprintf(buf
, sizeof(buf
), "%010u", (unsigned)shard_num
);
1132 string
objname(reshard_oid_prefix
);
1133 *logshard
= objname
+ buf
;
1136 int RGWReshard::process_all_logshards(const DoutPrefixProvider
*dpp
)
1138 if (!store
->svc()->zone
->can_reshard()) {
1139 ldpp_dout(dpp
, 20) << __func__
<< " Resharding is disabled" << dendl
;
1144 for (int i
= 0; i
< num_logshards
; i
++) {
1146 get_logshard_oid(i
, &logshard
);
1148 ldpp_dout(dpp
, 20) << "processing logshard = " << logshard
<< dendl
;
1150 ret
= process_single_logshard(i
, dpp
);
1152 ldpp_dout(dpp
, 20) << "finish processing logshard = " << logshard
<< " , ret = " << ret
<< dendl
;
1158 bool RGWReshard::going_down()
1163 void RGWReshard::start_processor()
1165 worker
= new ReshardWorker(store
->ctx(), this);
1166 worker
->create("rgw_reshard");
1169 void RGWReshard::stop_processor()
1180 void *RGWReshard::ReshardWorker::entry() {
1182 utime_t start
= ceph_clock_now();
1183 reshard
->process_all_logshards(this);
1185 if (reshard
->going_down())
1188 utime_t end
= ceph_clock_now();
1190 int secs
= cct
->_conf
.get_val
<uint64_t>("rgw_reshard_thread_interval");
1192 if (secs
<= end
.sec())
1193 continue; // next round
1197 std::unique_lock locker
{lock
};
1198 cond
.wait_for(locker
, std::chrono::seconds(secs
));
1199 } while (!reshard
->going_down());
1204 void RGWReshard::ReshardWorker::stop()
1206 std::lock_guard l
{lock
};
1210 CephContext
*RGWReshard::ReshardWorker::get_cct() const
1215 unsigned RGWReshard::ReshardWorker::get_subsys() const
1220 std::ostream
& RGWReshard::ReshardWorker::gen_prefix(std::ostream
& out
) const
1222 return out
<< "rgw reshard worker thread: ";