]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.cc
import ceph 16.2.6
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
CommitLineData
31f18b77 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
31f18b77 3
f64942e4 4#include <limits>
11fdf7f2 5#include <sstream>
f64942e4 6
11fdf7f2 7#include "rgw_zone.h"
31f18b77
FG
8#include "rgw_bucket.h"
9#include "rgw_reshard.h"
9f95a23c 10#include "rgw_sal.h"
f67539c2 11#include "rgw_sal_rados.h"
31f18b77
FG
12#include "cls/rgw/cls_rgw_client.h"
13#include "cls/lock/cls_lock_client.h"
14#include "common/errno.h"
15#include "common/ceph_json.h"
16
17#include "common/dout.h"
18
11fdf7f2
TL
19#include "services/svc_zone.h"
20#include "services/svc_sys_obj.h"
9f95a23c 21#include "services/svc_tier_rados.h"
11fdf7f2 22
31f18b77
FG
23#define dout_context g_ceph_context
24#define dout_subsys ceph_subsys_rgw
25
26const string reshard_oid_prefix = "reshard.";
27const string reshard_lock_name = "reshard_process";
28const string bucket_instance_lock_name = "bucket_instance_lock";
29
9f95a23c
TL
30/* All primes up to 2000 used to attempt to make dynamic sharding use
31 * a prime numbers of shards. Note: this list also includes 1 for when
32 * 1 shard is the most appropriate, even though 1 is not prime.
33 */
34const std::initializer_list<uint16_t> RGWBucketReshard::reshard_primes = {
35 1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61,
36 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137,
37 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211,
38 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283,
39 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379,
40 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461,
41 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563,
42 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643,
43 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739,
44 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829,
45 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937,
46 941, 947, 953, 967, 971, 977, 983, 991, 997, 1009, 1013, 1019, 1021,
47 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093,
48 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181,
49 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259,
50 1277, 1279, 1283, 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321,
51 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, 1429, 1433,
52 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493,
53 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579,
54 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, 1627, 1637, 1657,
55 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741,
56 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831,
57 1847, 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, 1913,
58 1931, 1933, 1949, 1951, 1973, 1979, 1987, 1993, 1997, 1999
59};
f64942e4 60
31f18b77 61class BucketReshardShard {
9f95a23c 62 rgw::sal::RGWRadosStore *store;
31f18b77
FG
63 const RGWBucketInfo& bucket_info;
64 int num_shard;
f67539c2 65 const rgw::bucket_index_layout_generation& idx_layout;
31f18b77
FG
66 RGWRados::BucketShard bs;
67 vector<rgw_cls_bi_entry> entries;
11fdf7f2 68 map<RGWObjCategory, rgw_bucket_category_stats> stats;
31f18b77 69 deque<librados::AioCompletion *>& aio_completions;
11fdf7f2
TL
70 uint64_t max_aio_completions;
71 uint64_t reshard_shard_batch_size;
31f18b77
FG
72
73 int wait_next_completion() {
74 librados::AioCompletion *c = aio_completions.front();
75 aio_completions.pop_front();
76
9f95a23c 77 c->wait_for_complete();
31f18b77
FG
78
79 int ret = c->get_return_value();
80 c->release();
81
82 if (ret < 0) {
83 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
84 return ret;
85 }
86
87 return 0;
88 }
89
90 int get_completion(librados::AioCompletion **c) {
11fdf7f2 91 if (aio_completions.size() >= max_aio_completions) {
31f18b77
FG
92 int ret = wait_next_completion();
93 if (ret < 0) {
94 return ret;
95 }
96 }
97
9f95a23c 98 *c = librados::Rados::aio_create_completion(nullptr, nullptr);
31f18b77
FG
99 aio_completions.push_back(*c);
100
101 return 0;
102 }
103
104public:
b3b6e05e
TL
105 BucketReshardShard(const DoutPrefixProvider *dpp,
106 rgw::sal::RGWRadosStore *_store, const RGWBucketInfo& _bucket_info,
f67539c2 107 int _num_shard, const rgw::bucket_index_layout_generation& _idx_layout,
f64942e4 108 deque<librados::AioCompletion *>& _completions) :
f67539c2 109 store(_store), bucket_info(_bucket_info), idx_layout(_idx_layout), bs(store->getRados()),
f64942e4
AA
110 aio_completions(_completions)
111 {
f67539c2
TL
112 num_shard = (idx_layout.layout.normal.num_shards > 0 ? _num_shard : -1);
113
b3b6e05e 114 bs.init(bucket_info.bucket, num_shard, idx_layout, nullptr /* no RGWBucketInfo */, dpp);
11fdf7f2
TL
115
116 max_aio_completions =
117 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio");
118 reshard_shard_batch_size =
119 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size");
31f18b77
FG
120 }
121
122 int get_num_shard() {
123 return num_shard;
124 }
125
11fdf7f2 126 int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77
FG
127 const rgw_bucket_category_stats& entry_stats) {
128 entries.push_back(entry);
129 if (account) {
130 rgw_bucket_category_stats& target = stats[category];
131 target.num_entries += entry_stats.num_entries;
132 target.total_size += entry_stats.total_size;
133 target.total_size_rounded += entry_stats.total_size_rounded;
91327a77 134 target.actual_size += entry_stats.actual_size;
31f18b77 135 }
11fdf7f2 136 if (entries.size() >= reshard_shard_batch_size) {
31f18b77
FG
137 int ret = flush();
138 if (ret < 0) {
139 return ret;
140 }
141 }
f64942e4 142
31f18b77
FG
143 return 0;
144 }
f64942e4 145
31f18b77
FG
146 int flush() {
147 if (entries.size() == 0) {
148 return 0;
149 }
150
151 librados::ObjectWriteOperation op;
152 for (auto& entry : entries) {
9f95a23c 153 store->getRados()->bi_put(op, bs, entry);
31f18b77
FG
154 }
155 cls_rgw_bucket_update_stats(op, false, stats);
156
157 librados::AioCompletion *c;
158 int ret = get_completion(&c);
159 if (ret < 0) {
160 return ret;
161 }
9f95a23c 162 ret = bs.bucket_obj.aio_operate(c, &op);
31f18b77
FG
163 if (ret < 0) {
164 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
165 return ret;
166 }
167 entries.clear();
168 stats.clear();
169 return 0;
170 }
171
172 int wait_all_aio() {
173 int ret = 0;
174 while (!aio_completions.empty()) {
175 int r = wait_next_completion();
176 if (r < 0) {
177 ret = r;
178 }
179 }
180 return ret;
181 }
f64942e4
AA
182}; // class BucketReshardShard
183
31f18b77
FG
184
185class BucketReshardManager {
9f95a23c 186 rgw::sal::RGWRadosStore *store;
31f18b77
FG
187 const RGWBucketInfo& target_bucket_info;
188 deque<librados::AioCompletion *> completions;
189 int num_target_shards;
190 vector<BucketReshardShard *> target_shards;
191
192public:
b3b6e05e
TL
193 BucketReshardManager(const DoutPrefixProvider *dpp,
194 rgw::sal::RGWRadosStore *_store,
f64942e4
AA
195 const RGWBucketInfo& _target_bucket_info,
196 int _num_target_shards) :
197 store(_store), target_bucket_info(_target_bucket_info),
198 num_target_shards(_num_target_shards)
f67539c2
TL
199 {
200 const auto& idx_layout = target_bucket_info.layout.current_index;
31f18b77
FG
201 target_shards.resize(num_target_shards);
202 for (int i = 0; i < num_target_shards; ++i) {
b3b6e05e 203 target_shards[i] = new BucketReshardShard(dpp, store, target_bucket_info, i, idx_layout, completions);
31f18b77
FG
204 }
205 }
206
207 ~BucketReshardManager() {
208 for (auto& shard : target_shards) {
209 int ret = shard->wait_all_aio();
210 if (ret < 0) {
f64942e4
AA
211 ldout(store->ctx(), 20) << __func__ <<
212 ": shard->wait_all_aio() returned ret=" << ret << dendl;
31f18b77
FG
213 }
214 }
215 }
216
217 int add_entry(int shard_index,
11fdf7f2 218 rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77 219 const rgw_bucket_category_stats& entry_stats) {
f64942e4
AA
220 int ret = target_shards[shard_index]->add_entry(entry, account, category,
221 entry_stats);
31f18b77 222 if (ret < 0) {
f64942e4
AA
223 derr << "ERROR: target_shards.add_entry(" << entry.idx <<
224 ") returned error: " << cpp_strerror(-ret) << dendl;
31f18b77
FG
225 return ret;
226 }
f64942e4 227
31f18b77
FG
228 return 0;
229 }
230
231 int finish() {
232 int ret = 0;
233 for (auto& shard : target_shards) {
234 int r = shard->flush();
235 if (r < 0) {
236 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
237 ret = r;
238 }
239 }
240 for (auto& shard : target_shards) {
241 int r = shard->wait_all_aio();
242 if (r < 0) {
243 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
244 ret = r;
245 }
246 delete shard;
247 }
248 target_shards.clear();
249 return ret;
250 }
f64942e4
AA
251}; // class BucketReshardManager
252
9f95a23c 253RGWBucketReshard::RGWBucketReshard(rgw::sal::RGWRadosStore *_store,
f64942e4
AA
254 const RGWBucketInfo& _bucket_info,
255 const map<string, bufferlist>& _bucket_attrs,
256 RGWBucketReshardLock* _outer_reshard_lock) :
257 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
258 reshard_lock(store, bucket_info, true),
259 outer_reshard_lock(_outer_reshard_lock)
260{ }
261
b3b6e05e
TL
262int RGWBucketReshard::set_resharding_status(const DoutPrefixProvider *dpp,
263 rgw::sal::RGWRadosStore* store,
f64942e4
AA
264 const RGWBucketInfo& bucket_info,
265 const string& new_instance_id,
266 int32_t num_shards,
267 cls_rgw_reshard_status status)
31f18b77
FG
268{
269 if (new_instance_id.empty()) {
b3b6e05e 270 ldpp_dout(dpp, 0) << __func__ << " missing new bucket instance id" << dendl;
31f18b77
FG
271 return -EINVAL;
272 }
273
274 cls_rgw_bucket_instance_entry instance_entry;
275 instance_entry.set_status(new_instance_id, num_shards, status);
276
b3b6e05e 277 int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
31f18b77 278 if (ret < 0) {
b3b6e05e 279 ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
31f18b77
FG
280 << cpp_strerror(-ret) << dendl;
281 return ret;
282 }
283 return 0;
284}
285
f64942e4 286// reshard lock assumes lock is held
b3b6e05e
TL
287int RGWBucketReshard::clear_resharding(const DoutPrefixProvider *dpp,
288 rgw::sal::RGWRadosStore* store,
f64942e4 289 const RGWBucketInfo& bucket_info)
31f18b77 290{
b3b6e05e 291 int ret = clear_index_shard_reshard_status(dpp, store, bucket_info);
f64942e4 292 if (ret < 0) {
b3b6e05e 293 ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
f64942e4
AA
294 " ERROR: error clearing reshard status from index shard " <<
295 cpp_strerror(-ret) << dendl;
296 return ret;
297 }
31f18b77 298
f64942e4 299 cls_rgw_bucket_instance_entry instance_entry;
b3b6e05e 300 ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
31f18b77 301 if (ret < 0) {
b3b6e05e 302 ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ <<
f64942e4
AA
303 " ERROR: error setting bucket resharding flag on bucket index: " <<
304 cpp_strerror(-ret) << dendl;
31f18b77
FG
305 return ret;
306 }
f64942e4
AA
307
308 return 0;
309}
310
b3b6e05e
TL
311int RGWBucketReshard::clear_index_shard_reshard_status(const DoutPrefixProvider *dpp,
312 rgw::sal::RGWRadosStore* store,
f64942e4
AA
313 const RGWBucketInfo& bucket_info)
314{
f67539c2 315 uint32_t num_shards = bucket_info.layout.current_index.layout.normal.num_shards;
f64942e4
AA
316
317 if (num_shards < std::numeric_limits<uint32_t>::max()) {
b3b6e05e 318 int ret = set_resharding_status(dpp, store, bucket_info,
f64942e4
AA
319 bucket_info.bucket.bucket_id,
320 (num_shards < 1 ? 1 : num_shards),
9f95a23c 321 cls_rgw_reshard_status::NOT_RESHARDING);
f64942e4 322 if (ret < 0) {
b3b6e05e 323 ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
f64942e4
AA
324 " ERROR: error clearing reshard status from index shard " <<
325 cpp_strerror(-ret) << dendl;
326 return ret;
327 }
328 }
329
31f18b77
FG
330 return 0;
331}
332
9f95a23c 333static int create_new_bucket_instance(rgw::sal::RGWRadosStore *store,
31f18b77
FG
334 int new_num_shards,
335 const RGWBucketInfo& bucket_info,
336 map<string, bufferlist>& attrs,
b3b6e05e
TL
337 RGWBucketInfo& new_bucket_info,
338 const DoutPrefixProvider *dpp)
31f18b77
FG
339{
340 new_bucket_info = bucket_info;
341
9f95a23c 342 store->getRados()->create_bucket_id(&new_bucket_info.bucket.bucket_id);
31f18b77 343
f67539c2 344 new_bucket_info.layout.current_index.layout.normal.num_shards = new_num_shards;
31f18b77
FG
345 new_bucket_info.objv_tracker.clear();
346
347 new_bucket_info.new_bucket_instance_id.clear();
9f95a23c 348 new_bucket_info.reshard_status = cls_rgw_reshard_status::NOT_RESHARDING;
31f18b77 349
b3b6e05e 350 int ret = store->svc()->bi->init_index(dpp, new_bucket_info);
31f18b77
FG
351 if (ret < 0) {
352 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
f64942e4 353 return ret;
31f18b77
FG
354 }
355
b3b6e05e 356 ret = store->getRados()->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs, dpp);
31f18b77
FG
357 if (ret < 0) {
358 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
f64942e4 359 return ret;
31f18b77
FG
360 }
361
362 return 0;
363}
364
365int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
b3b6e05e
TL
366 RGWBucketInfo& new_bucket_info,
367 const DoutPrefixProvider *dpp)
31f18b77 368{
f64942e4 369 return ::create_new_bucket_instance(store, new_num_shards,
b3b6e05e 370 bucket_info, bucket_attrs, new_bucket_info, dpp);
31f18b77
FG
371}
372
b3b6e05e 373int RGWBucketReshard::cancel(const DoutPrefixProvider *dpp)
94b18763 374{
f64942e4 375 int ret = reshard_lock.lock();
94b18763
FG
376 if (ret < 0) {
377 return ret;
378 }
379
b3b6e05e 380 ret = clear_resharding(dpp);
94b18763 381
f64942e4
AA
382 reshard_lock.unlock();
383 return ret;
94b18763
FG
384}
385
31f18b77
FG
386class BucketInfoReshardUpdate
387{
b3b6e05e 388 const DoutPrefixProvider *dpp;
9f95a23c 389 rgw::sal::RGWRadosStore *store;
92f5a8d4 390 RGWBucketInfo& bucket_info;
31f18b77
FG
391 std::map<string, bufferlist> bucket_attrs;
392
393 bool in_progress{false};
394
b3b6e05e 395 int set_status(cls_rgw_reshard_status s, const DoutPrefixProvider *dpp) {
31f18b77 396 bucket_info.reshard_status = s;
b3b6e05e 397 int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp);
31f18b77 398 if (ret < 0) {
b3b6e05e 399 ldpp_dout(dpp, 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
31f18b77
FG
400 return ret;
401 }
402 return 0;
403 }
404
405public:
b3b6e05e
TL
406 BucketInfoReshardUpdate(const DoutPrefixProvider *_dpp,
407 rgw::sal::RGWRadosStore *_store,
f64942e4
AA
408 RGWBucketInfo& _bucket_info,
409 map<string, bufferlist>& _bucket_attrs,
410 const string& new_bucket_id) :
b3b6e05e 411 dpp(_dpp),
f64942e4
AA
412 store(_store),
413 bucket_info(_bucket_info),
414 bucket_attrs(_bucket_attrs)
415 {
31f18b77
FG
416 bucket_info.new_bucket_instance_id = new_bucket_id;
417 }
f64942e4 418
31f18b77
FG
419 ~BucketInfoReshardUpdate() {
420 if (in_progress) {
f64942e4
AA
421 // resharding must not have ended correctly, clean up
422 int ret =
b3b6e05e 423 RGWBucketReshard::clear_index_shard_reshard_status(dpp, store, bucket_info);
f64942e4 424 if (ret < 0) {
b3b6e05e 425 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
426 " clear_index_shard_status returned " << ret << dendl;
427 }
31f18b77 428 bucket_info.new_bucket_instance_id.clear();
9f95a23c
TL
429
430 // clears new_bucket_instance as well
b3b6e05e 431 set_status(cls_rgw_reshard_status::NOT_RESHARDING, dpp);
31f18b77
FG
432 }
433 }
434
435 int start() {
b3b6e05e 436 int ret = set_status(cls_rgw_reshard_status::IN_PROGRESS, dpp);
31f18b77
FG
437 if (ret < 0) {
438 return ret;
439 }
440 in_progress = true;
441 return 0;
442 }
443
444 int complete() {
b3b6e05e 445 int ret = set_status(cls_rgw_reshard_status::DONE, dpp);
31f18b77
FG
446 if (ret < 0) {
447 return ret;
448 }
449 in_progress = false;
450 return 0;
451 }
452};
453
f64942e4 454
9f95a23c 455RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RGWRadosStore* _store,
f64942e4
AA
456 const std::string& reshard_lock_oid,
457 bool _ephemeral) :
458 store(_store),
459 lock_oid(reshard_lock_oid),
460 ephemeral(_ephemeral),
461 internal_lock(reshard_lock_name)
462{
11fdf7f2
TL
463 const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>(
464 "rgw_reshard_bucket_lock_duration");
f64942e4
AA
465 duration = std::chrono::seconds(lock_dur_secs);
466
467#define COOKIE_LEN 16
468 char cookie_buf[COOKIE_LEN + 1];
469 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
470 cookie_buf[COOKIE_LEN] = '\0';
471
472 internal_lock.set_cookie(cookie_buf);
473 internal_lock.set_duration(duration);
474}
475
476int RGWBucketReshardLock::lock() {
477 internal_lock.set_must_renew(false);
522d829b 478
f64942e4
AA
479 int ret;
480 if (ephemeral) {
9f95a23c 481 ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx,
f64942e4
AA
482 lock_oid);
483 } else {
9f95a23c 484 ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4 485 }
522d829b
TL
486
487 if (ret == -EBUSY) {
488 ldout(store->ctx(), 0) << "INFO: RGWReshardLock::" << __func__ <<
489 " found lock on " << lock_oid <<
490 " to be held by another RGW process; skipping for now" << dendl;
491 return ret;
492 } else if (ret < 0) {
493 lderr(store->ctx()) << "ERROR: RGWReshardLock::" << __func__ <<
494 " failed to acquire lock on " << lock_oid << ": " <<
495 cpp_strerror(-ret) << dendl;
f64942e4
AA
496 return ret;
497 }
522d829b 498
f64942e4
AA
499 reset_time(Clock::now());
500
501 return 0;
502}
503
504void RGWBucketReshardLock::unlock() {
9f95a23c 505 int ret = internal_lock.unlock(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4
AA
506 if (ret < 0) {
507 ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
508 " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
509 }
510}
511
512int RGWBucketReshardLock::renew(const Clock::time_point& now) {
513 internal_lock.set_must_renew(true);
514 int ret;
515 if (ephemeral) {
9f95a23c 516 ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx,
f64942e4
AA
517 lock_oid);
518 } else {
9f95a23c 519 ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4
AA
520 }
521 if (ret < 0) { /* expired or already locked by another processor */
11fdf7f2
TL
522 std::stringstream error_s;
523 if (-ENOENT == ret) {
524 error_s << "ENOENT (lock expired or never initially locked)";
525 } else {
526 error_s << ret << " (" << cpp_strerror(-ret) << ")";
527 }
f64942e4 528 ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
11fdf7f2 529 lock_oid << " with error " << error_s.str() << dendl;
f64942e4
AA
530 return ret;
531 }
532 internal_lock.set_must_renew(false);
533
534 reset_time(now);
535 ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
536 lock_oid << dendl;
537
538 return 0;
539}
540
541
542int RGWBucketReshard::do_reshard(int num_shards,
543 RGWBucketInfo& new_bucket_info,
544 int max_entries,
545 bool verbose,
546 ostream *out,
b3b6e05e
TL
547 Formatter *formatter,
548 const DoutPrefixProvider *dpp)
31f18b77 549{
31f18b77 550 if (out) {
f67539c2
TL
551 const rgw_bucket& bucket = bucket_info.bucket;
552 (*out) << "tenant: " << bucket.tenant << std::endl;
553 (*out) << "bucket name: " << bucket.name << std::endl;
554 (*out) << "old bucket instance id: " << bucket.bucket_id <<
f64942e4
AA
555 std::endl;
556 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
557 std::endl;
31f18b77
FG
558 }
559
f64942e4 560 /* update bucket info -- in progress*/
31f18b77
FG
561 list<rgw_cls_bi_entry> entries;
562
563 if (max_entries < 0) {
b3b6e05e 564 ldpp_dout(dpp, 0) << __func__ <<
f64942e4 565 ": can't reshard, negative max_entries" << dendl;
31f18b77
FG
566 return -EINVAL;
567 }
568
f64942e4
AA
569 // NB: destructor cleans up sharding state if reshard does not
570 // complete successfully
b3b6e05e 571 BucketInfoReshardUpdate bucket_info_updater(dpp, store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
31f18b77 572
f67539c2 573 int ret = bucket_info_updater.start();
31f18b77 574 if (ret < 0) {
b3b6e05e 575 ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
31f18b77
FG
576 return ret;
577 }
578
f67539c2 579 int num_target_shards = (new_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? new_bucket_info.layout.current_index.layout.normal.num_shards : 1);
31f18b77 580
b3b6e05e 581 BucketReshardManager target_shards_mgr(dpp, store, new_bucket_info, num_target_shards);
31f18b77 582
92f5a8d4 583 bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
31f18b77 584
92f5a8d4 585 if (verbose_json_out) {
31f18b77
FG
586 formatter->open_array_section("entries");
587 }
588
589 uint64_t total_entries = 0;
590
92f5a8d4
TL
591 if (!verbose_json_out && out) {
592 (*out) << "total entries:";
31f18b77
FG
593 }
594
f64942e4 595 const int num_source_shards =
f67539c2 596 (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
31f18b77
FG
597 string marker;
598 for (int i = 0; i < num_source_shards; ++i) {
599 bool is_truncated = true;
600 marker.clear();
601 while (is_truncated) {
602 entries.clear();
b3b6e05e 603 ret = store->getRados()->bi_list(dpp, bucket_info, i, string(), marker, max_entries, &entries, &is_truncated);
31f18b77
FG
604 if (ret < 0 && ret != -ENOENT) {
605 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
f64942e4 606 return ret;
31f18b77
FG
607 }
608
f64942e4 609 for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
31f18b77 610 rgw_cls_bi_entry& entry = *iter;
92f5a8d4 611 if (verbose_json_out) {
31f18b77
FG
612 formatter->open_object_section("entry");
613
614 encode_json("shard_id", i, formatter);
615 encode_json("num_entry", total_entries, formatter);
616 encode_json("entry", entry, formatter);
617 }
618 total_entries++;
619
620 marker = entry.idx;
621
622 int target_shard_id;
623 cls_rgw_obj_key cls_key;
11fdf7f2 624 RGWObjCategory category;
31f18b77
FG
625 rgw_bucket_category_stats stats;
626 bool account = entry.get_info(&cls_key, &category, &stats);
627 rgw_obj_key key(cls_key);
628 rgw_obj obj(new_bucket_info.bucket, key);
92f5a8d4
TL
629 RGWMPObj mp;
630 if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
631 // place the multipart .meta object on the same shard as its head object
632 obj.index_hash_source = mp.get_key();
633 }
f67539c2 634 int ret = store->getRados()->get_target_shard_id(new_bucket_info.layout.current_index.layout.normal, obj.get_hash_object(), &target_shard_id);
31f18b77 635 if (ret < 0) {
b3b6e05e 636 ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
31f18b77
FG
637 return ret;
638 }
639
640 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
641
f64942e4
AA
642 ret = target_shards_mgr.add_entry(shard_index, entry, account,
643 category, stats);
31f18b77
FG
644 if (ret < 0) {
645 return ret;
646 }
f64942e4
AA
647
648 Clock::time_point now = Clock::now();
649 if (reshard_lock.should_renew(now)) {
650 // assume outer locks have timespans at least the size of ours, so
651 // can call inside conditional
652 if (outer_reshard_lock) {
653 ret = outer_reshard_lock->renew(now);
654 if (ret < 0) {
655 return ret;
656 }
657 }
658 ret = reshard_lock.renew(now);
659 if (ret < 0) {
b3b6e05e 660 ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
f64942e4
AA
661 return ret;
662 }
663 }
92f5a8d4 664 if (verbose_json_out) {
31f18b77 665 formatter->close_section();
92f5a8d4 666 formatter->flush(*out);
31f18b77
FG
667 } else if (out && !(total_entries % 1000)) {
668 (*out) << " " << total_entries;
669 }
f64942e4 670 } // entries loop
31f18b77
FG
671 }
672 }
f64942e4 673
92f5a8d4 674 if (verbose_json_out) {
31f18b77 675 formatter->close_section();
92f5a8d4 676 formatter->flush(*out);
31f18b77
FG
677 } else if (out) {
678 (*out) << " " << total_entries << std::endl;
679 }
680
681 ret = target_shards_mgr.finish();
682 if (ret < 0) {
b3b6e05e 683 ldpp_dout(dpp, -1) << "ERROR: failed to reshard" << dendl;
f64942e4 684 return -EIO;
31f18b77
FG
685 }
686
b3b6e05e 687 ret = store->ctl()->bucket->link_bucket(new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time, null_yield, dpp);
b32b8144 688 if (ret < 0) {
b3b6e05e 689 ldpp_dout(dpp, -1) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
f64942e4 690 return ret;
31f18b77
FG
691 }
692
693 ret = bucket_info_updater.complete();
694 if (ret < 0) {
b3b6e05e 695 ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
31f18b77
FG
696 /* don't error out, reshard process succeeded */
697 }
f64942e4 698
31f18b77 699 return 0;
f64942e4
AA
700 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
701} // RGWBucketReshard::do_reshard
31f18b77 702
b3b6e05e 703int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_bucket_instance_entry> *status)
31f18b77 704{
b3b6e05e 705 return store->svc()->bi_rados->get_reshard_status(dpp, bucket_info, status);
31f18b77
FG
706}
707
31f18b77 708
f64942e4 709int RGWBucketReshard::execute(int num_shards, int max_op_entries,
b3b6e05e 710 const DoutPrefixProvider *dpp,
f64942e4
AA
711 bool verbose, ostream *out, Formatter *formatter,
712 RGWReshard* reshard_log)
31f18b77 713{
f64942e4 714 int ret = reshard_lock.lock();
31f18b77
FG
715 if (ret < 0) {
716 return ret;
717 }
718
719 RGWBucketInfo new_bucket_info;
b3b6e05e 720 ret = create_new_bucket_instance(num_shards, new_bucket_info, dpp);
31f18b77 721 if (ret < 0) {
f64942e4
AA
722 // shard state is uncertain, but this will attempt to remove them anyway
723 goto error_out;
31f18b77
FG
724 }
725
726 if (reshard_log) {
b3b6e05e 727 ret = reshard_log->update(dpp, bucket_info, new_bucket_info);
31f18b77 728 if (ret < 0) {
f64942e4 729 goto error_out;
31f18b77
FG
730 }
731 }
732
f64942e4
AA
733 // set resharding status of current bucket_info & shards with
734 // information about planned resharding
b3b6e05e 735 ret = set_resharding_status(dpp, new_bucket_info.bucket.bucket_id,
9f95a23c 736 num_shards, cls_rgw_reshard_status::IN_PROGRESS);
31f18b77 737 if (ret < 0) {
9f95a23c 738 goto error_out;
31f18b77
FG
739 }
740
741 ret = do_reshard(num_shards,
742 new_bucket_info,
743 max_op_entries,
b3b6e05e 744 verbose, out, formatter, dpp);
31f18b77 745 if (ret < 0) {
f64942e4 746 goto error_out;
31f18b77
FG
747 }
748
f64942e4
AA
749 // at this point we've done the main work; we'll make a best-effort
750 // to clean-up but will not indicate any errors encountered
751
752 reshard_lock.unlock();
753
754 // resharding successful, so remove old bucket index shards; use
755 // best effort and don't report out an error; the lock isn't needed
756 // at this point since all we're using a best effor to to remove old
757 // shard objects
b3b6e05e 758 ret = store->svc()->bi->clean_index(dpp, bucket_info);
31f18b77 759 if (ret < 0) {
b3b6e05e 760 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
761 " failed to clean up old shards; " <<
762 "RGWRados::clean_bucket_index returned " << ret << dendl;
31f18b77
FG
763 }
764
9f95a23c 765 ret = store->ctl()->bucket->remove_bucket_instance_info(bucket_info.bucket,
b3b6e05e 766 bucket_info, null_yield, dpp);
f64942e4 767 if (ret < 0) {
b3b6e05e 768 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
769 " failed to clean old bucket info object \"" <<
770 bucket_info.bucket.get_key() <<
771 "\"created after successful resharding with error " << ret << dendl;
772 }
31f18b77 773
b3b6e05e 774 ldpp_dout(dpp, 1) << __func__ <<
a8e16298
TL
775 " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
776 bucket_info.bucket.get_key() << "\" to \"" <<
777 new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
778
31f18b77 779 return 0;
f64942e4
AA
780
781error_out:
782
783 reshard_lock.unlock();
784
785 // since the real problem is the issue that led to this error code
786 // path, we won't touch ret and instead use another variable to
787 // temporarily error codes
b3b6e05e 788 int ret2 = store->svc()->bi->clean_index(dpp, new_bucket_info);
f64942e4 789 if (ret2 < 0) {
b3b6e05e 790 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
791 " failed to clean up shards from failed incomplete resharding; " <<
792 "RGWRados::clean_bucket_index returned " << ret2 << dendl;
793 }
794
9f95a23c
TL
795 ret2 = store->ctl()->bucket->remove_bucket_instance_info(new_bucket_info.bucket,
796 new_bucket_info,
b3b6e05e 797 null_yield, dpp);
f64942e4 798 if (ret2 < 0) {
b3b6e05e 799 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
800 " failed to clean bucket info object \"" <<
801 new_bucket_info.bucket.get_key() <<
802 "\"created during incomplete resharding with error " << ret2 << dendl;
803 }
804
805 return ret;
806} // execute
31f18b77
FG
807
808
9f95a23c 809RGWReshard::RGWReshard(rgw::sal::RGWRadosStore* _store, bool _verbose, ostream *_out,
f64942e4
AA
810 Formatter *_formatter) :
811 store(_store), instance_lock(bucket_instance_lock_name),
812 verbose(_verbose), out(_out), formatter(_formatter)
31f18b77 813{
11fdf7f2 814 num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs");
31f18b77
FG
815}
816
f64942e4
AA
817string RGWReshard::get_logshard_key(const string& tenant,
818 const string& bucket_name)
31f18b77
FG
819{
820 return tenant + ":" + bucket_name;
821}
822
823#define MAX_RESHARD_LOGSHARDS_PRIME 7877
824
825void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
826{
827 string key = get_logshard_key(tenant, bucket_name);
828
829 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
830 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
831 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
31f18b77 832
1adf2230 833 get_logshard_oid(int(sid), oid);
31f18b77
FG
834}
835
b3b6e05e 836int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
31f18b77 837{
9f95a23c 838 if (!store->svc()->zone->can_reshard()) {
b3b6e05e 839 ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
3efd9988
FG
840 return 0;
841 }
842
31f18b77
FG
843 string logshard_oid;
844
845 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
846
847 librados::ObjectWriteOperation op;
848 cls_rgw_reshard_add(op, entry);
849
b3b6e05e 850 int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
31f18b77 851 if (ret < 0) {
b3b6e05e 852 ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
31f18b77
FG
853 return ret;
854 }
855 return 0;
856}
857
b3b6e05e 858int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
31f18b77
FG
859{
860 cls_rgw_reshard_entry entry;
861 entry.bucket_name = bucket_info.bucket.name;
862 entry.bucket_id = bucket_info.bucket.bucket_id;
b32b8144 863 entry.tenant = bucket_info.owner.tenant;
31f18b77
FG
864
865 int ret = get(entry);
866 if (ret < 0) {
867 return ret;
868 }
869
870 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
871
b3b6e05e 872 ret = add(dpp, entry);
31f18b77 873 if (ret < 0) {
b3b6e05e 874 ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
31f18b77
FG
875 cpp_strerror(-ret) << dendl;
876 }
877
878 return ret;
879}
880
881
882int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
883{
884 string logshard_oid;
885
886 get_logshard_oid(logshard_num, &logshard_oid);
887
9f95a23c 888 int ret = cls_rgw_reshard_list(store->getRados()->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
c07f9fc5
FG
889
890 if (ret < 0) {
f67539c2
TL
891 lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << " "
892 << "marker=" << marker << " " << cpp_strerror(ret) << dendl;
c07f9fc5
FG
893 if (ret == -ENOENT) {
894 *is_truncated = false;
895 ret = 0;
9f95a23c 896 } else {
9f95a23c
TL
897 if (ret == -EACCES) {
898 lderr(store->ctx()) << "access denied to pool " << store->svc()->zone->get_zone_params().reshard_pool
c07f9fc5 899 << ". Fix the pool access permissions of your client" << dendl;
9f95a23c
TL
900 }
901 }
31f18b77 902 }
c07f9fc5
FG
903
904 return ret;
31f18b77
FG
905}
906
907int RGWReshard::get(cls_rgw_reshard_entry& entry)
908{
909 string logshard_oid;
910
911 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
912
9f95a23c 913 int ret = cls_rgw_reshard_get(store->getRados()->reshard_pool_ctx, logshard_oid, entry);
31f18b77 914 if (ret < 0) {
94b18763
FG
915 if (ret != -ENOENT) {
916 lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
917 " bucket=" << entry.bucket_name << dendl;
918 }
31f18b77
FG
919 return ret;
920 }
921
922 return 0;
923}
924
b3b6e05e 925int RGWReshard::remove(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
31f18b77
FG
926{
927 string logshard_oid;
928
929 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
930
931 librados::ObjectWriteOperation op;
932 cls_rgw_reshard_remove(op, entry);
933
b3b6e05e 934 int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
31f18b77 935 if (ret < 0) {
b3b6e05e 936 ldpp_dout(dpp, -1) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
31f18b77
FG
937 return ret;
938 }
939
940 return ret;
941}
942
943int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
944{
9f95a23c 945 int ret = cls_rgw_clear_bucket_resharding(store->getRados()->reshard_pool_ctx, bucket_instance_oid);
31f18b77
FG
946 if (ret < 0) {
947 lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
948 return ret;
949 }
950
951 return 0;
952}
953
11fdf7f2 954int RGWReshardWait::wait(optional_yield y)
31f18b77 955{
11fdf7f2
TL
956 std::unique_lock lock(mutex);
957
958 if (going_down) {
959 return -ECANCELED;
960 }
961
11fdf7f2
TL
962 if (y) {
963 auto& context = y.get_io_context();
964 auto& yield = y.get_yield_context();
965
966 Waiter waiter(context);
967 waiters.push_back(waiter);
968 lock.unlock();
969
970 waiter.timer.expires_after(duration);
971
972 boost::system::error_code ec;
973 waiter.timer.async_wait(yield[ec]);
974
975 lock.lock();
976 waiters.erase(waiters.iterator_to(waiter));
977 return -ec.value();
978 }
31f18b77 979
11fdf7f2 980 cond.wait_for(lock, duration);
31f18b77
FG
981
982 if (going_down) {
983 return -ECANCELED;
984 }
985
986 return 0;
987}
988
11fdf7f2 989void RGWReshardWait::stop()
31f18b77 990{
11fdf7f2
TL
991 std::scoped_lock lock(mutex);
992 going_down = true;
993 cond.notify_all();
994 for (auto& waiter : waiters) {
995 // unblock any waiters with ECANCELED
996 waiter.timer.cancel();
31f18b77 997 }
31f18b77
FG
998}
999
b3b6e05e 1000int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp)
31f18b77
FG
1001{
1002 string marker;
1003 bool truncated = true;
1004
f64942e4 1005 constexpr uint32_t max_entries = 1000;
31f18b77
FG
1006
1007 string logshard_oid;
1008 get_logshard_oid(logshard_num, &logshard_oid);
1009
f64942e4
AA
1010 RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
1011
1012 int ret = logshard_lock.lock();
92f5a8d4 1013 if (ret < 0) {
b3b6e05e 1014 ldpp_dout(dpp, 5) << __func__ << "(): failed to acquire lock on " <<
92f5a8d4 1015 logshard_oid << ", ret = " << ret <<dendl;
31f18b77
FG
1016 return ret;
1017 }
92f5a8d4 1018
31f18b77
FG
1019 do {
1020 std::list<cls_rgw_reshard_entry> entries;
1021 ret = list(logshard_num, marker, max_entries, entries, &truncated);
1022 if (ret < 0) {
b3b6e05e 1023 ldpp_dout(dpp, 10) << "cannot list all reshards in logshard oid=" <<
f64942e4 1024 logshard_oid << dendl;
31f18b77
FG
1025 continue;
1026 }
1027
f64942e4 1028 for(auto& entry: entries) { // logshard entries
31f18b77
FG
1029 if(entry.new_instance_id.empty()) {
1030
b3b6e05e 1031 ldpp_dout(dpp, 20) << __func__ << " resharding " <<
f64942e4 1032 entry.bucket_name << dendl;
31f18b77 1033
31f18b77
FG
1034 rgw_bucket bucket;
1035 RGWBucketInfo bucket_info;
1036 map<string, bufferlist> attrs;
1037
9f95a23c
TL
1038 ret = store->getRados()->get_bucket_info(store->svc(),
1039 entry.tenant, entry.bucket_name,
1040 bucket_info, nullptr,
b3b6e05e 1041 null_yield, dpp, &attrs);
1911f103
TL
1042 if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) {
1043 if (ret < 0) {
b3b6e05e 1044 ldpp_dout(dpp, 0) << __func__ <<
1911f103
TL
1045 ": Error in get_bucket_info for bucket " << entry.bucket_name <<
1046 ": " << cpp_strerror(-ret) << dendl;
1047 if (ret != -ENOENT) {
1048 // any error other than ENOENT will abort
1049 return ret;
1050 }
1051 } else {
b3b6e05e 1052 ldpp_dout(dpp, 0) << __func__ <<
1911f103
TL
1053 ": Bucket: " << entry.bucket_name <<
1054 " already resharded by someone, skipping " << dendl;
92f5a8d4
TL
1055 }
1056
1057 // we've encountered a reshard queue entry for an apparently
1058 // non-existent bucket; let's try to recover by cleaning up
b3b6e05e 1059 ldpp_dout(dpp, 0) << __func__ <<
1911f103 1060 ": removing reshard queue entry for a resharded or non-existent bucket" <<
92f5a8d4
TL
1061 entry.bucket_name << dendl;
1062
b3b6e05e 1063 ret = remove(dpp, entry);
92f5a8d4 1064 if (ret < 0) {
b3b6e05e 1065 ldpp_dout(dpp, 0) << __func__ <<
92f5a8d4
TL
1066 ": Error removing non-existent bucket " <<
1067 entry.bucket_name << " from resharding queue: " <<
1068 cpp_strerror(-ret) << dendl;
1069 return ret;
1070 }
1071
1072 // we cleaned up, move on to the next entry
1073 goto finished_entry;
31f18b77
FG
1074 }
1075
f64942e4 1076 RGWBucketReshard br(store, bucket_info, attrs, nullptr);
b3b6e05e 1077 ret = br.execute(entry.new_num_shards, max_entries, dpp, false, nullptr,
92f5a8d4 1078 nullptr, this);
31f18b77 1079 if (ret < 0) {
b3b6e05e 1080 ldpp_dout(dpp, 0) << __func__ <<
92f5a8d4 1081 ": Error during resharding bucket " << entry.bucket_name << ":" <<
31f18b77
FG
1082 cpp_strerror(-ret)<< dendl;
1083 return ret;
1084 }
1085
b3b6e05e 1086 ldpp_dout(dpp, 20) << __func__ <<
92f5a8d4 1087 " removing reshard queue entry for bucket " << entry.bucket_name <<
f64942e4 1088 dendl;
31f18b77 1089
b3b6e05e 1090 ret = remove(dpp, entry);
31f18b77 1091 if (ret < 0) {
b3b6e05e 1092 ldpp_dout(dpp, 0) << __func__ << ": Error removing bucket " <<
92f5a8d4 1093 entry.bucket_name << " from resharding queue: " <<
f64942e4 1094 cpp_strerror(-ret) << dendl;
31f18b77
FG
1095 return ret;
1096 }
92f5a8d4
TL
1097 } // if new instance id is empty
1098
1099 finished_entry:
f64942e4
AA
1100
1101 Clock::time_point now = Clock::now();
1102 if (logshard_lock.should_renew(now)) {
1103 ret = logshard_lock.renew(now);
1104 if (ret < 0) {
1105 return ret;
1106 }
31f18b77 1107 }
f64942e4 1108
31f18b77 1109 entry.get_key(&marker);
92f5a8d4 1110 } // entry for loop
31f18b77
FG
1111 } while (truncated);
1112
f64942e4 1113 logshard_lock.unlock();
31f18b77
FG
1114 return 0;
1115}
1116
1117
1118void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
1119{
1120 char buf[32];
1121 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
1122
1123 string objname(reshard_oid_prefix);
1124 *logshard = objname + buf;
1125}
1126
b3b6e05e 1127int RGWReshard::process_all_logshards(const DoutPrefixProvider *dpp)
31f18b77 1128{
9f95a23c 1129 if (!store->svc()->zone->can_reshard()) {
b3b6e05e 1130 ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
3efd9988
FG
1131 return 0;
1132 }
31f18b77
FG
1133 int ret = 0;
1134
1135 for (int i = 0; i < num_logshards; i++) {
1136 string logshard;
1137 get_logshard_oid(i, &logshard);
1138
b3b6e05e 1139 ldpp_dout(dpp, 20) << "processing logshard = " << logshard << dendl;
31f18b77 1140
b3b6e05e 1141 ret = process_single_logshard(i, dpp);
9f95a23c 1142
b3b6e05e 1143 ldpp_dout(dpp, 20) << "finish processing logshard = " << logshard << " , ret = " << ret << dendl;
31f18b77
FG
1144 }
1145
1146 return 0;
1147}
1148
1149bool RGWReshard::going_down()
1150{
1151 return down_flag;
1152}
1153
1154void RGWReshard::start_processor()
1155{
1156 worker = new ReshardWorker(store->ctx(), this);
1157 worker->create("rgw_reshard");
1158}
1159
1160void RGWReshard::stop_processor()
1161{
1162 down_flag = true;
1163 if (worker) {
1164 worker->stop();
1165 worker->join();
1166 }
1167 delete worker;
224ce89b 1168 worker = nullptr;
31f18b77
FG
1169}
1170
1171void *RGWReshard::ReshardWorker::entry() {
31f18b77
FG
1172 do {
1173 utime_t start = ceph_clock_now();
b3b6e05e 1174 reshard->process_all_logshards(this);
31f18b77
FG
1175
1176 if (reshard->going_down())
1177 break;
1178
1179 utime_t end = ceph_clock_now();
1180 end -= start;
11fdf7f2 1181 int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval");
31f18b77
FG
1182
1183 if (secs <= end.sec())
1184 continue; // next round
1185
1186 secs -= end.sec();
1187
9f95a23c
TL
1188 std::unique_lock locker{lock};
1189 cond.wait_for(locker, std::chrono::seconds(secs));
31f18b77
FG
1190 } while (!reshard->going_down());
1191
1192 return NULL;
1193}
1194
1195void RGWReshard::ReshardWorker::stop()
1196{
9f95a23c
TL
1197 std::lock_guard l{lock};
1198 cond.notify_all();
31f18b77 1199}
b3b6e05e
TL
1200
1201CephContext *RGWReshard::ReshardWorker::get_cct() const
1202{
1203 return cct;
1204}
1205
1206unsigned RGWReshard::ReshardWorker::get_subsys() const
1207{
1208 return dout_subsys;
1209}
1210
1211std::ostream& RGWReshard::ReshardWorker::gen_prefix(std::ostream& out) const
1212{
1213 return out << "rgw reshard worker thread: ";
1214}