]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.cc
bump version to 12.2.10-pve1
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
CommitLineData
31f18b77
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw_rados.h"
5#include "rgw_bucket.h"
6#include "rgw_reshard.h"
7#include "cls/rgw/cls_rgw_client.h"
8#include "cls/lock/cls_lock_client.h"
9#include "common/errno.h"
10#include "common/ceph_json.h"
11
12#include "common/dout.h"
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rgw
16
17const string reshard_oid_prefix = "reshard.";
18const string reshard_lock_name = "reshard_process";
19const string bucket_instance_lock_name = "bucket_instance_lock";
20
21using namespace std;
22
23#define RESHARD_SHARD_WINDOW 64
24#define RESHARD_MAX_AIO 128
25
26class BucketReshardShard {
27 RGWRados *store;
28 const RGWBucketInfo& bucket_info;
29 int num_shard;
30 RGWRados::BucketShard bs;
31 vector<rgw_cls_bi_entry> entries;
32 map<uint8_t, rgw_bucket_category_stats> stats;
33 deque<librados::AioCompletion *>& aio_completions;
34
35 int wait_next_completion() {
36 librados::AioCompletion *c = aio_completions.front();
37 aio_completions.pop_front();
38
39 c->wait_for_safe();
40
41 int ret = c->get_return_value();
42 c->release();
43
44 if (ret < 0) {
45 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
46 return ret;
47 }
48
49 return 0;
50 }
51
52 int get_completion(librados::AioCompletion **c) {
53 if (aio_completions.size() >= RESHARD_MAX_AIO) {
54 int ret = wait_next_completion();
55 if (ret < 0) {
56 return ret;
57 }
58 }
59
60 *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
61 aio_completions.push_back(*c);
62
63 return 0;
64 }
65
66public:
67 BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
68 int _num_shard,
69 deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
70 aio_completions(_completions) {
71 num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
72 bs.init(bucket_info.bucket, num_shard);
73 }
74
75 int get_num_shard() {
76 return num_shard;
77 }
78
79 int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
80 const rgw_bucket_category_stats& entry_stats) {
81 entries.push_back(entry);
82 if (account) {
83 rgw_bucket_category_stats& target = stats[category];
84 target.num_entries += entry_stats.num_entries;
85 target.total_size += entry_stats.total_size;
86 target.total_size_rounded += entry_stats.total_size_rounded;
91327a77 87 target.actual_size += entry_stats.actual_size;
31f18b77
FG
88 }
89 if (entries.size() >= RESHARD_SHARD_WINDOW) {
90 int ret = flush();
91 if (ret < 0) {
92 return ret;
93 }
94 }
95 return 0;
96 }
97 int flush() {
98 if (entries.size() == 0) {
99 return 0;
100 }
101
102 librados::ObjectWriteOperation op;
103 for (auto& entry : entries) {
104 store->bi_put(op, bs, entry);
105 }
106 cls_rgw_bucket_update_stats(op, false, stats);
107
108 librados::AioCompletion *c;
109 int ret = get_completion(&c);
110 if (ret < 0) {
111 return ret;
112 }
113 ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
114 if (ret < 0) {
115 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
116 return ret;
117 }
118 entries.clear();
119 stats.clear();
120 return 0;
121 }
122
123 int wait_all_aio() {
124 int ret = 0;
125 while (!aio_completions.empty()) {
126 int r = wait_next_completion();
127 if (r < 0) {
128 ret = r;
129 }
130 }
131 return ret;
132 }
133};
134
135class BucketReshardManager {
136 RGWRados *store;
137 const RGWBucketInfo& target_bucket_info;
138 deque<librados::AioCompletion *> completions;
139 int num_target_shards;
140 vector<BucketReshardShard *> target_shards;
141
142public:
143 BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
144 num_target_shards(_num_target_shards) {
145 target_shards.resize(num_target_shards);
146 for (int i = 0; i < num_target_shards; ++i) {
147 target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
148 }
149 }
150
151 ~BucketReshardManager() {
152 for (auto& shard : target_shards) {
153 int ret = shard->wait_all_aio();
154 if (ret < 0) {
155 ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
156 }
157 }
158 }
159
160 int add_entry(int shard_index,
161 rgw_cls_bi_entry& entry, bool account, uint8_t category,
162 const rgw_bucket_category_stats& entry_stats) {
163 int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
164 if (ret < 0) {
165 derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl;
166 return ret;
167 }
168 return 0;
169 }
170
171 int finish() {
172 int ret = 0;
173 for (auto& shard : target_shards) {
174 int r = shard->flush();
175 if (r < 0) {
176 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
177 ret = r;
178 }
179 }
180 for (auto& shard : target_shards) {
181 int r = shard->wait_all_aio();
182 if (r < 0) {
183 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
184 ret = r;
185 }
186 delete shard;
187 }
188 target_shards.clear();
189 return ret;
190 }
191};
192
193RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
194 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
195 reshard_lock(reshard_lock_name) {
196 const rgw_bucket& b = bucket_info.bucket;
197 reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
198
199 utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
200#define COOKIE_LEN 16
201 char cookie_buf[COOKIE_LEN + 1];
202 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
203 cookie_buf[COOKIE_LEN] = '\0';
204
205 reshard_lock.set_cookie(cookie_buf);
206 reshard_lock.set_duration(lock_duration);
207}
208
209int RGWBucketReshard::lock_bucket()
210{
211 int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
212 if (ret < 0) {
213 ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
214 return ret;
215 }
216 return 0;
217}
218
219void RGWBucketReshard::unlock_bucket()
220{
221 int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
222 if (ret < 0) {
223 ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
224 }
225}
226
227int RGWBucketReshard::set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status)
228{
229 if (new_instance_id.empty()) {
230 ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
231 return -EINVAL;
232 }
233
234 cls_rgw_bucket_instance_entry instance_entry;
235 instance_entry.set_status(new_instance_id, num_shards, status);
236
237 int ret = store->bucket_set_reshard(bucket_info, instance_entry);
238 if (ret < 0) {
239 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
240 << cpp_strerror(-ret) << dendl;
241 return ret;
242 }
243 return 0;
244}
245
246int RGWBucketReshard::clear_resharding()
247{
248 cls_rgw_bucket_instance_entry instance_entry;
249
250 int ret = store->bucket_set_reshard(bucket_info, instance_entry);
251 if (ret < 0) {
252 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
253 << cpp_strerror(-ret) << dendl;
254 return ret;
255 }
256 return 0;
257}
258
259static int create_new_bucket_instance(RGWRados *store,
260 int new_num_shards,
261 const RGWBucketInfo& bucket_info,
262 map<string, bufferlist>& attrs,
263 RGWBucketInfo& new_bucket_info)
264{
265 new_bucket_info = bucket_info;
266
267 store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
268 new_bucket_info.bucket.oid.clear();
269
270 new_bucket_info.num_shards = new_num_shards;
271 new_bucket_info.objv_tracker.clear();
272
273 new_bucket_info.new_bucket_instance_id.clear();
274 new_bucket_info.reshard_status = 0;
275
276 int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
277 if (ret < 0) {
278 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
279 return -ret;
280 }
281
282 ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
283 if (ret < 0) {
284 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
285 return -ret;
286 }
287
288 return 0;
289}
290
291int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
292 RGWBucketInfo& new_bucket_info)
293{
294 return ::create_new_bucket_instance(store, new_num_shards, bucket_info, bucket_attrs, new_bucket_info);
295}
296
94b18763
FG
297int RGWBucketReshard::cancel()
298{
299 int ret = lock_bucket();
300 if (ret < 0) {
301 return ret;
302 }
303
304 ret = clear_resharding();
305
306 unlock_bucket();
307 return 0;
308}
309
31f18b77
FG
310class BucketInfoReshardUpdate
311{
312 RGWRados *store;
313 RGWBucketInfo bucket_info;
314 std::map<string, bufferlist> bucket_attrs;
315
316 bool in_progress{false};
317
318 int set_status(cls_rgw_reshard_status s) {
319 bucket_info.reshard_status = s;
320 int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
321 if (ret < 0) {
322 ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
323 return ret;
324 }
325 return 0;
326 }
327
328public:
329 BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info,
330 map<string, bufferlist>& _bucket_attrs, const string& new_bucket_id) : store(_store),
331 bucket_info(_bucket_info),
332 bucket_attrs(_bucket_attrs) {
333 bucket_info.new_bucket_instance_id = new_bucket_id;
334 }
335 ~BucketInfoReshardUpdate() {
336 if (in_progress) {
337 bucket_info.new_bucket_instance_id.clear();
338 set_status(CLS_RGW_RESHARD_NONE);
339 }
340 }
341
342 int start() {
343 int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS);
344 if (ret < 0) {
345 return ret;
346 }
347 in_progress = true;
348 return 0;
349 }
350
351 int complete() {
352 int ret = set_status(CLS_RGW_RESHARD_DONE);
353 if (ret < 0) {
354 return ret;
355 }
356 in_progress = false;
357 return 0;
358 }
359};
360
361int RGWBucketReshard::do_reshard(
362 int num_shards,
b32b8144 363 RGWBucketInfo& new_bucket_info,
31f18b77
FG
364 int max_entries,
365 bool verbose,
366 ostream *out,
367 Formatter *formatter)
368{
369 rgw_bucket& bucket = bucket_info.bucket;
370
371 int ret = 0;
372
373 if (out) {
374 (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
375 (*out) << "*** these will need to be removed manually ***" << std::endl;
376 (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
377 (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
378 (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
379 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
380 }
381
382 /* update bucket info -- in progress*/
383 list<rgw_cls_bi_entry> entries;
384
385 if (max_entries < 0) {
386 ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl;
387 return -EINVAL;
388 }
389
390 BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
391
392 ret = bucket_info_updater.start();
393 if (ret < 0) {
394 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
395 return ret;
396 }
397
398 int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
399
400 BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
401
402 verbose = verbose && (formatter != nullptr);
403
404 if (verbose) {
405 formatter->open_array_section("entries");
406 }
407
408 uint64_t total_entries = 0;
409
410 if (!verbose) {
411 cout << "total entries:";
412 }
413
414 int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
415 string marker;
416 for (int i = 0; i < num_source_shards; ++i) {
417 bool is_truncated = true;
418 marker.clear();
419 while (is_truncated) {
420 entries.clear();
421 ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
422 if (ret < 0 && ret != -ENOENT) {
423 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
424 return -ret;
425 }
426
427 list<rgw_cls_bi_entry>::iterator iter;
428 for (iter = entries.begin(); iter != entries.end(); ++iter) {
429 rgw_cls_bi_entry& entry = *iter;
430 if (verbose) {
431 formatter->open_object_section("entry");
432
433 encode_json("shard_id", i, formatter);
434 encode_json("num_entry", total_entries, formatter);
435 encode_json("entry", entry, formatter);
436 }
437 total_entries++;
438
439 marker = entry.idx;
440
441 int target_shard_id;
442 cls_rgw_obj_key cls_key;
443 uint8_t category;
444 rgw_bucket_category_stats stats;
445 bool account = entry.get_info(&cls_key, &category, &stats);
446 rgw_obj_key key(cls_key);
447 rgw_obj obj(new_bucket_info.bucket, key);
448 int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
449 if (ret < 0) {
450 lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
451 return ret;
452 }
453
454 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
455
456 ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
457 if (ret < 0) {
458 return ret;
459 }
460 if (verbose) {
461 formatter->close_section();
462 if (out) {
463 formatter->flush(*out);
464 formatter->flush(*out);
465 }
466 } else if (out && !(total_entries % 1000)) {
467 (*out) << " " << total_entries;
468 }
469 }
470 }
471 }
472 if (verbose) {
473 formatter->close_section();
474 if (out) {
475 formatter->flush(*out);
476 }
477 } else if (out) {
478 (*out) << " " << total_entries << std::endl;
479 }
480
481 ret = target_shards_mgr.finish();
482 if (ret < 0) {
483 lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
484 return EIO;
485 }
486
b32b8144
FG
487 ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
488 if (ret < 0) {
489 lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
490 return -ret;
31f18b77
FG
491 }
492
493 ret = bucket_info_updater.complete();
494 if (ret < 0) {
495 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
496 /* don't error out, reshard process succeeded */
497 }
498 return 0;
499}
500
501int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
502{
503 librados::IoCtx index_ctx;
504 map<int, string> bucket_objs;
505
506 int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs);
507 if (r < 0) {
508 return r;
509 }
510
511 for (auto i : bucket_objs) {
512 cls_rgw_bucket_instance_entry entry;
513
514 int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry);
515 if (ret < 0 && ret != -ENOENT) {
516 lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
517 return ret;
518 }
519
520 status->push_back(entry);
521 }
522
523 return 0;
524}
525
526int RGWBucketReshard::execute(int num_shards, int max_op_entries,
527 bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
528
529{
530 int ret = lock_bucket();
531 if (ret < 0) {
532 return ret;
533 }
534
535 RGWBucketInfo new_bucket_info;
536 ret = create_new_bucket_instance(num_shards, new_bucket_info);
537 if (ret < 0) {
538 unlock_bucket();
539 return ret;
540 }
541
542 if (reshard_log) {
543 ret = reshard_log->update(bucket_info, new_bucket_info);
544 if (ret < 0) {
c07f9fc5 545 unlock_bucket();
31f18b77
FG
546 return ret;
547 }
548 }
549
550 ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
551 if (ret < 0) {
552 unlock_bucket();
553 return ret;
554 }
555
556 ret = do_reshard(num_shards,
557 new_bucket_info,
558 max_op_entries,
559 verbose, out, formatter);
560
561 if (ret < 0) {
562 unlock_bucket();
563 return ret;
564 }
565
566 ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_DONE);
567 if (ret < 0) {
568 unlock_bucket();
569 return ret;
570 }
571
572 unlock_bucket();
573
574 return 0;
575}
576
577
578RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
579 Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name),
580 verbose(_verbose), out(_out), formatter(_formatter)
581{
582 num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
583}
584
585string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
586{
587 return tenant + ":" + bucket_name;
588}
589
590#define MAX_RESHARD_LOGSHARDS_PRIME 7877
591
592void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
593{
594 string key = get_logshard_key(tenant, bucket_name);
595
596 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
597 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
598 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
31f18b77 599
1adf2230 600 get_logshard_oid(int(sid), oid);
31f18b77
FG
601}
602
603int RGWReshard::add(cls_rgw_reshard_entry& entry)
604{
3efd9988
FG
605 if (!store->can_reshard()) {
606 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
607 return 0;
608 }
609
31f18b77
FG
610 string logshard_oid;
611
612 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
613
614 librados::ObjectWriteOperation op;
615 cls_rgw_reshard_add(op, entry);
616
617 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
618 if (ret < 0) {
619 lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
620 return ret;
621 }
622 return 0;
623}
624
625int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
626{
627 cls_rgw_reshard_entry entry;
628 entry.bucket_name = bucket_info.bucket.name;
629 entry.bucket_id = bucket_info.bucket.bucket_id;
b32b8144 630 entry.tenant = bucket_info.owner.tenant;
31f18b77
FG
631
632 int ret = get(entry);
633 if (ret < 0) {
634 return ret;
635 }
636
637 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
638
639 ret = add(entry);
640 if (ret < 0) {
641 ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
642 cpp_strerror(-ret) << dendl;
643 }
644
645 return ret;
646}
647
648
649int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
650{
651 string logshard_oid;
652
653 get_logshard_oid(logshard_num, &logshard_oid);
654
655 int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
c07f9fc5
FG
656
657 if (ret < 0) {
658 if (ret == -ENOENT) {
659 *is_truncated = false;
660 ret = 0;
661 }
31f18b77 662 lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
c07f9fc5
FG
663 if (ret == -EACCES) {
664 lderr(store->ctx()) << "access denied to pool " << store->get_zone_params().reshard_pool
665 << ". Fix the pool access permissions of your client" << dendl;
666 }
31f18b77 667 }
c07f9fc5
FG
668
669 return ret;
31f18b77
FG
670}
671
672int RGWReshard::get(cls_rgw_reshard_entry& entry)
673{
674 string logshard_oid;
675
676 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
677
678 int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
679 if (ret < 0) {
94b18763
FG
680 if (ret != -ENOENT) {
681 lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
682 " bucket=" << entry.bucket_name << dendl;
683 }
31f18b77
FG
684 return ret;
685 }
686
687 return 0;
688}
689
690int RGWReshard::remove(cls_rgw_reshard_entry& entry)
691{
692 string logshard_oid;
693
694 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
695
696 librados::ObjectWriteOperation op;
697 cls_rgw_reshard_remove(op, entry);
698
699 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
700 if (ret < 0) {
701 lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
702 return ret;
703 }
704
705 return ret;
706}
707
708int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
709{
710 int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
711 if (ret < 0) {
712 lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
713 return ret;
714 }
715
716 return 0;
717}
718
719const int num_retries = 10;
720const int default_reshard_sleep_duration = 5;
721
722int RGWReshardWait::do_wait()
723{
724 Mutex::Locker l(lock);
725
726 cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0));
727
728 if (going_down) {
729 return -ECANCELED;
730 }
731
732 return 0;
733}
734
735int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
736{
737 int ret = 0;
738 cls_rgw_bucket_instance_entry entry;
739
740 for (int i=0; i < num_retries;i++) {
741 ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
742 if (ret < 0) {
743 ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :" <<
744 cpp_strerror(-ret)<< dendl;
745 return ret;
746 }
747 if (!entry.resharding_in_progress()) {
748 *new_bucket_id = entry.new_bucket_instance_id;
749 return 0;
750 }
751 ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
752 /* needed to unlock as clear resharding uses the same lock */
753
754 if (i == num_retries - 1) {
755 break;
756 }
757
758 ret = do_wait();
759 if (ret < 0) {
760 ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
761 return ret;
762 }
763 }
764 ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
765 return -ERR_BUSY_RESHARDING;
766}
767
768int RGWReshard::process_single_logshard(int logshard_num)
769{
770 string marker;
771 bool truncated = true;
772
773 CephContext *cct = store->ctx();
774 int max_entries = 1000;
775 int max_secs = 60;
776
777 rados::cls::lock::Lock l(reshard_lock_name);
778
779 utime_t time(max_secs, 0);
780 l.set_duration(time);
781
782 char cookie_buf[COOKIE_LEN + 1];
783 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
784 cookie_buf[COOKIE_LEN] = '\0';
785
786 l.set_cookie(cookie_buf);
787
788 string logshard_oid;
789 get_logshard_oid(logshard_num, &logshard_oid);
790
791 int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
792 if (ret == -EBUSY) { /* already locked by another processor */
793 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
794 return ret;
795 }
796
797 utime_t lock_start_time = ceph_clock_now();
798
799 do {
800 std::list<cls_rgw_reshard_entry> entries;
801 ret = list(logshard_num, marker, max_entries, entries, &truncated);
802 if (ret < 0) {
803 ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
804 continue;
805 }
806
807 for(auto& entry: entries) {
808 if(entry.new_instance_id.empty()) {
809
810 ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl;
811
812 RGWObjectCtx obj_ctx(store);
813 rgw_bucket bucket;
814 RGWBucketInfo bucket_info;
815 map<string, bufferlist> attrs;
816
817 ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
818 &attrs);
819 if (ret < 0) {
820 ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
821 return -ret;
822 }
823
824 RGWBucketReshard br(store, bucket_info, attrs);
825
826 Formatter* formatter = new JSONFormatter(false);
827 auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
828 ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
829 if (ret < 0) {
830 ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
831 cpp_strerror(-ret)<< dendl;
832 return ret;
833 }
834
835 ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name<< dendl;
836
837 ret = remove(entry);
838 if (ret < 0) {
839 ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
840 << cpp_strerror(-ret) << dendl;
841 return ret;
842 }
843 }
844 utime_t now = ceph_clock_now();
845
846 if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
847 l.set_renew(true);
848 ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
849 if (ret == -EBUSY) { /* already locked by another processor */
850 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
851 return ret;
852 }
853 lock_start_time = now;
854 }
855 entry.get_key(&marker);
856 }
857 } while (truncated);
858
859 l.unlock(&store->reshard_pool_ctx, logshard_oid);
860 return 0;
861}
862
863
864void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
865{
866 char buf[32];
867 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
868
869 string objname(reshard_oid_prefix);
870 *logshard = objname + buf;
871}
872
873int RGWReshard::process_all_logshards()
874{
3efd9988
FG
875 if (!store->can_reshard()) {
876 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
877 return 0;
878 }
31f18b77
FG
879 int ret = 0;
880
881 for (int i = 0; i < num_logshards; i++) {
882 string logshard;
883 get_logshard_oid(i, &logshard);
884
885 ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl;
886
887 ret = process_single_logshard(i);
888 if (ret <0) {
889 return ret;
890 }
891 }
892
893 return 0;
894}
895
896bool RGWReshard::going_down()
897{
898 return down_flag;
899}
900
901void RGWReshard::start_processor()
902{
903 worker = new ReshardWorker(store->ctx(), this);
904 worker->create("rgw_reshard");
905}
906
907void RGWReshard::stop_processor()
908{
909 down_flag = true;
910 if (worker) {
911 worker->stop();
912 worker->join();
913 }
914 delete worker;
224ce89b 915 worker = nullptr;
31f18b77
FG
916}
917
918void *RGWReshard::ReshardWorker::entry() {
919 utime_t last_run;
920 do {
921 utime_t start = ceph_clock_now();
31f18b77
FG
922 if (reshard->process_all_logshards()) {
923 /* All shards have been processed properly. Next time we can start
924 * from this moment. */
925 last_run = start;
926 }
31f18b77
FG
927
928 if (reshard->going_down())
929 break;
930
931 utime_t end = ceph_clock_now();
932 end -= start;
933 int secs = cct->_conf->rgw_reshard_thread_interval;
934
935 if (secs <= end.sec())
936 continue; // next round
937
938 secs -= end.sec();
939
940 lock.Lock();
941 cond.WaitInterval(lock, utime_t(secs, 0));
942 lock.Unlock();
943 } while (!reshard->going_down());
944
945 return NULL;
946}
947
948void RGWReshard::ReshardWorker::stop()
949{
950 Mutex::Locker l(lock);
951 cond.Signal();
952}