]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
CommitLineData
31f18b77
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
f64942e4 4#include <limits>
11fdf7f2 5#include <sstream>
f64942e4 6
31f18b77 7#include "rgw_rados.h"
11fdf7f2 8#include "rgw_zone.h"
31f18b77
FG
9#include "rgw_bucket.h"
10#include "rgw_reshard.h"
11#include "cls/rgw/cls_rgw_client.h"
12#include "cls/lock/cls_lock_client.h"
13#include "common/errno.h"
14#include "common/ceph_json.h"
15
16#include "common/dout.h"
17
11fdf7f2
TL
18#include "services/svc_zone.h"
19#include "services/svc_sys_obj.h"
20
31f18b77
FG
21#define dout_context g_ceph_context
22#define dout_subsys ceph_subsys_rgw
23
24const string reshard_oid_prefix = "reshard.";
25const string reshard_lock_name = "reshard_process";
26const string bucket_instance_lock_name = "bucket_instance_lock";
27
f64942e4 28
31f18b77
FG
29class BucketReshardShard {
30 RGWRados *store;
31 const RGWBucketInfo& bucket_info;
32 int num_shard;
33 RGWRados::BucketShard bs;
34 vector<rgw_cls_bi_entry> entries;
11fdf7f2 35 map<RGWObjCategory, rgw_bucket_category_stats> stats;
31f18b77 36 deque<librados::AioCompletion *>& aio_completions;
11fdf7f2
TL
37 uint64_t max_aio_completions;
38 uint64_t reshard_shard_batch_size;
31f18b77
FG
39
40 int wait_next_completion() {
41 librados::AioCompletion *c = aio_completions.front();
42 aio_completions.pop_front();
43
44 c->wait_for_safe();
45
46 int ret = c->get_return_value();
47 c->release();
48
49 if (ret < 0) {
50 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
51 return ret;
52 }
53
54 return 0;
55 }
56
57 int get_completion(librados::AioCompletion **c) {
11fdf7f2 58 if (aio_completions.size() >= max_aio_completions) {
31f18b77
FG
59 int ret = wait_next_completion();
60 if (ret < 0) {
61 return ret;
62 }
63 }
64
65 *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
66 aio_completions.push_back(*c);
67
68 return 0;
69 }
70
71public:
72 BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
73 int _num_shard,
f64942e4
AA
74 deque<librados::AioCompletion *>& _completions) :
75 store(_store), bucket_info(_bucket_info), bs(store),
76 aio_completions(_completions)
77 {
31f18b77 78 num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
f64942e4 79 bs.init(bucket_info.bucket, num_shard, nullptr /* no RGWBucketInfo */);
11fdf7f2
TL
80
81 max_aio_completions =
82 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_max_aio");
83 reshard_shard_batch_size =
84 store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_batch_size");
31f18b77
FG
85 }
86
87 int get_num_shard() {
88 return num_shard;
89 }
90
11fdf7f2 91 int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77
FG
92 const rgw_bucket_category_stats& entry_stats) {
93 entries.push_back(entry);
94 if (account) {
95 rgw_bucket_category_stats& target = stats[category];
96 target.num_entries += entry_stats.num_entries;
97 target.total_size += entry_stats.total_size;
98 target.total_size_rounded += entry_stats.total_size_rounded;
91327a77 99 target.actual_size += entry_stats.actual_size;
31f18b77 100 }
11fdf7f2 101 if (entries.size() >= reshard_shard_batch_size) {
31f18b77
FG
102 int ret = flush();
103 if (ret < 0) {
104 return ret;
105 }
106 }
f64942e4 107
31f18b77
FG
108 return 0;
109 }
f64942e4 110
31f18b77
FG
111 int flush() {
112 if (entries.size() == 0) {
113 return 0;
114 }
115
116 librados::ObjectWriteOperation op;
117 for (auto& entry : entries) {
118 store->bi_put(op, bs, entry);
119 }
120 cls_rgw_bucket_update_stats(op, false, stats);
121
122 librados::AioCompletion *c;
123 int ret = get_completion(&c);
124 if (ret < 0) {
125 return ret;
126 }
127 ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
128 if (ret < 0) {
129 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
130 return ret;
131 }
132 entries.clear();
133 stats.clear();
134 return 0;
135 }
136
137 int wait_all_aio() {
138 int ret = 0;
139 while (!aio_completions.empty()) {
140 int r = wait_next_completion();
141 if (r < 0) {
142 ret = r;
143 }
144 }
145 return ret;
146 }
f64942e4
AA
147}; // class BucketReshardShard
148
31f18b77
FG
149
150class BucketReshardManager {
151 RGWRados *store;
152 const RGWBucketInfo& target_bucket_info;
153 deque<librados::AioCompletion *> completions;
154 int num_target_shards;
155 vector<BucketReshardShard *> target_shards;
156
157public:
f64942e4
AA
158 BucketReshardManager(RGWRados *_store,
159 const RGWBucketInfo& _target_bucket_info,
160 int _num_target_shards) :
161 store(_store), target_bucket_info(_target_bucket_info),
162 num_target_shards(_num_target_shards)
163 {
31f18b77
FG
164 target_shards.resize(num_target_shards);
165 for (int i = 0; i < num_target_shards; ++i) {
166 target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
167 }
168 }
169
170 ~BucketReshardManager() {
171 for (auto& shard : target_shards) {
172 int ret = shard->wait_all_aio();
173 if (ret < 0) {
f64942e4
AA
174 ldout(store->ctx(), 20) << __func__ <<
175 ": shard->wait_all_aio() returned ret=" << ret << dendl;
31f18b77
FG
176 }
177 }
178 }
179
180 int add_entry(int shard_index,
11fdf7f2 181 rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
31f18b77 182 const rgw_bucket_category_stats& entry_stats) {
f64942e4
AA
183 int ret = target_shards[shard_index]->add_entry(entry, account, category,
184 entry_stats);
31f18b77 185 if (ret < 0) {
f64942e4
AA
186 derr << "ERROR: target_shards.add_entry(" << entry.idx <<
187 ") returned error: " << cpp_strerror(-ret) << dendl;
31f18b77
FG
188 return ret;
189 }
f64942e4 190
31f18b77
FG
191 return 0;
192 }
193
194 int finish() {
195 int ret = 0;
196 for (auto& shard : target_shards) {
197 int r = shard->flush();
198 if (r < 0) {
199 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
200 ret = r;
201 }
202 }
203 for (auto& shard : target_shards) {
204 int r = shard->wait_all_aio();
205 if (r < 0) {
206 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
207 ret = r;
208 }
209 delete shard;
210 }
211 target_shards.clear();
212 return ret;
213 }
f64942e4
AA
214}; // class BucketReshardManager
215
216RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
217 const RGWBucketInfo& _bucket_info,
218 const map<string, bufferlist>& _bucket_attrs,
219 RGWBucketReshardLock* _outer_reshard_lock) :
220 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
221 reshard_lock(store, bucket_info, true),
222 outer_reshard_lock(_outer_reshard_lock)
223{ }
224
225int RGWBucketReshard::set_resharding_status(RGWRados* store,
226 const RGWBucketInfo& bucket_info,
227 const string& new_instance_id,
228 int32_t num_shards,
229 cls_rgw_reshard_status status)
31f18b77
FG
230{
231 if (new_instance_id.empty()) {
232 ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
233 return -EINVAL;
234 }
235
236 cls_rgw_bucket_instance_entry instance_entry;
237 instance_entry.set_status(new_instance_id, num_shards, status);
238
239 int ret = store->bucket_set_reshard(bucket_info, instance_entry);
240 if (ret < 0) {
241 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
242 << cpp_strerror(-ret) << dendl;
243 return ret;
244 }
245 return 0;
246}
247
f64942e4
AA
248// reshard lock assumes lock is held
249int RGWBucketReshard::clear_resharding(RGWRados* store,
250 const RGWBucketInfo& bucket_info)
31f18b77 251{
f64942e4
AA
252 int ret = clear_index_shard_reshard_status(store, bucket_info);
253 if (ret < 0) {
254 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
255 " ERROR: error clearing reshard status from index shard " <<
256 cpp_strerror(-ret) << dendl;
257 return ret;
258 }
31f18b77 259
f64942e4
AA
260 cls_rgw_bucket_instance_entry instance_entry;
261 ret = store->bucket_set_reshard(bucket_info, instance_entry);
31f18b77 262 if (ret < 0) {
f64942e4
AA
263 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ <<
264 " ERROR: error setting bucket resharding flag on bucket index: " <<
265 cpp_strerror(-ret) << dendl;
31f18b77
FG
266 return ret;
267 }
f64942e4
AA
268
269 return 0;
270}
271
272int RGWBucketReshard::clear_index_shard_reshard_status(RGWRados* store,
273 const RGWBucketInfo& bucket_info)
274{
275 uint32_t num_shards = bucket_info.num_shards;
276
277 if (num_shards < std::numeric_limits<uint32_t>::max()) {
278 int ret = set_resharding_status(store, bucket_info,
279 bucket_info.bucket.bucket_id,
280 (num_shards < 1 ? 1 : num_shards),
494da23a 281 CLS_RGW_RESHARD_NOT_RESHARDING);
f64942e4
AA
282 if (ret < 0) {
283 ldout(store->ctx(), 0) << "RGWBucketReshard::" << __func__ <<
284 " ERROR: error clearing reshard status from index shard " <<
285 cpp_strerror(-ret) << dendl;
286 return ret;
287 }
288 }
289
31f18b77
FG
290 return 0;
291}
292
293static int create_new_bucket_instance(RGWRados *store,
294 int new_num_shards,
295 const RGWBucketInfo& bucket_info,
296 map<string, bufferlist>& attrs,
297 RGWBucketInfo& new_bucket_info)
298{
299 new_bucket_info = bucket_info;
300
301 store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
302 new_bucket_info.bucket.oid.clear();
303
304 new_bucket_info.num_shards = new_num_shards;
305 new_bucket_info.objv_tracker.clear();
306
307 new_bucket_info.new_bucket_instance_id.clear();
308 new_bucket_info.reshard_status = 0;
309
310 int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
311 if (ret < 0) {
312 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
f64942e4 313 return ret;
31f18b77
FG
314 }
315
316 ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
317 if (ret < 0) {
318 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
f64942e4 319 return ret;
31f18b77
FG
320 }
321
322 return 0;
323}
324
325int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
326 RGWBucketInfo& new_bucket_info)
327{
f64942e4
AA
328 return ::create_new_bucket_instance(store, new_num_shards,
329 bucket_info, bucket_attrs, new_bucket_info);
31f18b77
FG
330}
331
94b18763
FG
332int RGWBucketReshard::cancel()
333{
f64942e4 334 int ret = reshard_lock.lock();
94b18763
FG
335 if (ret < 0) {
336 return ret;
337 }
338
339 ret = clear_resharding();
340
f64942e4
AA
341 reshard_lock.unlock();
342 return ret;
94b18763
FG
343}
344
31f18b77
FG
345class BucketInfoReshardUpdate
346{
347 RGWRados *store;
92f5a8d4 348 RGWBucketInfo& bucket_info;
31f18b77
FG
349 std::map<string, bufferlist> bucket_attrs;
350
351 bool in_progress{false};
352
353 int set_status(cls_rgw_reshard_status s) {
354 bucket_info.reshard_status = s;
355 int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
356 if (ret < 0) {
357 ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
358 return ret;
359 }
360 return 0;
361 }
362
363public:
f64942e4
AA
364 BucketInfoReshardUpdate(RGWRados *_store,
365 RGWBucketInfo& _bucket_info,
366 map<string, bufferlist>& _bucket_attrs,
367 const string& new_bucket_id) :
368 store(_store),
369 bucket_info(_bucket_info),
370 bucket_attrs(_bucket_attrs)
371 {
31f18b77
FG
372 bucket_info.new_bucket_instance_id = new_bucket_id;
373 }
f64942e4 374
31f18b77
FG
375 ~BucketInfoReshardUpdate() {
376 if (in_progress) {
f64942e4
AA
377 // resharding must not have ended correctly, clean up
378 int ret =
379 RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info);
380 if (ret < 0) {
381 lderr(store->ctx()) << "Error: " << __func__ <<
382 " clear_index_shard_status returned " << ret << dendl;
383 }
31f18b77 384 bucket_info.new_bucket_instance_id.clear();
494da23a 385 set_status(CLS_RGW_RESHARD_NOT_RESHARDING); // clears new_bucket_instance as well
31f18b77
FG
386 }
387 }
388
389 int start() {
390 int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS);
391 if (ret < 0) {
392 return ret;
393 }
394 in_progress = true;
395 return 0;
396 }
397
398 int complete() {
399 int ret = set_status(CLS_RGW_RESHARD_DONE);
400 if (ret < 0) {
401 return ret;
402 }
403 in_progress = false;
404 return 0;
405 }
406};
407
f64942e4
AA
408
409RGWBucketReshardLock::RGWBucketReshardLock(RGWRados* _store,
410 const std::string& reshard_lock_oid,
411 bool _ephemeral) :
412 store(_store),
413 lock_oid(reshard_lock_oid),
414 ephemeral(_ephemeral),
415 internal_lock(reshard_lock_name)
416{
11fdf7f2
TL
417 const int lock_dur_secs = store->ctx()->_conf.get_val<uint64_t>(
418 "rgw_reshard_bucket_lock_duration");
f64942e4
AA
419 duration = std::chrono::seconds(lock_dur_secs);
420
421#define COOKIE_LEN 16
422 char cookie_buf[COOKIE_LEN + 1];
423 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
424 cookie_buf[COOKIE_LEN] = '\0';
425
426 internal_lock.set_cookie(cookie_buf);
427 internal_lock.set_duration(duration);
428}
429
430int RGWBucketReshardLock::lock() {
431 internal_lock.set_must_renew(false);
432 int ret;
433 if (ephemeral) {
434 ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
435 lock_oid);
436 } else {
437 ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
438 }
439 if (ret < 0) {
440 ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ <<
441 " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl;
442 return ret;
443 }
444 reset_time(Clock::now());
445
446 return 0;
447}
448
449void RGWBucketReshardLock::unlock() {
450 int ret = internal_lock.unlock(&store->reshard_pool_ctx, lock_oid);
451 if (ret < 0) {
452 ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
453 " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
454 }
455}
456
457int RGWBucketReshardLock::renew(const Clock::time_point& now) {
458 internal_lock.set_must_renew(true);
459 int ret;
460 if (ephemeral) {
461 ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
462 lock_oid);
463 } else {
464 ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
465 }
466 if (ret < 0) { /* expired or already locked by another processor */
11fdf7f2
TL
467 std::stringstream error_s;
468 if (-ENOENT == ret) {
469 error_s << "ENOENT (lock expired or never initially locked)";
470 } else {
471 error_s << ret << " (" << cpp_strerror(-ret) << ")";
472 }
f64942e4 473 ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
11fdf7f2 474 lock_oid << " with error " << error_s.str() << dendl;
f64942e4
AA
475 return ret;
476 }
477 internal_lock.set_must_renew(false);
478
479 reset_time(now);
480 ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
481 lock_oid << dendl;
482
483 return 0;
484}
485
486
487int RGWBucketReshard::do_reshard(int num_shards,
488 RGWBucketInfo& new_bucket_info,
489 int max_entries,
490 bool verbose,
491 ostream *out,
492 Formatter *formatter)
31f18b77
FG
493{
494 rgw_bucket& bucket = bucket_info.bucket;
495
496 int ret = 0;
497
498 if (out) {
31f18b77
FG
499 (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
500 (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
f64942e4
AA
501 (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id <<
502 std::endl;
503 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
504 std::endl;
31f18b77
FG
505 }
506
f64942e4 507 /* update bucket info -- in progress*/
31f18b77
FG
508 list<rgw_cls_bi_entry> entries;
509
510 if (max_entries < 0) {
f64942e4
AA
511 ldout(store->ctx(), 0) << __func__ <<
512 ": can't reshard, negative max_entries" << dendl;
31f18b77
FG
513 return -EINVAL;
514 }
515
f64942e4
AA
516 // NB: destructor cleans up sharding state if reshard does not
517 // complete successfully
31f18b77
FG
518 BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
519
520 ret = bucket_info_updater.start();
521 if (ret < 0) {
522 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
523 return ret;
524 }
525
526 int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
527
528 BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
529
92f5a8d4 530 bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
31f18b77 531
92f5a8d4 532 if (verbose_json_out) {
31f18b77
FG
533 formatter->open_array_section("entries");
534 }
535
536 uint64_t total_entries = 0;
537
92f5a8d4
TL
538 if (!verbose_json_out && out) {
539 (*out) << "total entries:";
31f18b77
FG
540 }
541
f64942e4
AA
542 const int num_source_shards =
543 (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
31f18b77
FG
544 string marker;
545 for (int i = 0; i < num_source_shards; ++i) {
546 bool is_truncated = true;
547 marker.clear();
548 while (is_truncated) {
549 entries.clear();
550 ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
551 if (ret < 0 && ret != -ENOENT) {
552 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
f64942e4 553 return ret;
31f18b77
FG
554 }
555
f64942e4 556 for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
31f18b77 557 rgw_cls_bi_entry& entry = *iter;
92f5a8d4 558 if (verbose_json_out) {
31f18b77
FG
559 formatter->open_object_section("entry");
560
561 encode_json("shard_id", i, formatter);
562 encode_json("num_entry", total_entries, formatter);
563 encode_json("entry", entry, formatter);
564 }
565 total_entries++;
566
567 marker = entry.idx;
568
569 int target_shard_id;
570 cls_rgw_obj_key cls_key;
11fdf7f2 571 RGWObjCategory category;
31f18b77
FG
572 rgw_bucket_category_stats stats;
573 bool account = entry.get_info(&cls_key, &category, &stats);
574 rgw_obj_key key(cls_key);
575 rgw_obj obj(new_bucket_info.bucket, key);
92f5a8d4
TL
576 RGWMPObj mp;
577 if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
578 // place the multipart .meta object on the same shard as its head object
579 obj.index_hash_source = mp.get_key();
580 }
31f18b77
FG
581 int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
582 if (ret < 0) {
583 lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
584 return ret;
585 }
586
587 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
588
f64942e4
AA
589 ret = target_shards_mgr.add_entry(shard_index, entry, account,
590 category, stats);
31f18b77
FG
591 if (ret < 0) {
592 return ret;
593 }
f64942e4
AA
594
595 Clock::time_point now = Clock::now();
596 if (reshard_lock.should_renew(now)) {
597 // assume outer locks have timespans at least the size of ours, so
598 // can call inside conditional
599 if (outer_reshard_lock) {
600 ret = outer_reshard_lock->renew(now);
601 if (ret < 0) {
602 return ret;
603 }
604 }
605 ret = reshard_lock.renew(now);
606 if (ret < 0) {
607 lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
608 return ret;
609 }
610 }
611
92f5a8d4 612 if (verbose_json_out) {
31f18b77 613 formatter->close_section();
92f5a8d4 614 formatter->flush(*out);
31f18b77
FG
615 } else if (out && !(total_entries % 1000)) {
616 (*out) << " " << total_entries;
617 }
f64942e4 618 } // entries loop
31f18b77
FG
619 }
620 }
f64942e4 621
92f5a8d4 622 if (verbose_json_out) {
31f18b77 623 formatter->close_section();
92f5a8d4 624 formatter->flush(*out);
31f18b77
FG
625 } else if (out) {
626 (*out) << " " << total_entries << std::endl;
627 }
628
629 ret = target_shards_mgr.finish();
630 if (ret < 0) {
631 lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
f64942e4 632 return -EIO;
31f18b77
FG
633 }
634
b32b8144
FG
635 ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
636 if (ret < 0) {
637 lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
f64942e4 638 return ret;
31f18b77
FG
639 }
640
641 ret = bucket_info_updater.complete();
642 if (ret < 0) {
643 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
644 /* don't error out, reshard process succeeded */
645 }
f64942e4 646
31f18b77 647 return 0;
f64942e4
AA
648 // NB: some error clean-up is done by ~BucketInfoReshardUpdate
649} // RGWBucketReshard::do_reshard
31f18b77
FG
650
651int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
652{
653 librados::IoCtx index_ctx;
654 map<int, string> bucket_objs;
655
656 int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs);
657 if (r < 0) {
658 return r;
659 }
660
661 for (auto i : bucket_objs) {
662 cls_rgw_bucket_instance_entry entry;
663
664 int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry);
665 if (ret < 0 && ret != -ENOENT) {
666 lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
667 return ret;
668 }
669
670 status->push_back(entry);
671 }
672
673 return 0;
674}
675
31f18b77 676
f64942e4
AA
677int RGWBucketReshard::execute(int num_shards, int max_op_entries,
678 bool verbose, ostream *out, Formatter *formatter,
679 RGWReshard* reshard_log)
31f18b77 680{
f64942e4
AA
681 Clock::time_point now;
682
683 int ret = reshard_lock.lock();
31f18b77
FG
684 if (ret < 0) {
685 return ret;
686 }
687
688 RGWBucketInfo new_bucket_info;
689 ret = create_new_bucket_instance(num_shards, new_bucket_info);
690 if (ret < 0) {
f64942e4
AA
691 // shard state is uncertain, but this will attempt to remove them anyway
692 goto error_out;
31f18b77
FG
693 }
694
695 if (reshard_log) {
696 ret = reshard_log->update(bucket_info, new_bucket_info);
697 if (ret < 0) {
f64942e4 698 goto error_out;
31f18b77
FG
699 }
700 }
701
f64942e4
AA
702 // set resharding status of current bucket_info & shards with
703 // information about planned resharding
704 ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
705 num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
31f18b77 706 if (ret < 0) {
f64942e4 707 reshard_lock.unlock();
31f18b77
FG
708 return ret;
709 }
710
711 ret = do_reshard(num_shards,
712 new_bucket_info,
713 max_op_entries,
714 verbose, out, formatter);
31f18b77 715 if (ret < 0) {
f64942e4 716 goto error_out;
31f18b77
FG
717 }
718
f64942e4
AA
719 // at this point we've done the main work; we'll make a best-effort
720 // to clean-up but will not indicate any errors encountered
721
722 reshard_lock.unlock();
723
724 // resharding successful, so remove old bucket index shards; use
725 // best effort and don't report out an error; the lock isn't needed
726 // at this point since all we're using a best effor to to remove old
727 // shard objects
728 ret = store->clean_bucket_index(bucket_info, bucket_info.num_shards);
31f18b77 729 if (ret < 0) {
f64942e4
AA
730 lderr(store->ctx()) << "Error: " << __func__ <<
731 " failed to clean up old shards; " <<
732 "RGWRados::clean_bucket_index returned " << ret << dendl;
31f18b77
FG
733 }
734
f64942e4
AA
735 ret = rgw_bucket_instance_remove_entry(store,
736 bucket_info.bucket.get_key(),
737 nullptr);
738 if (ret < 0) {
739 lderr(store->ctx()) << "Error: " << __func__ <<
740 " failed to clean old bucket info object \"" <<
741 bucket_info.bucket.get_key() <<
742 "\"created after successful resharding with error " << ret << dendl;
743 }
31f18b77 744
a8e16298
TL
745 ldout(store->ctx(), 1) << __func__ <<
746 " INFO: reshard of bucket \"" << bucket_info.bucket.name << "\" from \"" <<
747 bucket_info.bucket.get_key() << "\" to \"" <<
748 new_bucket_info.bucket.get_key() << "\" completed successfully" << dendl;
749
31f18b77 750 return 0;
f64942e4
AA
751
752error_out:
753
754 reshard_lock.unlock();
755
756 // since the real problem is the issue that led to this error code
757 // path, we won't touch ret and instead use another variable to
758 // temporarily error codes
759 int ret2 = store->clean_bucket_index(new_bucket_info,
760 new_bucket_info.num_shards);
761 if (ret2 < 0) {
762 lderr(store->ctx()) << "Error: " << __func__ <<
763 " failed to clean up shards from failed incomplete resharding; " <<
764 "RGWRados::clean_bucket_index returned " << ret2 << dendl;
765 }
766
767 ret2 = rgw_bucket_instance_remove_entry(store,
768 new_bucket_info.bucket.get_key(),
769 nullptr);
770 if (ret2 < 0) {
771 lderr(store->ctx()) << "Error: " << __func__ <<
772 " failed to clean bucket info object \"" <<
773 new_bucket_info.bucket.get_key() <<
774 "\"created during incomplete resharding with error " << ret2 << dendl;
775 }
776
777 return ret;
778} // execute
31f18b77
FG
779
780
781RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
f64942e4
AA
782 Formatter *_formatter) :
783 store(_store), instance_lock(bucket_instance_lock_name),
784 verbose(_verbose), out(_out), formatter(_formatter)
31f18b77 785{
11fdf7f2 786 num_logshards = store->ctx()->_conf.get_val<uint64_t>("rgw_reshard_num_logs");
31f18b77
FG
787}
788
f64942e4
AA
789string RGWReshard::get_logshard_key(const string& tenant,
790 const string& bucket_name)
31f18b77
FG
791{
792 return tenant + ":" + bucket_name;
793}
794
795#define MAX_RESHARD_LOGSHARDS_PRIME 7877
796
797void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
798{
799 string key = get_logshard_key(tenant, bucket_name);
800
801 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
802 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
803 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
31f18b77 804
1adf2230 805 get_logshard_oid(int(sid), oid);
31f18b77
FG
806}
807
808int RGWReshard::add(cls_rgw_reshard_entry& entry)
809{
11fdf7f2 810 if (!store->svc.zone->can_reshard()) {
3efd9988
FG
811 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
812 return 0;
813 }
814
31f18b77
FG
815 string logshard_oid;
816
817 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
818
819 librados::ObjectWriteOperation op;
820 cls_rgw_reshard_add(op, entry);
821
822 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
823 if (ret < 0) {
824 lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
825 return ret;
826 }
827 return 0;
828}
829
830int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
831{
832 cls_rgw_reshard_entry entry;
833 entry.bucket_name = bucket_info.bucket.name;
834 entry.bucket_id = bucket_info.bucket.bucket_id;
b32b8144 835 entry.tenant = bucket_info.owner.tenant;
31f18b77
FG
836
837 int ret = get(entry);
838 if (ret < 0) {
839 return ret;
840 }
841
842 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
843
844 ret = add(entry);
845 if (ret < 0) {
846 ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
847 cpp_strerror(-ret) << dendl;
848 }
849
850 return ret;
851}
852
853
854int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
855{
856 string logshard_oid;
857
858 get_logshard_oid(logshard_num, &logshard_oid);
859
860 int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
c07f9fc5
FG
861
862 if (ret < 0) {
863 if (ret == -ENOENT) {
864 *is_truncated = false;
865 ret = 0;
866 }
31f18b77 867 lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
c07f9fc5 868 if (ret == -EACCES) {
11fdf7f2 869 lderr(store->ctx()) << "access denied to pool " << store->svc.zone->get_zone_params().reshard_pool
c07f9fc5
FG
870 << ". Fix the pool access permissions of your client" << dendl;
871 }
31f18b77 872 }
c07f9fc5
FG
873
874 return ret;
31f18b77
FG
875}
876
877int RGWReshard::get(cls_rgw_reshard_entry& entry)
878{
879 string logshard_oid;
880
881 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
882
883 int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
884 if (ret < 0) {
94b18763
FG
885 if (ret != -ENOENT) {
886 lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
887 " bucket=" << entry.bucket_name << dendl;
888 }
31f18b77
FG
889 return ret;
890 }
891
892 return 0;
893}
894
895int RGWReshard::remove(cls_rgw_reshard_entry& entry)
896{
897 string logshard_oid;
898
899 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
900
901 librados::ObjectWriteOperation op;
902 cls_rgw_reshard_remove(op, entry);
903
904 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
905 if (ret < 0) {
906 lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
907 return ret;
908 }
909
910 return ret;
911}
912
913int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
914{
915 int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
916 if (ret < 0) {
917 lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
918 return ret;
919 }
920
921 return 0;
922}
923
11fdf7f2 924int RGWReshardWait::wait(optional_yield y)
31f18b77 925{
11fdf7f2
TL
926 std::unique_lock lock(mutex);
927
928 if (going_down) {
929 return -ECANCELED;
930 }
931
932#ifdef HAVE_BOOST_CONTEXT
933 if (y) {
934 auto& context = y.get_io_context();
935 auto& yield = y.get_yield_context();
936
937 Waiter waiter(context);
938 waiters.push_back(waiter);
939 lock.unlock();
940
941 waiter.timer.expires_after(duration);
942
943 boost::system::error_code ec;
944 waiter.timer.async_wait(yield[ec]);
945
946 lock.lock();
947 waiters.erase(waiters.iterator_to(waiter));
948 return -ec.value();
949 }
950#endif
31f18b77 951
11fdf7f2 952 cond.wait_for(lock, duration);
31f18b77
FG
953
954 if (going_down) {
955 return -ECANCELED;
956 }
957
958 return 0;
959}
960
11fdf7f2 961void RGWReshardWait::stop()
31f18b77 962{
11fdf7f2
TL
963 std::scoped_lock lock(mutex);
964 going_down = true;
965 cond.notify_all();
966 for (auto& waiter : waiters) {
967 // unblock any waiters with ECANCELED
968 waiter.timer.cancel();
31f18b77 969 }
31f18b77
FG
970}
971
972int RGWReshard::process_single_logshard(int logshard_num)
973{
974 string marker;
975 bool truncated = true;
976
977 CephContext *cct = store->ctx();
f64942e4 978 constexpr uint32_t max_entries = 1000;
31f18b77
FG
979
980 string logshard_oid;
981 get_logshard_oid(logshard_num, &logshard_oid);
982
f64942e4
AA
983 RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
984
985 int ret = logshard_lock.lock();
92f5a8d4 986 if (ret < 0) {
f64942e4 987 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
92f5a8d4 988 logshard_oid << ", ret = " << ret <<dendl;
31f18b77
FG
989 return ret;
990 }
92f5a8d4 991
31f18b77
FG
992 do {
993 std::list<cls_rgw_reshard_entry> entries;
994 ret = list(logshard_num, marker, max_entries, entries, &truncated);
995 if (ret < 0) {
f64942e4
AA
996 ldout(cct, 10) << "cannot list all reshards in logshard oid=" <<
997 logshard_oid << dendl;
31f18b77
FG
998 continue;
999 }
1000
f64942e4 1001 for(auto& entry: entries) { // logshard entries
31f18b77
FG
1002 if(entry.new_instance_id.empty()) {
1003
f64942e4
AA
1004 ldout(store->ctx(), 20) << __func__ << " resharding " <<
1005 entry.bucket_name << dendl;
31f18b77 1006
11fdf7f2 1007 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
31f18b77
FG
1008 rgw_bucket bucket;
1009 RGWBucketInfo bucket_info;
1010 map<string, bufferlist> attrs;
1011
f64942e4
AA
1012 ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
1013 bucket_info, nullptr, &attrs);
31f18b77 1014 if (ret < 0) {
92f5a8d4
TL
1015 ldout(cct, 0) << __func__ <<
1016 ": Error in get_bucket_info for bucket " << entry.bucket_name <<
1017 ": " << cpp_strerror(-ret) << dendl;
1018 if (ret != -ENOENT) {
1019 // any error other than ENOENT will abort
1020 return ret;
1021 }
1022
1023 // we've encountered a reshard queue entry for an apparently
1024 // non-existent bucket; let's try to recover by cleaning up
1025 ldout(cct, 0) << __func__ <<
1026 ": removing reshard queue entry for non-existent bucket " <<
1027 entry.bucket_name << dendl;
1028
1029 ret = remove(entry);
1030 if (ret < 0) {
1031 ldout(cct, 0) << __func__ <<
1032 ": Error removing non-existent bucket " <<
1033 entry.bucket_name << " from resharding queue: " <<
1034 cpp_strerror(-ret) << dendl;
1035 return ret;
1036 }
1037
1038 // we cleaned up, move on to the next entry
1039 goto finished_entry;
31f18b77
FG
1040 }
1041
f64942e4 1042 RGWBucketReshard br(store, bucket_info, attrs, nullptr);
92f5a8d4
TL
1043 ret = br.execute(entry.new_num_shards, max_entries, false, nullptr,
1044 nullptr, this);
31f18b77 1045 if (ret < 0) {
92f5a8d4
TL
1046 ldout(store->ctx(), 0) << __func__ <<
1047 ": Error during resharding bucket " << entry.bucket_name << ":" <<
31f18b77
FG
1048 cpp_strerror(-ret)<< dendl;
1049 return ret;
1050 }
1051
92f5a8d4
TL
1052 ldout(store->ctx(), 20) << __func__ <<
1053 " removing reshard queue entry for bucket " << entry.bucket_name <<
f64942e4 1054 dendl;
31f18b77
FG
1055
1056 ret = remove(entry);
1057 if (ret < 0) {
92f5a8d4
TL
1058 ldout(cct, 0) << __func__ << ": Error removing bucket " <<
1059 entry.bucket_name << " from resharding queue: " <<
f64942e4 1060 cpp_strerror(-ret) << dendl;
31f18b77
FG
1061 return ret;
1062 }
92f5a8d4
TL
1063 } // if new instance id is empty
1064
1065 finished_entry:
f64942e4
AA
1066
1067 Clock::time_point now = Clock::now();
1068 if (logshard_lock.should_renew(now)) {
1069 ret = logshard_lock.renew(now);
1070 if (ret < 0) {
1071 return ret;
1072 }
31f18b77 1073 }
f64942e4 1074
31f18b77 1075 entry.get_key(&marker);
92f5a8d4 1076 } // entry for loop
31f18b77
FG
1077 } while (truncated);
1078
f64942e4 1079 logshard_lock.unlock();
31f18b77
FG
1080 return 0;
1081}
1082
1083
1084void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
1085{
1086 char buf[32];
1087 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
1088
1089 string objname(reshard_oid_prefix);
1090 *logshard = objname + buf;
1091}
1092
1093int RGWReshard::process_all_logshards()
1094{
11fdf7f2 1095 if (!store->svc.zone->can_reshard()) {
3efd9988
FG
1096 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
1097 return 0;
1098 }
31f18b77
FG
1099 int ret = 0;
1100
1101 for (int i = 0; i < num_logshards; i++) {
1102 string logshard;
1103 get_logshard_oid(i, &logshard);
1104
81eedcae 1105 ldout(store->ctx(), 20) << "processing logshard = " << logshard << dendl;
31f18b77
FG
1106
1107 ret = process_single_logshard(i);
1108 if (ret <0) {
1109 return ret;
1110 }
1111 }
1112
1113 return 0;
1114}
1115
1116bool RGWReshard::going_down()
1117{
1118 return down_flag;
1119}
1120
1121void RGWReshard::start_processor()
1122{
1123 worker = new ReshardWorker(store->ctx(), this);
1124 worker->create("rgw_reshard");
1125}
1126
1127void RGWReshard::stop_processor()
1128{
1129 down_flag = true;
1130 if (worker) {
1131 worker->stop();
1132 worker->join();
1133 }
1134 delete worker;
224ce89b 1135 worker = nullptr;
31f18b77
FG
1136}
1137
1138void *RGWReshard::ReshardWorker::entry() {
1139 utime_t last_run;
1140 do {
1141 utime_t start = ceph_clock_now();
31f18b77
FG
1142 if (reshard->process_all_logshards()) {
1143 /* All shards have been processed properly. Next time we can start
1144 * from this moment. */
1145 last_run = start;
1146 }
31f18b77
FG
1147
1148 if (reshard->going_down())
1149 break;
1150
1151 utime_t end = ceph_clock_now();
1152 end -= start;
11fdf7f2 1153 int secs = cct->_conf.get_val<uint64_t>("rgw_reshard_thread_interval");
31f18b77
FG
1154
1155 if (secs <= end.sec())
1156 continue; // next round
1157
1158 secs -= end.sec();
1159
1160 lock.Lock();
1161 cond.WaitInterval(lock, utime_t(secs, 0));
1162 lock.Unlock();
1163 } while (!reshard->going_down());
1164
1165 return NULL;
1166}
1167
1168void RGWReshard::ReshardWorker::stop()
1169{
1170 Mutex::Locker l(lock);
1171 cond.Signal();
1172}