]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
CommitLineData
31f18b77 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
31f18b77 3
f64942e4 4#include <limits>
11fdf7f2 5#include <sstream>
f64942e4 6
31f18b77 7#include "rgw_rados.h"
11fdf7f2 8#include "rgw_zone.h"
31f18b77
FG
9#include "rgw_bucket.h"
10#include "rgw_reshard.h"
9f95a23c 11#include "rgw_sal.h"
31f18b77
FG
12#include "cls/rgw/cls_rgw_client.h"
13#include "cls/lock/cls_lock_client.h"
14#include "common/errno.h"
15#include "common/ceph_json.h"
16
17#include "common/dout.h"
18
11fdf7f2
TL
19#include "services/svc_zone.h"
20#include "services/svc_sys_obj.h"
9f95a23c 21#include "services/svc_tier_rados.h"
11fdf7f2 22
31f18b77
FG
23#define dout_context g_ceph_context
24#define dout_subsys ceph_subsys_rgw
25
26const string reshard_oid_prefix = "reshard.";
27const string reshard_lock_name = "reshard_process";
28const string bucket_instance_lock_name = "bucket_instance_lock";
29
9f95a23c
TL
30/* All primes up to 2000 used to attempt to make dynamic sharding use
31 * a prime numbers of shards. Note: this list also includes 1 for when
32 * 1 shard is the most appropriate, even though 1 is not prime.
33 */
34const std::initializer_list<uint16_t> RGWBucketReshard::reshard_primes = {
35 1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61,
36 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137,
37 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211,
38 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283,
39 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379,
40 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461,
41 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563,
42 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643,
43 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739,
44 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829,
45 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937,
46 941, 947, 953, 967, 971, 977, 983, 991, 997, 1009, 1013, 1019, 1021,
47 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093,
48 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181,
49 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259,
50 1277, 1279, 1283, 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321,
51 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, 1429, 1433,
52 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493,
53 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579,
54 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, 1627, 1637, 1657,
55 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741,
56 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831,
57 1847, 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, 1913,
58 1931, 1933, 1949, 1951, 1973, 1979, 1987, 1993, 1997, 1999
59};
f64942e4 60
31f18b77 61class BucketReshardShard {
9f95a23c 62 rgw::sal::RGWRadosStore *store;
31f18b77
FG
63 const RGWBucketInfo& bucket_info;
64 int num_shard;
65 RGWRados::BucketShard bs;
66 vector<rgw_cls_bi_entry> entries;
11fdf7f2 67 map<RGWObjCategory, rgw_bucket_category_stats> stats;
31f18b77 68 deque<librados::AioCompletion *>& aio_completions;
11fdf7f2
TL
69 uint64_t max_aio_completions;
70 uint64_t reshard_shard_batch_size;
31f18b77
FG
71
72 int wait_next_completion() {
73 librados::AioCompletion *c = aio_completions.front();
74 aio_completions.pop_front();
75
9f95a23c 76 c->wait_for_complete();
31f18b77
FG
77
78 int ret = c->get_return_value();
79 c->release();
80
81 if (ret < 0) {
82 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
83 return ret;
84 }
85
86 return 0;
87 }
88
89 int get_completion(librados::AioCompletion **c) {
11fdf7f2 90 if (aio_completions.size() >= max_aio_completions) {
31f18b77
FG
91 int ret = wait_next_completion();
92 if (ret < 0) {
93 return ret;
94 }
95 }
96
9f95a23c 97 *c = librados::Rados::aio_create_completion(nullptr, nullptr);
31f18b77
FG
98 aio_completions.push_back(*c);
99
100 return 0;
101 }
102
103public:
9f95a23c 104 BucketReshardShard(rgw::sal::RGWRadosStore *_store, const RGWBucketInfo& _bucket_info,
31f18b77 105 int _num_shard,
f64942e4 106 deque<librados::AioCompletion *>& _completions) :
9f95a23c 107 store(_store), bucket_info(_bucket_info), bs(store->getRados()),
f64942e4
AA
108 aio_completions(_completions)
109 {
31f18b77 110 num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
f64942e4 111 bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */);
11fdf7f2
TL
112
113 max_aio_completions =
114 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio");
115 reshard_shard_batch_size =
116 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size");
31f18b77
FG
117 }
118
119 int get_num_shard() {
120 return num_shard;
121 }
122
11fdf7f2 123 int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77
FG
124 const rgw_bucket_category_stats& entry_stats) {
125 entries.push_back(entry);
126 if (account) {
127 rgw_bucket_category_stats& target = stats[category];
128 target.num_entries += entry_stats.num_entries;
129 target.total_size += entry_stats.total_size;
130 target.total_size_rounded += entry_stats.total_size_rounded;
91327a77 131 target.actual_size += entry_stats.actual_size;
31f18b77 132 }
11fdf7f2 133 if (entries.size() >= reshard_shard_batch_size) {
31f18b77
FG
134 int ret = flush();
135 if (ret < 0) {
136 return ret;
137 }
138 }
f64942e4 139
31f18b77
FG
140 return 0;
141 }
f64942e4 142
31f18b77
FG
143 int flush() {
144 if (entries.size() == 0) {
145 return 0;
146 }
147
148 librados::ObjectWriteOperation op;
149 for (auto& entry : entries) {
9f95a23c 150 store->getRados()->bi_put(op, bs, entry);
31f18b77
FG
151 }
152 cls_rgw_bucket_update_stats(op, false, stats);
153
154 librados::AioCompletion *c;
155 int ret = get_completion(&c);
156 if (ret < 0) {
157 return ret;
158 }
9f95a23c 159 ret = bs.bucket_obj.aio_operate(c, &op);
31f18b77
FG
160 if (ret < 0) {
161 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
162 return ret;
163 }
164 entries.clear();
165 stats.clear();
166 return 0;
167 }
168
169 int wait_all_aio() {
170 int ret = 0;
171 while (!aio_completions.empty()) {
172 int r = wait_next_completion();
173 if (r < 0) {
174 ret = r;
175 }
176 }
177 return ret;
178 }
f64942e4
AA
179}; // class BucketReshardShard
180
31f18b77
FG
181
182class BucketReshardManager {
9f95a23c 183 rgw::sal::RGWRadosStore *store;
31f18b77
FG
184 const RGWBucketInfo& target_bucket_info;
185 deque<librados::AioCompletion *> completions;
186 int num_target_shards;
187 vector<BucketReshardShard *> target_shards;
188
189public:
9f95a23c 190 BucketReshardManager(rgw::sal::RGWRadosStore *_store,
f64942e4
AA
191 const RGWBucketInfo& _target_bucket_info,
192 int _num_target_shards) :
193 store(_store), target_bucket_info(_target_bucket_info),
194 num_target_shards(_num_target_shards)
195 {
31f18b77
FG
196 target_shards.resize(num_target_shards);
197 for (int i = 0; i < num_target_shards; ++i) {
198 target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
199 }
200 }
201
202 ~BucketReshardManager() {
203 for (auto& shard : target_shards) {
204 int ret = shard->wait_all_aio();
205 if (ret < 0) {
f64942e4
AA
206 ldout(store->ctx(), 20) << __func__ <<
207 ": shard->wait_all_aio() returned ret=" << ret << dendl;
31f18b77
FG
208 }
209 }
210 }
211
212 int add_entry(int shard_index,
11fdf7f2 213 rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77 214 const rgw_bucket_category_stats& entry_stats) {
f64942e4
AA
215 int ret = target_shards[shard_index]->add_entry(entry, account, category,
216 entry_stats);
31f18b77 217 if (ret < 0) {
f64942e4
AA
218 derr << "ERROR: target_shards.add_entry(" << entry.idx <<
219 ") returned error: " << cpp_strerror(-ret) << dendl;
31f18b77
FG
220 return ret;
221 }
f64942e4 222
31f18b77
FG
223 return 0;
224 }
225
226 int finish() {
227 int ret = 0;
228 for (auto& shard : target_shards) {
229 int r = shard->flush();
230 if (r < 0) {
231 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
232 ret = r;
233 }
234 }
235 for (auto& shard : target_shards) {
236 int r = shard->wait_all_aio();
237 if (r < 0) {
238 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
239 ret = r;
240 }
241 delete shard;
242 }
243 target_shards.clear();
244 return ret;
245 }
f64942e4
AA
246}; // class BucketReshardManager
247
9f95a23c 248RGWBucketReshard::RGWBucketReshard(rgw::sal::RGWRadosStore *_store,
f64942e4
AA
249 const RGWBucketInfo& _bucket_info,
250 const map<string, bufferlist>& _bucket_attrs,
251 RGWBucketReshardLock* _outer_reshard_lock) :
252 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
253 reshard_lock(store, bucket_info, true),
254 outer_reshard_lock(_outer_reshard_lock)
255{ }
256
9f95a23c 257int RGWBucketReshard::set_resharding_status(rgw::sal::RGWRadosStore* store,
f64942e4
AA
258 const RGWBucketInfo& bucket_info,
259 const string& new_instance_id,
260 int32_t num_shards,
261 cls_rgw_reshard_status status)
31f18b77
FG
262{
263 if (new_instance_id.empty()) {
264 ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
265 return -EINVAL;
266 }
267
268 cls_rgw_bucket_instance_entry instance_entry;
269 instance_entry.set_status(new_instance_id, num_shards, status);
270
9f95a23c 271 int ret = store->getRados()->bucket_set_reshard(bucket_info, instance_entry);
31f18b77
FG
272 if (ret < 0) {
273 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
274 << cpp_strerror(-ret) << dendl;
275 return ret;
276 }
277 return 0;
278}
279
f64942e4 280// reshard lock assumes lock is held
9f95a23c 281int RGWBucketReshard::clear_resharding(rgw::sal::RGWRadosStore* store,
f64942e4 282 const RGWBucketInfo& bucket_info)
31f18b77 283{
f64942e4
AA
284 int ret = clear_index_shard_reshard_status(store, bucket_info);
285 if (ret < 0) {
286 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
287 " ERROR: error clearing reshard status from index shard " <<
288 cpp_strerror(-ret) << dendl;
289 return ret;
290 }
31f18b77 291
f64942e4 292 cls_rgw_bucket_instance_entry instance_entry;
9f95a23c 293 ret = store->getRados()->bucket_set_reshard(bucket_info, instance_entry);
31f18b77 294 if (ret < 0) {
f64942e4
AA
295 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ <<
296 " ERROR: error setting bucket resharding flag on bucket index: " <<
297 cpp_strerror(-ret) << dendl;
31f18b77
FG
298 return ret;
299 }
f64942e4
AA
300
301 return 0;
302}
303
9f95a23c 304int RGWBucketReshard::clear_index_shard_reshard_status(rgw::sal::RGWRadosStore* store,
f64942e4
AA
305 const RGWBucketInfo& bucket_info)
306{
307 uint32_t num_shards = bucket_info.num_shards;
308
309 if (num_shards < std::numeric_limits<uint32_t>::max()) {
310 int ret = set_resharding_status(store, bucket_info,
311 bucket_info.bucket.bucket_id,
312 (num_shards < 1 ? 1 : num_shards),
9f95a23c 313 cls_rgw_reshard_status::NOT_RESHARDING);
f64942e4
AA
314 if (ret < 0) {
315 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
316 " ERROR: error clearing reshard status from index shard " <<
317 cpp_strerror(-ret) << dendl;
318 return ret;
319 }
320 }
321
31f18b77
FG
322 return 0;
323}
324
9f95a23c 325static int create_new_bucket_instance(rgw::sal::RGWRadosStore *store,
31f18b77
FG
326 int new_num_shards,
327 const RGWBucketInfo& bucket_info,
328 map<string, bufferlist>& attrs,
329 RGWBucketInfo& new_bucket_info)
330{
331 new_bucket_info = bucket_info;
332
9f95a23c 333 store->getRados()->create_bucket_id(&new_bucket_info.bucket.bucket_id);
31f18b77
FG
334
335 new_bucket_info.num_shards = new_num_shards;
336 new_bucket_info.objv_tracker.clear();
337
338 new_bucket_info.new_bucket_instance_id.clear();
9f95a23c 339 new_bucket_info.reshard_status = cls_rgw_reshard_status::NOT_RESHARDING;
31f18b77 340
9f95a23c 341 int ret = store->svc()->bi->init_index(new_bucket_info);
31f18b77
FG
342 if (ret < 0) {
343 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
f64942e4 344 return ret;
31f18b77
FG
345 }
346
9f95a23c 347 ret = store->getRados()->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
31f18b77
FG
348 if (ret < 0) {
349 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
f64942e4 350 return ret;
31f18b77
FG
351 }
352
353 return 0;
354}
355
356int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
357 RGWBucketInfo& new_bucket_info)
358{
f64942e4
AA
359 return ::create_new_bucket_instance(store, new_num_shards,
360 bucket_info, bucket_attrs, new_bucket_info);
31f18b77
FG
361}
362
94b18763
FG
363int RGWBucketReshard::cancel()
364{
f64942e4 365 int ret = reshard_lock.lock();
94b18763
FG
366 if (ret < 0) {
367 return ret;
368 }
369
370 ret = clear_resharding();
371
f64942e4
AA
372 reshard_lock.unlock();
373 return ret;
94b18763
FG
374}
375
31f18b77
FG
376class BucketInfoReshardUpdate
377{
9f95a23c 378 rgw::sal::RGWRadosStore *store;
92f5a8d4 379 RGWBucketInfo& bucket_info;
31f18b77
FG
380 std::map<string, bufferlist> bucket_attrs;
381
382 bool in_progress{false};
383
384 int set_status(cls_rgw_reshard_status s) {
385 bucket_info.reshard_status = s;
9f95a23c 386 int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
31f18b77
FG
387 if (ret < 0) {
388 ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
389 return ret;
390 }
391 return 0;
392 }
393
394public:
9f95a23c 395 BucketInfoReshardUpdate(rgw::sal::RGWRadosStore *_store,
f64942e4
AA
396 RGWBucketInfo& _bucket_info,
397 map<string, bufferlist>& _bucket_attrs,
398 const string& new_bucket_id) :
399 store(_store),
400 bucket_info(_bucket_info),
401 bucket_attrs(_bucket_attrs)
402 {
31f18b77
FG
403 bucket_info.new_bucket_instance_id = new_bucket_id;
404 }
f64942e4 405
31f18b77
FG
406 ~BucketInfoReshardUpdate() {
407 if (in_progress) {
f64942e4
AA
408 // resharding must not have ended correctly, clean up
409 int ret =
410 RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info);
411 if (ret < 0) {
412 lderr(store->ctx()) << "Error: " << __func__ <<
413 " clear_index_shard_status returned " << ret << dendl;
414 }
31f18b77 415 bucket_info.new_bucket_instance_id.clear();
9f95a23c
TL
416
417 // clears new_bucket_instance as well
418 set_status(cls_rgw_reshard_status::NOT_RESHARDING);
31f18b77
FG
419 }
420 }
421
422 int start() {
9f95a23c 423 int ret = set_status(cls_rgw_reshard_status::IN_PROGRESS);
31f18b77
FG
424 if (ret < 0) {
425 return ret;
426 }
427 in_progress = true;
428 return 0;
429 }
430
431 int complete() {
9f95a23c 432 int ret = set_status(cls_rgw_reshard_status::DONE);
31f18b77
FG
433 if (ret < 0) {
434 return ret;
435 }
436 in_progress = false;
437 return 0;
438 }
439};
440
f64942e4 441
9f95a23c 442RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
f64942e4
AA
443 const std::string& reshard_lock_oid,
444 bool _ephemeral) :
445 store(_store),
446 lock_oid(reshard_lock_oid),
447 ephemeral(_ephemeral),
448 internal_lock(reshard_lock_name)
449{
11fdf7f2
TL
450 const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>(
451 "rgw_reshard_bucket_lock_duration");
f64942e4
AA
452 duration = std::chrono::seconds(lock_dur_secs);
453
454#define COOKIE_LEN 16
455 char cookie_buf[COOKIE_LEN + 1];
456 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
457 cookie_buf[COOKIE_LEN] = '\0';
458
459 internal_lock.set_cookie(cookie_buf);
460 internal_lock.set_duration(duration);
461}
462
463int RGWBucketReshardLock::lock() {
464 internal_lock.set_must_renew(false);
465 int ret;
466 if (ephemeral) {
9f95a23c 467 ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx,
f64942e4
AA
468 lock_oid);
469 } else {
9f95a23c 470 ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4
AA
471 }
472 if (ret < 0) {
473 ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ <<
474 " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl;
475 return ret;
476 }
477 reset_time(Clock::now());
478
479 return 0;
480}
481
482void RGWBucketReshardLock::unlock() {
9f95a23c 483 int ret = internal_lock.unlock(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4
AA
484 if (ret < 0) {
485 ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
486 " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
487 }
488}
489
490int RGWBucketReshardLock::renew(const Clock::time_point& now) {
491 internal_lock.set_must_renew(true);
492 int ret;
493 if (ephemeral) {
9f95a23c 494 ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx,
f64942e4
AA
495 lock_oid);
496 } else {
9f95a23c 497 ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4
AA
498 }
499 if (ret < 0) { /* expired or already locked by another processor */
11fdf7f2
TL
500 std::stringstream error_s;
501 if (-ENOENT == ret) {
502 error_s << "ENOENT (lock expired or never initially locked)";
503 } else {
504 error_s << ret << " (" << cpp_strerror(-ret) << ")";
505 }
f64942e4 506 ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
11fdf7f2 507 lock_oid << " with error " << error_s.str() << dendl;
f64942e4
AA
508 return ret;
509 }
510 internal_lock.set_must_renew(false);
511
512 reset_time(now);
513 ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
514 lock_oid << dendl;
515
516 return 0;
517}
518
519
520int RGWBucketReshard::do_reshard(int num_shards,
521 RGWBucketInfo& new_bucket_info,
522 int max_entries,
523 bool verbose,
524 ostream *out,
525 Formatter *formatter)
31f18b77
FG
526{
527 rgw_bucket& bucket = bucket_info.bucket;
528
529 int ret = 0;
530
531 if (out) {
31f18b77
FG
532 (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
533 (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
f64942e4
AA
534 (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id <<
535 std::endl;
536 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
537 std::endl;
31f18b77
FG
538 }
539
f64942e4 540 /* update bucket info -- in progress*/
31f18b77
FG
541 list<rgw_cls_bi_entry> entries;
542
543 if (max_entries < 0) {
f64942e4
AA
544 ldout(store->ctx(), 0) << __func__ <<
545 ": can't reshard, negative max_entries" << dendl;
31f18b77
FG
546 return -EINVAL;
547 }
548
f64942e4
AA
549 // NB: destructor cleans up sharding state if reshard does not
550 // complete successfully
31f18b77
FG
551 BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
552
553 ret = bucket_info_updater.start();
554 if (ret < 0) {
555 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
556 return ret;
557 }
558
559 int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
560
561 BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
562
92f5a8d4 563 bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
31f18b77 564
92f5a8d4 565 if (verbose_json_out) {
31f18b77
FG
566 formatter->open_array_section("entries");
567 }
568
569 uint64_t total_entries = 0;
570
92f5a8d4
TL
571 if (!verbose_json_out && out) {
572 (*out) << "total entries:";
31f18b77
FG
573 }
574
f64942e4
AA
575 const int num_source_shards =
576 (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
31f18b77
FG
577 string marker;
578 for (int i = 0; i < num_source_shards; ++i) {
579 bool is_truncated = true;
580 marker.clear();
581 while (is_truncated) {
582 entries.clear();
9f95a23c 583 ret = store->getRados()->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
31f18b77
FG
584 if (ret < 0 && ret != -ENOENT) {
585 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
f64942e4 586 return ret;
31f18b77
FG
587 }
588
f64942e4 589 for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
31f18b77 590 rgw_cls_bi_entry& entry = *iter;
92f5a8d4 591 if (verbose_json_out) {
31f18b77
FG
592 formatter->open_object_section("entry");
593
594 encode_json("shard_id", i, formatter);
595 encode_json("num_entry", total_entries, formatter);
596 encode_json("entry", entry, formatter);
597 }
598 total_entries++;
599
600 marker = entry.idx;
601
602 int target_shard_id;
603 cls_rgw_obj_key cls_key;
11fdf7f2 604 RGWObjCategory category;
31f18b77
FG
605 rgw_bucket_category_stats stats;
606 bool account = entry.get_info(&cls_key, &category, &stats);
607 rgw_obj_key key(cls_key);
608 rgw_obj obj(new_bucket_info.bucket, key);
92f5a8d4
TL
609 RGWMPObj mp;
610 if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
611 // place the multipart .meta object on the same shard as its head object
612 obj.index_hash_source = mp.get_key();
613 }
9f95a23c 614 int ret = store->getRados()->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
31f18b77
FG
615 if (ret < 0) {
616 lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
617 return ret;
618 }
619
620 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
621
f64942e4
AA
622 ret = target_shards_mgr.add_entry(shard_index, entry, account,
623 category, stats);
31f18b77
FG
624 if (ret < 0) {
625 return ret;
626 }
f64942e4
AA
627
628 Clock::time_point now = Clock::now();
629 if (reshard_lock.should_renew(now)) {
630 // assume outer locks have timespans at least the size of ours, so
631 // can call inside conditional
632 if (outer_reshard_lock) {
633 ret = outer_reshard_lock->renew(now);
634 if (ret < 0) {
635 return ret;
636 }
637 }
638 ret = reshard_lock.renew(now);
639 if (ret < 0) {
640 lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
641 return ret;
642 }
643 }
92f5a8d4 644 if (verbose_json_out) {
31f18b77 645 formatter->close_section();
92f5a8d4 646 formatter->flush(*out);
31f18b77
FG
647 } else if (out && !(total_entries % 1000)) {
648 (*out) << " " << total_entries;
649 }
f64942e4 650 } // entries loop
31f18b77
FG
651 }
652 }
f64942e4 653
92f5a8d4 654 if (verbose_json_out) {
31f18b77 655 formatter->close_section();
92f5a8d4 656 formatter->flush(*out);
31f18b77
FG
657 } else if (out) {
658 (*out) << " " << total_entries << std::endl;
659 }
660
661 ret = target_shards_mgr.finish();
662 if (ret < 0) {
663 lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
f64942e4 664 return -EIO;
31f18b77
FG
665 }
666
9f95a23c 667 ret = store->ctl()->bucket->link_bucket(new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time, null_yield);
b32b8144
FG
668 if (ret < 0) {
669 lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
f64942e4 670 return ret;
31f18b77
FG
671 }
672
673 ret = bucket_info_updater.complete();
674 if (ret < 0) {
675 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
676 /* don't error out, reshard process succeeded */
677 }
f64942e4 678
31f18b77 679 return 0;
f64942e4
AA
680 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
681} // RGWBucketReshard::do_reshard
31f18b77
FG
682
683int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
684{
9f95a23c 685 return store->svc()->bi_rados->get_reshard_status(bucket_info, status);
31f18b77
FG
686}
687
31f18b77 688
f64942e4
AA
689int RGWBucketReshard::execute(int num_shards, int max_op_entries,
690 bool verbose, ostream *out, Formatter *formatter,
691 RGWReshard* reshard_log)
31f18b77 692{
f64942e4 693 int ret = reshard_lock.lock();
31f18b77
FG
694 if (ret < 0) {
695 return ret;
696 }
697
698 RGWBucketInfo new_bucket_info;
699 ret = create_new_bucket_instance(num_shards, new_bucket_info);
700 if (ret < 0) {
f64942e4
AA
701 // shard state is uncertain, but this will attempt to remove them anyway
702 goto error_out;
31f18b77
FG
703 }
704
705 if (reshard_log) {
706 ret = reshard_log->update(bucket_info, new_bucket_info);
707 if (ret < 0) {
f64942e4 708 goto error_out;
31f18b77
FG
709 }
710 }
711
f64942e4
AA
712 // set resharding status of current bucket_info & shards with
713 // information about planned resharding
714 ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
9f95a23c 715 num_shards, cls_rgw_reshard_status::IN_PROGRESS);
31f18b77 716 if (ret < 0) {
9f95a23c 717 goto error_out;
31f18b77
FG
718 }
719
720 ret = do_reshard(num_shards,
721 new_bucket_info,
722 max_op_entries,
723 verbose, out, formatter);
31f18b77 724 if (ret < 0) {
f64942e4 725 goto error_out;
31f18b77
FG
726 }
727
f64942e4
AA
728 // at this point we've done the main work; we'll make a best-effort
729 // to clean-up but will not indicate any errors encountered
730
731 reshard_lock.unlock();
732
733 // resharding successful, so remove old bucket index shards; use
734 // best effort and don't report out an error; the lock isn't needed
735 // at this point since all we're using a best effor to to remove old
736 // shard objects
9f95a23c 737 ret = store->svc()->bi->clean_index(bucket_info);
31f18b77 738 if (ret < 0) {
f64942e4
AA
739 lderr(store->ctx()) << "Error: " << __func__ <<
740 " failed to clean up old shards; " <<
741 "RGWRados::clean_bucket_index returned " << ret << dendl;
31f18b77
FG
742 }
743
9f95a23c
TL
744 ret = store->ctl()->bucket->remove_bucket_instance_info(bucket_info.bucket,
745 bucket_info, null_yield);
f64942e4
AA
746 if (ret < 0) {
747 lderr(store->ctx()) << "Error: " << __func__ <<
748 " failed to clean old bucket info object \"" <<
749 bucket_info.bucket.get_key() <<
750 "\"created after successful resharding with error " << ret << dendl;
751 }
31f18b77 752
a8e16298
TL
753 ldout(store->ctx(), 1) << __func__ <<
754 " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
755 bucket_info.bucket.get_key() << "\" to \"" <<
756 new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
757
31f18b77 758 return 0;
f64942e4
AA
759
760error_out:
761
762 reshard_lock.unlock();
763
764 // since the real problem is the issue that led to this error code
765 // path, we won't touch ret and instead use another variable to
766 // temporarily error codes
9f95a23c 767 int ret2 = store->svc()->bi->clean_index(new_bucket_info);
f64942e4
AA
768 if (ret2 < 0) {
769 lderr(store->ctx()) << "Error: " << __func__ <<
770 " failed to clean up shards from failed incomplete resharding; " <<
771 "RGWRados::clean_bucket_index returned " << ret2 << dendl;
772 }
773
9f95a23c
TL
774 ret2 = store->ctl()->bucket->remove_bucket_instance_info(new_bucket_info.bucket,
775 new_bucket_info,
776 null_yield);
f64942e4
AA
777 if (ret2 < 0) {
778 lderr(store->ctx()) << "Error: " << __func__ <<
779 " failed to clean bucket info object \"" <<
780 new_bucket_info.bucket.get_key() <<
781 "\"created during incomplete resharding with error " << ret2 << dendl;
782 }
783
784 return ret;
785} // execute
31f18b77
FG
786
787
9f95a23c 788RGWReshard::RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose, ostream *_out,
f64942e4
AA
789 Formatter *_formatter) :
790 store(_store), instance_lock(bucket_instance_lock_name),
791 verbose(_verbose), out(_out), formatter(_formatter)
31f18b77 792{
11fdf7f2 793 num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs");
31f18b77
FG
794}
795
f64942e4
AA
796string RGWReshard::get_logshard_key(const string& tenant,
797 const string& bucket_name)
31f18b77
FG
798{
799 return tenant + ":" + bucket_name;
800}
801
802#define MAX_RESHARD_LOGSHARDS_PRIME 7877
803
804void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
805{
806 string key = get_logshard_key(tenant, bucket_name);
807
808 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
809 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
810 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
31f18b77 811
1adf2230 812 get_logshard_oid(int(sid), oid);
31f18b77
FG
813}
814
815int RGWReshard::add(cls_rgw_reshard_entry& entry)
816{
9f95a23c 817 if (!store->svc()->zone->can_reshard()) {
3efd9988
FG
818 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
819 return 0;
820 }
821
31f18b77
FG
822 string logshard_oid;
823
824 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
825
826 librados::ObjectWriteOperation op;
827 cls_rgw_reshard_add(op, entry);
828
9f95a23c 829 int ret = rgw_rados_operate(store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
31f18b77
FG
830 if (ret < 0) {
831 lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
832 return ret;
833 }
834 return 0;
835}
836
837int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
838{
839 cls_rgw_reshard_entry entry;
840 entry.bucket_name = bucket_info.bucket.name;
841 entry.bucket_id = bucket_info.bucket.bucket_id;
b32b8144 842 entry.tenant = bucket_info.owner.tenant;
31f18b77
FG
843
844 int ret = get(entry);
845 if (ret < 0) {
846 return ret;
847 }
848
849 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
850
851 ret = add(entry);
852 if (ret < 0) {
853 ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
854 cpp_strerror(-ret) << dendl;
855 }
856
857 return ret;
858}
859
860
861int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
862{
863 string logshard_oid;
864
865 get_logshard_oid(logshard_num, &logshard_oid);
866
9f95a23c 867 int ret = cls_rgw_reshard_list(store->getRados()->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
c07f9fc5
FG
868
869 if (ret < 0) {
870 if (ret == -ENOENT) {
871 *is_truncated = false;
872 ret = 0;
9f95a23c
TL
873 } else {
874 lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
875 if (ret == -EACCES) {
876 lderr(store->ctx()) << "access denied to pool " << store->svc()->zone->get_zone_params().reshard_pool
c07f9fc5 877 << ". Fix the pool access permissions of your client" << dendl;
9f95a23c
TL
878 }
879 }
31f18b77 880 }
c07f9fc5
FG
881
882 return ret;
31f18b77
FG
883}
884
885int RGWReshard::get(cls_rgw_reshard_entry& entry)
886{
887 string logshard_oid;
888
889 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
890
9f95a23c 891 int ret = cls_rgw_reshard_get(store->getRados()->reshard_pool_ctx, logshard_oid, entry);
31f18b77 892 if (ret < 0) {
94b18763
FG
893 if (ret != -ENOENT) {
894 lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
895 " bucket=" << entry.bucket_name << dendl;
896 }
31f18b77
FG
897 return ret;
898 }
899
900 return 0;
901}
902
903int RGWReshard::remove(cls_rgw_reshard_entry& entry)
904{
905 string logshard_oid;
906
907 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
908
909 librados::ObjectWriteOperation op;
910 cls_rgw_reshard_remove(op, entry);
911
9f95a23c 912 int ret = rgw_rados_operate(store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
31f18b77
FG
913 if (ret < 0) {
914 lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
915 return ret;
916 }
917
918 return ret;
919}
920
921int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
922{
9f95a23c 923 int ret = cls_rgw_clear_bucket_resharding(store->getRados()->reshard_pool_ctx, bucket_instance_oid);
31f18b77
FG
924 if (ret < 0) {
925 lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
926 return ret;
927 }
928
929 return 0;
930}
931
11fdf7f2 932int RGWReshardWait::wait(optional_yield y)
31f18b77 933{
11fdf7f2
TL
934 std::unique_lock lock(mutex);
935
936 if (going_down) {
937 return -ECANCELED;
938 }
939
940#ifdef HAVE_BOOST_CONTEXT
941 if (y) {
942 auto& context = y.get_io_context();
943 auto& yield = y.get_yield_context();
944
945 Waiter waiter(context);
946 waiters.push_back(waiter);
947 lock.unlock();
948
949 waiter.timer.expires_after(duration);
950
951 boost::system::error_code ec;
952 waiter.timer.async_wait(yield[ec]);
953
954 lock.lock();
955 waiters.erase(waiters.iterator_to(waiter));
956 return -ec.value();
957 }
958#endif
31f18b77 959
11fdf7f2 960 cond.wait_for(lock, duration);
31f18b77
FG
961
962 if (going_down) {
963 return -ECANCELED;
964 }
965
966 return 0;
967}
968
11fdf7f2 969void RGWReshardWait::stop()
31f18b77 970{
11fdf7f2
TL
971 std::scoped_lock lock(mutex);
972 going_down = true;
973 cond.notify_all();
974 for (auto& waiter : waiters) {
975 // unblock any waiters with ECANCELED
976 waiter.timer.cancel();
31f18b77 977 }
31f18b77
FG
978}
979
980int RGWReshard::process_single_logshard(int logshard_num)
981{
982 string marker;
983 bool truncated = true;
984
985 CephContext *cct = store->ctx();
f64942e4 986 constexpr uint32_t max_entries = 1000;
31f18b77
FG
987
988 string logshard_oid;
989 get_logshard_oid(logshard_num, &logshard_oid);
990
f64942e4
AA
991 RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
992
993 int ret = logshard_lock.lock();
92f5a8d4 994 if (ret < 0) {
f64942e4 995 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
92f5a8d4 996 logshard_oid << ", ret = " << ret <<dendl;
31f18b77
FG
997 return ret;
998 }
92f5a8d4 999
31f18b77
FG
1000 do {
1001 std::list<cls_rgw_reshard_entry> entries;
1002 ret = list(logshard_num, marker, max_entries, entries, &truncated);
1003 if (ret < 0) {
f64942e4
AA
1004 ldout(cct, 10) << "cannot list all reshards in logshard oid=" <<
1005 logshard_oid << dendl;
31f18b77
FG
1006 continue;
1007 }
1008
f64942e4 1009 for(auto& entry: entries) { // logshard entries
31f18b77
FG
1010 if(entry.new_instance_id.empty()) {
1011
f64942e4
AA
1012 ldout(store->ctx(), 20) << __func__ << " resharding " <<
1013 entry.bucket_name << dendl;
31f18b77 1014
31f18b77
FG
1015 rgw_bucket bucket;
1016 RGWBucketInfo bucket_info;
1017 map<string, bufferlist> attrs;
1018
9f95a23c
TL
1019 ret = store->getRados()->get_bucket_info(store->svc(),
1020 entry.tenant, entry.bucket_name,
1021 bucket_info, nullptr,
1022 null_yield, &attrs);
1911f103
TL
1023 if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) {
1024 if (ret < 0) {
1025 ldout(cct, 0) << __func__ <<
1026 ": Error in get_bucket_info for bucket " << entry.bucket_name <<
1027 ": " << cpp_strerror(-ret) << dendl;
1028 if (ret != -ENOENT) {
1029 // any error other than ENOENT will abort
1030 return ret;
1031 }
1032 } else {
1033 ldout(cct,0) << __func__ <<
1034 ": Bucket: " << entry.bucket_name <<
1035 " already resharded by someone, skipping " << dendl;
92f5a8d4
TL
1036 }
1037
1038 // we've encountered a reshard queue entry for an apparently
1039 // non-existent bucket; let's try to recover by cleaning up
1040 ldout(cct, 0) << __func__ <<
1911f103 1041 ": removing reshard queue entry for a resharded or non-existent bucket" <<
92f5a8d4
TL
1042 entry.bucket_name << dendl;
1043
1044 ret = remove(entry);
1045 if (ret < 0) {
1046 ldout(cct, 0) << __func__ <<
1047 ": Error removing non-existent bucket " <<
1048 entry.bucket_name << " from resharding queue: " <<
1049 cpp_strerror(-ret) << dendl;
1050 return ret;
1051 }
1052
1053 // we cleaned up, move on to the next entry
1054 goto finished_entry;
31f18b77
FG
1055 }
1056
f64942e4 1057 RGWBucketReshard br(store, bucket_info, attrs, nullptr);
92f5a8d4
TL
1058 ret = br.execute(entry.new_num_shards, max_entries, false, nullptr,
1059 nullptr, this);
31f18b77 1060 if (ret < 0) {
92f5a8d4
TL
1061 ldout(store->ctx(), 0) << __func__ <<
1062 ": Error during resharding bucket " << entry.bucket_name << ":" <<
31f18b77
FG
1063 cpp_strerror(-ret)<< dendl;
1064 return ret;
1065 }
1066
92f5a8d4
TL
1067 ldout(store->ctx(), 20) << __func__ <<
1068 " removing reshard queue entry for bucket " << entry.bucket_name <<
f64942e4 1069 dendl;
31f18b77
FG
1070
1071 ret = remove(entry);
1072 if (ret < 0) {
92f5a8d4
TL
1073 ldout(cct, 0) << __func__ << ": Error removing bucket " <<
1074 entry.bucket_name << " from resharding queue: " <<
f64942e4 1075 cpp_strerror(-ret) << dendl;
31f18b77
FG
1076 return ret;
1077 }
92f5a8d4
TL
1078 } // if new instance id is empty
1079
1080 finished_entry:
f64942e4
AA
1081
1082 Clock::time_point now = Clock::now();
1083 if (logshard_lock.should_renew(now)) {
1084 ret = logshard_lock.renew(now);
1085 if (ret < 0) {
1086 return ret;
1087 }
31f18b77 1088 }
f64942e4 1089
31f18b77 1090 entry.get_key(&marker);
92f5a8d4 1091 } // entry for loop
31f18b77
FG
1092 } while (truncated);
1093
f64942e4 1094 logshard_lock.unlock();
31f18b77
FG
1095 return 0;
1096}
1097
1098
1099void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
1100{
1101 char buf[32];
1102 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
1103
1104 string objname(reshard_oid_prefix);
1105 *logshard = objname + buf;
1106}
1107
1108int RGWReshard::process_all_logshards()
1109{
9f95a23c 1110 if (!store->svc()->zone->can_reshard()) {
3efd9988
FG
1111 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
1112 return 0;
1113 }
31f18b77
FG
1114 int ret = 0;
1115
1116 for (int i = 0; i < num_logshards; i++) {
1117 string logshard;
1118 get_logshard_oid(i, &logshard);
1119
81eedcae 1120 ldout(store->ctx(), 20) << "processing logshard = " << logshard << dendl;
31f18b77
FG
1121
1122 ret = process_single_logshard(i);
9f95a23c
TL
1123
1124 ldout(store->ctx(), 20) << "finish processing logshard = " << logshard << " , ret = " << ret << dendl;
31f18b77
FG
1125 }
1126
1127 return 0;
1128}
1129
1130bool RGWReshard::going_down()
1131{
1132 return down_flag;
1133}
1134
1135void RGWReshard::start_processor()
1136{
1137 worker = new ReshardWorker(store->ctx(), this);
1138 worker->create("rgw_reshard");
1139}
1140
1141void RGWReshard::stop_processor()
1142{
1143 down_flag = true;
1144 if (worker) {
1145 worker->stop();
1146 worker->join();
1147 }
1148 delete worker;
224ce89b 1149 worker = nullptr;
31f18b77
FG
1150}
1151
1152void *RGWReshard::ReshardWorker::entry() {
31f18b77
FG
1153 do {
1154 utime_t start = ceph_clock_now();
9f95a23c 1155 reshard->process_all_logshards();
31f18b77
FG
1156
1157 if (reshard->going_down())
1158 break;
1159
1160 utime_t end = ceph_clock_now();
1161 end -= start;
11fdf7f2 1162 int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval");
31f18b77
FG
1163
1164 if (secs <= end.sec())
1165 continue; // next round
1166
1167 secs -= end.sec();
1168
9f95a23c
TL
1169 std::unique_lock locker{lock};
1170 cond.wait_for(locker, std::chrono::seconds(secs));
31f18b77
FG
1171 } while (!reshard->going_down());
1172
1173 return NULL;
1174}
1175
1176void RGWReshard::ReshardWorker::stop()
1177{
9f95a23c
TL
1178 std::lock_guard l{lock};
1179 cond.notify_all();
31f18b77 1180}