]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync.cc
a934a2e28a50d52a0dac8a5688ba3171749677ed
[ceph.git] / ceph / src / rgw / rgw_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <boost/optional.hpp>
5
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
13
14 #include "rgw_common.h"
15 #include "rgw_zone.h"
16 #include "rgw_sync.h"
17 #include "rgw_metadata.h"
18 #include "rgw_mdlog_types.h"
19 #include "rgw_rest_conn.h"
20 #include "rgw_tools.h"
21 #include "rgw_cr_rados.h"
22 #include "rgw_cr_rest.h"
23 #include "rgw_http_client.h"
24 #include "rgw_sync_trace.h"
25
26 #include "cls/lock/cls_lock_client.h"
27
28 #include "services/svc_zone.h"
29 #include "services/svc_mdlog.h"
30 #include "services/svc_meta.h"
31 #include "services/svc_cls.h"
32
33 #include <boost/asio/yield.hpp>
34
35 #define dout_subsys ceph_subsys_rgw
36
37 #undef dout_prefix
38 #define dout_prefix (*_dout << "meta sync: ")
39
40 static string mdlog_sync_status_oid = "mdlog.sync-status";
41 static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
42 static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
43
44 RGWSyncErrorLogger::RGWSyncErrorLogger(rgw::sal::RGWRadosStore *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
45 for (int i = 0; i < num_shards; i++) {
46 oids.push_back(get_shard_oid(oid_prefix, i));
47 }
48 }
49 string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
50 char buf[oid_prefix.size() + 16];
51 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
52 return string(buf);
53 }
54
55 RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
56 cls_log_entry entry;
57
58 rgw_sync_error_info info(source_zone, error_code, message);
59 bufferlist bl;
60 encode(info, bl);
61 store->svc()->cls->timelog.prepare_entry(entry, real_clock::now(), section, name, bl);
62
63 uint32_t shard_id = ++counter % num_shards;
64
65
66 return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
67 }
68
69 void RGWSyncBackoff::update_wait_time()
70 {
71 if (cur_wait == 0) {
72 cur_wait = 1;
73 } else {
74 cur_wait = (cur_wait << 1);
75 }
76 if (cur_wait >= max_secs) {
77 cur_wait = max_secs;
78 }
79 }
80
81 void RGWSyncBackoff::backoff_sleep()
82 {
83 update_wait_time();
84 sleep(cur_wait);
85 }
86
87 void RGWSyncBackoff::backoff(RGWCoroutine *op)
88 {
89 update_wait_time();
90 op->wait(utime_t(cur_wait, 0));
91 }
92
93 int RGWBackoffControlCR::operate() {
94 reenter(this) {
95 // retry the operation until it succeeds
96 while (true) {
97 yield {
98 std::lock_guard l{lock};
99 cr = alloc_cr();
100 cr->get();
101 call(cr);
102 }
103 {
104 std::lock_guard l{lock};
105 cr->put();
106 cr = NULL;
107 }
108 if (retcode >= 0) {
109 break;
110 }
111 if (retcode != -EBUSY && retcode != -EAGAIN) {
112 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
113 if (exit_on_error) {
114 return set_cr_error(retcode);
115 }
116 }
117 if (reset_backoff) {
118 backoff.reset();
119 }
120 yield backoff.backoff(this);
121 }
122
123 // run an optional finisher
124 yield call(alloc_finisher_cr());
125 if (retcode < 0) {
126 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
127 return set_cr_error(retcode);
128 }
129 return set_cr_done();
130 }
131 return 0;
132 }
133
134 void rgw_mdlog_info::decode_json(JSONObj *obj) {
135 JSONDecoder::decode_json("num_objects", num_shards, obj);
136 JSONDecoder::decode_json("period", period, obj);
137 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
138 }
139
140 void rgw_mdlog_entry::decode_json(JSONObj *obj) {
141 JSONDecoder::decode_json("id", id, obj);
142 JSONDecoder::decode_json("section", section, obj);
143 JSONDecoder::decode_json("name", name, obj);
144 utime_t ut;
145 JSONDecoder::decode_json("timestamp", ut, obj);
146 timestamp = ut.to_real_time();
147 JSONDecoder::decode_json("data", log_data, obj);
148 }
149
150 void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
151 JSONDecoder::decode_json("marker", marker, obj);
152 JSONDecoder::decode_json("truncated", truncated, obj);
153 JSONDecoder::decode_json("entries", entries, obj);
154 };
155
156 int RGWShardCollectCR::operate() {
157 reenter(this) {
158 while (spawn_next()) {
159 current_running++;
160
161 while (current_running >= max_concurrent) {
162 int child_ret;
163 yield wait_for_child();
164 if (collect_next(&child_ret)) {
165 current_running--;
166 if (child_ret < 0 && child_ret != -ENOENT) {
167 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
168 status = child_ret;
169 }
170 }
171 }
172 }
173 while (current_running > 0) {
174 int child_ret;
175 yield wait_for_child();
176 if (collect_next(&child_ret)) {
177 current_running--;
178 if (child_ret < 0 && child_ret != -ENOENT) {
179 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
180 status = child_ret;
181 }
182 }
183 }
184 if (status < 0) {
185 return set_cr_error(status);
186 }
187 return set_cr_done();
188 }
189 return 0;
190 }
191
192 class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
193 RGWMetaSyncEnv *sync_env;
194
195 const std::string& period;
196 int num_shards;
197 map<int, RGWMetadataLogInfo> *mdlog_info;
198
199 int shard_id;
200 #define READ_MDLOG_MAX_CONCURRENT 10
201
202 public:
203 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
204 const std::string& period, int _num_shards,
205 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
206 sync_env(_sync_env),
207 period(period), num_shards(_num_shards),
208 mdlog_info(_mdlog_info), shard_id(0) {}
209 bool spawn_next() override;
210 };
211
212 class RGWListRemoteMDLogCR : public RGWShardCollectCR {
213 RGWMetaSyncEnv *sync_env;
214
215 const std::string& period;
216 map<int, string> shards;
217 int max_entries_per_shard;
218 map<int, rgw_mdlog_shard_data> *result;
219
220 map<int, string>::iterator iter;
221 #define READ_MDLOG_MAX_CONCURRENT 10
222
223 public:
224 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
225 const std::string& period, map<int, string>& _shards,
226 int _max_entries_per_shard,
227 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
228 sync_env(_sync_env), period(period),
229 max_entries_per_shard(_max_entries_per_shard),
230 result(_result) {
231 shards.swap(_shards);
232 iter = shards.begin();
233 }
234 bool spawn_next() override;
235 };
236
237 RGWRemoteMetaLog::~RGWRemoteMetaLog()
238 {
239 delete error_logger;
240 }
241
242 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
243 {
244 rgw_http_param_pair pairs[] = { { "type", "metadata" },
245 { NULL, NULL } };
246
247 int ret = conn->get_json_resource("/admin/log", pairs, null_yield, *log_info);
248 if (ret < 0) {
249 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl;
250 return ret;
251 }
252
253 ldpp_dout(dpp, 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
254
255 return 0;
256 }
257
258 int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
259 {
260 if (store->svc()->zone->is_meta_master()) {
261 return 0;
262 }
263
264 rgw_mdlog_info log_info;
265 int ret = read_log_info(&log_info);
266 if (ret < 0) {
267 return ret;
268 }
269
270 return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
271 }
272
273 int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
274 {
275 if (store->svc()->zone->is_meta_master()) {
276 return 0;
277 }
278
279 return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
280 }
281
282 int RGWRemoteMetaLog::init()
283 {
284 conn = store->svc()->zone->get_master_conn();
285
286 int ret = http_manager.start();
287 if (ret < 0) {
288 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
289 return ret;
290 }
291
292 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
293
294 init_sync_env(&sync_env);
295
296 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "meta");
297
298 return 0;
299 }
300
301 void RGWRemoteMetaLog::finish()
302 {
303 going_down = true;
304 stop();
305 }
306
307 #define CLONE_MAX_ENTRIES 100
308
309 int RGWMetaSyncStatusManager::init()
310 {
311 if (store->svc()->zone->is_meta_master()) {
312 return 0;
313 }
314
315 if (!store->svc()->zone->get_master_conn()) {
316 lderr(store->ctx()) << "no REST connection to master zone" << dendl;
317 return -EIO;
318 }
319
320 int r = rgw_init_ioctx(store->getRados()->get_rados_handle(), store->svc()->zone->get_zone_params().log_pool, ioctx, true);
321 if (r < 0) {
322 lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->svc()->zone->get_zone_params().log_pool << " ret=" << r << dendl;
323 return r;
324 }
325
326 r = master_log.init();
327 if (r < 0) {
328 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
329 return r;
330 }
331
332 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
333
334 rgw_meta_sync_status sync_status;
335 r = read_sync_status(&sync_status);
336 if (r < 0 && r != -ENOENT) {
337 lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
338 return r;
339 }
340
341 int num_shards = sync_status.sync_info.num_shards;
342
343 for (int i = 0; i < num_shards; i++) {
344 shard_objs[i] = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.shard_obj_name(i));
345 }
346
347 std::unique_lock wl{ts_to_shard_lock};
348 for (int i = 0; i < num_shards; i++) {
349 clone_markers.push_back(string());
350 utime_shard ut;
351 ut.shard_id = i;
352 ts_to_shard[ut] = i;
353 }
354
355 return 0;
356 }
357
358 unsigned RGWMetaSyncStatusManager::get_subsys() const
359 {
360 return dout_subsys;
361 }
362
363 std::ostream& RGWMetaSyncStatusManager::gen_prefix(std::ostream& out) const
364 {
365 return out << "meta sync: ";
366 }
367
368 void RGWMetaSyncEnv::init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWRESTConn *_conn,
369 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
370 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer) {
371 dpp = _dpp;
372 cct = _cct;
373 store = _store;
374 conn = _conn;
375 async_rados = _async_rados;
376 http_manager = _http_manager;
377 error_logger = _error_logger;
378 sync_tracer = _sync_tracer;
379 }
380
381 string RGWMetaSyncEnv::status_oid()
382 {
383 return mdlog_sync_status_oid;
384 }
385
386 string RGWMetaSyncEnv::shard_obj_name(int shard_id)
387 {
388 char buf[mdlog_sync_status_shard_prefix.size() + 16];
389 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
390
391 return string(buf);
392 }
393
394 class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
395 rgw::sal::RGWRadosStore *store;
396 RGWMetadataLog *mdlog;
397 int shard_id;
398 int max_entries;
399
400 protected:
401 int _send_request() override {
402 real_time from_time;
403 real_time end_time;
404
405 void *handle;
406
407 mdlog->init_list_entries(shard_id, from_time, end_time, marker, &handle);
408
409 int ret = mdlog->list_entries(handle, max_entries, entries, &marker, &truncated);
410
411 mdlog->complete_list_entries(handle);
412
413 return ret;
414 }
415 public:
416 string marker;
417 list<cls_log_entry> entries;
418 bool truncated;
419
420 RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
421 RGWMetadataLog* mdlog, int _shard_id,
422 std::string _marker, int _max_entries)
423 : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
424 shard_id(_shard_id), max_entries(_max_entries), marker(std::move(_marker)) {}
425 };
426
427 class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
428 RGWMetaSyncEnv *sync_env;
429 RGWMetadataLog *const mdlog;
430 int shard_id;
431 string marker;
432 string *pmarker;
433 int max_entries;
434 list<cls_log_entry> *entries;
435 bool *truncated;
436
437 RGWAsyncReadMDLogEntries *req{nullptr};
438
439 public:
440 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
441 int _shard_id, string*_marker, int _max_entries,
442 list<cls_log_entry> *_entries, bool *_truncated)
443 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
444 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
445 entries(_entries), truncated(_truncated) {}
446
447 ~RGWReadMDLogEntriesCR() override {
448 if (req) {
449 req->finish();
450 }
451 }
452
453 int send_request() override {
454 marker = *pmarker;
455 req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
456 sync_env->store, mdlog, shard_id, marker,
457 max_entries);
458 sync_env->async_rados->queue(req);
459 return 0;
460 }
461
462 int request_complete() override {
463 *pmarker = std::move(req->marker);
464 *entries = std::move(req->entries);
465 *truncated = req->truncated;
466 return req->get_ret_status();
467 }
468 };
469
470
471 class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
472 RGWMetaSyncEnv *env;
473 RGWRESTReadResource *http_op;
474
475 const std::string& period;
476 int shard_id;
477 RGWMetadataLogInfo *shard_info;
478
479 public:
480 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
481 int _shard_id, RGWMetadataLogInfo *_shard_info)
482 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
483 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
484
485 int operate() override {
486 auto store = env->store;
487 RGWRESTConn *conn = store->svc()->zone->get_master_conn();
488 reenter(this) {
489 yield {
490 char buf[16];
491 snprintf(buf, sizeof(buf), "%d", shard_id);
492 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
493 { "id", buf },
494 { "period", period.c_str() },
495 { "info" , NULL },
496 { NULL, NULL } };
497
498 string p = "/admin/log/";
499
500 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
501 env->http_manager);
502
503 init_new_io(http_op);
504
505 int ret = http_op->aio_read();
506 if (ret < 0) {
507 ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
508 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
509 http_op->put();
510 return set_cr_error(ret);
511 }
512
513 return io_block(0);
514 }
515 yield {
516 int ret = http_op->wait(shard_info, null_yield);
517 http_op->put();
518 if (ret < 0) {
519 return set_cr_error(ret);
520 }
521 return set_cr_done();
522 }
523 }
524 return 0;
525 }
526 };
527
528 RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
529 const std::string& period,
530 int shard_id,
531 RGWMetadataLogInfo* info)
532 {
533 return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info);
534 }
535
536 class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
537 RGWMetaSyncEnv *sync_env;
538 RGWRESTReadResource *http_op;
539
540 const std::string& period;
541 int shard_id;
542 string marker;
543 uint32_t max_entries;
544 rgw_mdlog_shard_data *result;
545
546 public:
547 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
548 int _shard_id, const string& _marker, uint32_t _max_entries,
549 rgw_mdlog_shard_data *_result)
550 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
551 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
552
553 int send_request() override {
554 RGWRESTConn *conn = sync_env->conn;
555
556 char buf[32];
557 snprintf(buf, sizeof(buf), "%d", shard_id);
558
559 char max_entries_buf[32];
560 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
561
562 const char *marker_key = (marker.empty() ? "" : "marker");
563
564 rgw_http_param_pair pairs[] = { { "type", "metadata" },
565 { "id", buf },
566 { "period", period.c_str() },
567 { "max-entries", max_entries_buf },
568 { marker_key, marker.c_str() },
569 { NULL, NULL } };
570
571 string p = "/admin/log/";
572
573 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
574 init_new_io(http_op);
575
576 int ret = http_op->aio_read();
577 if (ret < 0) {
578 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
579 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
580 http_op->put();
581 return ret;
582 }
583
584 return 0;
585 }
586
587 int request_complete() override {
588 int ret = http_op->wait(result, null_yield);
589 http_op->put();
590 if (ret < 0 && ret != -ENOENT) {
591 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
592 return ret;
593 }
594 return 0;
595 }
596 };
597
598 RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
599 const std::string& period,
600 int shard_id,
601 const std::string& marker,
602 uint32_t max_entries,
603 rgw_mdlog_shard_data *result)
604 {
605 return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker,
606 max_entries, result);
607 }
608
609 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
610 if (shard_id >= num_shards) {
611 return false;
612 }
613 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
614 shard_id++;
615 return true;
616 }
617
618 bool RGWListRemoteMDLogCR::spawn_next() {
619 if (iter == shards.end()) {
620 return false;
621 }
622
623 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
624 ++iter;
625 return true;
626 }
627
628 class RGWInitSyncStatusCoroutine : public RGWCoroutine {
629 RGWMetaSyncEnv *sync_env;
630
631 rgw_meta_sync_info status;
632 vector<RGWMetadataLogInfo> shards_info;
633 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
634 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
635 public:
636 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
637 const rgw_meta_sync_info &status)
638 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
639 status(status), shards_info(status.num_shards),
640 lease_cr(nullptr), lease_stack(nullptr) {}
641
642 ~RGWInitSyncStatusCoroutine() override {
643 if (lease_cr) {
644 lease_cr->abort();
645 }
646 }
647
648 int operate() override {
649 int ret;
650 reenter(this) {
651 yield {
652 set_status("acquiring sync lock");
653 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
654 string lock_name = "sync_lock";
655 rgw::sal::RGWRadosStore *store = sync_env->store;
656 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
657 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
658 lock_name, lock_duration, this));
659 lease_stack.reset(spawn(lease_cr.get(), false));
660 }
661 while (!lease_cr->is_locked()) {
662 if (lease_cr->is_done()) {
663 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
664 set_status("lease lock failed, early abort");
665 return set_cr_error(lease_cr->get_ret_status());
666 }
667 set_sleeping(true);
668 yield;
669 }
670 yield {
671 set_status("writing sync status");
672 rgw::sal::RGWRadosStore *store = sync_env->store;
673 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc()->sysobj,
674 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
675 status));
676 }
677
678 if (retcode < 0) {
679 set_status("failed to write sync status");
680 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
681 yield lease_cr->go_down();
682 return set_cr_error(retcode);
683 }
684 /* fetch current position in logs */
685 set_status("fetching remote log position");
686 yield {
687 for (int i = 0; i < (int)status.num_shards; i++) {
688 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
689 &shards_info[i]), false);
690 }
691 }
692
693 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
694
695 yield {
696 set_status("updating sync status");
697 for (int i = 0; i < (int)status.num_shards; i++) {
698 rgw_meta_sync_marker marker;
699 RGWMetadataLogInfo& info = shards_info[i];
700 marker.next_step_marker = info.marker;
701 marker.timestamp = info.last_update;
702 rgw::sal::RGWRadosStore *store = sync_env->store;
703 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
704 store->svc()->sysobj,
705 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
706 marker), true);
707 }
708 }
709 yield {
710 set_status("changing sync state: build full sync maps");
711 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
712 rgw::sal::RGWRadosStore *store = sync_env->store;
713 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc()->sysobj,
714 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
715 status));
716 }
717 set_status("drop lock lease");
718 yield lease_cr->go_down();
719 while (collect(&ret, NULL)) {
720 if (ret < 0) {
721 return set_cr_error(ret);
722 }
723 yield;
724 }
725 drain_all();
726 return set_cr_done();
727 }
728 return 0;
729 }
730 };
731
732 class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
733 static constexpr int MAX_CONCURRENT_SHARDS = 16;
734
735 RGWMetaSyncEnv *env;
736 const int num_shards;
737 int shard_id{0};
738 map<uint32_t, rgw_meta_sync_marker>& markers;
739
740 public:
741 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
742 map<uint32_t, rgw_meta_sync_marker>& markers)
743 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
744 env(env), num_shards(num_shards), markers(markers)
745 {}
746 bool spawn_next() override;
747 };
748
749 bool RGWReadSyncStatusMarkersCR::spawn_next()
750 {
751 if (shard_id >= num_shards) {
752 return false;
753 }
754 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
755 rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool,
756 env->shard_obj_name(shard_id)};
757 spawn(new CR(env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false);
758 shard_id++;
759 return true;
760 }
761
762 class RGWReadSyncStatusCoroutine : public RGWCoroutine {
763 RGWMetaSyncEnv *sync_env;
764 rgw_meta_sync_status *sync_status;
765
766 public:
767 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
768 rgw_meta_sync_status *_status)
769 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
770 {}
771 int operate() override;
772 };
773
774 int RGWReadSyncStatusCoroutine::operate()
775 {
776 reenter(this) {
777 // read sync info
778 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
779 yield {
780 bool empty_on_enoent = false; // fail on ENOENT
781 rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool,
782 sync_env->status_oid()};
783 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, obj,
784 &sync_status->sync_info, empty_on_enoent));
785 }
786 if (retcode < 0) {
787 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status info with "
788 << cpp_strerror(retcode) << dendl;
789 return set_cr_error(retcode);
790 }
791 // read shard markers
792 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
793 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
794 sync_status->sync_markers));
795 if (retcode < 0) {
796 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status markers with "
797 << cpp_strerror(retcode) << dendl;
798 return set_cr_error(retcode);
799 }
800 return set_cr_done();
801 }
802 return 0;
803 }
804
805 class RGWFetchAllMetaCR : public RGWCoroutine {
806 RGWMetaSyncEnv *sync_env;
807
808 int num_shards;
809
810
811 int ret_status;
812
813 list<string> sections;
814 list<string>::iterator sections_iter;
815
816 struct meta_list_result {
817 list<string> keys;
818 string marker;
819 uint64_t count{0};
820 bool truncated{false};
821
822 void decode_json(JSONObj *obj) {
823 JSONDecoder::decode_json("keys", keys, obj);
824 JSONDecoder::decode_json("marker", marker, obj);
825 JSONDecoder::decode_json("count", count, obj);
826 JSONDecoder::decode_json("truncated", truncated, obj);
827 }
828 } result;
829 list<string>::iterator iter;
830
831 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
832
833 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
834 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
835 bool lost_lock;
836 bool failed;
837
838 string marker;
839
840 map<uint32_t, rgw_meta_sync_marker>& markers;
841
842 RGWSyncTraceNodeRef tn;
843
844 public:
845 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
846 map<uint32_t, rgw_meta_sync_marker>& _markers,
847 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
848 num_shards(_num_shards),
849 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
850 lost_lock(false), failed(false), markers(_markers) {
851 tn = sync_env->sync_tracer->add_node(_tn_parent, "fetch_all_meta");
852 }
853
854 ~RGWFetchAllMetaCR() override {
855 }
856
857 void append_section_from_set(set<string>& all_sections, const string& name) {
858 set<string>::iterator iter = all_sections.find(name);
859 if (iter != all_sections.end()) {
860 sections.emplace_back(std::move(*iter));
861 all_sections.erase(iter);
862 }
863 }
864 /*
865 * meta sync should go in the following order: user, bucket.instance, bucket
866 * then whatever other sections exist (if any)
867 */
868 void rearrange_sections() {
869 set<string> all_sections;
870 std::move(sections.begin(), sections.end(),
871 std::inserter(all_sections, all_sections.end()));
872 sections.clear();
873
874 append_section_from_set(all_sections, "user");
875 append_section_from_set(all_sections, "bucket.instance");
876 append_section_from_set(all_sections, "bucket");
877
878 std::move(all_sections.begin(), all_sections.end(),
879 std::back_inserter(sections));
880 }
881
882 int operate() override {
883 RGWRESTConn *conn = sync_env->conn;
884
885 reenter(this) {
886 yield {
887 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
888 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
889 string lock_name = "sync_lock";
890 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
891 sync_env->store,
892 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
893 lock_name, lock_duration, this));
894 lease_stack.reset(spawn(lease_cr.get(), false));
895 }
896 while (!lease_cr->is_locked()) {
897 if (lease_cr->is_done()) {
898 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
899 set_status("failed acquiring lock");
900 return set_cr_error(lease_cr->get_ret_status());
901 }
902 set_sleeping(true);
903 yield;
904 }
905 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
906 sync_env->store->svc()->zone->get_zone_params().log_pool,
907 mdlog_sync_full_sync_index_prefix));
908 yield {
909 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
910 "/admin/metadata", NULL, &sections));
911 }
912 if (get_ret_status() < 0) {
913 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch metadata sections" << dendl;
914 yield entries_index->finish();
915 yield lease_cr->go_down();
916 drain_all();
917 return set_cr_error(get_ret_status());
918 }
919 rearrange_sections();
920 sections_iter = sections.begin();
921 for (; sections_iter != sections.end(); ++sections_iter) {
922 do {
923 yield {
924 #define META_FULL_SYNC_CHUNK_SIZE "1000"
925 string entrypoint = string("/admin/metadata/") + *sections_iter;
926 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
927 { "marker", result.marker.c_str() },
928 { NULL, NULL } };
929 result.keys.clear();
930 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
931 entrypoint, pairs, &result));
932 }
933 ret_status = get_ret_status();
934 if (ret_status == -ENOENT) {
935 set_retcode(0); /* reset coroutine status so that we don't return it */
936 ret_status = 0;
937 }
938 if (ret_status < 0) {
939 tn->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter));
940 yield entries_index->finish();
941 yield lease_cr->go_down();
942 drain_all();
943 return set_cr_error(ret_status);
944 }
945 iter = result.keys.begin();
946 for (; iter != result.keys.end(); ++iter) {
947 if (!lease_cr->is_locked()) {
948 lost_lock = true;
949 break;
950 }
951 yield; // allow entries_index consumer to make progress
952
953 tn->log(20, SSTR("list metadata: section=" << *sections_iter << " key=" << *iter));
954 string s = *sections_iter + ":" + *iter;
955 int shard_id;
956 rgw::sal::RGWRadosStore *store = sync_env->store;
957 int ret = store->ctl()->meta.mgr->get_shard_id(*sections_iter, *iter, &shard_id);
958 if (ret < 0) {
959 tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter));
960 ret_status = ret;
961 break;
962 }
963 if (!entries_index->append(s, shard_id)) {
964 break;
965 }
966 }
967 } while (result.truncated);
968 }
969 yield {
970 if (!entries_index->finish()) {
971 failed = true;
972 }
973 }
974 if (!failed) {
975 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
976 int shard_id = (int)iter->first;
977 rgw_meta_sync_marker& marker = iter->second;
978 marker.total_entries = entries_index->get_total_entries(shard_id);
979 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store->svc()->sysobj,
980 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
981 marker), true);
982 }
983 }
984
985 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
986
987 yield lease_cr->go_down();
988
989 int ret;
990 while (collect(&ret, NULL)) {
991 if (ret < 0) {
992 return set_cr_error(ret);
993 }
994 yield;
995 }
996 drain_all();
997 if (failed) {
998 yield return set_cr_error(-EIO);
999 }
1000 if (lost_lock) {
1001 yield return set_cr_error(-EBUSY);
1002 }
1003
1004 if (ret_status < 0) {
1005 yield return set_cr_error(ret_status);
1006 }
1007
1008 yield return set_cr_done();
1009 }
1010 return 0;
1011 }
1012 };
1013
1014 static string full_sync_index_shard_oid(int shard_id)
1015 {
1016 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
1017 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
1018 return string(buf);
1019 }
1020
1021 class RGWReadRemoteMetadataCR : public RGWCoroutine {
1022 RGWMetaSyncEnv *sync_env;
1023
1024 RGWRESTReadResource *http_op;
1025
1026 string section;
1027 string key;
1028
1029 bufferlist *pbl;
1030
1031 RGWSyncTraceNodeRef tn;
1032
1033 public:
1034 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
1035 const string& _section, const string& _key, bufferlist *_pbl,
1036 const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1037 http_op(NULL),
1038 section(_section),
1039 key(_key),
1040 pbl(_pbl) {
1041 tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta",
1042 section + ":" + key);
1043 }
1044
1045 int operate() override {
1046 RGWRESTConn *conn = sync_env->conn;
1047 reenter(this) {
1048 yield {
1049 string key_encode;
1050 url_encode(key, key_encode);
1051 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
1052 { NULL, NULL } };
1053
1054 string p = string("/admin/metadata/") + section + "/" + key_encode;
1055
1056 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1057
1058 init_new_io(http_op);
1059
1060 int ret = http_op->aio_read();
1061 if (ret < 0) {
1062 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
1063 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1064 http_op->put();
1065 return set_cr_error(ret);
1066 }
1067
1068 return io_block(0);
1069 }
1070 yield {
1071 int ret = http_op->wait(pbl, null_yield);
1072 http_op->put();
1073 if (ret < 0) {
1074 return set_cr_error(ret);
1075 }
1076 return set_cr_done();
1077 }
1078 }
1079 return 0;
1080 }
1081 };
1082
1083 class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
1084 rgw::sal::RGWRadosStore *store;
1085 string raw_key;
1086 bufferlist bl;
1087 protected:
1088 int _send_request() override {
1089 int ret = store->ctl()->meta.mgr->put(raw_key, bl, null_yield, RGWMDLogSyncType::APPLY_ALWAYS, true);
1090 if (ret < 0) {
1091 ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1092 return ret;
1093 }
1094 return 0;
1095 }
1096 public:
1097 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
1098 const string& _raw_key,
1099 bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
1100 raw_key(_raw_key), bl(_bl) {}
1101 };
1102
1103
1104 class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1105 RGWMetaSyncEnv *sync_env;
1106 string raw_key;
1107 bufferlist bl;
1108
1109 RGWAsyncMetaStoreEntry *req;
1110
1111 public:
1112 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1113 const string& _raw_key,
1114 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1115 raw_key(_raw_key), bl(_bl), req(NULL) {
1116 }
1117
1118 ~RGWMetaStoreEntryCR() override {
1119 if (req) {
1120 req->finish();
1121 }
1122 }
1123
1124 int send_request() override {
1125 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1126 sync_env->store, raw_key, bl);
1127 sync_env->async_rados->queue(req);
1128 return 0;
1129 }
1130
1131 int request_complete() override {
1132 return req->get_ret_status();
1133 }
1134 };
1135
1136 class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
1137 rgw::sal::RGWRadosStore *store;
1138 string raw_key;
1139 protected:
1140 int _send_request() override {
1141 int ret = store->ctl()->meta.mgr->remove(raw_key, null_yield);
1142 if (ret < 0) {
1143 ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1144 return ret;
1145 }
1146 return 0;
1147 }
1148 public:
1149 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
1150 const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
1151 raw_key(_raw_key) {}
1152 };
1153
1154
1155 class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1156 RGWMetaSyncEnv *sync_env;
1157 string raw_key;
1158
1159 RGWAsyncMetaRemoveEntry *req;
1160
1161 public:
1162 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1163 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1164 raw_key(_raw_key), req(NULL) {
1165 }
1166
1167 ~RGWMetaRemoveEntryCR() override {
1168 if (req) {
1169 req->finish();
1170 }
1171 }
1172
1173 int send_request() override {
1174 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1175 sync_env->store, raw_key);
1176 sync_env->async_rados->queue(req);
1177 return 0;
1178 }
1179
1180 int request_complete() override {
1181 int r = req->get_ret_status();
1182 if (r == -ENOENT) {
1183 r = 0;
1184 }
1185 return r;
1186 }
1187 };
1188
1189 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1190
1191
1192 int RGWLastCallerWinsCR::operate() {
1193 RGWCoroutine *call_cr;
1194 reenter(this) {
1195 while (cr) {
1196 call_cr = cr;
1197 cr = nullptr;
1198 yield call(call_cr);
1199 /* cr might have been modified at this point */
1200 }
1201 return set_cr_done();
1202 }
1203 return 0;
1204 }
1205
1206 class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1207 RGWMetaSyncEnv *sync_env;
1208
1209 string marker_oid;
1210 rgw_meta_sync_marker sync_marker;
1211
1212 RGWSyncTraceNodeRef tn;
1213
1214 public:
1215 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1216 const string& _marker_oid,
1217 const rgw_meta_sync_marker& _marker,
1218 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
1219 sync_env(_sync_env),
1220 marker_oid(_marker_oid),
1221 sync_marker(_marker),
1222 tn(_tn){}
1223
1224 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1225 sync_marker.marker = new_marker;
1226 if (index_pos > 0) {
1227 sync_marker.pos = index_pos;
1228 }
1229
1230 if (!real_clock::is_zero(timestamp)) {
1231 sync_marker.timestamp = timestamp;
1232 }
1233
1234 ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1235 tn->log(20, SSTR("new marker=" << new_marker));
1236 rgw::sal::RGWRadosStore *store = sync_env->store;
1237 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
1238 store->svc()->sysobj,
1239 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid),
1240 sync_marker);
1241 }
1242
1243 RGWOrderCallCR *allocate_order_control_cr() override {
1244 return new RGWLastCallerWinsCR(sync_env->cct);
1245 }
1246 };
1247
1248 RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
1249 const string& _raw_key, const string& _entry_marker,
1250 const RGWMDLogStatus& _op_status,
1251 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
1252 sync_env(_sync_env),
1253 raw_key(_raw_key), entry_marker(_entry_marker),
1254 op_status(_op_status),
1255 pos(0), sync_status(0),
1256 marker_tracker(_marker_tracker), tries(0) {
1257 error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
1258 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
1259 }
1260
1261 int RGWMetaSyncSingleEntryCR::operate() {
1262 reenter(this) {
1263 #define NUM_TRANSIENT_ERROR_RETRIES 10
1264
1265 if (error_injection &&
1266 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
1267 return set_cr_error(-EIO);
1268 }
1269
1270 if (op_status != MDLOG_STATUS_COMPLETE) {
1271 tn->log(20, "skipping pending operation");
1272 yield call(marker_tracker->finish(entry_marker));
1273 if (retcode < 0) {
1274 return set_cr_error(retcode);
1275 }
1276 return set_cr_done();
1277 }
1278 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
1279 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1280 yield {
1281 pos = raw_key.find(':');
1282 section = raw_key.substr(0, pos);
1283 key = raw_key.substr(pos + 1);
1284 tn->log(10, SSTR("fetching remote metadata entry" << (tries == 0 ? "" : " (retry)")));
1285 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl, tn));
1286 }
1287
1288 sync_status = retcode;
1289
1290 if (sync_status == -ENOENT) {
1291 /* FIXME: do we need to remove the entry from the local zone? */
1292 break;
1293 }
1294
1295 if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1296 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
1297 continue;
1298 }
1299
1300 if (sync_status < 0) {
1301 tn->log(10, SSTR("failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status));
1302 log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1303 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
1304 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1305 return set_cr_error(sync_status);
1306 }
1307
1308 break;
1309 }
1310
1311 retcode = 0;
1312 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1313 if (sync_status != -ENOENT) {
1314 tn->log(10, SSTR("storing local metadata entry"));
1315 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1316 } else {
1317 tn->log(10, SSTR("removing local metadata entry"));
1318 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1319 }
1320 if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1321 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
1322 continue;
1323 }
1324 break;
1325 }
1326
1327 sync_status = retcode;
1328
1329 if (sync_status == 0 && marker_tracker) {
1330 /* update marker */
1331 yield call(marker_tracker->finish(entry_marker));
1332 sync_status = retcode;
1333 }
1334 if (sync_status < 0) {
1335 tn->log(10, SSTR("failed, status=" << sync_status));
1336 return set_cr_error(sync_status);
1337 }
1338 tn->log(10, "success");
1339 return set_cr_done();
1340 }
1341 return 0;
1342 }
1343
1344 class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1345 RGWMetaSyncEnv *sync_env;
1346 RGWMetadataLog *mdlog;
1347
1348 const std::string& period;
1349 int shard_id;
1350 string marker;
1351 bool truncated = false;
1352 string *new_marker;
1353
1354 int max_entries = CLONE_MAX_ENTRIES;
1355
1356 RGWRESTReadResource *http_op = nullptr;
1357 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1358
1359 RGWMetadataLogInfo shard_info;
1360 rgw_mdlog_shard_data data;
1361
1362 public:
1363 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1364 const std::string& period, int _id,
1365 const string& _marker, string *_new_marker)
1366 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1367 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1368 if (new_marker) {
1369 *new_marker = marker;
1370 }
1371 }
1372 ~RGWCloneMetaLogCoroutine() override {
1373 if (http_op) {
1374 http_op->put();
1375 }
1376 if (completion) {
1377 completion->cancel();
1378 }
1379 }
1380
1381 int operate() override;
1382
1383 int state_init();
1384 int state_read_shard_status();
1385 int state_read_shard_status_complete();
1386 int state_send_rest_request();
1387 int state_receive_rest_response();
1388 int state_store_mdlog_entries();
1389 int state_store_mdlog_entries_complete();
1390 };
1391
1392 class RGWMetaSyncShardCR : public RGWCoroutine {
1393 RGWMetaSyncEnv *sync_env;
1394
1395 const rgw_pool& pool;
1396 const std::string& period; //< currently syncing period id
1397 const epoch_t realm_epoch; //< realm_epoch of period
1398 RGWMetadataLog* mdlog; //< log of syncing period
1399 uint32_t shard_id;
1400 rgw_meta_sync_marker& sync_marker;
1401 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1402 string marker;
1403 string max_marker;
1404 const std::string& period_marker; //< max marker stored in next period
1405
1406 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
1407 std::set<std::string> entries;
1408 std::set<std::string>::iterator iter;
1409
1410 string oid;
1411
1412 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1413
1414 list<cls_log_entry> log_entries;
1415 list<cls_log_entry>::iterator log_iter;
1416 bool truncated = false;
1417
1418 string mdlog_marker;
1419 string raw_key;
1420 rgw_mdlog_entry mdlog_entry;
1421
1422 ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock");
1423 ceph::condition_variable inc_cond;
1424
1425 boost::asio::coroutine incremental_cr;
1426 boost::asio::coroutine full_cr;
1427
1428 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1429 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1430
1431 bool lost_lock = false;
1432
1433 bool *reset_backoff;
1434
1435 // hold a reference to the cr stack while it's in the map
1436 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1437 map<StackRef, string> stack_to_pos;
1438 map<string, string> pos_to_prev;
1439
1440 bool can_adjust_marker = false;
1441 bool done_with_period = false;
1442
1443 int total_entries = 0;
1444
1445 RGWSyncTraceNodeRef tn;
1446 public:
1447 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1448 const std::string& period, epoch_t realm_epoch,
1449 RGWMetadataLog* mdlog, uint32_t _shard_id,
1450 rgw_meta_sync_marker& _marker,
1451 const std::string& period_marker, bool *_reset_backoff,
1452 RGWSyncTraceNodeRef& _tn)
1453 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1454 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1455 shard_id(_shard_id), sync_marker(_marker),
1456 period_marker(period_marker),
1457 reset_backoff(_reset_backoff), tn(_tn) {
1458 *reset_backoff = false;
1459 }
1460
1461 ~RGWMetaSyncShardCR() override {
1462 delete marker_tracker;
1463 if (lease_cr) {
1464 lease_cr->abort();
1465 }
1466 }
1467
1468 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1469 delete marker_tracker;
1470 marker_tracker = mt;
1471 }
1472
1473 int operate() override {
1474 int r;
1475 while (true) {
1476 switch (sync_marker.state) {
1477 case rgw_meta_sync_marker::FullSync:
1478 r = full_sync();
1479 if (r < 0) {
1480 ldpp_dout(sync_env->dpp, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1481 return set_cr_error(r);
1482 }
1483 return 0;
1484 case rgw_meta_sync_marker::IncrementalSync:
1485 r = incremental_sync();
1486 if (r < 0) {
1487 ldpp_dout(sync_env->dpp, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1488 return set_cr_error(r);
1489 }
1490 return 0;
1491 }
1492 }
1493 /* unreachable */
1494 return 0;
1495 }
1496
1497 void collect_children()
1498 {
1499 int child_ret;
1500 RGWCoroutinesStack *child;
1501 while (collect_next(&child_ret, &child)) {
1502 auto iter = stack_to_pos.find(child);
1503 if (iter == stack_to_pos.end()) {
1504 /* some other stack that we don't care about */
1505 continue;
1506 }
1507
1508 string& pos = iter->second;
1509
1510 if (child_ret < 0) {
1511 ldpp_dout(sync_env->dpp, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
1512 }
1513
1514 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
1515 ceph_assert(prev_iter != pos_to_prev.end());
1516
1517 /*
1518 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1519 * update the marker and abort. We'll get called again for these. Permanent errors will be
1520 * handled by marking the entry at the error log shard, so that we retry on it separately
1521 */
1522 if (child_ret == -EAGAIN) {
1523 can_adjust_marker = false;
1524 }
1525
1526 if (pos_to_prev.size() == 1) {
1527 if (can_adjust_marker) {
1528 sync_marker.marker = pos;
1529 }
1530 pos_to_prev.erase(prev_iter);
1531 } else {
1532 ceph_assert(pos_to_prev.size() > 1);
1533 pos_to_prev.erase(prev_iter);
1534 prev_iter = pos_to_prev.begin();
1535 if (can_adjust_marker) {
1536 sync_marker.marker = prev_iter->second;
1537 }
1538 }
1539
1540 ldpp_dout(sync_env->dpp, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
1541 stack_to_pos.erase(iter);
1542 }
1543 }
1544
1545 int full_sync() {
1546 #define OMAP_GET_MAX_ENTRIES 100
1547 int max_entries = OMAP_GET_MAX_ENTRIES;
1548 reenter(&full_cr) {
1549 set_status("full_sync");
1550 tn->log(10, "start full sync");
1551 oid = full_sync_index_shard_oid(shard_id);
1552 can_adjust_marker = true;
1553 /* grab lock */
1554 yield {
1555 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1556 string lock_name = "sync_lock";
1557 rgw::sal::RGWRadosStore *store = sync_env->store;
1558 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1559 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1560 lock_name, lock_duration, this));
1561 lease_stack.reset(spawn(lease_cr.get(), false));
1562 lost_lock = false;
1563 }
1564 while (!lease_cr->is_locked()) {
1565 if (lease_cr->is_done()) {
1566 drain_all();
1567 tn->log(5, "failed to take lease");
1568 return lease_cr->get_ret_status();
1569 }
1570 set_sleeping(true);
1571 yield;
1572 }
1573 tn->log(10, "took lease");
1574
1575 /* lock succeeded, a retry now should avoid previous backoff status */
1576 *reset_backoff = true;
1577
1578 /* prepare marker tracker */
1579 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1580 sync_env->shard_obj_name(shard_id),
1581 sync_marker, tn));
1582
1583 marker = sync_marker.marker;
1584
1585 total_entries = sync_marker.pos;
1586
1587 /* sync! */
1588 do {
1589 if (!lease_cr->is_locked()) {
1590 tn->log(10, "lost lease");
1591 lost_lock = true;
1592 break;
1593 }
1594 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1595 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1596 marker, max_entries, omapkeys));
1597 if (retcode < 0) {
1598 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1599 tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
1600 yield lease_cr->go_down();
1601 drain_all();
1602 return retcode;
1603 }
1604 entries = std::move(omapkeys->entries);
1605 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1606 if (entries.size() > 0) {
1607 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1608 }
1609 iter = entries.begin();
1610 for (; iter != entries.end(); ++iter) {
1611 marker = *iter;
1612 tn->log(20, SSTR("full sync: " << marker));
1613 total_entries++;
1614 if (!marker_tracker->start(marker, total_entries, real_time())) {
1615 tn->log(0, SSTR("ERROR: cannot start syncing " << marker << ". Duplicate entry?"));
1616 } else {
1617 // fetch remote and write locally
1618 yield {
1619 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker, tn), false);
1620 // stack_to_pos holds a reference to the stack
1621 stack_to_pos[stack] = marker;
1622 pos_to_prev[marker] = marker;
1623 }
1624 }
1625 }
1626 collect_children();
1627 } while (omapkeys->more && can_adjust_marker);
1628
1629 tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1630
1631 while (num_spawned() > 1) {
1632 yield wait_for_child();
1633 collect_children();
1634 }
1635
1636 if (!lost_lock) {
1637 /* update marker to reflect we're done with full sync */
1638 if (can_adjust_marker) {
1639 // apply updates to a temporary marker, or operate() will send us
1640 // to incremental_sync() after we yield
1641 temp_marker = sync_marker;
1642 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1643 temp_marker->marker = std::move(temp_marker->next_step_marker);
1644 temp_marker->next_step_marker.clear();
1645 temp_marker->realm_epoch = realm_epoch;
1646 ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
1647
1648 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
1649 yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
1650 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1651 *temp_marker));
1652 }
1653
1654 if (retcode < 0) {
1655 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1656 yield lease_cr->go_down();
1657 drain_all();
1658 return retcode;
1659 }
1660 }
1661
1662 /*
1663 * if we reached here, it means that lost_lock is true, otherwise the state
1664 * change in the previous block will prevent us from reaching here
1665 */
1666
1667 yield lease_cr->go_down();
1668
1669 lease_cr.reset();
1670
1671 drain_all();
1672
1673 if (!can_adjust_marker) {
1674 return -EAGAIN;
1675 }
1676
1677 if (lost_lock) {
1678 return -EBUSY;
1679 }
1680
1681 tn->log(10, "full sync complete");
1682
1683 // apply the sync marker update
1684 ceph_assert(temp_marker);
1685 sync_marker = std::move(*temp_marker);
1686 temp_marker = boost::none;
1687 // must not yield after this point!
1688 }
1689 return 0;
1690 }
1691
1692
1693 int incremental_sync() {
1694 reenter(&incremental_cr) {
1695 set_status("incremental_sync");
1696 tn->log(10, "start incremental sync");
1697 can_adjust_marker = true;
1698 /* grab lock */
1699 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1700 yield {
1701 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1702 string lock_name = "sync_lock";
1703 rgw::sal::RGWRadosStore *store = sync_env->store;
1704 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1705 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1706 lock_name, lock_duration, this));
1707 lease_stack.reset(spawn(lease_cr.get(), false));
1708 lost_lock = false;
1709 }
1710 while (!lease_cr->is_locked()) {
1711 if (lease_cr->is_done()) {
1712 drain_all();
1713 tn->log(10, "failed to take lease");
1714 return lease_cr->get_ret_status();
1715 }
1716 set_sleeping(true);
1717 yield;
1718 }
1719 }
1720 tn->log(10, "took lease");
1721 // if the period has advanced, we can't use the existing marker
1722 if (sync_marker.realm_epoch < realm_epoch) {
1723 ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
1724 << " from old realm_epoch=" << sync_marker.realm_epoch
1725 << " (now " << realm_epoch << ')' << dendl;
1726 sync_marker.realm_epoch = realm_epoch;
1727 sync_marker.marker.clear();
1728 }
1729 mdlog_marker = sync_marker.marker;
1730 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1731 sync_env->shard_obj_name(shard_id),
1732 sync_marker, tn));
1733
1734 /*
1735 * mdlog_marker: the remote sync marker positiion
1736 * sync_marker: the local sync marker position
1737 * max_marker: the max mdlog position that we fetched
1738 * marker: the current position we try to sync
1739 * period_marker: the last marker before the next period begins (optional)
1740 */
1741 marker = max_marker = sync_marker.marker;
1742 /* inc sync */
1743 do {
1744 if (!lease_cr->is_locked()) {
1745 lost_lock = true;
1746 tn->log(10, "lost lease");
1747 break;
1748 }
1749 #define INCREMENTAL_MAX_ENTRIES 100
1750 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1751 if (!period_marker.empty() && period_marker <= mdlog_marker) {
1752 tn->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker << " period_marker=" << period_marker));
1753 done_with_period = true;
1754 break;
1755 }
1756 if (mdlog_marker <= max_marker) {
1757 /* we're at the tip, try to bring more entries */
1758 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
1759 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1760 period, shard_id,
1761 mdlog_marker, &mdlog_marker));
1762 }
1763 if (retcode < 0) {
1764 tn->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode));
1765 yield lease_cr->go_down();
1766 drain_all();
1767 *reset_backoff = false; // back off and try again later
1768 return retcode;
1769 }
1770 *reset_backoff = true; /* if we got to this point, all systems function */
1771 if (mdlog_marker > max_marker) {
1772 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1773 tn->log(20, SSTR("mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker));
1774 marker = max_marker;
1775 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1776 &max_marker, INCREMENTAL_MAX_ENTRIES,
1777 &log_entries, &truncated));
1778 if (retcode < 0) {
1779 tn->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode));
1780 yield lease_cr->go_down();
1781 drain_all();
1782 *reset_backoff = false; // back off and try again later
1783 return retcode;
1784 }
1785 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1786 if (!period_marker.empty() && period_marker <= log_iter->id) {
1787 done_with_period = true;
1788 if (period_marker < log_iter->id) {
1789 tn->log(10, SSTR("found key=" << log_iter->id
1790 << " past period_marker=" << period_marker));
1791 break;
1792 }
1793 ldpp_dout(sync_env->dpp, 10) << "found key at period_marker=" << period_marker << dendl;
1794 // sync this entry, then return control to RGWMetaSyncCR
1795 }
1796 if (!mdlog_entry.convert_from(*log_iter)) {
1797 tn->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry"));
1798 continue;
1799 }
1800 tn->log(20, SSTR("log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp));
1801 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
1802 ldpp_dout(sync_env->dpp, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
1803 } else {
1804 raw_key = log_iter->section + ":" + log_iter->name;
1805 yield {
1806 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker, tn), false);
1807 ceph_assert(stack);
1808 // stack_to_pos holds a reference to the stack
1809 stack_to_pos[stack] = log_iter->id;
1810 pos_to_prev[log_iter->id] = marker;
1811 }
1812 }
1813 marker = log_iter->id;
1814 }
1815 }
1816 collect_children();
1817 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1818 if (done_with_period) {
1819 // return control to RGWMetaSyncCR and advance to the next period
1820 tn->log(10, SSTR(*this << ": done with period"));
1821 break;
1822 }
1823 if (mdlog_marker == max_marker && can_adjust_marker) {
1824 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1825 #define INCREMENTAL_INTERVAL 20
1826 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1827 }
1828 } while (can_adjust_marker);
1829
1830 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1831
1832 while (num_spawned() > 1) {
1833 yield wait_for_child();
1834 collect_children();
1835 }
1836
1837 yield lease_cr->go_down();
1838
1839 drain_all();
1840
1841 if (lost_lock) {
1842 return -EBUSY;
1843 }
1844
1845 if (!can_adjust_marker) {
1846 return -EAGAIN;
1847 }
1848
1849 return set_cr_done();
1850 }
1851 /* TODO */
1852 return 0;
1853 }
1854 };
1855
1856 class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1857 {
1858 RGWMetaSyncEnv *sync_env;
1859
1860 const rgw_pool& pool;
1861 const std::string& period;
1862 epoch_t realm_epoch;
1863 RGWMetadataLog* mdlog;
1864 uint32_t shard_id;
1865 rgw_meta_sync_marker sync_marker;
1866 const std::string period_marker;
1867
1868 RGWSyncTraceNodeRef tn;
1869
1870 static constexpr bool exit_on_error = false; // retry on all errors
1871 public:
1872 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1873 const std::string& period, epoch_t realm_epoch,
1874 RGWMetadataLog* mdlog, uint32_t _shard_id,
1875 const rgw_meta_sync_marker& _marker,
1876 std::string&& period_marker,
1877 RGWSyncTraceNodeRef& _tn_parent)
1878 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1879 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1880 shard_id(_shard_id), sync_marker(_marker),
1881 period_marker(std::move(period_marker)) {
1882 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
1883 std::to_string(shard_id));
1884 }
1885
1886 RGWCoroutine *alloc_cr() override {
1887 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
1888 shard_id, sync_marker, period_marker, backoff_ptr(), tn);
1889 }
1890
1891 RGWCoroutine *alloc_finisher_cr() override {
1892 rgw::sal::RGWRadosStore *store = sync_env->store;
1893 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store->svc()->sysobj,
1894 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1895 &sync_marker);
1896 }
1897 };
1898
1899 class RGWMetaSyncCR : public RGWCoroutine {
1900 RGWMetaSyncEnv *sync_env;
1901 const rgw_pool& pool;
1902 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1903 RGWPeriodHistory::Cursor next; //< next period in history
1904 rgw_meta_sync_status sync_status;
1905 RGWSyncTraceNodeRef tn;
1906
1907 std::mutex mutex; //< protect access to shard_crs
1908
1909 // TODO: it should be enough to hold a reference on the stack only, as calling
1910 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1911 // already completed
1912 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1913 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1914 using RefPair = std::pair<ControlCRRef, StackRef>;
1915 map<int, RefPair> shard_crs;
1916 int ret{0};
1917
1918 public:
1919 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
1920 const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
1921 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1922 pool(sync_env->store->svc()->zone->get_zone_params().log_pool),
1923 cursor(cursor), sync_status(_sync_status), tn(_tn) {}
1924
1925 ~RGWMetaSyncCR() {
1926 }
1927
1928 int operate() override {
1929 reenter(this) {
1930 // loop through one period at a time
1931 tn->log(1, "start");
1932 for (;;) {
1933 if (cursor == sync_env->store->svc()->mdlog->get_period_history()->get_current()) {
1934 next = RGWPeriodHistory::Cursor{};
1935 if (cursor) {
1936 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on current period="
1937 << cursor.get_period().get_id() << dendl;
1938 } else {
1939 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR with no period" << dendl;
1940 }
1941 } else {
1942 next = cursor;
1943 next.next();
1944 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on period="
1945 << cursor.get_period().get_id() << ", next="
1946 << next.get_period().get_id() << dendl;
1947 }
1948
1949 yield {
1950 // get the mdlog for the current period (may be empty)
1951 auto& period_id = sync_status.sync_info.period;
1952 auto realm_epoch = sync_status.sync_info.realm_epoch;
1953 auto mdlog = sync_env->store->svc()->mdlog->get_log(period_id);
1954
1955 tn->log(1, SSTR("realm epoch=" << realm_epoch << " period id=" << period_id));
1956
1957 // prevent wakeup() from accessing shard_crs while we're spawning them
1958 std::lock_guard<std::mutex> lock(mutex);
1959
1960 // sync this period on each shard
1961 for (const auto& m : sync_status.sync_markers) {
1962 uint32_t shard_id = m.first;
1963 auto& marker = m.second;
1964
1965 std::string period_marker;
1966 if (next) {
1967 // read the maximum marker from the next period's sync status
1968 period_marker = next.get_period().get_sync_status()[shard_id];
1969 if (period_marker.empty()) {
1970 // no metadata changes have occurred on this shard, skip it
1971 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1972 << " with empty period marker" << dendl;
1973 continue;
1974 }
1975 }
1976
1977 using ShardCR = RGWMetaSyncShardControlCR;
1978 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1979 mdlog, shard_id, marker,
1980 std::move(period_marker), tn);
1981 auto stack = spawn(cr, false);
1982 shard_crs[shard_id] = RefPair{cr, stack};
1983 }
1984 }
1985 // wait for each shard to complete
1986 while (ret == 0 && num_spawned() > 0) {
1987 yield wait_for_child();
1988 collect(&ret, nullptr);
1989 }
1990 drain_all();
1991 {
1992 // drop shard cr refs under lock
1993 std::lock_guard<std::mutex> lock(mutex);
1994 shard_crs.clear();
1995 }
1996 if (ret < 0) {
1997 return set_cr_error(ret);
1998 }
1999 // advance to the next period
2000 ceph_assert(next);
2001 cursor = next;
2002
2003 // write the updated sync info
2004 sync_status.sync_info.period = cursor.get_period().get_id();
2005 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2006 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
2007 sync_env->store->svc()->sysobj,
2008 rgw_raw_obj(pool, sync_env->status_oid()),
2009 sync_status.sync_info));
2010 }
2011 }
2012 return 0;
2013 }
2014
2015 void wakeup(int shard_id) {
2016 std::lock_guard<std::mutex> lock(mutex);
2017 auto iter = shard_crs.find(shard_id);
2018 if (iter == shard_crs.end()) {
2019 return;
2020 }
2021 iter->second.first->wakeup();
2022 }
2023 };
2024
2025 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
2026 env->dpp = dpp;
2027 env->cct = store->ctx();
2028 env->store = store;
2029 env->conn = conn;
2030 env->async_rados = async_rados;
2031 env->http_manager = &http_manager;
2032 env->error_logger = error_logger;
2033 env->sync_tracer = store->getRados()->get_sync_tracer();
2034 }
2035
2036 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
2037 {
2038 if (store->svc()->zone->is_meta_master()) {
2039 return 0;
2040 }
2041 // cannot run concurrently with run_sync(), so run in a separate manager
2042 RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
2043 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
2044 int ret = http_manager.start();
2045 if (ret < 0) {
2046 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
2047 return ret;
2048 }
2049 RGWMetaSyncEnv sync_env_local = sync_env;
2050 sync_env_local.http_manager = &http_manager;
2051 tn->log(20, "read sync status");
2052 ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
2053 http_manager.stop();
2054 return ret;
2055 }
2056
2057 int RGWRemoteMetaLog::init_sync_status()
2058 {
2059 if (store->svc()->zone->is_meta_master()) {
2060 return 0;
2061 }
2062
2063 rgw_mdlog_info mdlog_info;
2064 int r = read_log_info(&mdlog_info);
2065 if (r < 0) {
2066 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2067 return r;
2068 }
2069
2070 rgw_meta_sync_info sync_info;
2071 sync_info.num_shards = mdlog_info.num_shards;
2072 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
2073 if (cursor) {
2074 sync_info.period = cursor.get_period().get_id();
2075 sync_info.realm_epoch = cursor.get_epoch();
2076 }
2077
2078 return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
2079 }
2080
2081 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
2082 {
2083 tn->log(20, "store sync info");
2084 return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store->svc()->sysobj,
2085 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()),
2086 sync_info));
2087 }
2088
2089 // return a cursor to the period at our sync position
2090 static RGWPeriodHistory::Cursor get_period_at(rgw::sal::RGWRadosStore* store,
2091 const rgw_meta_sync_info& info,
2092 optional_yield y)
2093 {
2094 if (info.period.empty()) {
2095 // return an empty cursor with error=0
2096 return RGWPeriodHistory::Cursor{};
2097 }
2098
2099 // look for an existing period in our history
2100 auto cursor = store->svc()->mdlog->get_period_history()->lookup(info.realm_epoch);
2101 if (cursor) {
2102 // verify that the period ids match
2103 auto& existing = cursor.get_period().get_id();
2104 if (existing != info.period) {
2105 lderr(store->ctx()) << "ERROR: sync status period=" << info.period
2106 << " does not match period=" << existing
2107 << " in history at realm epoch=" << info.realm_epoch << dendl;
2108 return RGWPeriodHistory::Cursor{-EEXIST};
2109 }
2110 return cursor;
2111 }
2112
2113 // read the period from rados or pull it from the master
2114 RGWPeriod period;
2115 int r = store->svc()->mdlog->pull_period(info.period, period, y);
2116 if (r < 0) {
2117 lderr(store->ctx()) << "ERROR: failed to read period id "
2118 << info.period << ": " << cpp_strerror(r) << dendl;
2119 return RGWPeriodHistory::Cursor{r};
2120 }
2121 // attach the period to our history
2122 cursor = store->svc()->mdlog->get_period_history()->attach(std::move(period), y);
2123 if (!cursor) {
2124 r = cursor.get_error();
2125 lderr(store->ctx()) << "ERROR: failed to read period history back to "
2126 << info.period << ": " << cpp_strerror(r) << dendl;
2127 }
2128 return cursor;
2129 }
2130
2131 int RGWRemoteMetaLog::run_sync(optional_yield y)
2132 {
2133 if (store->svc()->zone->is_meta_master()) {
2134 return 0;
2135 }
2136
2137 int r = 0;
2138
2139 // get shard count and oldest log period from master
2140 rgw_mdlog_info mdlog_info;
2141 for (;;) {
2142 if (going_down) {
2143 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
2144 return 0;
2145 }
2146 r = read_log_info(&mdlog_info);
2147 if (r == -EIO || r == -ENOENT) {
2148 // keep retrying if master isn't alive or hasn't initialized the log
2149 ldpp_dout(dpp, 10) << __func__ << "(): waiting for master.." << dendl;
2150 backoff.backoff_sleep();
2151 continue;
2152 }
2153 backoff.reset();
2154 if (r < 0) {
2155 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2156 return r;
2157 }
2158 break;
2159 }
2160
2161 rgw_meta_sync_status sync_status;
2162 do {
2163 if (going_down) {
2164 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
2165 return 0;
2166 }
2167 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2168 if (r < 0 && r != -ENOENT) {
2169 ldpp_dout(dpp, 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2170 return r;
2171 }
2172
2173 if (!mdlog_info.period.empty()) {
2174 // restart sync if the remote has a period, but:
2175 // a) our status does not, or
2176 // b) our sync period comes before the remote's oldest log period
2177 if (sync_status.sync_info.period.empty() ||
2178 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2179 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
2180 string reason;
2181 if (sync_status.sync_info.period.empty()) {
2182 reason = "period is empty";
2183 } else {
2184 reason = SSTR("sync_info realm epoch is behind: " << sync_status.sync_info.realm_epoch << " < " << mdlog_info.realm_epoch);
2185 }
2186 tn->log(1, "initialize sync (reason: " + reason + ")");
2187 ldpp_dout(dpp, 1) << "epoch=" << sync_status.sync_info.realm_epoch
2188 << " in sync status comes before remote's oldest mdlog epoch="
2189 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2190 }
2191 }
2192
2193 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
2194 ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl;
2195 sync_status.sync_info.num_shards = mdlog_info.num_shards;
2196 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
2197 if (cursor) {
2198 // run full sync, then start incremental from the current period/epoch
2199 sync_status.sync_info.period = cursor.get_period().get_id();
2200 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2201 }
2202 r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2203 if (r == -EBUSY) {
2204 backoff.backoff_sleep();
2205 continue;
2206 }
2207 backoff.reset();
2208 if (r < 0) {
2209 ldpp_dout(dpp, 0) << "ERROR: failed to init sync status r=" << r << dendl;
2210 return r;
2211 }
2212 }
2213 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2214
2215 auto num_shards = sync_status.sync_info.num_shards;
2216 if (num_shards != mdlog_info.num_shards) {
2217 lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2218 return -EINVAL;
2219 }
2220
2221 RGWPeriodHistory::Cursor cursor;
2222 do {
2223 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2224 if (r < 0 && r != -ENOENT) {
2225 tn->log(0, SSTR("ERROR: failed to fetch sync status r=" << r));
2226 return r;
2227 }
2228
2229 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2230 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
2231 tn->log(20, "building full sync maps");
2232 r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers, tn));
2233 if (r == -EBUSY || r == -EAGAIN) {
2234 backoff.backoff_sleep();
2235 continue;
2236 }
2237 backoff.reset();
2238 if (r < 0) {
2239 tn->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r << ")"));
2240 return r;
2241 }
2242
2243 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2244 r = store_sync_info(sync_status.sync_info);
2245 if (r < 0) {
2246 tn->log(0, SSTR("ERROR: failed to update sync status (r=" << r << ")"));
2247 return r;
2248 }
2249 /* fall through */
2250 case rgw_meta_sync_info::StateSync:
2251 tn->log(20, "sync");
2252 // find our position in the period history (if any)
2253 cursor = get_period_at(store, sync_status.sync_info, y);
2254 r = cursor.get_error();
2255 if (r < 0) {
2256 return r;
2257 }
2258 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
2259 r = run(meta_sync_cr);
2260 if (r < 0) {
2261 tn->log(0, "ERROR: failed to fetch all metadata keys");
2262 return r;
2263 }
2264 break;
2265 default:
2266 tn->log(0, "ERROR: bad sync state!");
2267 return -EIO;
2268 }
2269 } while (!going_down);
2270
2271 return 0;
2272 }
2273
2274 void RGWRemoteMetaLog::wakeup(int shard_id)
2275 {
2276 if (!meta_sync_cr) {
2277 return;
2278 }
2279 meta_sync_cr->wakeup(shard_id);
2280 }
2281
2282 int RGWCloneMetaLogCoroutine::operate()
2283 {
2284 reenter(this) {
2285 do {
2286 yield {
2287 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
2288 return state_init();
2289 }
2290 yield {
2291 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
2292 return state_read_shard_status();
2293 }
2294 yield {
2295 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
2296 return state_read_shard_status_complete();
2297 }
2298 yield {
2299 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
2300 return state_send_rest_request();
2301 }
2302 yield {
2303 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
2304 return state_receive_rest_response();
2305 }
2306 yield {
2307 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
2308 return state_store_mdlog_entries();
2309 }
2310 } while (truncated);
2311 yield {
2312 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
2313 return state_store_mdlog_entries_complete();
2314 }
2315 }
2316
2317 return 0;
2318 }
2319
2320 int RGWCloneMetaLogCoroutine::state_init()
2321 {
2322 data = rgw_mdlog_shard_data();
2323
2324 return 0;
2325 }
2326
2327 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2328 {
2329 const bool add_ref = false; // default constructs with refs=1
2330
2331 completion.reset(new RGWMetadataLogInfoCompletion(
2332 [this](int ret, const cls_log_header& header) {
2333 if (ret < 0) {
2334 if (ret != -ENOENT) {
2335 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to read mdlog info with "
2336 << cpp_strerror(ret) << dendl;
2337 }
2338 } else {
2339 shard_info.marker = header.max_marker;
2340 shard_info.last_update = header.max_time.to_real_time();
2341 }
2342 // wake up parent stack
2343 io_complete();
2344 }), add_ref);
2345
2346 int ret = mdlog->get_info_async(shard_id, completion.get());
2347 if (ret < 0) {
2348 ldpp_dout(sync_env->dpp, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
2349 return set_cr_error(ret);
2350 }
2351
2352 return io_block(0);
2353 }
2354
2355 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2356 {
2357 completion.reset();
2358
2359 ldpp_dout(sync_env->dpp, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
2360
2361 marker = shard_info.marker;
2362
2363 return 0;
2364 }
2365
2366 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2367 {
2368 RGWRESTConn *conn = sync_env->conn;
2369
2370 char buf[32];
2371 snprintf(buf, sizeof(buf), "%d", shard_id);
2372
2373 char max_entries_buf[32];
2374 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2375
2376 const char *marker_key = (marker.empty() ? "" : "marker");
2377
2378 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2379 { "id", buf },
2380 { "period", period.c_str() },
2381 { "max-entries", max_entries_buf },
2382 { marker_key, marker.c_str() },
2383 { NULL, NULL } };
2384
2385 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2386
2387 init_new_io(http_op);
2388
2389 int ret = http_op->aio_read();
2390 if (ret < 0) {
2391 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
2392 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2393 http_op->put();
2394 http_op = NULL;
2395 return set_cr_error(ret);
2396 }
2397
2398 return io_block(0);
2399 }
2400
2401 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2402 {
2403 int ret = http_op->wait(&data, null_yield);
2404 if (ret < 0) {
2405 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
2406 ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;
2407 http_op->put();
2408 http_op = NULL;
2409 return set_cr_error(ret);
2410 }
2411 http_op->put();
2412 http_op = NULL;
2413
2414 ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
2415
2416 truncated = ((int)data.entries.size() == max_entries);
2417
2418 if (data.entries.empty()) {
2419 if (new_marker) {
2420 *new_marker = marker;
2421 }
2422 return set_cr_done();
2423 }
2424
2425 if (new_marker) {
2426 *new_marker = data.entries.back().id;
2427 }
2428
2429 return 0;
2430 }
2431
2432
2433 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2434 {
2435 list<cls_log_entry> dest_entries;
2436
2437 vector<rgw_mdlog_entry>::iterator iter;
2438 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2439 rgw_mdlog_entry& entry = *iter;
2440 ldpp_dout(sync_env->dpp, 20) << "entry: name=" << entry.name << dendl;
2441
2442 cls_log_entry dest_entry;
2443 dest_entry.id = entry.id;
2444 dest_entry.section = entry.section;
2445 dest_entry.name = entry.name;
2446 dest_entry.timestamp = utime_t(entry.timestamp);
2447
2448 encode(entry.log_data, dest_entry.data);
2449
2450 dest_entries.push_back(dest_entry);
2451
2452 marker = entry.id;
2453 }
2454
2455 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2456
2457 int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
2458 if (ret < 0) {
2459 cn->put();
2460 ldpp_dout(sync_env->dpp, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
2461 return set_cr_error(ret);
2462 }
2463 return io_block(0);
2464 }
2465
2466 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2467 {
2468 return set_cr_done();
2469 }