]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/optional.hpp>
5
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
13
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
16 #include "rgw_zone.h"
17 #include "rgw_sync.h"
18 #include "rgw_metadata.h"
19 #include "rgw_rest_conn.h"
20 #include "rgw_tools.h"
21 #include "rgw_cr_rados.h"
22 #include "rgw_cr_rest.h"
23 #include "rgw_http_client.h"
24 #include "rgw_sync_trace.h"
25
26 #include "cls/lock/cls_lock_client.h"
27
28 #include "services/svc_zone.h"
29
30 #include <boost/asio/yield.hpp>
31
32 #define dout_subsys ceph_subsys_rgw
33
34 #undef dout_prefix
35 #define dout_prefix (*_dout << "meta sync: ")
36
37 static string mdlog_sync_status_oid = "mdlog.sync-status";
38 static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
39 static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
40
41 RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
42 for (int i = 0; i < num_shards; i++) {
43 oids.push_back(get_shard_oid(oid_prefix, i));
44 }
45 }
46 string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
47 char buf[oid_prefix.size() + 16];
48 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
49 return string(buf);
50 }
51
52 RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
53 cls_log_entry entry;
54
55 rgw_sync_error_info info(source_zone, error_code, message);
56 bufferlist bl;
57 encode(info, bl);
58 store->time_log_prepare_entry(entry, real_clock::now(), section, name, bl);
59
60 uint32_t shard_id = ++counter % num_shards;
61
62
63 return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
64 }
65
66 void RGWSyncBackoff::update_wait_time()
67 {
68 if (cur_wait == 0) {
69 cur_wait = 1;
70 } else {
71 cur_wait = (cur_wait << 1);
72 }
73 if (cur_wait >= max_secs) {
74 cur_wait = max_secs;
75 }
76 }
77
78 void RGWSyncBackoff::backoff_sleep()
79 {
80 update_wait_time();
81 sleep(cur_wait);
82 }
83
84 void RGWSyncBackoff::backoff(RGWCoroutine *op)
85 {
86 update_wait_time();
87 op->wait(utime_t(cur_wait, 0));
88 }
89
90 int RGWBackoffControlCR::operate() {
91 reenter(this) {
92 // retry the operation until it succeeds
93 while (true) {
94 yield {
95 Mutex::Locker l(lock);
96 cr = alloc_cr();
97 cr->get();
98 call(cr);
99 }
100 {
101 Mutex::Locker l(lock);
102 cr->put();
103 cr = NULL;
104 }
105 if (retcode >= 0) {
106 break;
107 }
108 if (retcode != -EBUSY && retcode != -EAGAIN) {
109 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
110 if (exit_on_error) {
111 return set_cr_error(retcode);
112 }
113 }
114 if (reset_backoff) {
115 backoff.reset();
116 }
117 yield backoff.backoff(this);
118 }
119
120 // run an optional finisher
121 yield call(alloc_finisher_cr());
122 if (retcode < 0) {
123 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
124 return set_cr_error(retcode);
125 }
126 return set_cr_done();
127 }
128 return 0;
129 }
130
131 void rgw_mdlog_info::decode_json(JSONObj *obj) {
132 JSONDecoder::decode_json("num_objects", num_shards, obj);
133 JSONDecoder::decode_json("period", period, obj);
134 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
135 }
136
137 void rgw_mdlog_entry::decode_json(JSONObj *obj) {
138 JSONDecoder::decode_json("id", id, obj);
139 JSONDecoder::decode_json("section", section, obj);
140 JSONDecoder::decode_json("name", name, obj);
141 utime_t ut;
142 JSONDecoder::decode_json("timestamp", ut, obj);
143 timestamp = ut.to_real_time();
144 JSONDecoder::decode_json("data", log_data, obj);
145 }
146
147 void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
148 JSONDecoder::decode_json("marker", marker, obj);
149 JSONDecoder::decode_json("truncated", truncated, obj);
150 JSONDecoder::decode_json("entries", entries, obj);
151 };
152
153 int RGWShardCollectCR::operate() {
154 reenter(this) {
155 while (spawn_next()) {
156 current_running++;
157
158 while (current_running >= max_concurrent) {
159 int child_ret;
160 yield wait_for_child();
161 if (collect_next(&child_ret)) {
162 current_running--;
163 if (child_ret < 0 && child_ret != -ENOENT) {
164 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
165 status = child_ret;
166 }
167 }
168 }
169 }
170 while (current_running > 0) {
171 int child_ret;
172 yield wait_for_child();
173 if (collect_next(&child_ret)) {
174 current_running--;
175 if (child_ret < 0 && child_ret != -ENOENT) {
176 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
177 status = child_ret;
178 }
179 }
180 }
181 if (status < 0) {
182 return set_cr_error(status);
183 }
184 return set_cr_done();
185 }
186 return 0;
187 }
188
189 class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
190 RGWMetaSyncEnv *sync_env;
191
192 const std::string& period;
193 int num_shards;
194 map<int, RGWMetadataLogInfo> *mdlog_info;
195
196 int shard_id;
197 #define READ_MDLOG_MAX_CONCURRENT 10
198
199 public:
200 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
201 const std::string& period, int _num_shards,
202 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
203 sync_env(_sync_env),
204 period(period), num_shards(_num_shards),
205 mdlog_info(_mdlog_info), shard_id(0) {}
206 bool spawn_next() override;
207 };
208
209 class RGWListRemoteMDLogCR : public RGWShardCollectCR {
210 RGWMetaSyncEnv *sync_env;
211
212 const std::string& period;
213 map<int, string> shards;
214 int max_entries_per_shard;
215 map<int, rgw_mdlog_shard_data> *result;
216
217 map<int, string>::iterator iter;
218 #define READ_MDLOG_MAX_CONCURRENT 10
219
220 public:
221 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
222 const std::string& period, map<int, string>& _shards,
223 int _max_entries_per_shard,
224 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
225 sync_env(_sync_env), period(period),
226 max_entries_per_shard(_max_entries_per_shard),
227 result(_result) {
228 shards.swap(_shards);
229 iter = shards.begin();
230 }
231 bool spawn_next() override;
232 };
233
234 RGWRemoteMetaLog::~RGWRemoteMetaLog()
235 {
236 delete error_logger;
237 }
238
239 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
240 {
241 rgw_http_param_pair pairs[] = { { "type", "metadata" },
242 { NULL, NULL } };
243
244 int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
245 if (ret < 0) {
246 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl;
247 return ret;
248 }
249
250 ldpp_dout(dpp, 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
251
252 return 0;
253 }
254
255 int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
256 {
257 if (store->svc.zone->is_meta_master()) {
258 return 0;
259 }
260
261 rgw_mdlog_info log_info;
262 int ret = read_log_info(&log_info);
263 if (ret < 0) {
264 return ret;
265 }
266
267 return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
268 }
269
270 int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
271 {
272 if (store->svc.zone->is_meta_master()) {
273 return 0;
274 }
275
276 return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
277 }
278
279 int RGWRemoteMetaLog::init()
280 {
281 conn = store->svc.zone->get_master_conn();
282
283 int ret = http_manager.start();
284 if (ret < 0) {
285 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
286 return ret;
287 }
288
289 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
290
291 init_sync_env(&sync_env);
292
293 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "meta");
294
295 return 0;
296 }
297
298 void RGWRemoteMetaLog::finish()
299 {
300 going_down = true;
301 stop();
302 }
303
304 #define CLONE_MAX_ENTRIES 100
305
306 int RGWMetaSyncStatusManager::init()
307 {
308 if (store->svc.zone->is_meta_master()) {
309 return 0;
310 }
311
312 if (!store->svc.zone->get_master_conn()) {
313 lderr(store->ctx()) << "no REST connection to master zone" << dendl;
314 return -EIO;
315 }
316
317 int r = rgw_init_ioctx(store->get_rados_handle(), store->svc.zone->get_zone_params().log_pool, ioctx, true);
318 if (r < 0) {
319 lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->svc.zone->get_zone_params().log_pool << " ret=" << r << dendl;
320 return r;
321 }
322
323 r = master_log.init();
324 if (r < 0) {
325 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
326 return r;
327 }
328
329 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
330
331 rgw_meta_sync_status sync_status;
332 r = read_sync_status(&sync_status);
333 if (r < 0 && r != -ENOENT) {
334 lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
335 return r;
336 }
337
338 int num_shards = sync_status.sync_info.num_shards;
339
340 for (int i = 0; i < num_shards; i++) {
341 shard_objs[i] = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env.shard_obj_name(i));
342 }
343
344 RWLock::WLocker wl(ts_to_shard_lock);
345 for (int i = 0; i < num_shards; i++) {
346 clone_markers.push_back(string());
347 utime_shard ut;
348 ut.shard_id = i;
349 ts_to_shard[ut] = i;
350 }
351
352 return 0;
353 }
354
355 unsigned RGWMetaSyncStatusManager::get_subsys() const
356 {
357 return dout_subsys;
358 }
359
360 std::ostream& RGWMetaSyncStatusManager::gen_prefix(std::ostream& out) const
361 {
362 return out << "meta sync: ";
363 }
364
365 void RGWMetaSyncEnv::init(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
366 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
367 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer) {
368 dpp = _dpp;
369 cct = _cct;
370 store = _store;
371 conn = _conn;
372 async_rados = _async_rados;
373 http_manager = _http_manager;
374 error_logger = _error_logger;
375 sync_tracer = _sync_tracer;
376 }
377
378 string RGWMetaSyncEnv::status_oid()
379 {
380 return mdlog_sync_status_oid;
381 }
382
383 string RGWMetaSyncEnv::shard_obj_name(int shard_id)
384 {
385 char buf[mdlog_sync_status_shard_prefix.size() + 16];
386 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
387
388 return string(buf);
389 }
390
391 class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
392 RGWRados *store;
393 RGWMetadataLog *mdlog;
394 int shard_id;
395 string *marker;
396 int max_entries;
397 list<cls_log_entry> *entries;
398 bool *truncated;
399
400 protected:
401 int _send_request() override {
402 real_time from_time;
403 real_time end_time;
404
405 void *handle;
406
407 mdlog->init_list_entries(shard_id, from_time, end_time, *marker, &handle);
408
409 int ret = mdlog->list_entries(handle, max_entries, *entries, marker, truncated);
410
411 mdlog->complete_list_entries(handle);
412
413 return ret;
414 }
415 public:
416 RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
417 RGWMetadataLog* mdlog, int _shard_id,
418 string* _marker, int _max_entries,
419 list<cls_log_entry> *_entries, bool *_truncated)
420 : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
421 shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
422 entries(_entries), truncated(_truncated) {}
423 };
424
425 class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
426 RGWMetaSyncEnv *sync_env;
427 RGWMetadataLog *const mdlog;
428 int shard_id;
429 string marker;
430 string *pmarker;
431 int max_entries;
432 list<cls_log_entry> *entries;
433 bool *truncated;
434
435 RGWAsyncReadMDLogEntries *req{nullptr};
436
437 public:
438 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
439 int _shard_id, string*_marker, int _max_entries,
440 list<cls_log_entry> *_entries, bool *_truncated)
441 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
442 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
443 entries(_entries), truncated(_truncated) {}
444
445 ~RGWReadMDLogEntriesCR() override {
446 if (req) {
447 req->finish();
448 }
449 }
450
451 int send_request() override {
452 marker = *pmarker;
453 req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
454 sync_env->store, mdlog, shard_id, &marker,
455 max_entries, entries, truncated);
456 sync_env->async_rados->queue(req);
457 return 0;
458 }
459
460 int request_complete() override {
461 int ret = req->get_ret_status();
462 if (ret >= 0 && !entries->empty()) {
463 *pmarker = marker;
464 }
465 return req->get_ret_status();
466 }
467 };
468
469
470 class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
471 RGWMetaSyncEnv *env;
472 RGWRESTReadResource *http_op;
473
474 const std::string& period;
475 int shard_id;
476 RGWMetadataLogInfo *shard_info;
477
478 public:
479 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
480 int _shard_id, RGWMetadataLogInfo *_shard_info)
481 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
482 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
483
484 int operate() override {
485 auto store = env->store;
486 RGWRESTConn *conn = store->svc.zone->get_master_conn();
487 reenter(this) {
488 yield {
489 char buf[16];
490 snprintf(buf, sizeof(buf), "%d", shard_id);
491 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
492 { "id", buf },
493 { "period", period.c_str() },
494 { "info" , NULL },
495 { NULL, NULL } };
496
497 string p = "/admin/log/";
498
499 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
500 env->http_manager);
501
502 init_new_io(http_op);
503
504 int ret = http_op->aio_read();
505 if (ret < 0) {
506 ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
507 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
508 http_op->put();
509 return set_cr_error(ret);
510 }
511
512 return io_block(0);
513 }
514 yield {
515 int ret = http_op->wait(shard_info);
516 http_op->put();
517 if (ret < 0) {
518 return set_cr_error(ret);
519 }
520 return set_cr_done();
521 }
522 }
523 return 0;
524 }
525 };
526
527 class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
528 RGWMetaSyncEnv *sync_env;
529 RGWRESTReadResource *http_op;
530
531 const std::string& period;
532 int shard_id;
533 string marker;
534 uint32_t max_entries;
535 rgw_mdlog_shard_data *result;
536
537 public:
538 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
539 int _shard_id, const string& _marker, uint32_t _max_entries,
540 rgw_mdlog_shard_data *_result)
541 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
542 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
543
544 int send_request() override {
545 RGWRESTConn *conn = sync_env->conn;
546
547 char buf[32];
548 snprintf(buf, sizeof(buf), "%d", shard_id);
549
550 char max_entries_buf[32];
551 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
552
553 const char *marker_key = (marker.empty() ? "" : "marker");
554
555 rgw_http_param_pair pairs[] = { { "type", "metadata" },
556 { "id", buf },
557 { "period", period.c_str() },
558 { "max-entries", max_entries_buf },
559 { marker_key, marker.c_str() },
560 { NULL, NULL } };
561
562 string p = "/admin/log/";
563
564 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
565 init_new_io(http_op);
566
567 int ret = http_op->aio_read();
568 if (ret < 0) {
569 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
570 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
571 http_op->put();
572 return ret;
573 }
574
575 return 0;
576 }
577
578 int request_complete() override {
579 int ret = http_op->wait(result);
580 http_op->put();
581 if (ret < 0 && ret != -ENOENT) {
582 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
583 return ret;
584 }
585 return 0;
586 }
587 };
588
589 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
590 if (shard_id >= num_shards) {
591 return false;
592 }
593 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
594 shard_id++;
595 return true;
596 }
597
598 bool RGWListRemoteMDLogCR::spawn_next() {
599 if (iter == shards.end()) {
600 return false;
601 }
602
603 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
604 ++iter;
605 return true;
606 }
607
608 class RGWInitSyncStatusCoroutine : public RGWCoroutine {
609 RGWMetaSyncEnv *sync_env;
610
611 rgw_meta_sync_info status;
612 vector<RGWMetadataLogInfo> shards_info;
613 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
614 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
615 public:
616 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
617 const rgw_meta_sync_info &status)
618 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
619 status(status), shards_info(status.num_shards),
620 lease_cr(nullptr), lease_stack(nullptr) {}
621
622 ~RGWInitSyncStatusCoroutine() override {
623 if (lease_cr) {
624 lease_cr->abort();
625 }
626 }
627
628 int operate() override {
629 int ret;
630 reenter(this) {
631 yield {
632 set_status("acquiring sync lock");
633 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
634 string lock_name = "sync_lock";
635 RGWRados *store = sync_env->store;
636 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
637 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env->status_oid()),
638 lock_name, lock_duration, this));
639 lease_stack.reset(spawn(lease_cr.get(), false));
640 }
641 while (!lease_cr->is_locked()) {
642 if (lease_cr->is_done()) {
643 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
644 set_status("lease lock failed, early abort");
645 return set_cr_error(lease_cr->get_ret_status());
646 }
647 set_sleeping(true);
648 yield;
649 }
650 yield {
651 set_status("writing sync status");
652 RGWRados *store = sync_env->store;
653 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc.sysobj,
654 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env->status_oid()),
655 status));
656 }
657
658 if (retcode < 0) {
659 set_status("failed to write sync status");
660 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
661 yield lease_cr->go_down();
662 return set_cr_error(retcode);
663 }
664 /* fetch current position in logs */
665 set_status("fetching remote log position");
666 yield {
667 for (int i = 0; i < (int)status.num_shards; i++) {
668 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
669 &shards_info[i]), false);
670 }
671 }
672
673 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
674
675 yield {
676 set_status("updating sync status");
677 for (int i = 0; i < (int)status.num_shards; i++) {
678 rgw_meta_sync_marker marker;
679 RGWMetadataLogInfo& info = shards_info[i];
680 marker.next_step_marker = info.marker;
681 marker.timestamp = info.last_update;
682 RGWRados *store = sync_env->store;
683 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
684 store->svc.sysobj,
685 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
686 marker), true);
687 }
688 }
689 yield {
690 set_status("changing sync state: build full sync maps");
691 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
692 RGWRados *store = sync_env->store;
693 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc.sysobj,
694 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env->status_oid()),
695 status));
696 }
697 set_status("drop lock lease");
698 yield lease_cr->go_down();
699 while (collect(&ret, NULL)) {
700 if (ret < 0) {
701 return set_cr_error(ret);
702 }
703 yield;
704 }
705 drain_all();
706 return set_cr_done();
707 }
708 return 0;
709 }
710 };
711
712 class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
713 static constexpr int MAX_CONCURRENT_SHARDS = 16;
714
715 RGWMetaSyncEnv *env;
716 const int num_shards;
717 int shard_id{0};
718 map<uint32_t, rgw_meta_sync_marker>& markers;
719
720 public:
721 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
722 map<uint32_t, rgw_meta_sync_marker>& markers)
723 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
724 env(env), num_shards(num_shards), markers(markers)
725 {}
726 bool spawn_next() override;
727 };
728
729 bool RGWReadSyncStatusMarkersCR::spawn_next()
730 {
731 if (shard_id >= num_shards) {
732 return false;
733 }
734 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
735 rgw_raw_obj obj{env->store->svc.zone->get_zone_params().log_pool,
736 env->shard_obj_name(shard_id)};
737 spawn(new CR(env->async_rados, env->store->svc.sysobj, obj, &markers[shard_id]), false);
738 shard_id++;
739 return true;
740 }
741
742 class RGWReadSyncStatusCoroutine : public RGWCoroutine {
743 RGWMetaSyncEnv *sync_env;
744 rgw_meta_sync_status *sync_status;
745
746 public:
747 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
748 rgw_meta_sync_status *_status)
749 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
750 {}
751 int operate() override;
752 };
753
754 int RGWReadSyncStatusCoroutine::operate()
755 {
756 reenter(this) {
757 // read sync info
758 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
759 yield {
760 bool empty_on_enoent = false; // fail on ENOENT
761 rgw_raw_obj obj{sync_env->store->svc.zone->get_zone_params().log_pool,
762 sync_env->status_oid()};
763 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj, obj,
764 &sync_status->sync_info, empty_on_enoent));
765 }
766 if (retcode < 0) {
767 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status info with "
768 << cpp_strerror(retcode) << dendl;
769 return set_cr_error(retcode);
770 }
771 // read shard markers
772 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
773 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
774 sync_status->sync_markers));
775 if (retcode < 0) {
776 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status markers with "
777 << cpp_strerror(retcode) << dendl;
778 return set_cr_error(retcode);
779 }
780 return set_cr_done();
781 }
782 return 0;
783 }
784
785 class RGWFetchAllMetaCR : public RGWCoroutine {
786 RGWMetaSyncEnv *sync_env;
787
788 int num_shards;
789
790
791 int ret_status;
792
793 list<string> sections;
794 list<string>::iterator sections_iter;
795
796 struct meta_list_result {
797 list<string> keys;
798 string marker;
799 uint64_t count{0};
800 bool truncated{false};
801
802 void decode_json(JSONObj *obj) {
803 JSONDecoder::decode_json("keys", keys, obj);
804 JSONDecoder::decode_json("marker", marker, obj);
805 JSONDecoder::decode_json("count", count, obj);
806 JSONDecoder::decode_json("truncated", truncated, obj);
807 }
808 } result;
809 list<string>::iterator iter;
810
811 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
812
813 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
814 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
815 bool lost_lock;
816 bool failed;
817
818 string marker;
819
820 map<uint32_t, rgw_meta_sync_marker>& markers;
821
822 RGWSyncTraceNodeRef tn;
823
824 public:
825 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
826 map<uint32_t, rgw_meta_sync_marker>& _markers,
827 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
828 num_shards(_num_shards),
829 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
830 lost_lock(false), failed(false), markers(_markers) {
831 tn = sync_env->sync_tracer->add_node(_tn_parent, "fetch_all_meta");
832 }
833
834 ~RGWFetchAllMetaCR() override {
835 }
836
837 void append_section_from_set(set<string>& all_sections, const string& name) {
838 set<string>::iterator iter = all_sections.find(name);
839 if (iter != all_sections.end()) {
840 sections.emplace_back(std::move(*iter));
841 all_sections.erase(iter);
842 }
843 }
844 /*
845 * meta sync should go in the following order: user, bucket.instance, bucket
846 * then whatever other sections exist (if any)
847 */
848 void rearrange_sections() {
849 set<string> all_sections;
850 std::move(sections.begin(), sections.end(),
851 std::inserter(all_sections, all_sections.end()));
852 sections.clear();
853
854 append_section_from_set(all_sections, "user");
855 append_section_from_set(all_sections, "bucket.instance");
856 append_section_from_set(all_sections, "bucket");
857
858 std::move(all_sections.begin(), all_sections.end(),
859 std::back_inserter(sections));
860 }
861
862 int operate() override {
863 RGWRESTConn *conn = sync_env->conn;
864
865 reenter(this) {
866 yield {
867 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
868 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
869 string lock_name = "sync_lock";
870 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
871 sync_env->store,
872 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, sync_env->status_oid()),
873 lock_name, lock_duration, this));
874 lease_stack.reset(spawn(lease_cr.get(), false));
875 }
876 while (!lease_cr->is_locked()) {
877 if (lease_cr->is_done()) {
878 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
879 set_status("failed acquiring lock");
880 return set_cr_error(lease_cr->get_ret_status());
881 }
882 set_sleeping(true);
883 yield;
884 }
885 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
886 sync_env->store->svc.zone->get_zone_params().log_pool,
887 mdlog_sync_full_sync_index_prefix));
888 yield {
889 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
890 "/admin/metadata", NULL, &sections));
891 }
892 if (get_ret_status() < 0) {
893 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch metadata sections" << dendl;
894 yield entries_index->finish();
895 yield lease_cr->go_down();
896 drain_all();
897 return set_cr_error(get_ret_status());
898 }
899 rearrange_sections();
900 sections_iter = sections.begin();
901 for (; sections_iter != sections.end(); ++sections_iter) {
902 do {
903 yield {
904 #define META_FULL_SYNC_CHUNK_SIZE "1000"
905 string entrypoint = string("/admin/metadata/") + *sections_iter;
906 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
907 { "marker", result.marker.c_str() },
908 { NULL, NULL } };
909 result.keys.clear();
910 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
911 entrypoint, pairs, &result));
912 }
913 ret_status = get_ret_status();
914 if (ret_status == -ENOENT) {
915 set_retcode(0); /* reset coroutine status so that we don't return it */
916 ret_status = 0;
917 }
918 if (ret_status < 0) {
919 tn->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter));
920 yield entries_index->finish();
921 yield lease_cr->go_down();
922 drain_all();
923 return set_cr_error(ret_status);
924 }
925 iter = result.keys.begin();
926 for (; iter != result.keys.end(); ++iter) {
927 if (!lease_cr->is_locked()) {
928 lost_lock = true;
929 break;
930 }
931 yield; // allow entries_index consumer to make progress
932
933 tn->log(20, SSTR("list metadata: section=" << *sections_iter << " key=" << *iter));
934 string s = *sections_iter + ":" + *iter;
935 int shard_id;
936 RGWRados *store = sync_env->store;
937 int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
938 if (ret < 0) {
939 tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter));
940 ret_status = ret;
941 break;
942 }
943 if (!entries_index->append(s, shard_id)) {
944 break;
945 }
946 }
947 } while (result.truncated);
948 }
949 yield {
950 if (!entries_index->finish()) {
951 failed = true;
952 }
953 }
954 if (!failed) {
955 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
956 int shard_id = (int)iter->first;
957 rgw_meta_sync_marker& marker = iter->second;
958 marker.total_entries = entries_index->get_total_entries(shard_id);
959 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store->svc.sysobj,
960 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
961 marker), true);
962 }
963 }
964
965 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
966
967 yield lease_cr->go_down();
968
969 int ret;
970 while (collect(&ret, NULL)) {
971 if (ret < 0) {
972 return set_cr_error(ret);
973 }
974 yield;
975 }
976 drain_all();
977 if (failed) {
978 yield return set_cr_error(-EIO);
979 }
980 if (lost_lock) {
981 yield return set_cr_error(-EBUSY);
982 }
983
984 if (ret_status < 0) {
985 yield return set_cr_error(ret_status);
986 }
987
988 yield return set_cr_done();
989 }
990 return 0;
991 }
992 };
993
994 static string full_sync_index_shard_oid(int shard_id)
995 {
996 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
997 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
998 return string(buf);
999 }
1000
1001 class RGWReadRemoteMetadataCR : public RGWCoroutine {
1002 RGWMetaSyncEnv *sync_env;
1003
1004 RGWRESTReadResource *http_op;
1005
1006 string section;
1007 string key;
1008
1009 bufferlist *pbl;
1010
1011 RGWSyncTraceNodeRef tn;
1012
1013 public:
1014 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
1015 const string& _section, const string& _key, bufferlist *_pbl,
1016 const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1017 http_op(NULL),
1018 section(_section),
1019 key(_key),
1020 pbl(_pbl) {
1021 tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta",
1022 section + ":" + key);
1023 }
1024
1025 int operate() override {
1026 RGWRESTConn *conn = sync_env->conn;
1027 reenter(this) {
1028 yield {
1029 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
1030 { NULL, NULL } };
1031
1032 string p = string("/admin/metadata/") + section + "/" + key;
1033
1034 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1035
1036 init_new_io(http_op);
1037
1038 int ret = http_op->aio_read();
1039 if (ret < 0) {
1040 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
1041 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1042 http_op->put();
1043 return set_cr_error(ret);
1044 }
1045
1046 return io_block(0);
1047 }
1048 yield {
1049 int ret = http_op->wait(pbl);
1050 http_op->put();
1051 if (ret < 0) {
1052 return set_cr_error(ret);
1053 }
1054 return set_cr_done();
1055 }
1056 }
1057 return 0;
1058 }
1059 };
1060
1061 class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
1062 RGWRados *store;
1063 string raw_key;
1064 bufferlist bl;
1065 protected:
1066 int _send_request() override {
1067 int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
1068 if (ret < 0) {
1069 ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1070 return ret;
1071 }
1072 return 0;
1073 }
1074 public:
1075 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1076 const string& _raw_key,
1077 bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
1078 raw_key(_raw_key), bl(_bl) {}
1079 };
1080
1081
1082 class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1083 RGWMetaSyncEnv *sync_env;
1084 string raw_key;
1085 bufferlist bl;
1086
1087 RGWAsyncMetaStoreEntry *req;
1088
1089 public:
1090 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1091 const string& _raw_key,
1092 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1093 raw_key(_raw_key), bl(_bl), req(NULL) {
1094 }
1095
1096 ~RGWMetaStoreEntryCR() override {
1097 if (req) {
1098 req->finish();
1099 }
1100 }
1101
1102 int send_request() override {
1103 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1104 sync_env->store, raw_key, bl);
1105 sync_env->async_rados->queue(req);
1106 return 0;
1107 }
1108
1109 int request_complete() override {
1110 return req->get_ret_status();
1111 }
1112 };
1113
1114 class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
1115 RGWRados *store;
1116 string raw_key;
1117 protected:
1118 int _send_request() override {
1119 int ret = store->meta_mgr->remove(raw_key);
1120 if (ret < 0) {
1121 ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1122 return ret;
1123 }
1124 return 0;
1125 }
1126 public:
1127 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1128 const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
1129 raw_key(_raw_key) {}
1130 };
1131
1132
1133 class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1134 RGWMetaSyncEnv *sync_env;
1135 string raw_key;
1136
1137 RGWAsyncMetaRemoveEntry *req;
1138
1139 public:
1140 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1141 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1142 raw_key(_raw_key), req(NULL) {
1143 }
1144
1145 ~RGWMetaRemoveEntryCR() override {
1146 if (req) {
1147 req->finish();
1148 }
1149 }
1150
1151 int send_request() override {
1152 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1153 sync_env->store, raw_key);
1154 sync_env->async_rados->queue(req);
1155 return 0;
1156 }
1157
1158 int request_complete() override {
1159 int r = req->get_ret_status();
1160 if (r == -ENOENT) {
1161 r = 0;
1162 }
1163 return r;
1164 }
1165 };
1166
1167 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1168
1169
1170 int RGWLastCallerWinsCR::operate() {
1171 RGWCoroutine *call_cr;
1172 reenter(this) {
1173 while (cr) {
1174 call_cr = cr;
1175 cr = nullptr;
1176 yield call(call_cr);
1177 /* cr might have been modified at this point */
1178 }
1179 return set_cr_done();
1180 }
1181 return 0;
1182 }
1183
1184 class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1185 RGWMetaSyncEnv *sync_env;
1186
1187 string marker_oid;
1188 rgw_meta_sync_marker sync_marker;
1189
1190 RGWSyncTraceNodeRef tn;
1191
1192 public:
1193 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1194 const string& _marker_oid,
1195 const rgw_meta_sync_marker& _marker,
1196 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
1197 sync_env(_sync_env),
1198 marker_oid(_marker_oid),
1199 sync_marker(_marker),
1200 tn(_tn){}
1201
1202 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1203 sync_marker.marker = new_marker;
1204 if (index_pos > 0) {
1205 sync_marker.pos = index_pos;
1206 }
1207
1208 if (!real_clock::is_zero(timestamp)) {
1209 sync_marker.timestamp = timestamp;
1210 }
1211
1212 ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1213 tn->log(20, SSTR("new marker=" << new_marker));
1214 RGWRados *store = sync_env->store;
1215 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
1216 store->svc.sysobj,
1217 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
1218 sync_marker);
1219 }
1220
1221 RGWOrderCallCR *allocate_order_control_cr() override {
1222 return new RGWLastCallerWinsCR(sync_env->cct);
1223 }
1224 };
1225
1226 RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
1227 const string& _raw_key, const string& _entry_marker,
1228 const RGWMDLogStatus& _op_status,
1229 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
1230 sync_env(_sync_env),
1231 raw_key(_raw_key), entry_marker(_entry_marker),
1232 op_status(_op_status),
1233 pos(0), sync_status(0),
1234 marker_tracker(_marker_tracker), tries(0) {
1235 error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
1236 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
1237 }
1238
1239 int RGWMetaSyncSingleEntryCR::operate() {
1240 reenter(this) {
1241 #define NUM_TRANSIENT_ERROR_RETRIES 10
1242
1243 if (error_injection &&
1244 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
1245 ldpp_dout(sync_env->dpp, 0) << __FILE__ << ":" << __LINE__ << ": injecting meta sync error on key=" << raw_key << dendl;
1246 return set_cr_error(-EIO);
1247 }
1248
1249 if (op_status != MDLOG_STATUS_COMPLETE) {
1250 tn->log(20, "skipping pending operation");
1251 yield call(marker_tracker->finish(entry_marker));
1252 if (retcode < 0) {
1253 return set_cr_error(retcode);
1254 }
1255 return set_cr_done();
1256 }
1257 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
1258 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1259 yield {
1260 pos = raw_key.find(':');
1261 section = raw_key.substr(0, pos);
1262 key = raw_key.substr(pos + 1);
1263 tn->log(10, SSTR("fetching remote metadata entry" << (tries == 0 ? "" : " (retry)")));
1264 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl, tn));
1265 }
1266
1267 sync_status = retcode;
1268
1269 if (sync_status == -ENOENT) {
1270 /* FIXME: do we need to remove the entry from the local zone? */
1271 break;
1272 }
1273
1274 if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1275 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
1276 continue;
1277 }
1278
1279 if (sync_status < 0) {
1280 tn->log(10, SSTR("failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status));
1281 log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1282 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
1283 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1284 return set_cr_error(sync_status);
1285 }
1286
1287 break;
1288 }
1289
1290 retcode = 0;
1291 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1292 if (sync_status != -ENOENT) {
1293 tn->log(10, SSTR("storing local metadata entry"));
1294 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1295 } else {
1296 tn->log(10, SSTR("removing local metadata entry"));
1297 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1298 }
1299 if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1300 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
1301 continue;
1302 }
1303 break;
1304 }
1305
1306 sync_status = retcode;
1307
1308 if (sync_status == 0 && marker_tracker) {
1309 /* update marker */
1310 yield call(marker_tracker->finish(entry_marker));
1311 sync_status = retcode;
1312 }
1313 if (sync_status < 0) {
1314 tn->log(10, SSTR("failed, status=" << sync_status));
1315 return set_cr_error(sync_status);
1316 }
1317 tn->log(10, "success");
1318 return set_cr_done();
1319 }
1320 return 0;
1321 }
1322
1323 class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1324 RGWMetaSyncEnv *sync_env;
1325 RGWMetadataLog *mdlog;
1326
1327 const std::string& period;
1328 int shard_id;
1329 string marker;
1330 bool truncated = false;
1331 string *new_marker;
1332
1333 int max_entries = CLONE_MAX_ENTRIES;
1334
1335 RGWRESTReadResource *http_op = nullptr;
1336 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1337
1338 RGWMetadataLogInfo shard_info;
1339 rgw_mdlog_shard_data data;
1340
1341 public:
1342 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1343 const std::string& period, int _id,
1344 const string& _marker, string *_new_marker)
1345 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1346 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1347 if (new_marker) {
1348 *new_marker = marker;
1349 }
1350 }
1351 ~RGWCloneMetaLogCoroutine() override {
1352 if (http_op) {
1353 http_op->put();
1354 }
1355 if (completion) {
1356 completion->cancel();
1357 }
1358 }
1359
1360 int operate() override;
1361
1362 int state_init();
1363 int state_read_shard_status();
1364 int state_read_shard_status_complete();
1365 int state_send_rest_request();
1366 int state_receive_rest_response();
1367 int state_store_mdlog_entries();
1368 int state_store_mdlog_entries_complete();
1369 };
1370
1371 class RGWMetaSyncShardCR : public RGWCoroutine {
1372 RGWMetaSyncEnv *sync_env;
1373
1374 const rgw_pool& pool;
1375 const std::string& period; //< currently syncing period id
1376 const epoch_t realm_epoch; //< realm_epoch of period
1377 RGWMetadataLog* mdlog; //< log of syncing period
1378 uint32_t shard_id;
1379 rgw_meta_sync_marker& sync_marker;
1380 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1381 string marker;
1382 string max_marker;
1383 const std::string& period_marker; //< max marker stored in next period
1384
1385 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
1386 std::set<std::string> entries;
1387 std::set<std::string>::iterator iter;
1388
1389 string oid;
1390
1391 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1392
1393 list<cls_log_entry> log_entries;
1394 list<cls_log_entry>::iterator log_iter;
1395 bool truncated = false;
1396
1397 string mdlog_marker;
1398 string raw_key;
1399 rgw_mdlog_entry mdlog_entry;
1400
1401 Mutex inc_lock;
1402 Cond inc_cond;
1403
1404 boost::asio::coroutine incremental_cr;
1405 boost::asio::coroutine full_cr;
1406
1407 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1408 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1409
1410 bool lost_lock = false;
1411
1412 bool *reset_backoff;
1413
1414 // hold a reference to the cr stack while it's in the map
1415 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1416 map<StackRef, string> stack_to_pos;
1417 map<string, string> pos_to_prev;
1418
1419 bool can_adjust_marker = false;
1420 bool done_with_period = false;
1421
1422 int total_entries = 0;
1423
1424 RGWSyncTraceNodeRef tn;
1425 public:
1426 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1427 const std::string& period, epoch_t realm_epoch,
1428 RGWMetadataLog* mdlog, uint32_t _shard_id,
1429 rgw_meta_sync_marker& _marker,
1430 const std::string& period_marker, bool *_reset_backoff,
1431 RGWSyncTraceNodeRef& _tn)
1432 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1433 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1434 shard_id(_shard_id), sync_marker(_marker),
1435 period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
1436 reset_backoff(_reset_backoff), tn(_tn) {
1437 *reset_backoff = false;
1438 }
1439
1440 ~RGWMetaSyncShardCR() override {
1441 delete marker_tracker;
1442 if (lease_cr) {
1443 lease_cr->abort();
1444 }
1445 }
1446
1447 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1448 delete marker_tracker;
1449 marker_tracker = mt;
1450 }
1451
1452 int operate() override {
1453 int r;
1454 while (true) {
1455 switch (sync_marker.state) {
1456 case rgw_meta_sync_marker::FullSync:
1457 r = full_sync();
1458 if (r < 0) {
1459 ldpp_dout(sync_env->dpp, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1460 return set_cr_error(r);
1461 }
1462 return 0;
1463 case rgw_meta_sync_marker::IncrementalSync:
1464 r = incremental_sync();
1465 if (r < 0) {
1466 ldpp_dout(sync_env->dpp, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1467 return set_cr_error(r);
1468 }
1469 return 0;
1470 }
1471 }
1472 /* unreachable */
1473 return 0;
1474 }
1475
1476 void collect_children()
1477 {
1478 int child_ret;
1479 RGWCoroutinesStack *child;
1480 while (collect_next(&child_ret, &child)) {
1481 auto iter = stack_to_pos.find(child);
1482 if (iter == stack_to_pos.end()) {
1483 /* some other stack that we don't care about */
1484 continue;
1485 }
1486
1487 string& pos = iter->second;
1488
1489 if (child_ret < 0) {
1490 ldpp_dout(sync_env->dpp, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
1491 }
1492
1493 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
1494 ceph_assert(prev_iter != pos_to_prev.end());
1495
1496 /*
1497 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1498 * update the marker and abort. We'll get called again for these. Permanent errors will be
1499 * handled by marking the entry at the error log shard, so that we retry on it separately
1500 */
1501 if (child_ret == -EAGAIN) {
1502 can_adjust_marker = false;
1503 }
1504
1505 if (pos_to_prev.size() == 1) {
1506 if (can_adjust_marker) {
1507 sync_marker.marker = pos;
1508 }
1509 pos_to_prev.erase(prev_iter);
1510 } else {
1511 ceph_assert(pos_to_prev.size() > 1);
1512 pos_to_prev.erase(prev_iter);
1513 prev_iter = pos_to_prev.begin();
1514 if (can_adjust_marker) {
1515 sync_marker.marker = prev_iter->second;
1516 }
1517 }
1518
1519 ldpp_dout(sync_env->dpp, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
1520 stack_to_pos.erase(iter);
1521 }
1522 }
1523
1524 int full_sync() {
1525 #define OMAP_GET_MAX_ENTRIES 100
1526 int max_entries = OMAP_GET_MAX_ENTRIES;
1527 reenter(&full_cr) {
1528 set_status("full_sync");
1529 tn->log(10, "start full sync");
1530 oid = full_sync_index_shard_oid(shard_id);
1531 can_adjust_marker = true;
1532 /* grab lock */
1533 yield {
1534 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1535 string lock_name = "sync_lock";
1536 RGWRados *store = sync_env->store;
1537 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1538 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1539 lock_name, lock_duration, this));
1540 lease_stack.reset(spawn(lease_cr.get(), false));
1541 lost_lock = false;
1542 }
1543 while (!lease_cr->is_locked()) {
1544 if (lease_cr->is_done()) {
1545 drain_all();
1546 tn->log(5, "failed to take lease");
1547 return lease_cr->get_ret_status();
1548 }
1549 set_sleeping(true);
1550 yield;
1551 }
1552 tn->log(10, "took lease");
1553
1554 /* lock succeeded, a retry now should avoid previous backoff status */
1555 *reset_backoff = true;
1556
1557 /* prepare marker tracker */
1558 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1559 sync_env->shard_obj_name(shard_id),
1560 sync_marker, tn));
1561
1562 marker = sync_marker.marker;
1563
1564 total_entries = sync_marker.pos;
1565
1566 /* sync! */
1567 do {
1568 if (!lease_cr->is_locked()) {
1569 tn->log(10, "lost lease");
1570 lost_lock = true;
1571 break;
1572 }
1573 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1574 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1575 marker, max_entries, omapkeys));
1576 if (retcode < 0) {
1577 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1578 tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
1579 yield lease_cr->go_down();
1580 drain_all();
1581 return retcode;
1582 }
1583 entries = std::move(omapkeys->entries);
1584 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1585 if (entries.size() > 0) {
1586 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1587 }
1588 iter = entries.begin();
1589 for (; iter != entries.end(); ++iter) {
1590 marker = *iter;
1591 tn->log(20, SSTR("full sync: " << marker));
1592 total_entries++;
1593 if (!marker_tracker->start(marker, total_entries, real_time())) {
1594 tn->log(0, SSTR("ERROR: cannot start syncing " << marker << ". Duplicate entry?"));
1595 } else {
1596 // fetch remote and write locally
1597 yield {
1598 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker, tn), false);
1599 // stack_to_pos holds a reference to the stack
1600 stack_to_pos[stack] = marker;
1601 pos_to_prev[marker] = marker;
1602 }
1603 }
1604 }
1605 collect_children();
1606 } while (omapkeys->more && can_adjust_marker);
1607
1608 tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1609
1610 while (num_spawned() > 1) {
1611 yield wait_for_child();
1612 collect_children();
1613 }
1614
1615 if (!lost_lock) {
1616 /* update marker to reflect we're done with full sync */
1617 if (can_adjust_marker) {
1618 // apply updates to a temporary marker, or operate() will send us
1619 // to incremental_sync() after we yield
1620 temp_marker = sync_marker;
1621 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1622 temp_marker->marker = std::move(temp_marker->next_step_marker);
1623 temp_marker->next_step_marker.clear();
1624 temp_marker->realm_epoch = realm_epoch;
1625 ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
1626
1627 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
1628 yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store->svc.sysobj,
1629 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1630 *temp_marker));
1631 }
1632
1633 if (retcode < 0) {
1634 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1635 yield lease_cr->go_down();
1636 drain_all();
1637 return retcode;
1638 }
1639 }
1640
1641 /*
1642 * if we reached here, it means that lost_lock is true, otherwise the state
1643 * change in the previous block will prevent us from reaching here
1644 */
1645
1646 yield lease_cr->go_down();
1647
1648 lease_cr.reset();
1649
1650 drain_all();
1651
1652 if (!can_adjust_marker) {
1653 return -EAGAIN;
1654 }
1655
1656 if (lost_lock) {
1657 return -EBUSY;
1658 }
1659
1660 tn->log(10, "full sync complete");
1661
1662 // apply the sync marker update
1663 ceph_assert(temp_marker);
1664 sync_marker = std::move(*temp_marker);
1665 temp_marker = boost::none;
1666 // must not yield after this point!
1667 }
1668 return 0;
1669 }
1670
1671
1672 int incremental_sync() {
1673 reenter(&incremental_cr) {
1674 set_status("incremental_sync");
1675 tn->log(10, "start incremental sync");
1676 can_adjust_marker = true;
1677 /* grab lock */
1678 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1679 yield {
1680 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1681 string lock_name = "sync_lock";
1682 RGWRados *store = sync_env->store;
1683 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1684 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1685 lock_name, lock_duration, this));
1686 lease_stack.reset(spawn(lease_cr.get(), false));
1687 lost_lock = false;
1688 }
1689 while (!lease_cr->is_locked()) {
1690 if (lease_cr->is_done()) {
1691 drain_all();
1692 tn->log(10, "failed to take lease");
1693 return lease_cr->get_ret_status();
1694 }
1695 set_sleeping(true);
1696 yield;
1697 }
1698 }
1699 tn->log(10, "took lease");
1700 // if the period has advanced, we can't use the existing marker
1701 if (sync_marker.realm_epoch < realm_epoch) {
1702 ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
1703 << " from old realm_epoch=" << sync_marker.realm_epoch
1704 << " (now " << realm_epoch << ')' << dendl;
1705 sync_marker.realm_epoch = realm_epoch;
1706 sync_marker.marker.clear();
1707 }
1708 mdlog_marker = sync_marker.marker;
1709 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1710 sync_env->shard_obj_name(shard_id),
1711 sync_marker, tn));
1712
1713 /*
1714 * mdlog_marker: the remote sync marker positiion
1715 * sync_marker: the local sync marker position
1716 * max_marker: the max mdlog position that we fetched
1717 * marker: the current position we try to sync
1718 * period_marker: the last marker before the next period begins (optional)
1719 */
1720 marker = max_marker = sync_marker.marker;
1721 /* inc sync */
1722 do {
1723 if (!lease_cr->is_locked()) {
1724 lost_lock = true;
1725 tn->log(10, "lost lease");
1726 break;
1727 }
1728 #define INCREMENTAL_MAX_ENTRIES 100
1729 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1730 if (!period_marker.empty() && period_marker <= mdlog_marker) {
1731 tn->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker << " period_marker=" << period_marker));
1732 done_with_period = true;
1733 break;
1734 }
1735 if (mdlog_marker <= max_marker) {
1736 /* we're at the tip, try to bring more entries */
1737 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
1738 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1739 period, shard_id,
1740 mdlog_marker, &mdlog_marker));
1741 }
1742 if (retcode < 0) {
1743 tn->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode));
1744 yield lease_cr->go_down();
1745 drain_all();
1746 *reset_backoff = false; // back off and try again later
1747 return retcode;
1748 }
1749 *reset_backoff = true; /* if we got to this point, all systems function */
1750 if (mdlog_marker > max_marker) {
1751 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1752 tn->log(20, SSTR("mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker));
1753 marker = max_marker;
1754 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1755 &max_marker, INCREMENTAL_MAX_ENTRIES,
1756 &log_entries, &truncated));
1757 if (retcode < 0) {
1758 tn->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode));
1759 yield lease_cr->go_down();
1760 drain_all();
1761 *reset_backoff = false; // back off and try again later
1762 return retcode;
1763 }
1764 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1765 if (!period_marker.empty() && period_marker <= log_iter->id) {
1766 done_with_period = true;
1767 if (period_marker < log_iter->id) {
1768 tn->log(10, SSTR("found key=" << log_iter->id
1769 << " past period_marker=" << period_marker));
1770 break;
1771 }
1772 ldpp_dout(sync_env->dpp, 10) << "found key at period_marker=" << period_marker << dendl;
1773 // sync this entry, then return control to RGWMetaSyncCR
1774 }
1775 if (!mdlog_entry.convert_from(*log_iter)) {
1776 tn->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry"));
1777 continue;
1778 }
1779 tn->log(20, SSTR("log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp));
1780 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
1781 ldpp_dout(sync_env->dpp, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
1782 } else {
1783 raw_key = log_iter->section + ":" + log_iter->name;
1784 yield {
1785 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker, tn), false);
1786 ceph_assert(stack);
1787 // stack_to_pos holds a reference to the stack
1788 stack_to_pos[stack] = log_iter->id;
1789 pos_to_prev[log_iter->id] = marker;
1790 }
1791 }
1792 marker = log_iter->id;
1793 }
1794 }
1795 collect_children();
1796 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1797 if (done_with_period) {
1798 // return control to RGWMetaSyncCR and advance to the next period
1799 tn->log(10, SSTR(*this << ": done with period"));
1800 break;
1801 }
1802 if (mdlog_marker == max_marker && can_adjust_marker) {
1803 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1804 #define INCREMENTAL_INTERVAL 20
1805 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1806 }
1807 } while (can_adjust_marker);
1808
1809 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1810
1811 while (num_spawned() > 1) {
1812 yield wait_for_child();
1813 collect_children();
1814 }
1815
1816 yield lease_cr->go_down();
1817
1818 drain_all();
1819
1820 if (lost_lock) {
1821 return -EBUSY;
1822 }
1823
1824 if (!can_adjust_marker) {
1825 return -EAGAIN;
1826 }
1827
1828 return set_cr_done();
1829 }
1830 /* TODO */
1831 return 0;
1832 }
1833 };
1834
1835 class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1836 {
1837 RGWMetaSyncEnv *sync_env;
1838
1839 const rgw_pool& pool;
1840 const std::string& period;
1841 epoch_t realm_epoch;
1842 RGWMetadataLog* mdlog;
1843 uint32_t shard_id;
1844 rgw_meta_sync_marker sync_marker;
1845 const std::string period_marker;
1846
1847 RGWSyncTraceNodeRef tn;
1848
1849 static constexpr bool exit_on_error = false; // retry on all errors
1850 public:
1851 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1852 const std::string& period, epoch_t realm_epoch,
1853 RGWMetadataLog* mdlog, uint32_t _shard_id,
1854 const rgw_meta_sync_marker& _marker,
1855 std::string&& period_marker,
1856 RGWSyncTraceNodeRef& _tn_parent)
1857 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1858 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1859 shard_id(_shard_id), sync_marker(_marker),
1860 period_marker(std::move(period_marker)) {
1861 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
1862 std::to_string(shard_id));
1863 }
1864
1865 RGWCoroutine *alloc_cr() override {
1866 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
1867 shard_id, sync_marker, period_marker, backoff_ptr(), tn);
1868 }
1869
1870 RGWCoroutine *alloc_finisher_cr() override {
1871 RGWRados *store = sync_env->store;
1872 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1873 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1874 &sync_marker);
1875 }
1876 };
1877
1878 class RGWMetaSyncCR : public RGWCoroutine {
1879 RGWMetaSyncEnv *sync_env;
1880 const rgw_pool& pool;
1881 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1882 RGWPeriodHistory::Cursor next; //< next period in history
1883 rgw_meta_sync_status sync_status;
1884 RGWSyncTraceNodeRef tn;
1885
1886 std::mutex mutex; //< protect access to shard_crs
1887
1888 // TODO: it should be enough to hold a reference on the stack only, as calling
1889 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1890 // already completed
1891 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1892 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1893 using RefPair = std::pair<ControlCRRef, StackRef>;
1894 map<int, RefPair> shard_crs;
1895 int ret{0};
1896
1897 public:
1898 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
1899 const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
1900 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1901 pool(sync_env->store->svc.zone->get_zone_params().log_pool),
1902 cursor(cursor), sync_status(_sync_status), tn(_tn) {}
1903
1904 ~RGWMetaSyncCR() {
1905 }
1906
1907 int operate() override {
1908 reenter(this) {
1909 // loop through one period at a time
1910 tn->log(1, "start");
1911 for (;;) {
1912 if (cursor == sync_env->store->period_history->get_current()) {
1913 next = RGWPeriodHistory::Cursor{};
1914 if (cursor) {
1915 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on current period="
1916 << cursor.get_period().get_id() << dendl;
1917 } else {
1918 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR with no period" << dendl;
1919 }
1920 } else {
1921 next = cursor;
1922 next.next();
1923 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on period="
1924 << cursor.get_period().get_id() << ", next="
1925 << next.get_period().get_id() << dendl;
1926 }
1927
1928 yield {
1929 // get the mdlog for the current period (may be empty)
1930 auto& period_id = sync_status.sync_info.period;
1931 auto realm_epoch = sync_status.sync_info.realm_epoch;
1932 auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
1933
1934 tn->log(1, SSTR("realm epoch=" << realm_epoch << " period id=" << period_id));
1935
1936 // prevent wakeup() from accessing shard_crs while we're spawning them
1937 std::lock_guard<std::mutex> lock(mutex);
1938
1939 // sync this period on each shard
1940 for (const auto& m : sync_status.sync_markers) {
1941 uint32_t shard_id = m.first;
1942 auto& marker = m.second;
1943
1944 std::string period_marker;
1945 if (next) {
1946 // read the maximum marker from the next period's sync status
1947 period_marker = next.get_period().get_sync_status()[shard_id];
1948 if (period_marker.empty()) {
1949 // no metadata changes have occurred on this shard, skip it
1950 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1951 << " with empty period marker" << dendl;
1952 continue;
1953 }
1954 }
1955
1956 using ShardCR = RGWMetaSyncShardControlCR;
1957 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1958 mdlog, shard_id, marker,
1959 std::move(period_marker), tn);
1960 auto stack = spawn(cr, false);
1961 shard_crs[shard_id] = RefPair{cr, stack};
1962 }
1963 }
1964 // wait for each shard to complete
1965 while (ret == 0 && num_spawned() > 0) {
1966 yield wait_for_child();
1967 collect(&ret, nullptr);
1968 }
1969 drain_all();
1970 {
1971 // drop shard cr refs under lock
1972 std::lock_guard<std::mutex> lock(mutex);
1973 shard_crs.clear();
1974 }
1975 if (ret < 0) {
1976 return set_cr_error(ret);
1977 }
1978 // advance to the next period
1979 ceph_assert(next);
1980 cursor = next;
1981
1982 // write the updated sync info
1983 sync_status.sync_info.period = cursor.get_period().get_id();
1984 sync_status.sync_info.realm_epoch = cursor.get_epoch();
1985 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
1986 sync_env->store->svc.sysobj,
1987 rgw_raw_obj(pool, sync_env->status_oid()),
1988 sync_status.sync_info));
1989 }
1990 }
1991 return 0;
1992 }
1993
1994 void wakeup(int shard_id) {
1995 std::lock_guard<std::mutex> lock(mutex);
1996 auto iter = shard_crs.find(shard_id);
1997 if (iter == shard_crs.end()) {
1998 return;
1999 }
2000 iter->second.first->wakeup();
2001 }
2002 };
2003
2004 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
2005 env->dpp = dpp;
2006 env->cct = store->ctx();
2007 env->store = store;
2008 env->conn = conn;
2009 env->async_rados = async_rados;
2010 env->http_manager = &http_manager;
2011 env->error_logger = error_logger;
2012 env->sync_tracer = store->get_sync_tracer();
2013 }
2014
2015 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
2016 {
2017 if (store->svc.zone->is_meta_master()) {
2018 return 0;
2019 }
2020 // cannot run concurrently with run_sync(), so run in a separate manager
2021 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2022 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
2023 int ret = http_manager.start();
2024 if (ret < 0) {
2025 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
2026 return ret;
2027 }
2028 RGWMetaSyncEnv sync_env_local = sync_env;
2029 sync_env_local.http_manager = &http_manager;
2030 tn->log(20, "read sync status");
2031 ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
2032 http_manager.stop();
2033 return ret;
2034 }
2035
2036 int RGWRemoteMetaLog::init_sync_status()
2037 {
2038 if (store->svc.zone->is_meta_master()) {
2039 return 0;
2040 }
2041
2042 rgw_mdlog_info mdlog_info;
2043 int r = read_log_info(&mdlog_info);
2044 if (r < 0) {
2045 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2046 return r;
2047 }
2048
2049 rgw_meta_sync_info sync_info;
2050 sync_info.num_shards = mdlog_info.num_shards;
2051 auto cursor = store->period_history->get_current();
2052 if (cursor) {
2053 sync_info.period = cursor.get_period().get_id();
2054 sync_info.realm_epoch = cursor.get_epoch();
2055 }
2056
2057 return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
2058 }
2059
2060 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
2061 {
2062 tn->log(20, "store sync info");
2063 return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store->svc.sysobj,
2064 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sync_env.status_oid()),
2065 sync_info));
2066 }
2067
2068 // return a cursor to the period at our sync position
2069 static RGWPeriodHistory::Cursor get_period_at(RGWRados* store,
2070 const rgw_meta_sync_info& info)
2071 {
2072 if (info.period.empty()) {
2073 // return an empty cursor with error=0
2074 return RGWPeriodHistory::Cursor{};
2075 }
2076
2077 // look for an existing period in our history
2078 auto cursor = store->period_history->lookup(info.realm_epoch);
2079 if (cursor) {
2080 // verify that the period ids match
2081 auto& existing = cursor.get_period().get_id();
2082 if (existing != info.period) {
2083 lderr(store->ctx()) << "ERROR: sync status period=" << info.period
2084 << " does not match period=" << existing
2085 << " in history at realm epoch=" << info.realm_epoch << dendl;
2086 return RGWPeriodHistory::Cursor{-EEXIST};
2087 }
2088 return cursor;
2089 }
2090
2091 // read the period from rados or pull it from the master
2092 RGWPeriod period;
2093 int r = store->period_puller->pull(info.period, period);
2094 if (r < 0) {
2095 lderr(store->ctx()) << "ERROR: failed to read period id "
2096 << info.period << ": " << cpp_strerror(r) << dendl;
2097 return RGWPeriodHistory::Cursor{r};
2098 }
2099 // attach the period to our history
2100 cursor = store->period_history->attach(std::move(period));
2101 if (!cursor) {
2102 r = cursor.get_error();
2103 lderr(store->ctx()) << "ERROR: failed to read period history back to "
2104 << info.period << ": " << cpp_strerror(r) << dendl;
2105 }
2106 return cursor;
2107 }
2108
2109 int RGWRemoteMetaLog::run_sync()
2110 {
2111 if (store->svc.zone->is_meta_master()) {
2112 return 0;
2113 }
2114
2115 int r = 0;
2116
2117 // get shard count and oldest log period from master
2118 rgw_mdlog_info mdlog_info;
2119 for (;;) {
2120 if (going_down) {
2121 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
2122 return 0;
2123 }
2124 r = read_log_info(&mdlog_info);
2125 if (r == -EIO || r == -ENOENT) {
2126 // keep retrying if master isn't alive or hasn't initialized the log
2127 ldpp_dout(dpp, 10) << __func__ << "(): waiting for master.." << dendl;
2128 backoff.backoff_sleep();
2129 continue;
2130 }
2131 backoff.reset();
2132 if (r < 0) {
2133 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2134 return r;
2135 }
2136 break;
2137 }
2138
2139 rgw_meta_sync_status sync_status;
2140 do {
2141 if (going_down) {
2142 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
2143 return 0;
2144 }
2145 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2146 if (r < 0 && r != -ENOENT) {
2147 ldpp_dout(dpp, 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2148 return r;
2149 }
2150
2151 if (!mdlog_info.period.empty()) {
2152 // restart sync if the remote has a period, but:
2153 // a) our status does not, or
2154 // b) our sync period comes before the remote's oldest log period
2155 if (sync_status.sync_info.period.empty() ||
2156 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2157 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
2158 string reason;
2159 if (sync_status.sync_info.period.empty()) {
2160 reason = "period is empty";
2161 } else {
2162 reason = SSTR("sync_info realm epoch is behind: " << sync_status.sync_info.realm_epoch << " < " << mdlog_info.realm_epoch);
2163 }
2164 tn->log(1, "initialize sync (reason: " + reason + ")");
2165 ldpp_dout(dpp, 1) << "epoch=" << sync_status.sync_info.realm_epoch
2166 << " in sync status comes before remote's oldest mdlog epoch="
2167 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2168 }
2169 }
2170
2171 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
2172 ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl;
2173 sync_status.sync_info.num_shards = mdlog_info.num_shards;
2174 auto cursor = store->period_history->get_current();
2175 if (cursor) {
2176 // run full sync, then start incremental from the current period/epoch
2177 sync_status.sync_info.period = cursor.get_period().get_id();
2178 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2179 }
2180 r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2181 if (r == -EBUSY) {
2182 backoff.backoff_sleep();
2183 continue;
2184 }
2185 backoff.reset();
2186 if (r < 0) {
2187 ldpp_dout(dpp, 0) << "ERROR: failed to init sync status r=" << r << dendl;
2188 return r;
2189 }
2190 }
2191 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2192
2193 auto num_shards = sync_status.sync_info.num_shards;
2194 if (num_shards != mdlog_info.num_shards) {
2195 lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2196 return -EINVAL;
2197 }
2198
2199 RGWPeriodHistory::Cursor cursor;
2200 do {
2201 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2202 if (r < 0 && r != -ENOENT) {
2203 tn->log(0, SSTR("ERROR: failed to fetch sync status r=" << r));
2204 return r;
2205 }
2206
2207 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2208 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
2209 tn->log(20, "building full sync maps");
2210 r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers, tn));
2211 if (r == -EBUSY || r == -EAGAIN) {
2212 backoff.backoff_sleep();
2213 continue;
2214 }
2215 backoff.reset();
2216 if (r < 0) {
2217 tn->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r << ")"));
2218 return r;
2219 }
2220
2221 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2222 r = store_sync_info(sync_status.sync_info);
2223 if (r < 0) {
2224 tn->log(0, SSTR("ERROR: failed to update sync status (r=" << r << ")"));
2225 return r;
2226 }
2227 /* fall through */
2228 case rgw_meta_sync_info::StateSync:
2229 tn->log(20, "sync");
2230 // find our position in the period history (if any)
2231 cursor = get_period_at(store, sync_status.sync_info);
2232 r = cursor.get_error();
2233 if (r < 0) {
2234 return r;
2235 }
2236 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
2237 r = run(meta_sync_cr);
2238 if (r < 0) {
2239 tn->log(0, "ERROR: failed to fetch all metadata keys");
2240 return r;
2241 }
2242 break;
2243 default:
2244 tn->log(0, "ERROR: bad sync state!");
2245 return -EIO;
2246 }
2247 } while (!going_down);
2248
2249 return 0;
2250 }
2251
2252 void RGWRemoteMetaLog::wakeup(int shard_id)
2253 {
2254 if (!meta_sync_cr) {
2255 return;
2256 }
2257 meta_sync_cr->wakeup(shard_id);
2258 }
2259
2260 int RGWCloneMetaLogCoroutine::operate()
2261 {
2262 reenter(this) {
2263 do {
2264 yield {
2265 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
2266 return state_init();
2267 }
2268 yield {
2269 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
2270 return state_read_shard_status();
2271 }
2272 yield {
2273 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
2274 return state_read_shard_status_complete();
2275 }
2276 yield {
2277 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
2278 return state_send_rest_request();
2279 }
2280 yield {
2281 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
2282 return state_receive_rest_response();
2283 }
2284 yield {
2285 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
2286 return state_store_mdlog_entries();
2287 }
2288 } while (truncated);
2289 yield {
2290 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
2291 return state_store_mdlog_entries_complete();
2292 }
2293 }
2294
2295 return 0;
2296 }
2297
2298 int RGWCloneMetaLogCoroutine::state_init()
2299 {
2300 data = rgw_mdlog_shard_data();
2301
2302 return 0;
2303 }
2304
2305 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2306 {
2307 const bool add_ref = false; // default constructs with refs=1
2308
2309 completion.reset(new RGWMetadataLogInfoCompletion(
2310 [this](int ret, const cls_log_header& header) {
2311 if (ret < 0) {
2312 if (ret != -ENOENT) {
2313 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to read mdlog info with "
2314 << cpp_strerror(ret) << dendl;
2315 }
2316 } else {
2317 shard_info.marker = header.max_marker;
2318 shard_info.last_update = header.max_time.to_real_time();
2319 }
2320 // wake up parent stack
2321 io_complete();
2322 }), add_ref);
2323
2324 int ret = mdlog->get_info_async(shard_id, completion.get());
2325 if (ret < 0) {
2326 ldpp_dout(sync_env->dpp, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
2327 return set_cr_error(ret);
2328 }
2329
2330 return io_block(0);
2331 }
2332
2333 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2334 {
2335 completion.reset();
2336
2337 ldpp_dout(sync_env->dpp, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
2338
2339 marker = shard_info.marker;
2340
2341 return 0;
2342 }
2343
2344 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2345 {
2346 RGWRESTConn *conn = sync_env->conn;
2347
2348 char buf[32];
2349 snprintf(buf, sizeof(buf), "%d", shard_id);
2350
2351 char max_entries_buf[32];
2352 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2353
2354 const char *marker_key = (marker.empty() ? "" : "marker");
2355
2356 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2357 { "id", buf },
2358 { "period", period.c_str() },
2359 { "max-entries", max_entries_buf },
2360 { marker_key, marker.c_str() },
2361 { NULL, NULL } };
2362
2363 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2364
2365 init_new_io(http_op);
2366
2367 int ret = http_op->aio_read();
2368 if (ret < 0) {
2369 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
2370 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2371 http_op->put();
2372 http_op = NULL;
2373 return set_cr_error(ret);
2374 }
2375
2376 return io_block(0);
2377 }
2378
2379 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2380 {
2381 int ret = http_op->wait(&data);
2382 if (ret < 0) {
2383 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
2384 ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;
2385 http_op->put();
2386 http_op = NULL;
2387 return set_cr_error(ret);
2388 }
2389 http_op->put();
2390 http_op = NULL;
2391
2392 ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
2393
2394 truncated = ((int)data.entries.size() == max_entries);
2395
2396 if (data.entries.empty()) {
2397 if (new_marker) {
2398 *new_marker = marker;
2399 }
2400 return set_cr_done();
2401 }
2402
2403 if (new_marker) {
2404 *new_marker = data.entries.back().id;
2405 }
2406
2407 return 0;
2408 }
2409
2410
2411 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2412 {
2413 list<cls_log_entry> dest_entries;
2414
2415 vector<rgw_mdlog_entry>::iterator iter;
2416 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2417 rgw_mdlog_entry& entry = *iter;
2418 ldpp_dout(sync_env->dpp, 20) << "entry: name=" << entry.name << dendl;
2419
2420 cls_log_entry dest_entry;
2421 dest_entry.id = entry.id;
2422 dest_entry.section = entry.section;
2423 dest_entry.name = entry.name;
2424 dest_entry.timestamp = utime_t(entry.timestamp);
2425
2426 encode(entry.log_data, dest_entry.data);
2427
2428 dest_entries.push_back(dest_entry);
2429
2430 marker = entry.id;
2431 }
2432
2433 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2434
2435 int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
2436 if (ret < 0) {
2437 cn->put();
2438 ldpp_dout(sync_env->dpp, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
2439 return set_cr_error(ret);
2440 }
2441 return io_block(0);
2442 }
2443
2444 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2445 {
2446 return set_cr_done();
2447 }
2448
2449
2450 // TODO: move into rgw_sync_trim.cc
2451 #undef dout_prefix
2452 #define dout_prefix (*_dout << "meta trim: ")
2453
2454 /// purge all log shards for the given mdlog
2455 class PurgeLogShardsCR : public RGWShardCollectCR {
2456 RGWRados *const store;
2457 const RGWMetadataLog* mdlog;
2458 const int num_shards;
2459 rgw_raw_obj obj;
2460 int i{0};
2461
2462 static constexpr int max_concurrent = 16;
2463
2464 public:
2465 PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
2466 const rgw_pool& pool, int num_shards)
2467 : RGWShardCollectCR(store->ctx(), max_concurrent),
2468 store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
2469 {}
2470
2471 bool spawn_next() override {
2472 if (i == num_shards) {
2473 return false;
2474 }
2475 mdlog->get_shard_oid(i++, obj.oid);
2476 spawn(new RGWRadosRemoveCR(store, obj), false);
2477 return true;
2478 }
2479 };
2480
2481 using Cursor = RGWPeriodHistory::Cursor;
2482
2483 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
2484 class PurgePeriodLogsCR : public RGWCoroutine {
2485 RGWRados *const store;
2486 RGWMetadataManager *const metadata;
2487 RGWObjVersionTracker objv;
2488 Cursor cursor;
2489 epoch_t realm_epoch;
2490 epoch_t *last_trim_epoch; //< update last trim on success
2491
2492 public:
2493 PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
2494 : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
2495 realm_epoch(realm_epoch), last_trim_epoch(last_trim)
2496 {}
2497
2498 int operate() override;
2499 };
2500
2501 int PurgePeriodLogsCR::operate()
2502 {
2503 reenter(this) {
2504 // read our current oldest log period
2505 yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
2506 if (retcode < 0) {
2507 return set_cr_error(retcode);
2508 }
2509 ceph_assert(cursor);
2510 ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
2511 << " period=" << cursor.get_period().get_id() << dendl;
2512
2513 // trim -up to- the given realm_epoch
2514 while (cursor.get_epoch() < realm_epoch) {
2515 ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
2516 << " period=" << cursor.get_period().get_id() << dendl;
2517 yield {
2518 const auto mdlog = metadata->get_log(cursor.get_period().get_id());
2519 const auto& pool = store->svc.zone->get_zone_params().log_pool;
2520 auto num_shards = cct->_conf->rgw_md_log_max_shards;
2521 call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
2522 }
2523 if (retcode < 0) {
2524 ldout(cct, 1) << "failed to remove log shards: "
2525 << cpp_strerror(retcode) << dendl;
2526 return set_cr_error(retcode);
2527 }
2528 ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
2529 << " period=" << cursor.get_period().get_id() << dendl;
2530
2531 // update our mdlog history
2532 yield call(metadata->trim_log_period_cr(cursor, &objv));
2533 if (retcode == -ENOENT) {
2534 // must have raced to update mdlog history. return success and allow the
2535 // winner to continue purging
2536 ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
2537 << " period=" << cursor.get_period().get_id() << dendl;
2538 return set_cr_done();
2539 } else if (retcode < 0) {
2540 ldout(cct, 1) << "failed to remove log shards for realm_epoch="
2541 << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
2542 << " with: " << cpp_strerror(retcode) << dendl;
2543 return set_cr_error(retcode);
2544 }
2545
2546 if (*last_trim_epoch < cursor.get_epoch()) {
2547 *last_trim_epoch = cursor.get_epoch();
2548 }
2549
2550 ceph_assert(cursor.has_next()); // get_current() should always come after
2551 cursor.next();
2552 }
2553 return set_cr_done();
2554 }
2555 return 0;
2556 }
2557
2558 namespace {
2559
2560 using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
2561
2562 /// construct a RGWRESTConn for each zone in the realm
2563 template <typename Zonegroups>
2564 connection_map make_peer_connections(RGWRados *store,
2565 const Zonegroups& zonegroups)
2566 {
2567 connection_map connections;
2568 for (auto& g : zonegroups) {
2569 for (auto& z : g.second.zones) {
2570 std::unique_ptr<RGWRESTConn> conn{
2571 new RGWRESTConn(store->ctx(), store->svc.zone, z.first, z.second.endpoints)};
2572 connections.emplace(z.first, std::move(conn));
2573 }
2574 }
2575 return connections;
2576 }
2577
2578 /// return the marker that it's safe to trim up to
2579 const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
2580 {
2581 return m.state == m.FullSync ? m.next_step_marker : m.marker;
2582 }
2583
2584 /// comparison operator for take_min_status()
2585 bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
2586 {
2587 // sort by stable marker
2588 return get_stable_marker(lhs) < get_stable_marker(rhs);
2589 }
2590
2591 /// populate the status with the minimum stable marker of each shard for any
2592 /// peer whose realm_epoch matches the minimum realm_epoch in the input
2593 template <typename Iter>
2594 int take_min_status(CephContext *cct, Iter first, Iter last,
2595 rgw_meta_sync_status *status)
2596 {
2597 if (first == last) {
2598 return -EINVAL;
2599 }
2600 const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
2601
2602 status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
2603 for (auto p = first; p != last; ++p) {
2604 // validate peer's shard count
2605 if (p->sync_markers.size() != num_shards) {
2606 ldout(cct, 1) << "take_min_status got peer status with "
2607 << p->sync_markers.size() << " shards, expected "
2608 << num_shards << dendl;
2609 return -EINVAL;
2610 }
2611 if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
2612 // earlier epoch, take its entire status
2613 *status = std::move(*p);
2614 } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
2615 // same epoch, take any earlier markers
2616 auto m = status->sync_markers.begin();
2617 for (auto& shard : p->sync_markers) {
2618 if (shard.second < m->second) {
2619 m->second = std::move(shard.second);
2620 }
2621 ++m;
2622 }
2623 }
2624 }
2625 return 0;
2626 }
2627
2628 struct TrimEnv {
2629 const DoutPrefixProvider *dpp;
2630 RGWRados *const store;
2631 RGWHTTPManager *const http;
2632 int num_shards;
2633 const std::string& zone;
2634 Cursor current; //< cursor to current period
2635 epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
2636
2637 TrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
2638 : dpp(dpp), store(store), http(http), num_shards(num_shards),
2639 zone(store->svc.zone->get_zone_params().get_id()),
2640 current(store->period_history->get_current())
2641 {}
2642 };
2643
2644 struct MasterTrimEnv : public TrimEnv {
2645 connection_map connections; //< peer connections
2646 std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
2647 /// last trim marker for each shard, only applies to current period's mdlog
2648 std::vector<std::string> last_trim_markers;
2649
2650 MasterTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
2651 : TrimEnv(dpp, store, http, num_shards),
2652 last_trim_markers(num_shards)
2653 {
2654 auto& period = current.get_period();
2655 connections = make_peer_connections(store, period.get_map().zonegroups);
2656 connections.erase(zone);
2657 peer_status.resize(connections.size());
2658 }
2659 };
2660
2661 struct PeerTrimEnv : public TrimEnv {
2662 /// last trim timestamp for each shard, only applies to current period's mdlog
2663 std::vector<ceph::real_time> last_trim_timestamps;
2664
2665 PeerTrimEnv(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
2666 : TrimEnv(dpp, store, http, num_shards),
2667 last_trim_timestamps(num_shards)
2668 {}
2669
2670 void set_num_shards(int num_shards) {
2671 this->num_shards = num_shards;
2672 last_trim_timestamps.resize(num_shards);
2673 }
2674 };
2675
2676 } // anonymous namespace
2677
2678
2679 /// spawn a trim cr for each shard that needs it, while limiting the number
2680 /// of concurrent shards
2681 class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
2682 private:
2683 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2684
2685 MasterTrimEnv& env;
2686 RGWMetadataLog *mdlog;
2687 int shard_id{0};
2688 std::string oid;
2689 const rgw_meta_sync_status& sync_status;
2690
2691 public:
2692 MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
2693 const rgw_meta_sync_status& sync_status)
2694 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2695 env(env), mdlog(mdlog), sync_status(sync_status)
2696 {}
2697
2698 bool spawn_next() override;
2699 };
2700
2701 bool MetaMasterTrimShardCollectCR::spawn_next()
2702 {
2703 while (shard_id < env.num_shards) {
2704 auto m = sync_status.sync_markers.find(shard_id);
2705 if (m == sync_status.sync_markers.end()) {
2706 shard_id++;
2707 continue;
2708 }
2709 auto& stable = get_stable_marker(m->second);
2710 auto& last_trim = env.last_trim_markers[shard_id];
2711
2712 if (stable <= last_trim) {
2713 // already trimmed
2714 ldout(cct, 20) << "skipping log shard " << shard_id
2715 << " at marker=" << stable
2716 << " last_trim=" << last_trim
2717 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2718 shard_id++;
2719 continue;
2720 }
2721
2722 mdlog->get_shard_oid(shard_id, oid);
2723
2724 ldout(cct, 10) << "trimming log shard " << shard_id
2725 << " at marker=" << stable
2726 << " last_trim=" << last_trim
2727 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2728 spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
2729 shard_id++;
2730 return true;
2731 }
2732 return false;
2733 }
2734
2735 /// spawn rest requests to read each peer's sync status
2736 class MetaMasterStatusCollectCR : public RGWShardCollectCR {
2737 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2738
2739 MasterTrimEnv& env;
2740 connection_map::iterator c;
2741 std::vector<rgw_meta_sync_status>::iterator s;
2742 public:
2743 explicit MetaMasterStatusCollectCR(MasterTrimEnv& env)
2744 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2745 env(env), c(env.connections.begin()), s(env.peer_status.begin())
2746 {}
2747
2748 bool spawn_next() override {
2749 if (c == env.connections.end()) {
2750 return false;
2751 }
2752 static rgw_http_param_pair params[] = {
2753 { "type", "metadata" },
2754 { "status", nullptr },
2755 { nullptr, nullptr }
2756 };
2757
2758 ldout(cct, 20) << "query sync status from " << c->first << dendl;
2759 auto conn = c->second.get();
2760 using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
2761 spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
2762 false);
2763 ++c;
2764 ++s;
2765 return true;
2766 }
2767 };
2768
2769 class MetaMasterTrimCR : public RGWCoroutine {
2770 MasterTrimEnv& env;
2771 rgw_meta_sync_status min_status; //< minimum sync status of all peers
2772 int ret{0};
2773
2774 public:
2775 explicit MetaMasterTrimCR(MasterTrimEnv& env)
2776 : RGWCoroutine(env.store->ctx()), env(env)
2777 {}
2778
2779 int operate() override;
2780 };
2781
2782 int MetaMasterTrimCR::operate()
2783 {
2784 reenter(this) {
2785 // TODO: detect this and fail before we spawn the trim thread?
2786 if (env.connections.empty()) {
2787 ldout(cct, 4) << "no peers, exiting" << dendl;
2788 return set_cr_done();
2789 }
2790
2791 ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
2792 // query mdlog sync status from peers
2793 yield call(new MetaMasterStatusCollectCR(env));
2794
2795 // must get a successful reply from all peers to consider trimming
2796 if (ret < 0) {
2797 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
2798 return set_cr_error(ret);
2799 }
2800
2801 // determine the minimum epoch and markers
2802 ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
2803 env.peer_status.end(), &min_status);
2804 if (ret < 0) {
2805 ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
2806 return set_cr_error(ret);
2807 }
2808 yield {
2809 auto store = env.store;
2810 auto epoch = min_status.sync_info.realm_epoch;
2811 ldout(cct, 4) << "realm epoch min=" << epoch
2812 << " current=" << env.current.get_epoch()<< dendl;
2813 if (epoch > env.last_trim_epoch + 1) {
2814 // delete any prior mdlog periods
2815 spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
2816 } else {
2817 ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
2818 << env.last_trim_epoch << dendl;
2819 }
2820
2821 // if realm_epoch == current, trim mdlog based on markers
2822 if (epoch == env.current.get_epoch()) {
2823 auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
2824 spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
2825 }
2826 }
2827 // ignore any errors during purge/trim because we want to hold the lock open
2828 return set_cr_done();
2829 }
2830 return 0;
2831 }
2832
2833
2834 /// read the first entry of the master's mdlog shard and trim to that position
2835 class MetaPeerTrimShardCR : public RGWCoroutine {
2836 RGWMetaSyncEnv& env;
2837 RGWMetadataLog *mdlog;
2838 const std::string& period_id;
2839 const int shard_id;
2840 RGWMetadataLogInfo info;
2841 ceph::real_time stable; //< safe timestamp to trim, according to master
2842 ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
2843 rgw_mdlog_shard_data result; //< result from master's mdlog listing
2844
2845 public:
2846 MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
2847 const std::string& period_id, int shard_id,
2848 ceph::real_time *last_trim)
2849 : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
2850 period_id(period_id), shard_id(shard_id), last_trim(last_trim)
2851 {}
2852
2853 int operate() override;
2854 };
2855
2856 int MetaPeerTrimShardCR::operate()
2857 {
2858 reenter(this) {
2859 // query master's first mdlog entry for this shard
2860 yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2861 "", 1, &result));
2862 if (retcode < 0) {
2863 ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
2864 << shard_id << " for period " << period_id
2865 << ": " << cpp_strerror(retcode) << dendl;
2866 return set_cr_error(retcode);
2867 }
2868 if (result.entries.empty()) {
2869 // if there are no mdlog entries, we don't have a timestamp to compare. we
2870 // can't just trim everything, because there could be racing updates since
2871 // this empty reply. query the mdlog shard info to read its max timestamp,
2872 // then retry the listing to make sure it's still empty before trimming to
2873 // that
2874 ldpp_dout(env.dpp, 10) << "empty master mdlog shard " << shard_id
2875 << ", reading last timestamp from shard info" << dendl;
2876 // read the mdlog shard info for the last timestamp
2877 using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR;
2878 yield call(new ShardInfoCR(&env, period_id, shard_id, &info));
2879 if (retcode < 0) {
2880 ldpp_dout(env.dpp, 5) << "failed to read info from master's mdlog shard "
2881 << shard_id << " for period " << period_id
2882 << ": " << cpp_strerror(retcode) << dendl;
2883 return set_cr_error(retcode);
2884 }
2885 if (ceph::real_clock::is_zero(info.last_update)) {
2886 return set_cr_done(); // nothing to trim
2887 }
2888 ldpp_dout(env.dpp, 10) << "got mdlog shard info with last update="
2889 << info.last_update << dendl;
2890 // re-read the master's first mdlog entry to make sure it hasn't changed
2891 yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2892 "", 1, &result));
2893 if (retcode < 0) {
2894 ldpp_dout(env.dpp, 5) << "failed to read first entry from master's mdlog shard "
2895 << shard_id << " for period " << period_id
2896 << ": " << cpp_strerror(retcode) << dendl;
2897 return set_cr_error(retcode);
2898 }
2899 // if the mdlog is still empty, trim to max marker
2900 if (result.entries.empty()) {
2901 stable = info.last_update;
2902 } else {
2903 stable = result.entries.front().timestamp;
2904
2905 // can only trim -up to- master's first timestamp, so subtract a second.
2906 // (this is why we use timestamps instead of markers for the peers)
2907 stable -= std::chrono::seconds(1);
2908 }
2909 } else {
2910 stable = result.entries.front().timestamp;
2911 stable -= std::chrono::seconds(1);
2912 }
2913
2914 if (stable <= *last_trim) {
2915 ldpp_dout(env.dpp, 10) << "skipping log shard " << shard_id
2916 << " at timestamp=" << stable
2917 << " last_trim=" << *last_trim << dendl;
2918 return set_cr_done();
2919 }
2920
2921 ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id
2922 << " at timestamp=" << stable
2923 << " last_trim=" << *last_trim << dendl;
2924 yield {
2925 std::string oid;
2926 mdlog->get_shard_oid(shard_id, oid);
2927 call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
2928 }
2929 if (retcode < 0 && retcode != -ENODATA) {
2930 ldpp_dout(env.dpp, 1) << "failed to trim mdlog shard " << shard_id
2931 << ": " << cpp_strerror(retcode) << dendl;
2932 return set_cr_error(retcode);
2933 }
2934 *last_trim = stable;
2935 return set_cr_done();
2936 }
2937 return 0;
2938 }
2939
2940 class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
2941 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2942
2943 PeerTrimEnv& env;
2944 RGWMetadataLog *mdlog;
2945 const std::string& period_id;
2946 RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
2947 int shard_id{0};
2948
2949 public:
2950 MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
2951 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2952 env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
2953 {
2954 meta_env.init(env.dpp, cct, env.store, env.store->svc.zone->get_master_conn(),
2955 env.store->get_async_rados(), env.http, nullptr,
2956 env.store->get_sync_tracer());
2957 }
2958
2959 bool spawn_next() override;
2960 };
2961
2962 bool MetaPeerTrimShardCollectCR::spawn_next()
2963 {
2964 if (shard_id >= env.num_shards) {
2965 return false;
2966 }
2967 auto& last_trim = env.last_trim_timestamps[shard_id];
2968 spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
2969 false);
2970 shard_id++;
2971 return true;
2972 }
2973
2974 class MetaPeerTrimCR : public RGWCoroutine {
2975 PeerTrimEnv& env;
2976 rgw_mdlog_info mdlog_info; //< master's mdlog info
2977
2978 public:
2979 explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
2980
2981 int operate() override;
2982 };
2983
2984 int MetaPeerTrimCR::operate()
2985 {
2986 reenter(this) {
2987 ldout(cct, 10) << "fetching master mdlog info" << dendl;
2988 yield {
2989 // query mdlog_info from master for oldest_log_period
2990 rgw_http_param_pair params[] = {
2991 { "type", "metadata" },
2992 { nullptr, nullptr }
2993 };
2994
2995 using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
2996 call(new LogInfoCR(cct, env.store->svc.zone->get_master_conn(), env.http,
2997 "/admin/log/", params, &mdlog_info));
2998 }
2999 if (retcode < 0) {
3000 ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
3001 return set_cr_error(retcode);
3002 }
3003 // use master's shard count instead
3004 env.set_num_shards(mdlog_info.num_shards);
3005
3006 if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
3007 // delete any prior mdlog periods
3008 yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
3009 &env.last_trim_epoch));
3010 } else {
3011 ldout(cct, 10) << "mdlogs already purged through realm_epoch "
3012 << env.last_trim_epoch << dendl;
3013 }
3014
3015 // if realm_epoch == current, trim mdlog based on master's markers
3016 if (mdlog_info.realm_epoch == env.current.get_epoch()) {
3017 yield {
3018 auto meta_mgr = env.store->meta_mgr;
3019 auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
3020 call(new MetaPeerTrimShardCollectCR(env, mdlog));
3021 // ignore any errors during purge/trim because we want to hold the lock open
3022 }
3023 }
3024 return set_cr_done();
3025 }
3026 return 0;
3027 }
3028
3029 class MetaTrimPollCR : public RGWCoroutine {
3030 RGWRados *const store;
3031 const utime_t interval; //< polling interval
3032 const rgw_raw_obj obj;
3033 const std::string name{"meta_trim"}; //< lock name
3034 const std::string cookie;
3035
3036 protected:
3037 /// allocate the coroutine to run within the lease
3038 virtual RGWCoroutine* alloc_cr() = 0;
3039
3040 public:
3041 MetaTrimPollCR(RGWRados *store, utime_t interval)
3042 : RGWCoroutine(store->ctx()), store(store), interval(interval),
3043 obj(store->svc.zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
3044 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
3045 {}
3046
3047 int operate() override;
3048 };
3049
3050 int MetaTrimPollCR::operate()
3051 {
3052 reenter(this) {
3053 for (;;) {
3054 set_status("sleeping");
3055 wait(interval);
3056
3057 // prevent others from trimming for our entire wait interval
3058 set_status("acquiring trim lock");
3059 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3060 obj, name, cookie, interval.sec()));
3061 if (retcode < 0) {
3062 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
3063 continue;
3064 }
3065
3066 set_status("trimming");
3067 yield call(alloc_cr());
3068
3069 if (retcode < 0) {
3070 // on errors, unlock so other gateways can try
3071 set_status("unlocking");
3072 yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
3073 obj, name, cookie));
3074 }
3075 }
3076 }
3077 return 0;
3078 }
3079
3080 class MetaMasterTrimPollCR : public MetaTrimPollCR {
3081 MasterTrimEnv env; //< trim state to share between calls
3082 RGWCoroutine* alloc_cr() override {
3083 return new MetaMasterTrimCR(env);
3084 }
3085 public:
3086 MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
3087 int num_shards, utime_t interval)
3088 : MetaTrimPollCR(store, interval),
3089 env(dpp, store, http, num_shards)
3090 {}
3091 };
3092
3093 class MetaPeerTrimPollCR : public MetaTrimPollCR {
3094 PeerTrimEnv env; //< trim state to share between calls
3095 RGWCoroutine* alloc_cr() override {
3096 return new MetaPeerTrimCR(env);
3097 }
3098 public:
3099 MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
3100 int num_shards, utime_t interval)
3101 : MetaTrimPollCR(store, interval),
3102 env(dpp, store, http, num_shards)
3103 {}
3104 };
3105
3106 RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
3107 int num_shards, utime_t interval)
3108 {
3109 if (store->svc.zone->is_meta_master()) {
3110 return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval);
3111 }
3112 return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval);
3113 }
3114
3115
3116 struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
3117 MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
3118 : MasterTrimEnv(dpp, store, http, num_shards),
3119 MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
3120 {}
3121 };
3122
3123 struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
3124 MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http, int num_shards)
3125 : PeerTrimEnv(dpp, store, http, num_shards),
3126 MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
3127 {}
3128 };
3129
3130 RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store,
3131 RGWHTTPManager *http,
3132 int num_shards)
3133 {
3134 if (store->svc.zone->is_meta_master()) {
3135 return new MetaMasterAdminTrimCR(dpp, store, http, num_shards);
3136 }
3137 return new MetaPeerAdminTrimCR(dpp, store, http, num_shards);
3138 }