]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
CommitLineData
31f18b77 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
31f18b77 3
f64942e4 4#include <limits>
11fdf7f2 5#include <sstream>
f64942e4 6
11fdf7f2 7#include "rgw_zone.h"
31f18b77
FG
8#include "rgw_bucket.h"
9#include "rgw_reshard.h"
9f95a23c 10#include "rgw_sal.h"
f67539c2 11#include "rgw_sal_rados.h"
31f18b77
FG
12#include "cls/rgw/cls_rgw_client.h"
13#include "cls/lock/cls_lock_client.h"
14#include "common/errno.h"
15#include "common/ceph_json.h"
16
17#include "common/dout.h"
18
11fdf7f2
TL
19#include "services/svc_zone.h"
20#include "services/svc_sys_obj.h"
9f95a23c 21#include "services/svc_tier_rados.h"
11fdf7f2 22
31f18b77
FG
23#define dout_context g_ceph_context
24#define dout_subsys ceph_subsys_rgw
25
20effc67
TL
26using namespace std;
27
31f18b77
FG
28const string reshard_oid_prefix = "reshard.";
29const string reshard_lock_name = "reshard_process";
30const string bucket_instance_lock_name = "bucket_instance_lock";
31
9f95a23c
TL
32/* All primes up to 2000 used to attempt to make dynamic sharding use
33 * a prime numbers of shards. Note: this list also includes 1 for when
34 * 1 shard is the most appropriate, even though 1 is not prime.
35 */
36const std::initializer_list<uint16_t> RGWBucketReshard::reshard_primes = {
37 1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61,
38 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137,
39 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211,
40 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283,
41 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379,
42 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461,
43 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563,
44 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643,
45 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739,
46 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829,
47 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937,
48 941, 947, 953, 967, 971, 977, 983, 991, 997, 1009, 1013, 1019, 1021,
49 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093,
50 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181,
51 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259,
52 1277, 1279, 1283, 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321,
53 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, 1429, 1433,
54 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493,
55 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579,
56 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, 1627, 1637, 1657,
57 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741,
58 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831,
59 1847, 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, 1913,
60 1931, 1933, 1949, 1951, 1973, 1979, 1987, 1993, 1997, 1999
61};
f64942e4 62
31f18b77 63class BucketReshardShard {
20effc67 64 rgw::sal::RadosStore* store;
31f18b77
FG
65 const RGWBucketInfo& bucket_info;
66 int num_shard;
f67539c2 67 const rgw::bucket_index_layout_generation& idx_layout;
31f18b77
FG
68 RGWRados::BucketShard bs;
69 vector<rgw_cls_bi_entry> entries;
11fdf7f2 70 map<RGWObjCategory, rgw_bucket_category_stats> stats;
31f18b77 71 deque<librados::AioCompletion *>& aio_completions;
11fdf7f2
TL
72 uint64_t max_aio_completions;
73 uint64_t reshard_shard_batch_size;
31f18b77
FG
74
75 int wait_next_completion() {
76 librados::AioCompletion *c = aio_completions.front();
77 aio_completions.pop_front();
78
9f95a23c 79 c->wait_for_complete();
31f18b77
FG
80
81 int ret = c->get_return_value();
82 c->release();
83
84 if (ret < 0) {
85 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
86 return ret;
87 }
88
89 return 0;
90 }
91
92 int get_completion(librados::AioCompletion **c) {
11fdf7f2 93 if (aio_completions.size() >= max_aio_completions) {
31f18b77
FG
94 int ret = wait_next_completion();
95 if (ret < 0) {
96 return ret;
97 }
98 }
99
9f95a23c 100 *c = librados::Rados::aio_create_completion(nullptr, nullptr);
31f18b77
FG
101 aio_completions.push_back(*c);
102
103 return 0;
104 }
105
106public:
b3b6e05e 107 BucketReshardShard(const DoutPrefixProvider *dpp,
20effc67 108 rgw::sal::RadosStore* _store, const RGWBucketInfo& _bucket_info,
f67539c2 109 int _num_shard, const rgw::bucket_index_layout_generation& _idx_layout,
f64942e4 110 deque<librados::AioCompletion *>& _completions) :
f67539c2 111 store(_store), bucket_info(_bucket_info), idx_layout(_idx_layout), bs(store->getRados()),
f64942e4
AA
112 aio_completions(_completions)
113 {
f67539c2
TL
114 num_shard = (idx_layout.layout.normal.num_shards > 0 ? _num_shard : -1);
115
b3b6e05e 116 bs.init(bucket_info.bucket, num_shard, idx_layout, nullptr /* no RGWBucketInfo */, dpp);
11fdf7f2
TL
117
118 max_aio_completions =
119 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio");
120 reshard_shard_batch_size =
121 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size");
31f18b77
FG
122 }
123
124 int get_num_shard() {
125 return num_shard;
126 }
127
11fdf7f2 128 int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77
FG
129 const rgw_bucket_category_stats& entry_stats) {
130 entries.push_back(entry);
131 if (account) {
132 rgw_bucket_category_stats& target = stats[category];
133 target.num_entries += entry_stats.num_entries;
134 target.total_size += entry_stats.total_size;
135 target.total_size_rounded += entry_stats.total_size_rounded;
91327a77 136 target.actual_size += entry_stats.actual_size;
31f18b77 137 }
11fdf7f2 138 if (entries.size() >= reshard_shard_batch_size) {
31f18b77
FG
139 int ret = flush();
140 if (ret < 0) {
141 return ret;
142 }
143 }
f64942e4 144
31f18b77
FG
145 return 0;
146 }
f64942e4 147
31f18b77
FG
148 int flush() {
149 if (entries.size() == 0) {
150 return 0;
151 }
152
153 librados::ObjectWriteOperation op;
154 for (auto& entry : entries) {
9f95a23c 155 store->getRados()->bi_put(op, bs, entry);
31f18b77
FG
156 }
157 cls_rgw_bucket_update_stats(op, false, stats);
158
159 librados::AioCompletion *c;
160 int ret = get_completion(&c);
161 if (ret < 0) {
162 return ret;
163 }
9f95a23c 164 ret = bs.bucket_obj.aio_operate(c, &op);
31f18b77
FG
165 if (ret < 0) {
166 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
167 return ret;
168 }
169 entries.clear();
170 stats.clear();
171 return 0;
172 }
173
174 int wait_all_aio() {
175 int ret = 0;
176 while (!aio_completions.empty()) {
177 int r = wait_next_completion();
178 if (r < 0) {
179 ret = r;
180 }
181 }
182 return ret;
183 }
f64942e4
AA
184}; // class BucketReshardShard
185
31f18b77
FG
186
187class BucketReshardManager {
20effc67 188 rgw::sal::RadosStore* store;
31f18b77
FG
189 const RGWBucketInfo& target_bucket_info;
190 deque<librados::AioCompletion *> completions;
191 int num_target_shards;
192 vector<BucketReshardShard *> target_shards;
193
194public:
b3b6e05e 195 BucketReshardManager(const DoutPrefixProvider *dpp,
20effc67 196 rgw::sal::RadosStore* _store,
f64942e4
AA
197 const RGWBucketInfo& _target_bucket_info,
198 int _num_target_shards) :
199 store(_store), target_bucket_info(_target_bucket_info),
200 num_target_shards(_num_target_shards)
f67539c2
TL
201 {
202 const auto& idx_layout = target_bucket_info.layout.current_index;
31f18b77
FG
203 target_shards.resize(num_target_shards);
204 for (int i = 0; i < num_target_shards; ++i) {
b3b6e05e 205 target_shards[i] = new BucketReshardShard(dpp, store, target_bucket_info, i, idx_layout, completions);
31f18b77
FG
206 }
207 }
208
209 ~BucketReshardManager() {
210 for (auto& shard : target_shards) {
211 int ret = shard->wait_all_aio();
212 if (ret < 0) {
f64942e4
AA
213 ldout(store->ctx(), 20) << __func__ <<
214 ": shard->wait_all_aio() returned ret=" << ret << dendl;
31f18b77
FG
215 }
216 }
217 }
218
219 int add_entry(int shard_index,
11fdf7f2 220 rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77 221 const rgw_bucket_category_stats& entry_stats) {
f64942e4
AA
222 int ret = target_shards[shard_index]->add_entry(entry, account, category,
223 entry_stats);
31f18b77 224 if (ret < 0) {
f64942e4
AA
225 derr << "ERROR: target_shards.add_entry(" << entry.idx <<
226 ") returned error: " << cpp_strerror(-ret) << dendl;
31f18b77
FG
227 return ret;
228 }
f64942e4 229
31f18b77
FG
230 return 0;
231 }
232
233 int finish() {
234 int ret = 0;
235 for (auto& shard : target_shards) {
236 int r = shard->flush();
237 if (r < 0) {
238 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
239 ret = r;
240 }
241 }
242 for (auto& shard : target_shards) {
243 int r = shard->wait_all_aio();
244 if (r < 0) {
245 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
246 ret = r;
247 }
248 delete shard;
249 }
250 target_shards.clear();
251 return ret;
252 }
f64942e4
AA
253}; // class BucketReshardManager
254
20effc67 255RGWBucketReshard::RGWBucketReshard(rgw::sal::RadosStore* _store,
f64942e4
AA
256 const RGWBucketInfo& _bucket_info,
257 const map<string, bufferlist>& _bucket_attrs,
258 RGWBucketReshardLock* _outer_reshard_lock) :
259 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
260 reshard_lock(store, bucket_info, true),
261 outer_reshard_lock(_outer_reshard_lock)
262{ }
263
b3b6e05e 264int RGWBucketReshard::set_resharding_status(const DoutPrefixProvider *dpp,
20effc67 265 rgw::sal::RadosStore* store,
f64942e4
AA
266 const RGWBucketInfo& bucket_info,
267 const string& new_instance_id,
268 int32_t num_shards,
269 cls_rgw_reshard_status status)
31f18b77
FG
270{
271 if (new_instance_id.empty()) {
b3b6e05e 272 ldpp_dout(dpp, 0) << __func__ << " missing new bucket instance id" << dendl;
31f18b77
FG
273 return -EINVAL;
274 }
275
276 cls_rgw_bucket_instance_entry instance_entry;
277 instance_entry.set_status(new_instance_id, num_shards, status);
278
b3b6e05e 279 int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
31f18b77 280 if (ret < 0) {
b3b6e05e 281 ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
31f18b77
FG
282 << cpp_strerror(-ret) << dendl;
283 return ret;
284 }
285 return 0;
286}
287
f64942e4 288// reshard lock assumes lock is held
b3b6e05e 289int RGWBucketReshard::clear_resharding(const DoutPrefixProvider *dpp,
20effc67 290 rgw::sal::RadosStore* store,
f64942e4 291 const RGWBucketInfo& bucket_info)
31f18b77 292{
b3b6e05e 293 int ret = clear_index_shard_reshard_status(dpp, store, bucket_info);
f64942e4 294 if (ret < 0) {
b3b6e05e 295 ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
f64942e4
AA
296 " ERROR: error clearing reshard status from index shard " <<
297 cpp_strerror(-ret) << dendl;
298 return ret;
299 }
31f18b77 300
f64942e4 301 cls_rgw_bucket_instance_entry instance_entry;
b3b6e05e 302 ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
31f18b77 303 if (ret < 0) {
b3b6e05e 304 ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ <<
f64942e4
AA
305 " ERROR: error setting bucket resharding flag on bucket index: " <<
306 cpp_strerror(-ret) << dendl;
31f18b77
FG
307 return ret;
308 }
f64942e4
AA
309
310 return 0;
311}
312
b3b6e05e 313int RGWBucketReshard::clear_index_shard_reshard_status(const DoutPrefixProvider *dpp,
20effc67 314 rgw::sal::RadosStore* store,
f64942e4
AA
315 const RGWBucketInfo& bucket_info)
316{
f67539c2 317 uint32_t num_shards = bucket_info.layout.current_index.layout.normal.num_shards;
f64942e4
AA
318
319 if (num_shards < std::numeric_limits<uint32_t>::max()) {
b3b6e05e 320 int ret = set_resharding_status(dpp, store, bucket_info,
f64942e4
AA
321 bucket_info.bucket.bucket_id,
322 (num_shards < 1 ? 1 : num_shards),
9f95a23c 323 cls_rgw_reshard_status::NOT_RESHARDING);
f64942e4 324 if (ret < 0) {
b3b6e05e 325 ldpp_dout(dpp, 0) << "RGWBucketReshard::" << __func__ <<
f64942e4
AA
326 " ERROR: error clearing reshard status from index shard " <<
327 cpp_strerror(-ret) << dendl;
328 return ret;
329 }
330 }
331
31f18b77
FG
332 return 0;
333}
334
20effc67 335static int create_new_bucket_instance(rgw::sal::RadosStore* store,
31f18b77
FG
336 int new_num_shards,
337 const RGWBucketInfo& bucket_info,
338 map<string, bufferlist>& attrs,
b3b6e05e
TL
339 RGWBucketInfo& new_bucket_info,
340 const DoutPrefixProvider *dpp)
31f18b77
FG
341{
342 new_bucket_info = bucket_info;
343
9f95a23c 344 store->getRados()->create_bucket_id(&new_bucket_info.bucket.bucket_id);
31f18b77 345
f67539c2 346 new_bucket_info.layout.current_index.layout.normal.num_shards = new_num_shards;
31f18b77
FG
347 new_bucket_info.objv_tracker.clear();
348
349 new_bucket_info.new_bucket_instance_id.clear();
9f95a23c 350 new_bucket_info.reshard_status = cls_rgw_reshard_status::NOT_RESHARDING;
31f18b77 351
20effc67 352 int ret = store->getRados()->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs, dpp);
31f18b77 353 if (ret < 0) {
20effc67 354 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
f64942e4 355 return ret;
31f18b77
FG
356 }
357
20effc67 358 ret = store->svc()->bi->init_index(dpp, new_bucket_info);
31f18b77 359 if (ret < 0) {
20effc67 360 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
f64942e4 361 return ret;
31f18b77
FG
362 }
363
364 return 0;
365}
366
367int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
b3b6e05e
TL
368 RGWBucketInfo& new_bucket_info,
369 const DoutPrefixProvider *dpp)
31f18b77 370{
f64942e4 371 return ::create_new_bucket_instance(store, new_num_shards,
b3b6e05e 372 bucket_info, bucket_attrs, new_bucket_info, dpp);
31f18b77
FG
373}
374
b3b6e05e 375int RGWBucketReshard::cancel(const DoutPrefixProvider *dpp)
94b18763 376{
20effc67 377 int ret = reshard_lock.lock(dpp);
94b18763
FG
378 if (ret < 0) {
379 return ret;
380 }
381
b3b6e05e 382 ret = clear_resharding(dpp);
94b18763 383
f64942e4
AA
384 reshard_lock.unlock();
385 return ret;
94b18763
FG
386}
387
31f18b77
FG
388class BucketInfoReshardUpdate
389{
b3b6e05e 390 const DoutPrefixProvider *dpp;
20effc67 391 rgw::sal::RadosStore* store;
92f5a8d4 392 RGWBucketInfo& bucket_info;
31f18b77
FG
393 std::map<string, bufferlist> bucket_attrs;
394
395 bool in_progress{false};
396
b3b6e05e 397 int set_status(cls_rgw_reshard_status s, const DoutPrefixProvider *dpp) {
31f18b77 398 bucket_info.reshard_status = s;
b3b6e05e 399 int ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp);
31f18b77 400 if (ret < 0) {
b3b6e05e 401 ldpp_dout(dpp, 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
31f18b77
FG
402 return ret;
403 }
404 return 0;
405 }
406
407public:
b3b6e05e 408 BucketInfoReshardUpdate(const DoutPrefixProvider *_dpp,
20effc67 409 rgw::sal::RadosStore* _store,
f64942e4
AA
410 RGWBucketInfo& _bucket_info,
411 map<string, bufferlist>& _bucket_attrs,
412 const string& new_bucket_id) :
b3b6e05e 413 dpp(_dpp),
f64942e4
AA
414 store(_store),
415 bucket_info(_bucket_info),
416 bucket_attrs(_bucket_attrs)
417 {
31f18b77
FG
418 bucket_info.new_bucket_instance_id = new_bucket_id;
419 }
f64942e4 420
31f18b77
FG
421 ~BucketInfoReshardUpdate() {
422 if (in_progress) {
f64942e4
AA
423 // resharding must not have ended correctly, clean up
424 int ret =
b3b6e05e 425 RGWBucketReshard::clear_index_shard_reshard_status(dpp, store, bucket_info);
f64942e4 426 if (ret < 0) {
b3b6e05e 427 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
428 " clear_index_shard_status returned " << ret << dendl;
429 }
31f18b77 430 bucket_info.new_bucket_instance_id.clear();
9f95a23c
TL
431
432 // clears new_bucket_instance as well
b3b6e05e 433 set_status(cls_rgw_reshard_status::NOT_RESHARDING, dpp);
31f18b77
FG
434 }
435 }
436
437 int start() {
b3b6e05e 438 int ret = set_status(cls_rgw_reshard_status::IN_PROGRESS, dpp);
31f18b77
FG
439 if (ret < 0) {
440 return ret;
441 }
442 in_progress = true;
443 return 0;
444 }
445
446 int complete() {
b3b6e05e 447 int ret = set_status(cls_rgw_reshard_status::DONE, dpp);
31f18b77
FG
448 if (ret < 0) {
449 return ret;
450 }
451 in_progress = false;
452 return 0;
453 }
454};
455
f64942e4 456
20effc67 457RGWBucketReshardLock::RGWBucketReshardLock(rgw::sal::RadosStore* _store,
f64942e4
AA
458 const std::string& reshard_lock_oid,
459 bool _ephemeral) :
460 store(_store),
461 lock_oid(reshard_lock_oid),
462 ephemeral(_ephemeral),
463 internal_lock(reshard_lock_name)
464{
11fdf7f2
TL
465 const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>(
466 "rgw_reshard_bucket_lock_duration");
f64942e4
AA
467 duration = std::chrono::seconds(lock_dur_secs);
468
469#define COOKIE_LEN 16
470 char cookie_buf[COOKIE_LEN + 1];
471 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
472 cookie_buf[COOKIE_LEN] = '\0';
473
474 internal_lock.set_cookie(cookie_buf);
475 internal_lock.set_duration(duration);
476}
477
20effc67 478int RGWBucketReshardLock::lock(const DoutPrefixProvider *dpp) {
f64942e4 479 internal_lock.set_must_renew(false);
522d829b 480
f64942e4
AA
481 int ret;
482 if (ephemeral) {
9f95a23c 483 ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx,
f64942e4
AA
484 lock_oid);
485 } else {
9f95a23c 486 ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4 487 }
522d829b
TL
488
489 if (ret == -EBUSY) {
490 ldout(store->ctx(), 0) << "INFO: RGWReshardLock::" << __func__ <<
491 " found lock on " << lock_oid <<
492 " to be held by another RGW process; skipping for now" << dendl;
493 return ret;
494 } else if (ret < 0) {
20effc67 495 ldpp_dout(dpp, -1) << "ERROR: RGWReshardLock::" << __func__ <<
522d829b
TL
496 " failed to acquire lock on " << lock_oid << ": " <<
497 cpp_strerror(-ret) << dendl;
f64942e4
AA
498 return ret;
499 }
522d829b 500
f64942e4
AA
501 reset_time(Clock::now());
502
503 return 0;
504}
505
506void RGWBucketReshardLock::unlock() {
9f95a23c 507 int ret = internal_lock.unlock(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4
AA
508 if (ret < 0) {
509 ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
510 " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
511 }
512}
513
514int RGWBucketReshardLock::renew(const Clock::time_point& now) {
515 internal_lock.set_must_renew(true);
516 int ret;
517 if (ephemeral) {
9f95a23c 518 ret = internal_lock.lock_exclusive_ephemeral(&store->getRados()->reshard_pool_ctx,
f64942e4
AA
519 lock_oid);
520 } else {
9f95a23c 521 ret = internal_lock.lock_exclusive(&store->getRados()->reshard_pool_ctx, lock_oid);
f64942e4
AA
522 }
523 if (ret < 0) { /* expired or already locked by another processor */
11fdf7f2
TL
524 std::stringstream error_s;
525 if (-ENOENT == ret) {
526 error_s << "ENOENT (lock expired or never initially locked)";
527 } else {
528 error_s << ret << " (" << cpp_strerror(-ret) << ")";
529 }
f64942e4 530 ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
11fdf7f2 531 lock_oid << " with error " << error_s.str() << dendl;
f64942e4
AA
532 return ret;
533 }
534 internal_lock.set_must_renew(false);
535
536 reset_time(now);
537 ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
538 lock_oid << dendl;
539
540 return 0;
541}
542
543
544int RGWBucketReshard::do_reshard(int num_shards,
545 RGWBucketInfo& new_bucket_info,
546 int max_entries,
547 bool verbose,
548 ostream *out,
b3b6e05e
TL
549 Formatter *formatter,
550 const DoutPrefixProvider *dpp)
31f18b77 551{
31f18b77 552 if (out) {
f67539c2
TL
553 const rgw_bucket& bucket = bucket_info.bucket;
554 (*out) << "tenant: " << bucket.tenant << std::endl;
555 (*out) << "bucket name: " << bucket.name << std::endl;
556 (*out) << "old bucket instance id: " << bucket.bucket_id <<
f64942e4
AA
557 std::endl;
558 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
559 std::endl;
31f18b77
FG
560 }
561
f64942e4 562 /* update bucket info -- in progress*/
31f18b77
FG
563 list<rgw_cls_bi_entry> entries;
564
565 if (max_entries < 0) {
b3b6e05e 566 ldpp_dout(dpp, 0) << __func__ <<
f64942e4 567 ": can't reshard, negative max_entries" << dendl;
31f18b77
FG
568 return -EINVAL;
569 }
570
f64942e4
AA
571 // NB: destructor cleans up sharding state if reshard does not
572 // complete successfully
b3b6e05e 573 BucketInfoReshardUpdate bucket_info_updater(dpp, store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
31f18b77 574
f67539c2 575 int ret = bucket_info_updater.start();
31f18b77 576 if (ret < 0) {
b3b6e05e 577 ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
31f18b77
FG
578 return ret;
579 }
580
f67539c2 581 int num_target_shards = (new_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? new_bucket_info.layout.current_index.layout.normal.num_shards : 1);
31f18b77 582
b3b6e05e 583 BucketReshardManager target_shards_mgr(dpp, store, new_bucket_info, num_target_shards);
31f18b77 584
92f5a8d4 585 bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
31f18b77 586
92f5a8d4 587 if (verbose_json_out) {
31f18b77
FG
588 formatter->open_array_section("entries");
589 }
590
591 uint64_t total_entries = 0;
592
92f5a8d4
TL
593 if (!verbose_json_out && out) {
594 (*out) << "total entries:";
31f18b77
FG
595 }
596
f64942e4 597 const int num_source_shards =
f67539c2 598 (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
31f18b77
FG
599 string marker;
600 for (int i = 0; i < num_source_shards; ++i) {
601 bool is_truncated = true;
602 marker.clear();
20effc67 603 const std::string null_object_filter; // empty string since we're not filtering by object
31f18b77
FG
604 while (is_truncated) {
605 entries.clear();
20effc67 606 ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter, marker, max_entries, &entries, &is_truncated);
31f18b77
FG
607 if (ret < 0 && ret != -ENOENT) {
608 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
f64942e4 609 return ret;
31f18b77
FG
610 }
611
f64942e4 612 for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
31f18b77 613 rgw_cls_bi_entry& entry = *iter;
92f5a8d4 614 if (verbose_json_out) {
31f18b77
FG
615 formatter->open_object_section("entry");
616
617 encode_json("shard_id", i, formatter);
618 encode_json("num_entry", total_entries, formatter);
619 encode_json("entry", entry, formatter);
620 }
621 total_entries++;
622
623 marker = entry.idx;
624
625 int target_shard_id;
626 cls_rgw_obj_key cls_key;
11fdf7f2 627 RGWObjCategory category;
31f18b77
FG
628 rgw_bucket_category_stats stats;
629 bool account = entry.get_info(&cls_key, &category, &stats);
630 rgw_obj_key key(cls_key);
631 rgw_obj obj(new_bucket_info.bucket, key);
92f5a8d4
TL
632 RGWMPObj mp;
633 if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
634 // place the multipart .meta object on the same shard as its head object
635 obj.index_hash_source = mp.get_key();
636 }
f67539c2 637 int ret = store->getRados()->get_target_shard_id(new_bucket_info.layout.current_index.layout.normal, obj.get_hash_object(), &target_shard_id);
31f18b77 638 if (ret < 0) {
b3b6e05e 639 ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
31f18b77
FG
640 return ret;
641 }
642
643 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
644
f64942e4
AA
645 ret = target_shards_mgr.add_entry(shard_index, entry, account,
646 category, stats);
31f18b77
FG
647 if (ret < 0) {
648 return ret;
649 }
f64942e4
AA
650
651 Clock::time_point now = Clock::now();
652 if (reshard_lock.should_renew(now)) {
653 // assume outer locks have timespans at least the size of ours, so
654 // can call inside conditional
655 if (outer_reshard_lock) {
656 ret = outer_reshard_lock->renew(now);
657 if (ret < 0) {
658 return ret;
659 }
660 }
661 ret = reshard_lock.renew(now);
662 if (ret < 0) {
b3b6e05e 663 ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
f64942e4
AA
664 return ret;
665 }
666 }
92f5a8d4 667 if (verbose_json_out) {
31f18b77 668 formatter->close_section();
92f5a8d4 669 formatter->flush(*out);
31f18b77
FG
670 } else if (out && !(total_entries % 1000)) {
671 (*out) << " " << total_entries;
672 }
f64942e4 673 } // entries loop
31f18b77
FG
674 }
675 }
f64942e4 676
92f5a8d4 677 if (verbose_json_out) {
31f18b77 678 formatter->close_section();
92f5a8d4 679 formatter->flush(*out);
31f18b77
FG
680 } else if (out) {
681 (*out) << " " << total_entries << std::endl;
682 }
683
684 ret = target_shards_mgr.finish();
685 if (ret < 0) {
b3b6e05e 686 ldpp_dout(dpp, -1) << "ERROR: failed to reshard" << dendl;
f64942e4 687 return -EIO;
31f18b77
FG
688 }
689
b3b6e05e 690 ret = store->ctl()->bucket->link_bucket(new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time, null_yield, dpp);
b32b8144 691 if (ret < 0) {
b3b6e05e 692 ldpp_dout(dpp, -1) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
f64942e4 693 return ret;
31f18b77
FG
694 }
695
696 ret = bucket_info_updater.complete();
697 if (ret < 0) {
b3b6e05e 698 ldpp_dout(dpp, 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
31f18b77
FG
699 /* don't error out, reshard process succeeded */
700 }
f64942e4 701
31f18b77 702 return 0;
f64942e4
AA
703 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
704} // RGWBucketReshard::do_reshard
31f18b77 705
b3b6e05e 706int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_bucket_instance_entry> *status)
31f18b77 707{
b3b6e05e 708 return store->svc()->bi_rados->get_reshard_status(dpp, bucket_info, status);
31f18b77
FG
709}
710
31f18b77 711
f64942e4 712int RGWBucketReshard::execute(int num_shards, int max_op_entries,
b3b6e05e 713 const DoutPrefixProvider *dpp,
f64942e4
AA
714 bool verbose, ostream *out, Formatter *formatter,
715 RGWReshard* reshard_log)
31f18b77 716{
20effc67 717 int ret = reshard_lock.lock(dpp);
31f18b77
FG
718 if (ret < 0) {
719 return ret;
720 }
721
722 RGWBucketInfo new_bucket_info;
b3b6e05e 723 ret = create_new_bucket_instance(num_shards, new_bucket_info, dpp);
31f18b77 724 if (ret < 0) {
f64942e4
AA
725 // shard state is uncertain, but this will attempt to remove them anyway
726 goto error_out;
31f18b77
FG
727 }
728
729 if (reshard_log) {
b3b6e05e 730 ret = reshard_log->update(dpp, bucket_info, new_bucket_info);
31f18b77 731 if (ret < 0) {
f64942e4 732 goto error_out;
31f18b77
FG
733 }
734 }
735
f64942e4
AA
736 // set resharding status of current bucket_info & shards with
737 // information about planned resharding
b3b6e05e 738 ret = set_resharding_status(dpp, new_bucket_info.bucket.bucket_id,
9f95a23c 739 num_shards, cls_rgw_reshard_status::IN_PROGRESS);
31f18b77 740 if (ret < 0) {
9f95a23c 741 goto error_out;
31f18b77
FG
742 }
743
744 ret = do_reshard(num_shards,
745 new_bucket_info,
746 max_op_entries,
b3b6e05e 747 verbose, out, formatter, dpp);
31f18b77 748 if (ret < 0) {
f64942e4 749 goto error_out;
31f18b77
FG
750 }
751
f64942e4
AA
752 // at this point we've done the main work; we'll make a best-effort
753 // to clean-up but will not indicate any errors encountered
754
755 reshard_lock.unlock();
756
757 // resharding successful, so remove old bucket index shards; use
758 // best effort and don't report out an error; the lock isn't needed
20effc67 759 // at this point since all we're using a best effort to remove old
f64942e4 760 // shard objects
b3b6e05e 761 ret = store->svc()->bi->clean_index(dpp, bucket_info);
31f18b77 762 if (ret < 0) {
b3b6e05e 763 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
764 " failed to clean up old shards; " <<
765 "RGWRados::clean_bucket_index returned " << ret << dendl;
31f18b77
FG
766 }
767
9f95a23c 768 ret = store->ctl()->bucket->remove_bucket_instance_info(bucket_info.bucket,
b3b6e05e 769 bucket_info, null_yield, dpp);
f64942e4 770 if (ret < 0) {
b3b6e05e 771 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
772 " failed to clean old bucket info object \"" <<
773 bucket_info.bucket.get_key() <<
774 "\"created after successful resharding with error " << ret << dendl;
775 }
31f18b77 776
b3b6e05e 777 ldpp_dout(dpp, 1) << __func__ <<
a8e16298
TL
778 " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
779 bucket_info.bucket.get_key() << "\" to \"" <<
780 new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
781
31f18b77 782 return 0;
f64942e4
AA
783
784error_out:
785
786 reshard_lock.unlock();
787
788 // since the real problem is the issue that led to this error code
789 // path, we won't touch ret and instead use another variable to
790 // temporarily error codes
b3b6e05e 791 int ret2 = store->svc()->bi->clean_index(dpp, new_bucket_info);
f64942e4 792 if (ret2 < 0) {
b3b6e05e 793 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
794 " failed to clean up shards from failed incomplete resharding; " <<
795 "RGWRados::clean_bucket_index returned " << ret2 << dendl;
796 }
797
9f95a23c
TL
798 ret2 = store->ctl()->bucket->remove_bucket_instance_info(new_bucket_info.bucket,
799 new_bucket_info,
b3b6e05e 800 null_yield, dpp);
f64942e4 801 if (ret2 < 0) {
b3b6e05e 802 ldpp_dout(dpp, -1) << "Error: " << __func__ <<
f64942e4
AA
803 " failed to clean bucket info object \"" <<
804 new_bucket_info.bucket.get_key() <<
805 "\"created during incomplete resharding with error " << ret2 << dendl;
806 }
807
808 return ret;
809} // execute
31f18b77
FG
810
811
20effc67 812RGWReshard::RGWReshard(rgw::sal::RadosStore* _store, bool _verbose, ostream *_out,
f64942e4
AA
813 Formatter *_formatter) :
814 store(_store), instance_lock(bucket_instance_lock_name),
815 verbose(_verbose), out(_out), formatter(_formatter)
31f18b77 816{
11fdf7f2 817 num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs");
31f18b77
FG
818}
819
f64942e4
AA
820string RGWReshard::get_logshard_key(const string& tenant,
821 const string& bucket_name)
31f18b77
FG
822{
823 return tenant + ":" + bucket_name;
824}
825
826#define MAX_RESHARD_LOGSHARDS_PRIME 7877
827
828void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
829{
830 string key = get_logshard_key(tenant, bucket_name);
831
832 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
833 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
834 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
31f18b77 835
1adf2230 836 get_logshard_oid(int(sid), oid);
31f18b77
FG
837}
838
b3b6e05e 839int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
31f18b77 840{
9f95a23c 841 if (!store->svc()->zone->can_reshard()) {
b3b6e05e 842 ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
3efd9988
FG
843 return 0;
844 }
845
31f18b77
FG
846 string logshard_oid;
847
848 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
849
850 librados::ObjectWriteOperation op;
851 cls_rgw_reshard_add(op, entry);
852
b3b6e05e 853 int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
31f18b77 854 if (ret < 0) {
b3b6e05e 855 ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
31f18b77
FG
856 return ret;
857 }
858 return 0;
859}
860
b3b6e05e 861int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
31f18b77
FG
862{
863 cls_rgw_reshard_entry entry;
864 entry.bucket_name = bucket_info.bucket.name;
865 entry.bucket_id = bucket_info.bucket.bucket_id;
b32b8144 866 entry.tenant = bucket_info.owner.tenant;
31f18b77 867
20effc67 868 int ret = get(dpp, entry);
31f18b77
FG
869 if (ret < 0) {
870 return ret;
871 }
872
873 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
874
b3b6e05e 875 ret = add(dpp, entry);
31f18b77 876 if (ret < 0) {
b3b6e05e 877 ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
31f18b77
FG
878 cpp_strerror(-ret) << dendl;
879 }
880
881 return ret;
882}
883
884
20effc67 885int RGWReshard::list(const DoutPrefixProvider *dpp, int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
31f18b77
FG
886{
887 string logshard_oid;
888
889 get_logshard_oid(logshard_num, &logshard_oid);
890
9f95a23c 891 int ret = cls_rgw_reshard_list(store->getRados()->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
c07f9fc5 892
20effc67
TL
893 if (ret == -ENOENT) {
894 // these shard objects aren't created until we actually write something to
895 // them, so treat ENOENT as a successful empty listing
896 *is_truncated = false;
897 ret = 0;
898 } else if (ret == -EACCES) {
899 ldpp_dout(dpp, -1) << "ERROR: access denied to pool " << store->svc()->zone->get_zone_params().reshard_pool
900 << ". Fix the pool access permissions of your client" << dendl;
901 } else if (ret < 0) {
902 ldpp_dout(dpp, -1) << "ERROR: failed to list reshard log entries, oid="
903 << logshard_oid << " marker=" << marker << " " << cpp_strerror(ret) << dendl;
31f18b77 904 }
c07f9fc5
FG
905
906 return ret;
31f18b77
FG
907}
908
20effc67 909int RGWReshard::get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
31f18b77
FG
910{
911 string logshard_oid;
912
913 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
914
9f95a23c 915 int ret = cls_rgw_reshard_get(store->getRados()->reshard_pool_ctx, logshard_oid, entry);
31f18b77 916 if (ret < 0) {
94b18763 917 if (ret != -ENOENT) {
20effc67 918 ldpp_dout(dpp, -1) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
94b18763
FG
919 " bucket=" << entry.bucket_name << dendl;
920 }
31f18b77
FG
921 return ret;
922 }
923
924 return 0;
925}
926
b3b6e05e 927int RGWReshard::remove(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry)
31f18b77
FG
928{
929 string logshard_oid;
930
931 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
932
933 librados::ObjectWriteOperation op;
934 cls_rgw_reshard_remove(op, entry);
935
b3b6e05e 936 int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, null_yield);
31f18b77 937 if (ret < 0) {
b3b6e05e 938 ldpp_dout(dpp, -1) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
31f18b77
FG
939 return ret;
940 }
941
942 return ret;
943}
944
20effc67 945int RGWReshard::clear_bucket_resharding(const DoutPrefixProvider *dpp, const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
31f18b77 946{
9f95a23c 947 int ret = cls_rgw_clear_bucket_resharding(store->getRados()->reshard_pool_ctx, bucket_instance_oid);
31f18b77 948 if (ret < 0) {
20effc67 949 ldpp_dout(dpp, -1) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
31f18b77
FG
950 return ret;
951 }
952
953 return 0;
954}
955
11fdf7f2 956int RGWReshardWait::wait(optional_yield y)
31f18b77 957{
11fdf7f2
TL
958 std::unique_lock lock(mutex);
959
960 if (going_down) {
961 return -ECANCELED;
962 }
963
11fdf7f2
TL
964 if (y) {
965 auto& context = y.get_io_context();
966 auto& yield = y.get_yield_context();
967
968 Waiter waiter(context);
969 waiters.push_back(waiter);
970 lock.unlock();
971
972 waiter.timer.expires_after(duration);
973
974 boost::system::error_code ec;
975 waiter.timer.async_wait(yield[ec]);
976
977 lock.lock();
978 waiters.erase(waiters.iterator_to(waiter));
979 return -ec.value();
980 }
31f18b77 981
11fdf7f2 982 cond.wait_for(lock, duration);
31f18b77
FG
983
984 if (going_down) {
985 return -ECANCELED;
986 }
987
988 return 0;
989}
990
11fdf7f2 991void RGWReshardWait::stop()
31f18b77 992{
11fdf7f2
TL
993 std::scoped_lock lock(mutex);
994 going_down = true;
995 cond.notify_all();
996 for (auto& waiter : waiters) {
997 // unblock any waiters with ECANCELED
998 waiter.timer.cancel();
31f18b77 999 }
31f18b77
FG
1000}
1001
b3b6e05e 1002int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp)
31f18b77
FG
1003{
1004 string marker;
1005 bool truncated = true;
1006
f64942e4 1007 constexpr uint32_t max_entries = 1000;
31f18b77
FG
1008
1009 string logshard_oid;
1010 get_logshard_oid(logshard_num, &logshard_oid);
1011
f64942e4
AA
1012 RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
1013
20effc67 1014 int ret = logshard_lock.lock(dpp);
92f5a8d4 1015 if (ret < 0) {
b3b6e05e 1016 ldpp_dout(dpp, 5) << __func__ << "(): failed to acquire lock on " <<
92f5a8d4 1017 logshard_oid << ", ret = " << ret <<dendl;
31f18b77
FG
1018 return ret;
1019 }
92f5a8d4 1020
31f18b77
FG
1021 do {
1022 std::list<cls_rgw_reshard_entry> entries;
20effc67 1023 ret = list(dpp, logshard_num, marker, max_entries, entries, &truncated);
31f18b77 1024 if (ret < 0) {
b3b6e05e 1025 ldpp_dout(dpp, 10) << "cannot list all reshards in logshard oid=" <<
f64942e4 1026 logshard_oid << dendl;
31f18b77
FG
1027 continue;
1028 }
1029
f64942e4 1030 for(auto& entry: entries) { // logshard entries
31f18b77
FG
1031 if(entry.new_instance_id.empty()) {
1032
b3b6e05e 1033 ldpp_dout(dpp, 20) << __func__ << " resharding " <<
f64942e4 1034 entry.bucket_name << dendl;
31f18b77 1035
31f18b77
FG
1036 rgw_bucket bucket;
1037 RGWBucketInfo bucket_info;
1038 map<string, bufferlist> attrs;
1039
9f95a23c
TL
1040 ret = store->getRados()->get_bucket_info(store->svc(),
1041 entry.tenant, entry.bucket_name,
1042 bucket_info, nullptr,
b3b6e05e 1043 null_yield, dpp, &attrs);
1911f103
TL
1044 if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) {
1045 if (ret < 0) {
b3b6e05e 1046 ldpp_dout(dpp, 0) << __func__ <<
1911f103
TL
1047 ": Error in get_bucket_info for bucket " << entry.bucket_name <<
1048 ": " << cpp_strerror(-ret) << dendl;
1049 if (ret != -ENOENT) {
1050 // any error other than ENOENT will abort
1051 return ret;
1052 }
1053 } else {
b3b6e05e 1054 ldpp_dout(dpp, 0) << __func__ <<
1911f103
TL
1055 ": Bucket: " << entry.bucket_name <<
1056 " already resharded by someone, skipping " << dendl;
92f5a8d4
TL
1057 }
1058
1059 // we've encountered a reshard queue entry for an apparently
1060 // non-existent bucket; let's try to recover by cleaning up
b3b6e05e 1061 ldpp_dout(dpp, 0) << __func__ <<
1911f103 1062 ": removing reshard queue entry for a resharded or non-existent bucket" <<
92f5a8d4
TL
1063 entry.bucket_name << dendl;
1064
b3b6e05e 1065 ret = remove(dpp, entry);
92f5a8d4 1066 if (ret < 0) {
b3b6e05e 1067 ldpp_dout(dpp, 0) << __func__ <<
92f5a8d4
TL
1068 ": Error removing non-existent bucket " <<
1069 entry.bucket_name << " from resharding queue: " <<
1070 cpp_strerror(-ret) << dendl;
1071 return ret;
1072 }
1073
1074 // we cleaned up, move on to the next entry
1075 goto finished_entry;
31f18b77
FG
1076 }
1077
f64942e4 1078 RGWBucketReshard br(store, bucket_info, attrs, nullptr);
b3b6e05e 1079 ret = br.execute(entry.new_num_shards, max_entries, dpp, false, nullptr,
92f5a8d4 1080 nullptr, this);
31f18b77 1081 if (ret < 0) {
b3b6e05e 1082 ldpp_dout(dpp, 0) << __func__ <<
92f5a8d4 1083 ": Error during resharding bucket " << entry.bucket_name << ":" <<
31f18b77
FG
1084 cpp_strerror(-ret)<< dendl;
1085 return ret;
1086 }
1087
b3b6e05e 1088 ldpp_dout(dpp, 20) << __func__ <<
92f5a8d4 1089 " removing reshard queue entry for bucket " << entry.bucket_name <<
f64942e4 1090 dendl;
31f18b77 1091
b3b6e05e 1092 ret = remove(dpp, entry);
31f18b77 1093 if (ret < 0) {
b3b6e05e 1094 ldpp_dout(dpp, 0) << __func__ << ": Error removing bucket " <<
92f5a8d4 1095 entry.bucket_name << " from resharding queue: " <<
f64942e4 1096 cpp_strerror(-ret) << dendl;
31f18b77
FG
1097 return ret;
1098 }
92f5a8d4
TL
1099 } // if new instance id is empty
1100
1101 finished_entry:
f64942e4
AA
1102
1103 Clock::time_point now = Clock::now();
1104 if (logshard_lock.should_renew(now)) {
1105 ret = logshard_lock.renew(now);
1106 if (ret < 0) {
1107 return ret;
1108 }
31f18b77 1109 }
f64942e4 1110
31f18b77 1111 entry.get_key(&marker);
92f5a8d4 1112 } // entry for loop
31f18b77
FG
1113 } while (truncated);
1114
f64942e4 1115 logshard_lock.unlock();
31f18b77
FG
1116 return 0;
1117}
1118
1119
1120void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
1121{
1122 char buf[32];
1123 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
1124
1125 string objname(reshard_oid_prefix);
1126 *logshard = objname + buf;
1127}
1128
b3b6e05e 1129int RGWReshard::process_all_logshards(const DoutPrefixProvider *dpp)
31f18b77 1130{
9f95a23c 1131 if (!store->svc()->zone->can_reshard()) {
b3b6e05e 1132 ldpp_dout(dpp, 20) << __func__ << " Resharding is disabled" << dendl;
3efd9988
FG
1133 return 0;
1134 }
31f18b77
FG
1135 int ret = 0;
1136
1137 for (int i = 0; i < num_logshards; i++) {
1138 string logshard;
1139 get_logshard_oid(i, &logshard);
1140
b3b6e05e 1141 ldpp_dout(dpp, 20) << "processing logshard = " << logshard << dendl;
31f18b77 1142
b3b6e05e 1143 ret = process_single_logshard(i, dpp);
9f95a23c 1144
b3b6e05e 1145 ldpp_dout(dpp, 20) << "finish processing logshard = " << logshard << " , ret = " << ret << dendl;
31f18b77
FG
1146 }
1147
1148 return 0;
1149}
1150
1151bool RGWReshard::going_down()
1152{
1153 return down_flag;
1154}
1155
1156void RGWReshard::start_processor()
1157{
1158 worker = new ReshardWorker(store->ctx(), this);
1159 worker->create("rgw_reshard");
1160}
1161
1162void RGWReshard::stop_processor()
1163{
1164 down_flag = true;
1165 if (worker) {
1166 worker->stop();
1167 worker->join();
1168 }
1169 delete worker;
224ce89b 1170 worker = nullptr;
31f18b77
FG
1171}
1172
1173void *RGWReshard::ReshardWorker::entry() {
31f18b77
FG
1174 do {
1175 utime_t start = ceph_clock_now();
b3b6e05e 1176 reshard->process_all_logshards(this);
31f18b77
FG
1177
1178 if (reshard->going_down())
1179 break;
1180
1181 utime_t end = ceph_clock_now();
1182 end -= start;
11fdf7f2 1183 int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval");
31f18b77
FG
1184
1185 if (secs <= end.sec())
1186 continue; // next round
1187
1188 secs -= end.sec();
1189
9f95a23c
TL
1190 std::unique_lock locker{lock};
1191 cond.wait_for(locker, std::chrono::seconds(secs));
31f18b77
FG
1192 } while (!reshard->going_down());
1193
1194 return NULL;
1195}
1196
1197void RGWReshard::ReshardWorker::stop()
1198{
9f95a23c
TL
1199 std::lock_guard l{lock};
1200 cond.notify_all();
31f18b77 1201}
b3b6e05e
TL
1202
1203CephContext *RGWReshard::ReshardWorker::get_cct() const
1204{
1205 return cct;
1206}
1207
1208unsigned RGWReshard::ReshardWorker::get_subsys() const
1209{
1210 return dout_subsys;
1211}
1212
1213std::ostream& RGWReshard::ReshardWorker::gen_prefix(std::ostream& out) const
1214{
1215 return out << "rgw reshard worker thread: ";
1216}