]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_reshard.cc
update sources to 12.2.8
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
CommitLineData
31f18b77
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw_rados.h"
5#include "rgw_bucket.h"
6#include "rgw_reshard.h"
7#include "cls/rgw/cls_rgw_client.h"
8#include "cls/lock/cls_lock_client.h"
9#include "common/errno.h"
10#include "common/ceph_json.h"
11
12#include "common/dout.h"
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rgw
16
17const string reshard_oid_prefix = "reshard.";
18const string reshard_lock_name = "reshard_process";
19const string bucket_instance_lock_name = "bucket_instance_lock";
20
21using namespace std;
22
23#define RESHARD_SHARD_WINDOW 64
24#define RESHARD_MAX_AIO 128
25
26class BucketReshardShard {
27 RGWRados *store;
28 const RGWBucketInfo& bucket_info;
29 int num_shard;
30 RGWRados::BucketShard bs;
31 vector<rgw_cls_bi_entry> entries;
32 map<uint8_t, rgw_bucket_category_stats> stats;
33 deque<librados::AioCompletion *>& aio_completions;
34
35 int wait_next_completion() {
36 librados::AioCompletion *c = aio_completions.front();
37 aio_completions.pop_front();
38
39 c->wait_for_safe();
40
41 int ret = c->get_return_value();
42 c->release();
43
44 if (ret < 0) {
45 derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
46 return ret;
47 }
48
49 return 0;
50 }
51
52 int get_completion(librados::AioCompletion **c) {
53 if (aio_completions.size() >= RESHARD_MAX_AIO) {
54 int ret = wait_next_completion();
55 if (ret < 0) {
56 return ret;
57 }
58 }
59
60 *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
61 aio_completions.push_back(*c);
62
63 return 0;
64 }
65
66public:
67 BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
68 int _num_shard,
69 deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
70 aio_completions(_completions) {
71 num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
72 bs.init(bucket_info.bucket, num_shard);
73 }
74
75 int get_num_shard() {
76 return num_shard;
77 }
78
79 int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
80 const rgw_bucket_category_stats& entry_stats) {
81 entries.push_back(entry);
82 if (account) {
83 rgw_bucket_category_stats& target = stats[category];
84 target.num_entries += entry_stats.num_entries;
85 target.total_size += entry_stats.total_size;
86 target.total_size_rounded += entry_stats.total_size_rounded;
87 }
88 if (entries.size() >= RESHARD_SHARD_WINDOW) {
89 int ret = flush();
90 if (ret < 0) {
91 return ret;
92 }
93 }
94 return 0;
95 }
96 int flush() {
97 if (entries.size() == 0) {
98 return 0;
99 }
100
101 librados::ObjectWriteOperation op;
102 for (auto& entry : entries) {
103 store->bi_put(op, bs, entry);
104 }
105 cls_rgw_bucket_update_stats(op, false, stats);
106
107 librados::AioCompletion *c;
108 int ret = get_completion(&c);
109 if (ret < 0) {
110 return ret;
111 }
112 ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
113 if (ret < 0) {
114 derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
115 return ret;
116 }
117 entries.clear();
118 stats.clear();
119 return 0;
120 }
121
122 int wait_all_aio() {
123 int ret = 0;
124 while (!aio_completions.empty()) {
125 int r = wait_next_completion();
126 if (r < 0) {
127 ret = r;
128 }
129 }
130 return ret;
131 }
132};
133
134class BucketReshardManager {
135 RGWRados *store;
136 const RGWBucketInfo& target_bucket_info;
137 deque<librados::AioCompletion *> completions;
138 int num_target_shards;
139 vector<BucketReshardShard *> target_shards;
140
141public:
142 BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
143 num_target_shards(_num_target_shards) {
144 target_shards.resize(num_target_shards);
145 for (int i = 0; i < num_target_shards; ++i) {
146 target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
147 }
148 }
149
150 ~BucketReshardManager() {
151 for (auto& shard : target_shards) {
152 int ret = shard->wait_all_aio();
153 if (ret < 0) {
154 ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
155 }
156 }
157 }
158
159 int add_entry(int shard_index,
160 rgw_cls_bi_entry& entry, bool account, uint8_t category,
161 const rgw_bucket_category_stats& entry_stats) {
162 int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
163 if (ret < 0) {
164 derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl;
165 return ret;
166 }
167 return 0;
168 }
169
170 int finish() {
171 int ret = 0;
172 for (auto& shard : target_shards) {
173 int r = shard->flush();
174 if (r < 0) {
175 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
176 ret = r;
177 }
178 }
179 for (auto& shard : target_shards) {
180 int r = shard->wait_all_aio();
181 if (r < 0) {
182 derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
183 ret = r;
184 }
185 delete shard;
186 }
187 target_shards.clear();
188 return ret;
189 }
190};
191
192RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
193 store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
194 reshard_lock(reshard_lock_name) {
195 const rgw_bucket& b = bucket_info.bucket;
196 reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
197
198 utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
199#define COOKIE_LEN 16
200 char cookie_buf[COOKIE_LEN + 1];
201 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
202 cookie_buf[COOKIE_LEN] = '\0';
203
204 reshard_lock.set_cookie(cookie_buf);
205 reshard_lock.set_duration(lock_duration);
206}
207
208int RGWBucketReshard::lock_bucket()
209{
210 int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
211 if (ret < 0) {
212 ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
213 return ret;
214 }
215 return 0;
216}
217
218void RGWBucketReshard::unlock_bucket()
219{
220 int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
221 if (ret < 0) {
222 ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
223 }
224}
225
226int RGWBucketReshard::set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status)
227{
228 if (new_instance_id.empty()) {
229 ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
230 return -EINVAL;
231 }
232
233 cls_rgw_bucket_instance_entry instance_entry;
234 instance_entry.set_status(new_instance_id, num_shards, status);
235
236 int ret = store->bucket_set_reshard(bucket_info, instance_entry);
237 if (ret < 0) {
238 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
239 << cpp_strerror(-ret) << dendl;
240 return ret;
241 }
242 return 0;
243}
244
245int RGWBucketReshard::clear_resharding()
246{
247 cls_rgw_bucket_instance_entry instance_entry;
248
249 int ret = store->bucket_set_reshard(bucket_info, instance_entry);
250 if (ret < 0) {
251 ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
252 << cpp_strerror(-ret) << dendl;
253 return ret;
254 }
255 return 0;
256}
257
258static int create_new_bucket_instance(RGWRados *store,
259 int new_num_shards,
260 const RGWBucketInfo& bucket_info,
261 map<string, bufferlist>& attrs,
262 RGWBucketInfo& new_bucket_info)
263{
264 new_bucket_info = bucket_info;
265
266 store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
267 new_bucket_info.bucket.oid.clear();
268
269 new_bucket_info.num_shards = new_num_shards;
270 new_bucket_info.objv_tracker.clear();
271
272 new_bucket_info.new_bucket_instance_id.clear();
273 new_bucket_info.reshard_status = 0;
274
275 int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
276 if (ret < 0) {
277 cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
278 return -ret;
279 }
280
281 ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
282 if (ret < 0) {
283 cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
284 return -ret;
285 }
286
287 return 0;
288}
289
290int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
291 RGWBucketInfo& new_bucket_info)
292{
293 return ::create_new_bucket_instance(store, new_num_shards, bucket_info, bucket_attrs, new_bucket_info);
294}
295
94b18763
FG
296int RGWBucketReshard::cancel()
297{
298 int ret = lock_bucket();
299 if (ret < 0) {
300 return ret;
301 }
302
303 ret = clear_resharding();
304
305 unlock_bucket();
306 return 0;
307}
308
31f18b77
FG
309class BucketInfoReshardUpdate
310{
311 RGWRados *store;
312 RGWBucketInfo bucket_info;
313 std::map<string, bufferlist> bucket_attrs;
314
315 bool in_progress{false};
316
317 int set_status(cls_rgw_reshard_status s) {
318 bucket_info.reshard_status = s;
319 int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
320 if (ret < 0) {
321 ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
322 return ret;
323 }
324 return 0;
325 }
326
327public:
328 BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info,
329 map<string, bufferlist>& _bucket_attrs, const string& new_bucket_id) : store(_store),
330 bucket_info(_bucket_info),
331 bucket_attrs(_bucket_attrs) {
332 bucket_info.new_bucket_instance_id = new_bucket_id;
333 }
334 ~BucketInfoReshardUpdate() {
335 if (in_progress) {
336 bucket_info.new_bucket_instance_id.clear();
337 set_status(CLS_RGW_RESHARD_NONE);
338 }
339 }
340
341 int start() {
342 int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS);
343 if (ret < 0) {
344 return ret;
345 }
346 in_progress = true;
347 return 0;
348 }
349
350 int complete() {
351 int ret = set_status(CLS_RGW_RESHARD_DONE);
352 if (ret < 0) {
353 return ret;
354 }
355 in_progress = false;
356 return 0;
357 }
358};
359
360int RGWBucketReshard::do_reshard(
361 int num_shards,
b32b8144 362 RGWBucketInfo& new_bucket_info,
31f18b77
FG
363 int max_entries,
364 bool verbose,
365 ostream *out,
366 Formatter *formatter)
367{
368 rgw_bucket& bucket = bucket_info.bucket;
369
370 int ret = 0;
371
372 if (out) {
373 (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
374 (*out) << "*** these will need to be removed manually ***" << std::endl;
375 (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
376 (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
377 (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
378 (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
379 }
380
381 /* update bucket info -- in progress*/
382 list<rgw_cls_bi_entry> entries;
383
384 if (max_entries < 0) {
385 ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl;
386 return -EINVAL;
387 }
388
389 BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
390
391 ret = bucket_info_updater.start();
392 if (ret < 0) {
393 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
394 return ret;
395 }
396
397 int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
398
399 BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
400
401 verbose = verbose && (formatter != nullptr);
402
403 if (verbose) {
404 formatter->open_array_section("entries");
405 }
406
407 uint64_t total_entries = 0;
408
409 if (!verbose) {
410 cout << "total entries:";
411 }
412
413 int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
414 string marker;
415 for (int i = 0; i < num_source_shards; ++i) {
416 bool is_truncated = true;
417 marker.clear();
418 while (is_truncated) {
419 entries.clear();
420 ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
421 if (ret < 0 && ret != -ENOENT) {
422 derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
423 return -ret;
424 }
425
426 list<rgw_cls_bi_entry>::iterator iter;
427 for (iter = entries.begin(); iter != entries.end(); ++iter) {
428 rgw_cls_bi_entry& entry = *iter;
429 if (verbose) {
430 formatter->open_object_section("entry");
431
432 encode_json("shard_id", i, formatter);
433 encode_json("num_entry", total_entries, formatter);
434 encode_json("entry", entry, formatter);
435 }
436 total_entries++;
437
438 marker = entry.idx;
439
440 int target_shard_id;
441 cls_rgw_obj_key cls_key;
442 uint8_t category;
443 rgw_bucket_category_stats stats;
444 bool account = entry.get_info(&cls_key, &category, &stats);
445 rgw_obj_key key(cls_key);
446 rgw_obj obj(new_bucket_info.bucket, key);
447 int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
448 if (ret < 0) {
449 lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
450 return ret;
451 }
452
453 int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
454
455 ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
456 if (ret < 0) {
457 return ret;
458 }
459 if (verbose) {
460 formatter->close_section();
461 if (out) {
462 formatter->flush(*out);
463 formatter->flush(*out);
464 }
465 } else if (out && !(total_entries % 1000)) {
466 (*out) << " " << total_entries;
467 }
468 }
469 }
470 }
471 if (verbose) {
472 formatter->close_section();
473 if (out) {
474 formatter->flush(*out);
475 }
476 } else if (out) {
477 (*out) << " " << total_entries << std::endl;
478 }
479
480 ret = target_shards_mgr.finish();
481 if (ret < 0) {
482 lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
483 return EIO;
484 }
485
b32b8144
FG
486 ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
487 if (ret < 0) {
488 lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
489 return -ret;
31f18b77
FG
490 }
491
492 ret = bucket_info_updater.complete();
493 if (ret < 0) {
494 ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
495 /* don't error out, reshard process succeeded */
496 }
497 return 0;
498}
499
500int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
501{
502 librados::IoCtx index_ctx;
503 map<int, string> bucket_objs;
504
505 int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs);
506 if (r < 0) {
507 return r;
508 }
509
510 for (auto i : bucket_objs) {
511 cls_rgw_bucket_instance_entry entry;
512
513 int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry);
514 if (ret < 0 && ret != -ENOENT) {
515 lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
516 return ret;
517 }
518
519 status->push_back(entry);
520 }
521
522 return 0;
523}
524
525int RGWBucketReshard::execute(int num_shards, int max_op_entries,
526 bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
527
528{
529 int ret = lock_bucket();
530 if (ret < 0) {
531 return ret;
532 }
533
534 RGWBucketInfo new_bucket_info;
535 ret = create_new_bucket_instance(num_shards, new_bucket_info);
536 if (ret < 0) {
537 unlock_bucket();
538 return ret;
539 }
540
541 if (reshard_log) {
542 ret = reshard_log->update(bucket_info, new_bucket_info);
543 if (ret < 0) {
c07f9fc5 544 unlock_bucket();
31f18b77
FG
545 return ret;
546 }
547 }
548
549 ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
550 if (ret < 0) {
551 unlock_bucket();
552 return ret;
553 }
554
555 ret = do_reshard(num_shards,
556 new_bucket_info,
557 max_op_entries,
558 verbose, out, formatter);
559
560 if (ret < 0) {
561 unlock_bucket();
562 return ret;
563 }
564
565 ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_DONE);
566 if (ret < 0) {
567 unlock_bucket();
568 return ret;
569 }
570
571 unlock_bucket();
572
573 return 0;
574}
575
576
577RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
578 Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name),
579 verbose(_verbose), out(_out), formatter(_formatter)
580{
581 num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
582}
583
584string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
585{
586 return tenant + ":" + bucket_name;
587}
588
589#define MAX_RESHARD_LOGSHARDS_PRIME 7877
590
591void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
592{
593 string key = get_logshard_key(tenant, bucket_name);
594
595 uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
596 uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
597 sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
31f18b77 598
1adf2230 599 get_logshard_oid(int(sid), oid);
31f18b77
FG
600}
601
602int RGWReshard::add(cls_rgw_reshard_entry& entry)
603{
3efd9988
FG
604 if (!store->can_reshard()) {
605 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
606 return 0;
607 }
608
31f18b77
FG
609 string logshard_oid;
610
611 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
612
613 librados::ObjectWriteOperation op;
614 cls_rgw_reshard_add(op, entry);
615
616 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
617 if (ret < 0) {
618 lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
619 return ret;
620 }
621 return 0;
622}
623
624int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
625{
626 cls_rgw_reshard_entry entry;
627 entry.bucket_name = bucket_info.bucket.name;
628 entry.bucket_id = bucket_info.bucket.bucket_id;
b32b8144 629 entry.tenant = bucket_info.owner.tenant;
31f18b77
FG
630
631 int ret = get(entry);
632 if (ret < 0) {
633 return ret;
634 }
635
636 entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
637
638 ret = add(entry);
639 if (ret < 0) {
640 ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
641 cpp_strerror(-ret) << dendl;
642 }
643
644 return ret;
645}
646
647
648int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
649{
650 string logshard_oid;
651
652 get_logshard_oid(logshard_num, &logshard_oid);
653
654 int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
c07f9fc5
FG
655
656 if (ret < 0) {
657 if (ret == -ENOENT) {
658 *is_truncated = false;
659 ret = 0;
660 }
31f18b77 661 lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
c07f9fc5
FG
662 if (ret == -EACCES) {
663 lderr(store->ctx()) << "access denied to pool " << store->get_zone_params().reshard_pool
664 << ". Fix the pool access permissions of your client" << dendl;
665 }
31f18b77 666 }
c07f9fc5
FG
667
668 return ret;
31f18b77
FG
669}
670
671int RGWReshard::get(cls_rgw_reshard_entry& entry)
672{
673 string logshard_oid;
674
675 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
676
677 int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
678 if (ret < 0) {
94b18763
FG
679 if (ret != -ENOENT) {
680 lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant <<
681 " bucket=" << entry.bucket_name << dendl;
682 }
31f18b77
FG
683 return ret;
684 }
685
686 return 0;
687}
688
689int RGWReshard::remove(cls_rgw_reshard_entry& entry)
690{
691 string logshard_oid;
692
693 get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
694
695 librados::ObjectWriteOperation op;
696 cls_rgw_reshard_remove(op, entry);
697
698 int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
699 if (ret < 0) {
700 lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
701 return ret;
702 }
703
704 return ret;
705}
706
707int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
708{
709 int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
710 if (ret < 0) {
711 lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
712 return ret;
713 }
714
715 return 0;
716}
717
718const int num_retries = 10;
719const int default_reshard_sleep_duration = 5;
720
721int RGWReshardWait::do_wait()
722{
723 Mutex::Locker l(lock);
724
725 cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0));
726
727 if (going_down) {
728 return -ECANCELED;
729 }
730
731 return 0;
732}
733
734int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
735{
736 int ret = 0;
737 cls_rgw_bucket_instance_entry entry;
738
739 for (int i=0; i < num_retries;i++) {
740 ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
741 if (ret < 0) {
742 ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :" <<
743 cpp_strerror(-ret)<< dendl;
744 return ret;
745 }
746 if (!entry.resharding_in_progress()) {
747 *new_bucket_id = entry.new_bucket_instance_id;
748 return 0;
749 }
750 ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
751 /* needed to unlock as clear resharding uses the same lock */
752
753 if (i == num_retries - 1) {
754 break;
755 }
756
757 ret = do_wait();
758 if (ret < 0) {
759 ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
760 return ret;
761 }
762 }
763 ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
764 return -ERR_BUSY_RESHARDING;
765}
766
767int RGWReshard::process_single_logshard(int logshard_num)
768{
769 string marker;
770 bool truncated = true;
771
772 CephContext *cct = store->ctx();
773 int max_entries = 1000;
774 int max_secs = 60;
775
776 rados::cls::lock::Lock l(reshard_lock_name);
777
778 utime_t time(max_secs, 0);
779 l.set_duration(time);
780
781 char cookie_buf[COOKIE_LEN + 1];
782 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
783 cookie_buf[COOKIE_LEN] = '\0';
784
785 l.set_cookie(cookie_buf);
786
787 string logshard_oid;
788 get_logshard_oid(logshard_num, &logshard_oid);
789
790 int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
791 if (ret == -EBUSY) { /* already locked by another processor */
792 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
793 return ret;
794 }
795
796 utime_t lock_start_time = ceph_clock_now();
797
798 do {
799 std::list<cls_rgw_reshard_entry> entries;
800 ret = list(logshard_num, marker, max_entries, entries, &truncated);
801 if (ret < 0) {
802 ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
803 continue;
804 }
805
806 for(auto& entry: entries) {
807 if(entry.new_instance_id.empty()) {
808
809 ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl;
810
811 RGWObjectCtx obj_ctx(store);
812 rgw_bucket bucket;
813 RGWBucketInfo bucket_info;
814 map<string, bufferlist> attrs;
815
816 ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
817 &attrs);
818 if (ret < 0) {
819 ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
820 return -ret;
821 }
822
823 RGWBucketReshard br(store, bucket_info, attrs);
824
825 Formatter* formatter = new JSONFormatter(false);
826 auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
827 ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
828 if (ret < 0) {
829 ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
830 cpp_strerror(-ret)<< dendl;
831 return ret;
832 }
833
834 ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name<< dendl;
835
836 ret = remove(entry);
837 if (ret < 0) {
838 ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
839 << cpp_strerror(-ret) << dendl;
840 return ret;
841 }
842 }
843 utime_t now = ceph_clock_now();
844
845 if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
846 l.set_renew(true);
847 ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
848 if (ret == -EBUSY) { /* already locked by another processor */
849 ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
850 return ret;
851 }
852 lock_start_time = now;
853 }
854 entry.get_key(&marker);
855 }
856 } while (truncated);
857
858 l.unlock(&store->reshard_pool_ctx, logshard_oid);
859 return 0;
860}
861
862
863void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
864{
865 char buf[32];
866 snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
867
868 string objname(reshard_oid_prefix);
869 *logshard = objname + buf;
870}
871
872int RGWReshard::process_all_logshards()
873{
3efd9988
FG
874 if (!store->can_reshard()) {
875 ldout(store->ctx(), 20) << __func__ << " Resharding is disabled" << dendl;
876 return 0;
877 }
31f18b77
FG
878 int ret = 0;
879
880 for (int i = 0; i < num_logshards; i++) {
881 string logshard;
882 get_logshard_oid(i, &logshard);
883
884 ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl;
885
886 ret = process_single_logshard(i);
887 if (ret <0) {
888 return ret;
889 }
890 }
891
892 return 0;
893}
894
895bool RGWReshard::going_down()
896{
897 return down_flag;
898}
899
900void RGWReshard::start_processor()
901{
902 worker = new ReshardWorker(store->ctx(), this);
903 worker->create("rgw_reshard");
904}
905
906void RGWReshard::stop_processor()
907{
908 down_flag = true;
909 if (worker) {
910 worker->stop();
911 worker->join();
912 }
913 delete worker;
224ce89b 914 worker = nullptr;
31f18b77
FG
915}
916
917void *RGWReshard::ReshardWorker::entry() {
918 utime_t last_run;
919 do {
920 utime_t start = ceph_clock_now();
31f18b77
FG
921 if (reshard->process_all_logshards()) {
922 /* All shards have been processed properly. Next time we can start
923 * from this moment. */
924 last_run = start;
925 }
31f18b77
FG
926
927 if (reshard->going_down())
928 break;
929
930 utime_t end = ceph_clock_now();
931 end -= start;
932 int secs = cct->_conf->rgw_reshard_thread_interval;
933
934 if (secs <= end.sec())
935 continue; // next round
936
937 secs -= end.sec();
938
939 lock.Lock();
940 cond.WaitInterval(lock, utime_t(secs, 0));
941 lock.Unlock();
942 } while (!reshard->going_down());
943
944 return NULL;
945}
946
947void RGWReshard::ReshardWorker::stop()
948{
949 Mutex::Locker l(lock);
950 cond.Signal();
951}