]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync.cc
86a72d1740f9aac5d147ac05ad00af8c03e1353c
[ceph.git] / ceph / src / rgw / rgw_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <boost/optional.hpp>
5
6 #include "common/RefCountedObj.h"
7 #include "common/WorkQueue.h"
8 #include "common/Throttle.h"
9 #include "common/admin_socket.h"
10 #include "common/errno.h"
11
12 #include "common/ceph_json.h"
13 #include "common/Formatter.h"
14
15 #include "rgw_common.h"
16 #include "rgw_zone.h"
17 #include "rgw_sync.h"
18 #include "rgw_metadata.h"
19 #include "rgw_mdlog_types.h"
20 #include "rgw_rest_conn.h"
21 #include "rgw_tools.h"
22 #include "rgw_cr_rados.h"
23 #include "rgw_cr_rest.h"
24 #include "rgw_http_client.h"
25 #include "rgw_sync_trace.h"
26
27 #include "cls/lock/cls_lock_client.h"
28
29 #include "services/svc_zone.h"
30 #include "services/svc_mdlog.h"
31 #include "services/svc_meta.h"
32 #include "services/svc_cls.h"
33
34 #include <boost/asio/yield.hpp>
35
36 #define dout_subsys ceph_subsys_rgw
37
38 #undef dout_prefix
39 #define dout_prefix (*_dout << "meta sync: ")
40
41 using namespace std;
42
43 static string mdlog_sync_status_oid = "mdlog.sync-status";
44 static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
45 static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
46
47 RGWContinuousLeaseCR::~RGWContinuousLeaseCR() {}
48
49 RGWSyncErrorLogger::RGWSyncErrorLogger(rgw::sal::RadosStore* _store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
50 for (int i = 0; i < num_shards; i++) {
51 oids.push_back(get_shard_oid(oid_prefix, i));
52 }
53 }
54 string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
55 char buf[oid_prefix.size() + 16];
56 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
57 return string(buf);
58 }
59
60 RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const DoutPrefixProvider *dpp, const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
61 cls_log_entry entry;
62
63 rgw_sync_error_info info(source_zone, error_code, message);
64 bufferlist bl;
65 encode(info, bl);
66 store->svc()->cls->timelog.prepare_entry(entry, real_clock::now(), section, name, bl);
67
68 uint32_t shard_id = ++counter % num_shards;
69
70
71 return new RGWRadosTimelogAddCR(dpp, store, oids[shard_id], entry);
72 }
73
74 void RGWSyncBackoff::update_wait_time()
75 {
76 if (cur_wait == 0) {
77 cur_wait = 1;
78 } else {
79 cur_wait = (cur_wait << 1);
80 }
81 if (cur_wait >= max_secs) {
82 cur_wait = max_secs;
83 }
84 }
85
86 void RGWSyncBackoff::backoff_sleep()
87 {
88 update_wait_time();
89 sleep(cur_wait);
90 }
91
92 void RGWSyncBackoff::backoff(RGWCoroutine *op)
93 {
94 update_wait_time();
95 op->wait(utime_t(cur_wait, 0));
96 }
97
98 int RGWBackoffControlCR::operate(const DoutPrefixProvider *dpp) {
99 reenter(this) {
100 // retry the operation until it succeeds
101 while (true) {
102 yield {
103 std::lock_guard l{lock};
104 cr = alloc_cr();
105 cr->get();
106 call(cr);
107 }
108 {
109 std::lock_guard l{lock};
110 cr->put();
111 cr = NULL;
112 }
113 if (retcode >= 0) {
114 break;
115 }
116 if (retcode != -EBUSY && retcode != -EAGAIN) {
117 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
118 if (exit_on_error) {
119 return set_cr_error(retcode);
120 }
121 }
122 if (reset_backoff) {
123 backoff.reset();
124 }
125 yield backoff.backoff(this);
126 }
127
128 // run an optional finisher
129 yield call(alloc_finisher_cr());
130 if (retcode < 0) {
131 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
132 return set_cr_error(retcode);
133 }
134 return set_cr_done();
135 }
136 return 0;
137 }
138
139 void rgw_mdlog_info::decode_json(JSONObj *obj) {
140 JSONDecoder::decode_json("num_objects", num_shards, obj);
141 JSONDecoder::decode_json("period", period, obj);
142 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
143 }
144
145 void rgw_mdlog_entry::decode_json(JSONObj *obj) {
146 JSONDecoder::decode_json("id", id, obj);
147 JSONDecoder::decode_json("section", section, obj);
148 JSONDecoder::decode_json("name", name, obj);
149 utime_t ut;
150 JSONDecoder::decode_json("timestamp", ut, obj);
151 timestamp = ut.to_real_time();
152 JSONDecoder::decode_json("data", log_data, obj);
153 }
154
155 void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
156 JSONDecoder::decode_json("marker", marker, obj);
157 JSONDecoder::decode_json("truncated", truncated, obj);
158 JSONDecoder::decode_json("entries", entries, obj);
159 };
160
161 int RGWShardCollectCR::operate(const DoutPrefixProvider *dpp) {
162 reenter(this) {
163 while (spawn_next()) {
164 current_running++;
165
166 while (current_running >= max_concurrent) {
167 int child_ret;
168 yield wait_for_child();
169 if (collect_next(&child_ret)) {
170 current_running--;
171 if (child_ret < 0 && child_ret != -ENOENT) {
172 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
173 status = child_ret;
174 }
175 }
176 }
177 }
178 while (current_running > 0) {
179 int child_ret;
180 yield wait_for_child();
181 if (collect_next(&child_ret)) {
182 current_running--;
183 if (child_ret < 0 && child_ret != -ENOENT) {
184 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
185 status = child_ret;
186 }
187 }
188 }
189 if (status < 0) {
190 return set_cr_error(status);
191 }
192 return set_cr_done();
193 }
194 return 0;
195 }
196
197 class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
198 RGWMetaSyncEnv *sync_env;
199
200 const std::string& period;
201 int num_shards;
202 map<int, RGWMetadataLogInfo> *mdlog_info;
203
204 int shard_id;
205 #define READ_MDLOG_MAX_CONCURRENT 10
206
207 public:
208 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
209 const std::string& period, int _num_shards,
210 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
211 sync_env(_sync_env),
212 period(period), num_shards(_num_shards),
213 mdlog_info(_mdlog_info), shard_id(0) {}
214 bool spawn_next() override;
215 };
216
217 class RGWListRemoteMDLogCR : public RGWShardCollectCR {
218 RGWMetaSyncEnv *sync_env;
219
220 const std::string& period;
221 map<int, string> shards;
222 int max_entries_per_shard;
223 map<int, rgw_mdlog_shard_data> *result;
224
225 map<int, string>::iterator iter;
226 #define READ_MDLOG_MAX_CONCURRENT 10
227
228 public:
229 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
230 const std::string& period, map<int, string>& _shards,
231 int _max_entries_per_shard,
232 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
233 sync_env(_sync_env), period(period),
234 max_entries_per_shard(_max_entries_per_shard),
235 result(_result) {
236 shards.swap(_shards);
237 iter = shards.begin();
238 }
239 bool spawn_next() override;
240 };
241
242 int RGWRemoteMetaLog::read_log_info(const DoutPrefixProvider *dpp, rgw_mdlog_info *log_info)
243 {
244 rgw_http_param_pair pairs[] = { { "type", "metadata" },
245 { NULL, NULL } };
246
247 int ret = conn->get_json_resource(dpp, "/admin/log", pairs, null_yield, *log_info);
248 if (ret < 0) {
249 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl;
250 return ret;
251 }
252
253 ldpp_dout(dpp, 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
254
255 return 0;
256 }
257
258 int RGWRemoteMetaLog::read_master_log_shards_info(const DoutPrefixProvider *dpp, const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
259 {
260 if (store->svc()->zone->is_meta_master()) {
261 return 0;
262 }
263
264 rgw_mdlog_info log_info;
265 int ret = read_log_info(dpp, &log_info);
266 if (ret < 0) {
267 return ret;
268 }
269
270 return run(dpp, new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
271 }
272
273 int RGWRemoteMetaLog::read_master_log_shards_next(const DoutPrefixProvider *dpp, const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
274 {
275 if (store->svc()->zone->is_meta_master()) {
276 return 0;
277 }
278
279 return run(dpp, new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
280 }
281
282 int RGWRemoteMetaLog::init()
283 {
284 conn = store->svc()->zone->get_master_conn();
285
286 int ret = http_manager.start();
287 if (ret < 0) {
288 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
289 return ret;
290 }
291
292 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
293
294 init_sync_env(&sync_env);
295
296 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "meta");
297
298 return 0;
299 }
300
301 void RGWRemoteMetaLog::finish()
302 {
303 going_down = true;
304 stop();
305 }
306
307 #define CLONE_MAX_ENTRIES 100
308
309 int RGWMetaSyncStatusManager::init(const DoutPrefixProvider *dpp)
310 {
311 if (store->svc()->zone->is_meta_master()) {
312 return 0;
313 }
314
315 if (!store->svc()->zone->get_master_conn()) {
316 ldpp_dout(dpp, -1) << "no REST connection to master zone" << dendl;
317 return -EIO;
318 }
319
320 int r = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), store->svc()->zone->get_zone_params().log_pool, ioctx, true);
321 if (r < 0) {
322 ldpp_dout(dpp, -1) << "ERROR: failed to open log pool (" << store->svc()->zone->get_zone_params().log_pool << " ret=" << r << dendl;
323 return r;
324 }
325
326 r = master_log.init();
327 if (r < 0) {
328 ldpp_dout(dpp, -1) << "ERROR: failed to init remote log, r=" << r << dendl;
329 return r;
330 }
331
332 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
333
334 rgw_meta_sync_status sync_status;
335 r = read_sync_status(dpp, &sync_status);
336 if (r < 0 && r != -ENOENT) {
337 ldpp_dout(dpp, -1) << "ERROR: failed to read sync status, r=" << r << dendl;
338 return r;
339 }
340
341 int num_shards = sync_status.sync_info.num_shards;
342
343 for (int i = 0; i < num_shards; i++) {
344 shard_objs[i] = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.shard_obj_name(i));
345 }
346
347 std::unique_lock wl{ts_to_shard_lock};
348 for (int i = 0; i < num_shards; i++) {
349 clone_markers.push_back(string());
350 utime_shard ut;
351 ut.shard_id = i;
352 ts_to_shard[ut] = i;
353 }
354
355 return 0;
356 }
357
358 unsigned RGWMetaSyncStatusManager::get_subsys() const
359 {
360 return dout_subsys;
361 }
362
363 std::ostream& RGWMetaSyncStatusManager::gen_prefix(std::ostream& out) const
364 {
365 return out << "meta sync: ";
366 }
367
368 void RGWMetaSyncEnv::init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RadosStore* _store, RGWRESTConn *_conn,
369 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
370 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer) {
371 dpp = _dpp;
372 cct = _cct;
373 store = _store;
374 conn = _conn;
375 async_rados = _async_rados;
376 http_manager = _http_manager;
377 error_logger = _error_logger;
378 sync_tracer = _sync_tracer;
379 }
380
381 string RGWMetaSyncEnv::status_oid()
382 {
383 return mdlog_sync_status_oid;
384 }
385
386 string RGWMetaSyncEnv::shard_obj_name(int shard_id)
387 {
388 char buf[mdlog_sync_status_shard_prefix.size() + 16];
389 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
390
391 return string(buf);
392 }
393
394 class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
395 const DoutPrefixProvider *dpp;
396 rgw::sal::RadosStore* store;
397 RGWMetadataLog *mdlog;
398 int shard_id;
399 int max_entries;
400
401 protected:
402 int _send_request(const DoutPrefixProvider *dpp) override {
403 real_time from_time;
404 real_time end_time;
405
406 void *handle;
407
408 mdlog->init_list_entries(shard_id, from_time, end_time, marker, &handle);
409
410 int ret = mdlog->list_entries(dpp, handle, max_entries, entries, &marker, &truncated);
411
412 mdlog->complete_list_entries(handle);
413
414 return ret;
415 }
416 public:
417 string marker;
418 list<cls_log_entry> entries;
419 bool truncated;
420
421 RGWAsyncReadMDLogEntries(const DoutPrefixProvider *dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
422 RGWMetadataLog* mdlog, int _shard_id,
423 std::string _marker, int _max_entries)
424 : RGWAsyncRadosRequest(caller, cn), dpp(dpp), store(_store), mdlog(mdlog),
425 shard_id(_shard_id), max_entries(_max_entries), marker(std::move(_marker)) {}
426 };
427
428 class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
429 RGWMetaSyncEnv *sync_env;
430 RGWMetadataLog *const mdlog;
431 int shard_id;
432 string marker;
433 string *pmarker;
434 int max_entries;
435 list<cls_log_entry> *entries;
436 bool *truncated;
437
438 RGWAsyncReadMDLogEntries *req{nullptr};
439
440 public:
441 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
442 int _shard_id, string*_marker, int _max_entries,
443 list<cls_log_entry> *_entries, bool *_truncated)
444 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
445 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
446 entries(_entries), truncated(_truncated) {}
447
448 ~RGWReadMDLogEntriesCR() override {
449 if (req) {
450 req->finish();
451 }
452 }
453
454 int send_request(const DoutPrefixProvider *dpp) override {
455 marker = *pmarker;
456 req = new RGWAsyncReadMDLogEntries(dpp, this, stack->create_completion_notifier(),
457 sync_env->store, mdlog, shard_id, marker,
458 max_entries);
459 sync_env->async_rados->queue(req);
460 return 0;
461 }
462
463 int request_complete() override {
464 *pmarker = std::move(req->marker);
465 *entries = std::move(req->entries);
466 *truncated = req->truncated;
467 return req->get_ret_status();
468 }
469 };
470
471
472 class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
473 RGWMetaSyncEnv *env;
474 RGWRESTReadResource *http_op;
475
476 const std::string& period;
477 int shard_id;
478 RGWMetadataLogInfo *shard_info;
479
480 public:
481 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
482 int _shard_id, RGWMetadataLogInfo *_shard_info)
483 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
484 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
485
486 int operate(const DoutPrefixProvider *dpp) override {
487 auto store = env->store;
488 RGWRESTConn *conn = store->svc()->zone->get_master_conn();
489 reenter(this) {
490 yield {
491 char buf[16];
492 snprintf(buf, sizeof(buf), "%d", shard_id);
493 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
494 { "id", buf },
495 { "period", period.c_str() },
496 { "info" , NULL },
497 { NULL, NULL } };
498
499 string p = "/admin/log/";
500
501 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
502 env->http_manager);
503
504 init_new_io(http_op);
505
506 int ret = http_op->aio_read(dpp);
507 if (ret < 0) {
508 ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
509 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
510 http_op->put();
511 return set_cr_error(ret);
512 }
513
514 return io_block(0);
515 }
516 yield {
517 int ret = http_op->wait(shard_info, null_yield);
518 http_op->put();
519 if (ret < 0) {
520 return set_cr_error(ret);
521 }
522 return set_cr_done();
523 }
524 }
525 return 0;
526 }
527 };
528
529 RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
530 const std::string& period,
531 int shard_id,
532 RGWMetadataLogInfo* info)
533 {
534 return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info);
535 }
536
537 class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
538 RGWMetaSyncEnv *sync_env;
539 RGWRESTReadResource *http_op;
540
541 const std::string& period;
542 int shard_id;
543 string marker;
544 uint32_t max_entries;
545 rgw_mdlog_shard_data *result;
546
547 public:
548 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
549 int _shard_id, const string& _marker, uint32_t _max_entries,
550 rgw_mdlog_shard_data *_result)
551 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
552 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
553
554 int send_request(const DoutPrefixProvider *dpp) override {
555 RGWRESTConn *conn = sync_env->conn;
556
557 char buf[32];
558 snprintf(buf, sizeof(buf), "%d", shard_id);
559
560 char max_entries_buf[32];
561 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
562
563 const char *marker_key = (marker.empty() ? "" : "marker");
564
565 rgw_http_param_pair pairs[] = { { "type", "metadata" },
566 { "id", buf },
567 { "period", period.c_str() },
568 { "max-entries", max_entries_buf },
569 { marker_key, marker.c_str() },
570 { NULL, NULL } };
571
572 string p = "/admin/log/";
573
574 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
575 init_new_io(http_op);
576
577 int ret = http_op->aio_read(dpp);
578 if (ret < 0) {
579 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
580 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
581 http_op->put();
582 return ret;
583 }
584
585 return 0;
586 }
587
588 int request_complete() override {
589 int ret = http_op->wait(result, null_yield);
590 http_op->put();
591 if (ret < 0 && ret != -ENOENT) {
592 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
593 return ret;
594 }
595 return 0;
596 }
597 };
598
599 RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
600 const std::string& period,
601 int shard_id,
602 const std::string& marker,
603 uint32_t max_entries,
604 rgw_mdlog_shard_data *result)
605 {
606 return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker,
607 max_entries, result);
608 }
609
610 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
611 if (shard_id >= num_shards) {
612 return false;
613 }
614 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
615 shard_id++;
616 return true;
617 }
618
619 bool RGWListRemoteMDLogCR::spawn_next() {
620 if (iter == shards.end()) {
621 return false;
622 }
623
624 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
625 ++iter;
626 return true;
627 }
628
629 class RGWInitSyncStatusCoroutine : public RGWCoroutine {
630 RGWMetaSyncEnv *sync_env;
631
632 rgw_meta_sync_info status;
633 vector<RGWMetadataLogInfo> shards_info;
634 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
635 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
636 public:
637 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
638 const rgw_meta_sync_info &status)
639 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
640 status(status), shards_info(status.num_shards),
641 lease_cr(nullptr), lease_stack(nullptr) {}
642
643 ~RGWInitSyncStatusCoroutine() override {
644 if (lease_cr) {
645 lease_cr->abort();
646 }
647 }
648
649 int operate(const DoutPrefixProvider *dpp) override {
650 int ret;
651 reenter(this) {
652 yield {
653 set_status("acquiring sync lock");
654 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
655 string lock_name = "sync_lock";
656 rgw::sal::RadosStore* store = sync_env->store;
657 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
658 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
659 lock_name, lock_duration, this));
660 lease_stack.reset(spawn(lease_cr.get(), false));
661 }
662 while (!lease_cr->is_locked()) {
663 if (lease_cr->is_done()) {
664 ldpp_dout(dpp, 5) << "lease cr failed, done early " << dendl;
665 set_status("lease lock failed, early abort");
666 return set_cr_error(lease_cr->get_ret_status());
667 }
668 set_sleeping(true);
669 yield;
670 }
671 yield {
672 set_status("writing sync status");
673 rgw::sal::RadosStore* store = sync_env->store;
674 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
675 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
676 status));
677 }
678
679 if (retcode < 0) {
680 set_status("failed to write sync status");
681 ldpp_dout(dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
682 yield lease_cr->go_down();
683 return set_cr_error(retcode);
684 }
685 /* fetch current position in logs */
686 set_status("fetching remote log position");
687 yield {
688 for (int i = 0; i < (int)status.num_shards; i++) {
689 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
690 &shards_info[i]), false);
691 }
692 }
693
694 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
695
696 yield {
697 set_status("updating sync status");
698 for (int i = 0; i < (int)status.num_shards; i++) {
699 rgw_meta_sync_marker marker;
700 RGWMetadataLogInfo& info = shards_info[i];
701 marker.next_step_marker = info.marker;
702 marker.timestamp = info.last_update;
703 rgw::sal::RadosStore* store = sync_env->store;
704 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp,
705 sync_env->async_rados,
706 store->svc()->sysobj,
707 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
708 marker), true);
709 }
710 }
711 yield {
712 set_status("changing sync state: build full sync maps");
713 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
714 rgw::sal::RadosStore* store = sync_env->store;
715 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
716 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
717 status));
718 }
719 set_status("drop lock lease");
720 yield lease_cr->go_down();
721 while (collect(&ret, NULL)) {
722 if (ret < 0) {
723 return set_cr_error(ret);
724 }
725 yield;
726 }
727 drain_all();
728 return set_cr_done();
729 }
730 return 0;
731 }
732 };
733
734 class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
735 static constexpr int MAX_CONCURRENT_SHARDS = 16;
736
737 RGWMetaSyncEnv *env;
738 const int num_shards;
739 int shard_id{0};
740 map<uint32_t, rgw_meta_sync_marker>& markers;
741
742 public:
743 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
744 map<uint32_t, rgw_meta_sync_marker>& markers)
745 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
746 env(env), num_shards(num_shards), markers(markers)
747 {}
748 bool spawn_next() override;
749 };
750
751 bool RGWReadSyncStatusMarkersCR::spawn_next()
752 {
753 if (shard_id >= num_shards) {
754 return false;
755 }
756 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
757 rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool,
758 env->shard_obj_name(shard_id)};
759 spawn(new CR(env->dpp, env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false);
760 shard_id++;
761 return true;
762 }
763
764 class RGWReadSyncStatusCoroutine : public RGWCoroutine {
765 RGWMetaSyncEnv *sync_env;
766 rgw_meta_sync_status *sync_status;
767
768 public:
769 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
770 rgw_meta_sync_status *_status)
771 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
772 {}
773 int operate(const DoutPrefixProvider *dpp) override;
774 };
775
776 int RGWReadSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
777 {
778 reenter(this) {
779 // read sync info
780 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
781 yield {
782 bool empty_on_enoent = false; // fail on ENOENT
783 rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool,
784 sync_env->status_oid()};
785 call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, obj,
786 &sync_status->sync_info, empty_on_enoent));
787 }
788 if (retcode < 0) {
789 ldpp_dout(dpp, 4) << "failed to read sync status info with "
790 << cpp_strerror(retcode) << dendl;
791 return set_cr_error(retcode);
792 }
793 // read shard markers
794 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
795 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
796 sync_status->sync_markers));
797 if (retcode < 0) {
798 ldpp_dout(dpp, 4) << "failed to read sync status markers with "
799 << cpp_strerror(retcode) << dendl;
800 return set_cr_error(retcode);
801 }
802 return set_cr_done();
803 }
804 return 0;
805 }
806
807 class RGWFetchAllMetaCR : public RGWCoroutine {
808 RGWMetaSyncEnv *sync_env;
809
810 int num_shards;
811
812
813 int ret_status;
814
815 list<string> sections;
816 list<string>::iterator sections_iter;
817
818 struct meta_list_result {
819 list<string> keys;
820 string marker;
821 uint64_t count{0};
822 bool truncated{false};
823
824 void decode_json(JSONObj *obj) {
825 JSONDecoder::decode_json("keys", keys, obj);
826 JSONDecoder::decode_json("marker", marker, obj);
827 JSONDecoder::decode_json("count", count, obj);
828 JSONDecoder::decode_json("truncated", truncated, obj);
829 }
830 } result;
831 list<string>::iterator iter;
832
833 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
834
835 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
836 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
837 bool lost_lock;
838 bool failed;
839
840 string marker;
841
842 map<uint32_t, rgw_meta_sync_marker>& markers;
843
844 RGWSyncTraceNodeRef tn;
845
846 public:
847 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
848 map<uint32_t, rgw_meta_sync_marker>& _markers,
849 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
850 num_shards(_num_shards),
851 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
852 lost_lock(false), failed(false), markers(_markers) {
853 tn = sync_env->sync_tracer->add_node(_tn_parent, "fetch_all_meta");
854 }
855
856 ~RGWFetchAllMetaCR() override {
857 }
858
859 void append_section_from_set(set<string>& all_sections, const string& name) {
860 set<string>::iterator iter = all_sections.find(name);
861 if (iter != all_sections.end()) {
862 sections.emplace_back(std::move(*iter));
863 all_sections.erase(iter);
864 }
865 }
866 /*
867 * meta sync should go in the following order: user, bucket.instance, bucket
868 * then whatever other sections exist (if any)
869 */
870 void rearrange_sections() {
871 set<string> all_sections;
872 std::move(sections.begin(), sections.end(),
873 std::inserter(all_sections, all_sections.end()));
874 sections.clear();
875
876 append_section_from_set(all_sections, "user");
877 append_section_from_set(all_sections, "bucket.instance");
878 append_section_from_set(all_sections, "bucket");
879
880 std::move(all_sections.begin(), all_sections.end(),
881 std::back_inserter(sections));
882 }
883
884 int operate(const DoutPrefixProvider *dpp) override {
885 RGWRESTConn *conn = sync_env->conn;
886
887 reenter(this) {
888 yield {
889 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
890 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
891 string lock_name = "sync_lock";
892 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
893 sync_env->store,
894 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
895 lock_name, lock_duration, this));
896 lease_stack.reset(spawn(lease_cr.get(), false));
897 }
898 while (!lease_cr->is_locked()) {
899 if (lease_cr->is_done()) {
900 ldpp_dout(dpp, 5) << "lease cr failed, done early " << dendl;
901 set_status("failed acquiring lock");
902 return set_cr_error(lease_cr->get_ret_status());
903 }
904 set_sleeping(true);
905 yield;
906 }
907 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
908 sync_env->store->svc()->zone->get_zone_params().log_pool,
909 mdlog_sync_full_sync_index_prefix));
910 yield {
911 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
912 "/admin/metadata", NULL, &sections));
913 }
914 if (get_ret_status() < 0) {
915 ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata sections" << dendl;
916 yield entries_index->finish();
917 yield lease_cr->go_down();
918 drain_all();
919 return set_cr_error(get_ret_status());
920 }
921 rearrange_sections();
922 sections_iter = sections.begin();
923 for (; sections_iter != sections.end(); ++sections_iter) {
924 do {
925 yield {
926 #define META_FULL_SYNC_CHUNK_SIZE "1000"
927 string entrypoint = string("/admin/metadata/") + *sections_iter;
928 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
929 { "marker", result.marker.c_str() },
930 { NULL, NULL } };
931 result.keys.clear();
932 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
933 entrypoint, pairs, &result));
934 }
935 ret_status = get_ret_status();
936 if (ret_status == -ENOENT) {
937 set_retcode(0); /* reset coroutine status so that we don't return it */
938 ret_status = 0;
939 }
940 if (ret_status < 0) {
941 tn->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter));
942 yield entries_index->finish();
943 yield lease_cr->go_down();
944 drain_all();
945 return set_cr_error(ret_status);
946 }
947 iter = result.keys.begin();
948 for (; iter != result.keys.end(); ++iter) {
949 if (!lease_cr->is_locked()) {
950 lost_lock = true;
951 break;
952 }
953 yield; // allow entries_index consumer to make progress
954
955 tn->log(20, SSTR("list metadata: section=" << *sections_iter << " key=" << *iter));
956 string s = *sections_iter + ":" + *iter;
957 int shard_id;
958 rgw::sal::RadosStore* store = sync_env->store;
959 int ret = store->ctl()->meta.mgr->get_shard_id(*sections_iter, *iter, &shard_id);
960 if (ret < 0) {
961 tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter));
962 ret_status = ret;
963 break;
964 }
965 if (!entries_index->append(s, shard_id)) {
966 break;
967 }
968 }
969 } while (result.truncated);
970 }
971 yield {
972 if (!entries_index->finish()) {
973 failed = true;
974 }
975 }
976 if (!failed) {
977 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
978 int shard_id = (int)iter->first;
979 rgw_meta_sync_marker& marker = iter->second;
980 marker.total_entries = entries_index->get_total_entries(shard_id);
981 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
982 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
983 marker), true);
984 }
985 }
986
987 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
988
989 yield lease_cr->go_down();
990
991 int ret;
992 while (collect(&ret, NULL)) {
993 if (ret < 0) {
994 return set_cr_error(ret);
995 }
996 yield;
997 }
998 drain_all();
999 if (failed) {
1000 yield return set_cr_error(-EIO);
1001 }
1002 if (lost_lock) {
1003 yield return set_cr_error(-EBUSY);
1004 }
1005
1006 if (ret_status < 0) {
1007 yield return set_cr_error(ret_status);
1008 }
1009
1010 yield return set_cr_done();
1011 }
1012 return 0;
1013 }
1014 };
1015
1016 static string full_sync_index_shard_oid(int shard_id)
1017 {
1018 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
1019 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
1020 return string(buf);
1021 }
1022
1023 class RGWReadRemoteMetadataCR : public RGWCoroutine {
1024 RGWMetaSyncEnv *sync_env;
1025
1026 RGWRESTReadResource *http_op;
1027
1028 string section;
1029 string key;
1030
1031 bufferlist *pbl;
1032
1033 RGWSyncTraceNodeRef tn;
1034
1035 public:
1036 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
1037 const string& _section, const string& _key, bufferlist *_pbl,
1038 const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1039 http_op(NULL),
1040 section(_section),
1041 key(_key),
1042 pbl(_pbl) {
1043 tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta",
1044 section + ":" + key);
1045 }
1046
1047 int operate(const DoutPrefixProvider *dpp) override {
1048 RGWRESTConn *conn = sync_env->conn;
1049 reenter(this) {
1050 yield {
1051 string key_encode;
1052 url_encode(key, key_encode);
1053 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
1054 { NULL, NULL } };
1055
1056 string p = string("/admin/metadata/") + section + "/" + key_encode;
1057
1058 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1059
1060 init_new_io(http_op);
1061
1062 int ret = http_op->aio_read(dpp);
1063 if (ret < 0) {
1064 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
1065 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1066 http_op->put();
1067 return set_cr_error(ret);
1068 }
1069
1070 return io_block(0);
1071 }
1072 yield {
1073 int ret = http_op->wait(pbl, null_yield);
1074 http_op->put();
1075 if (ret < 0) {
1076 return set_cr_error(ret);
1077 }
1078 return set_cr_done();
1079 }
1080 }
1081 return 0;
1082 }
1083 };
1084
1085 class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
1086 rgw::sal::RadosStore* store;
1087 string raw_key;
1088 bufferlist bl;
1089 const DoutPrefixProvider *dpp;
1090 protected:
1091 int _send_request(const DoutPrefixProvider *dpp) override {
1092 int ret = store->ctl()->meta.mgr->put(raw_key, bl, null_yield, dpp, RGWMDLogSyncType::APPLY_ALWAYS, true);
1093 if (ret < 0) {
1094 ldpp_dout(dpp, 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1095 return ret;
1096 }
1097 return 0;
1098 }
1099 public:
1100 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
1101 const string& _raw_key,
1102 bufferlist& _bl,
1103 const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store),
1104 raw_key(_raw_key), bl(_bl), dpp(dpp) {}
1105 };
1106
1107
1108 class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1109 RGWMetaSyncEnv *sync_env;
1110 string raw_key;
1111 bufferlist bl;
1112
1113 RGWAsyncMetaStoreEntry *req;
1114
1115 public:
1116 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1117 const string& _raw_key,
1118 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1119 raw_key(_raw_key), bl(_bl), req(NULL) {
1120 }
1121
1122 ~RGWMetaStoreEntryCR() override {
1123 if (req) {
1124 req->finish();
1125 }
1126 }
1127
1128 int send_request(const DoutPrefixProvider *dpp) override {
1129 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1130 sync_env->store, raw_key, bl, dpp);
1131 sync_env->async_rados->queue(req);
1132 return 0;
1133 }
1134
1135 int request_complete() override {
1136 return req->get_ret_status();
1137 }
1138 };
1139
1140 class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
1141 rgw::sal::RadosStore* store;
1142 string raw_key;
1143 const DoutPrefixProvider *dpp;
1144 protected:
1145 int _send_request(const DoutPrefixProvider *dpp) override {
1146 int ret = store->ctl()->meta.mgr->remove(raw_key, null_yield, dpp);
1147 if (ret < 0) {
1148 ldpp_dout(dpp, 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1149 return ret;
1150 }
1151 return 0;
1152 }
1153 public:
1154 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
1155 const string& _raw_key, const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store),
1156 raw_key(_raw_key), dpp(dpp) {}
1157 };
1158
1159
1160 class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1161 RGWMetaSyncEnv *sync_env;
1162 string raw_key;
1163
1164 RGWAsyncMetaRemoveEntry *req;
1165
1166 public:
1167 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1168 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1169 raw_key(_raw_key), req(NULL) {
1170 }
1171
1172 ~RGWMetaRemoveEntryCR() override {
1173 if (req) {
1174 req->finish();
1175 }
1176 }
1177
1178 int send_request(const DoutPrefixProvider *dpp) override {
1179 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1180 sync_env->store, raw_key, dpp);
1181 sync_env->async_rados->queue(req);
1182 return 0;
1183 }
1184
1185 int request_complete() override {
1186 int r = req->get_ret_status();
1187 if (r == -ENOENT) {
1188 r = 0;
1189 }
1190 return r;
1191 }
1192 };
1193
1194 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1195
1196
1197 int RGWLastCallerWinsCR::operate(const DoutPrefixProvider *dpp) {
1198 RGWCoroutine *call_cr;
1199 reenter(this) {
1200 while (cr) {
1201 call_cr = cr;
1202 cr = nullptr;
1203 yield call(call_cr);
1204 /* cr might have been modified at this point */
1205 }
1206 return set_cr_done();
1207 }
1208 return 0;
1209 }
1210
1211 class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1212 RGWMetaSyncEnv *sync_env;
1213
1214 string marker_oid;
1215 rgw_meta_sync_marker sync_marker;
1216
1217 RGWSyncTraceNodeRef tn;
1218
1219 public:
1220 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1221 const string& _marker_oid,
1222 const rgw_meta_sync_marker& _marker,
1223 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
1224 sync_env(_sync_env),
1225 marker_oid(_marker_oid),
1226 sync_marker(_marker),
1227 tn(_tn){}
1228
1229 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1230 sync_marker.marker = new_marker;
1231 if (index_pos > 0) {
1232 sync_marker.pos = index_pos;
1233 }
1234
1235 if (!real_clock::is_zero(timestamp)) {
1236 sync_marker.timestamp = timestamp;
1237 }
1238
1239 ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1240 tn->log(20, SSTR("new marker=" << new_marker));
1241 rgw::sal::RadosStore* store = sync_env->store;
1242 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados,
1243 store->svc()->sysobj,
1244 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid),
1245 sync_marker);
1246 }
1247
1248 RGWOrderCallCR *allocate_order_control_cr() override {
1249 return new RGWLastCallerWinsCR(sync_env->cct);
1250 }
1251 };
1252
1253 RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
1254 const string& _raw_key, const string& _entry_marker,
1255 const RGWMDLogStatus& _op_status,
1256 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
1257 sync_env(_sync_env),
1258 raw_key(_raw_key), entry_marker(_entry_marker),
1259 op_status(_op_status),
1260 pos(0), sync_status(0),
1261 marker_tracker(_marker_tracker), tries(0) {
1262 error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
1263 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
1264 }
1265
1266 int RGWMetaSyncSingleEntryCR::operate(const DoutPrefixProvider *dpp) {
1267 reenter(this) {
1268 #define NUM_TRANSIENT_ERROR_RETRIES 10
1269
1270 if (error_injection &&
1271 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
1272 return set_cr_error(-EIO);
1273 }
1274
1275 if (op_status != MDLOG_STATUS_COMPLETE) {
1276 tn->log(20, "skipping pending operation");
1277 yield call(marker_tracker->finish(entry_marker));
1278 if (retcode < 0) {
1279 return set_cr_error(retcode);
1280 }
1281 return set_cr_done();
1282 }
1283 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
1284 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1285 yield {
1286 pos = raw_key.find(':');
1287 section = raw_key.substr(0, pos);
1288 key = raw_key.substr(pos + 1);
1289 tn->log(10, SSTR("fetching remote metadata entry" << (tries == 0 ? "" : " (retry)")));
1290 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl, tn));
1291 }
1292
1293 sync_status = retcode;
1294
1295 if (sync_status == -ENOENT) {
1296 /* FIXME: do we need to remove the entry from the local zone? */
1297 break;
1298 }
1299
1300 if (sync_status < 0) {
1301 if (tries < NUM_TRANSIENT_ERROR_RETRIES - 1) {
1302 ldpp_dout(dpp, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
1303 continue;
1304 }
1305
1306 tn->log(10, SSTR("failed to read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status));
1307 log_error() << "failed to read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1308 yield call(sync_env->error_logger->log_error_cr(dpp, sync_env->conn->get_remote_id(), section, key, -sync_status,
1309 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1310 return set_cr_error(sync_status);
1311 }
1312
1313 break;
1314 }
1315
1316 retcode = 0;
1317 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1318 if (sync_status != -ENOENT) {
1319 tn->log(10, SSTR("storing local metadata entry"));
1320 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1321 } else {
1322 tn->log(10, SSTR("removing local metadata entry"));
1323 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1324 }
1325 if ((retcode < 0) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1326 ldpp_dout(dpp, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
1327 continue;
1328 }
1329 break;
1330 }
1331
1332 sync_status = retcode;
1333
1334 if (sync_status == 0 && marker_tracker) {
1335 /* update marker */
1336 yield call(marker_tracker->finish(entry_marker));
1337 sync_status = retcode;
1338 }
1339 if (sync_status < 0) {
1340 tn->log(10, SSTR("failed, status=" << sync_status));
1341 return set_cr_error(sync_status);
1342 }
1343 tn->log(10, "success");
1344 return set_cr_done();
1345 }
1346 return 0;
1347 }
1348
1349 class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1350 RGWMetaSyncEnv *sync_env;
1351 RGWMetadataLog *mdlog;
1352
1353 const std::string& period;
1354 int shard_id;
1355 string marker;
1356 bool truncated = false;
1357 string *new_marker;
1358
1359 int max_entries = CLONE_MAX_ENTRIES;
1360
1361 RGWRESTReadResource *http_op = nullptr;
1362 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1363
1364 RGWMetadataLogInfo shard_info;
1365 rgw_mdlog_shard_data data;
1366
1367 public:
1368 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1369 const std::string& period, int _id,
1370 const string& _marker, string *_new_marker)
1371 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1372 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1373 if (new_marker) {
1374 *new_marker = marker;
1375 }
1376 }
1377 ~RGWCloneMetaLogCoroutine() override {
1378 if (http_op) {
1379 http_op->put();
1380 }
1381 if (completion) {
1382 completion->cancel();
1383 }
1384 }
1385
1386 int operate(const DoutPrefixProvider *dpp) override;
1387
1388 int state_init();
1389 int state_read_shard_status();
1390 int state_read_shard_status_complete();
1391 int state_send_rest_request(const DoutPrefixProvider *dpp);
1392 int state_receive_rest_response();
1393 int state_store_mdlog_entries();
1394 int state_store_mdlog_entries_complete();
1395 };
1396
1397 #define META_SYNC_SPAWN_WINDOW 20
1398
1399 class RGWMetaSyncShardCR : public RGWCoroutine {
1400 RGWMetaSyncEnv *sync_env;
1401
1402 const rgw_pool& pool;
1403 const std::string& period; //< currently syncing period id
1404 const epoch_t realm_epoch; //< realm_epoch of period
1405 RGWMetadataLog* mdlog; //< log of syncing period
1406 uint32_t shard_id;
1407 rgw_meta_sync_marker& sync_marker;
1408 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1409 string marker;
1410 string max_marker;
1411 const std::string& period_marker; //< max marker stored in next period
1412
1413 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
1414 std::set<std::string> entries;
1415 std::set<std::string>::iterator iter;
1416
1417 string oid;
1418
1419 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1420
1421 list<cls_log_entry> log_entries;
1422 list<cls_log_entry>::iterator log_iter;
1423 bool truncated = false;
1424
1425 string mdlog_marker;
1426 string raw_key;
1427 rgw_mdlog_entry mdlog_entry;
1428
1429 ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock");
1430 ceph::condition_variable inc_cond;
1431
1432 boost::asio::coroutine incremental_cr;
1433 boost::asio::coroutine full_cr;
1434
1435 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1436 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1437
1438 bool lost_lock = false;
1439
1440 bool *reset_backoff;
1441
1442 // hold a reference to the cr stack while it's in the map
1443 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1444 map<StackRef, string> stack_to_pos;
1445 map<string, string> pos_to_prev;
1446
1447 bool can_adjust_marker = false;
1448 bool done_with_period = false;
1449
1450 int total_entries = 0;
1451
1452 RGWSyncTraceNodeRef tn;
1453 public:
1454 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1455 const std::string& period, epoch_t realm_epoch,
1456 RGWMetadataLog* mdlog, uint32_t _shard_id,
1457 rgw_meta_sync_marker& _marker,
1458 const std::string& period_marker, bool *_reset_backoff,
1459 RGWSyncTraceNodeRef& _tn)
1460 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1461 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1462 shard_id(_shard_id), sync_marker(_marker),
1463 period_marker(period_marker),
1464 reset_backoff(_reset_backoff), tn(_tn) {
1465 *reset_backoff = false;
1466 }
1467
1468 ~RGWMetaSyncShardCR() override {
1469 delete marker_tracker;
1470 if (lease_cr) {
1471 lease_cr->abort();
1472 }
1473 }
1474
1475 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1476 delete marker_tracker;
1477 marker_tracker = mt;
1478 }
1479
1480 int operate(const DoutPrefixProvider *dpp) override {
1481 int r;
1482 while (true) {
1483 switch (sync_marker.state) {
1484 case rgw_meta_sync_marker::FullSync:
1485 r = full_sync();
1486 if (r < 0) {
1487 ldpp_dout(dpp, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1488 return set_cr_error(r);
1489 }
1490 return 0;
1491 case rgw_meta_sync_marker::IncrementalSync:
1492 r = incremental_sync();
1493 if (r < 0) {
1494 ldpp_dout(dpp, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1495 return set_cr_error(r);
1496 }
1497 return 0;
1498 }
1499 }
1500 /* unreachable */
1501 return 0;
1502 }
1503
1504 void collect_children()
1505 {
1506 int child_ret;
1507 RGWCoroutinesStack *child;
1508 while (collect_next(&child_ret, &child)) {
1509 auto iter = stack_to_pos.find(child);
1510 if (iter == stack_to_pos.end()) {
1511 /* some other stack that we don't care about */
1512 continue;
1513 }
1514
1515 string& pos = iter->second;
1516
1517 if (child_ret < 0) {
1518 ldpp_dout(sync_env->dpp, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
1519 // on any error code from RGWMetaSyncSingleEntryCR, we do not advance
1520 // the sync status marker past this entry, and set
1521 // can_adjust_marker=false to exit out of RGWMetaSyncShardCR.
1522 // RGWMetaSyncShardControlCR will rerun RGWMetaSyncShardCR from the
1523 // previous marker and retry
1524 can_adjust_marker = false;
1525 }
1526
1527 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
1528 ceph_assert(prev_iter != pos_to_prev.end());
1529
1530 if (pos_to_prev.size() == 1) {
1531 if (can_adjust_marker) {
1532 sync_marker.marker = pos;
1533 }
1534 pos_to_prev.erase(prev_iter);
1535 } else {
1536 ceph_assert(pos_to_prev.size() > 1);
1537 pos_to_prev.erase(prev_iter);
1538 prev_iter = pos_to_prev.begin();
1539 if (can_adjust_marker) {
1540 sync_marker.marker = prev_iter->second;
1541 }
1542 }
1543
1544 ldpp_dout(sync_env->dpp, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
1545 stack_to_pos.erase(iter);
1546 }
1547 }
1548
1549 int full_sync() {
1550 #define OMAP_GET_MAX_ENTRIES 100
1551 int max_entries = OMAP_GET_MAX_ENTRIES;
1552 reenter(&full_cr) {
1553 set_status("full_sync");
1554 tn->log(10, "start full sync");
1555 oid = full_sync_index_shard_oid(shard_id);
1556 can_adjust_marker = true;
1557 /* grab lock */
1558 yield {
1559 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1560 string lock_name = "sync_lock";
1561 rgw::sal::RadosStore* store = sync_env->store;
1562 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1563 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1564 lock_name, lock_duration, this));
1565 lease_stack.reset(spawn(lease_cr.get(), false));
1566 lost_lock = false;
1567 }
1568 while (!lease_cr->is_locked()) {
1569 if (lease_cr->is_done()) {
1570 drain_all();
1571 tn->log(5, "failed to take lease");
1572 return lease_cr->get_ret_status();
1573 }
1574 set_sleeping(true);
1575 yield;
1576 }
1577 tn->log(10, "took lease");
1578
1579 /* lock succeeded, a retry now should avoid previous backoff status */
1580 *reset_backoff = true;
1581
1582 /* prepare marker tracker */
1583 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1584 sync_env->shard_obj_name(shard_id),
1585 sync_marker, tn));
1586
1587 marker = sync_marker.marker;
1588
1589 total_entries = sync_marker.pos;
1590
1591 /* sync! */
1592 do {
1593 if (!lease_cr->is_locked()) {
1594 tn->log(10, "lost lease");
1595 lost_lock = true;
1596 break;
1597 }
1598 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1599 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1600 marker, max_entries, omapkeys));
1601 if (retcode < 0) {
1602 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1603 tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
1604 yield lease_cr->go_down();
1605 drain_all();
1606 return retcode;
1607 }
1608 entries = std::move(omapkeys->entries);
1609 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1610 if (entries.size() > 0) {
1611 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1612 }
1613 iter = entries.begin();
1614 for (; iter != entries.end(); ++iter) {
1615 marker = *iter;
1616 tn->log(20, SSTR("full sync: " << marker));
1617 total_entries++;
1618 if (!marker_tracker->start(marker, total_entries, real_time())) {
1619 tn->log(0, SSTR("ERROR: cannot start syncing " << marker << ". Duplicate entry?"));
1620 } else {
1621 // fetch remote and write locally
1622 yield {
1623 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker, tn), false);
1624 // stack_to_pos holds a reference to the stack
1625 stack_to_pos[stack] = marker;
1626 pos_to_prev[marker] = marker;
1627 }
1628 // limit spawn window
1629 while (num_spawned() > META_SYNC_SPAWN_WINDOW) {
1630 yield wait_for_child();
1631 collect_children();
1632 }
1633 }
1634 }
1635 collect_children();
1636 } while (omapkeys->more && can_adjust_marker);
1637
1638 tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1639
1640 while (num_spawned() > 1) {
1641 yield wait_for_child();
1642 collect_children();
1643 }
1644
1645 if (!lost_lock) {
1646 /* update marker to reflect we're done with full sync */
1647 if (can_adjust_marker) {
1648 // apply updates to a temporary marker, or operate() will send us
1649 // to incremental_sync() after we yield
1650 temp_marker = sync_marker;
1651 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1652 temp_marker->marker = std::move(temp_marker->next_step_marker);
1653 temp_marker->next_step_marker.clear();
1654 temp_marker->realm_epoch = realm_epoch;
1655 ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
1656
1657 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
1658 yield call(new WriteMarkerCR(sync_env->dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
1659 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1660 *temp_marker));
1661 }
1662
1663 if (retcode < 0) {
1664 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1665 yield lease_cr->go_down();
1666 drain_all();
1667 return retcode;
1668 }
1669 // clean up full sync index
1670 yield {
1671 auto oid = full_sync_index_shard_oid(shard_id);
1672 call(new RGWRadosRemoveCR(sync_env->store, {pool, oid}));
1673 }
1674 }
1675
1676 /*
1677 * if we reached here, it means that lost_lock is true, otherwise the state
1678 * change in the previous block will prevent us from reaching here
1679 */
1680
1681 yield lease_cr->go_down();
1682
1683 lease_cr.reset();
1684
1685 drain_all();
1686
1687 if (!can_adjust_marker) {
1688 return -EAGAIN;
1689 }
1690
1691 if (lost_lock) {
1692 return -EBUSY;
1693 }
1694
1695 tn->log(10, "full sync complete");
1696
1697 // apply the sync marker update
1698 ceph_assert(temp_marker);
1699 sync_marker = std::move(*temp_marker);
1700 temp_marker = boost::none;
1701 // must not yield after this point!
1702 }
1703 return 0;
1704 }
1705
1706
1707 int incremental_sync() {
1708 reenter(&incremental_cr) {
1709 set_status("incremental_sync");
1710 tn->log(10, "start incremental sync");
1711 can_adjust_marker = true;
1712 /* grab lock */
1713 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1714 yield {
1715 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1716 string lock_name = "sync_lock";
1717 rgw::sal::RadosStore* store = sync_env->store;
1718 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1719 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1720 lock_name, lock_duration, this));
1721 lease_stack.reset(spawn(lease_cr.get(), false));
1722 lost_lock = false;
1723 }
1724 while (!lease_cr->is_locked()) {
1725 if (lease_cr->is_done()) {
1726 drain_all();
1727 tn->log(10, "failed to take lease");
1728 return lease_cr->get_ret_status();
1729 }
1730 set_sleeping(true);
1731 yield;
1732 }
1733 }
1734 tn->log(10, "took lease");
1735 // if the period has advanced, we can't use the existing marker
1736 if (sync_marker.realm_epoch < realm_epoch) {
1737 ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
1738 << " from old realm_epoch=" << sync_marker.realm_epoch
1739 << " (now " << realm_epoch << ')' << dendl;
1740 sync_marker.realm_epoch = realm_epoch;
1741 sync_marker.marker.clear();
1742 }
1743 mdlog_marker = sync_marker.marker;
1744 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1745 sync_env->shard_obj_name(shard_id),
1746 sync_marker, tn));
1747
1748 /*
1749 * mdlog_marker: the remote sync marker positiion
1750 * sync_marker: the local sync marker position
1751 * max_marker: the max mdlog position that we fetched
1752 * marker: the current position we try to sync
1753 * period_marker: the last marker before the next period begins (optional)
1754 */
1755 marker = max_marker = sync_marker.marker;
1756 /* inc sync */
1757 do {
1758 if (!lease_cr->is_locked()) {
1759 lost_lock = true;
1760 tn->log(10, "lost lease");
1761 break;
1762 }
1763 #define INCREMENTAL_MAX_ENTRIES 100
1764 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1765 if (!period_marker.empty() && period_marker <= mdlog_marker) {
1766 tn->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker << " period_marker=" << period_marker));
1767 done_with_period = true;
1768 break;
1769 }
1770 if (mdlog_marker <= max_marker) {
1771 /* we're at the tip, try to bring more entries */
1772 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
1773 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1774 period, shard_id,
1775 mdlog_marker, &mdlog_marker));
1776 }
1777 if (retcode < 0) {
1778 tn->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode));
1779 yield lease_cr->go_down();
1780 drain_all();
1781 *reset_backoff = false; // back off and try again later
1782 return retcode;
1783 }
1784 *reset_backoff = true; /* if we got to this point, all systems function */
1785 if (mdlog_marker > max_marker) {
1786 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1787 tn->log(20, SSTR("mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker));
1788 marker = max_marker;
1789 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1790 &max_marker, INCREMENTAL_MAX_ENTRIES,
1791 &log_entries, &truncated));
1792 if (retcode < 0) {
1793 tn->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode));
1794 yield lease_cr->go_down();
1795 drain_all();
1796 *reset_backoff = false; // back off and try again later
1797 return retcode;
1798 }
1799 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1800 if (!period_marker.empty() && period_marker <= log_iter->id) {
1801 done_with_period = true;
1802 if (period_marker < log_iter->id) {
1803 tn->log(10, SSTR("found key=" << log_iter->id
1804 << " past period_marker=" << period_marker));
1805 break;
1806 }
1807 ldpp_dout(sync_env->dpp, 10) << "found key at period_marker=" << period_marker << dendl;
1808 // sync this entry, then return control to RGWMetaSyncCR
1809 }
1810 if (!mdlog_entry.convert_from(*log_iter)) {
1811 tn->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry"));
1812 continue;
1813 }
1814 tn->log(20, SSTR("log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp));
1815 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
1816 ldpp_dout(sync_env->dpp, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
1817 } else {
1818 raw_key = log_iter->section + ":" + log_iter->name;
1819 yield {
1820 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker, tn), false);
1821 ceph_assert(stack);
1822 // stack_to_pos holds a reference to the stack
1823 stack_to_pos[stack] = log_iter->id;
1824 pos_to_prev[log_iter->id] = marker;
1825 }
1826 // limit spawn window
1827 while (num_spawned() > META_SYNC_SPAWN_WINDOW) {
1828 yield wait_for_child();
1829 collect_children();
1830 }
1831 }
1832 marker = log_iter->id;
1833 }
1834 }
1835 collect_children();
1836 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1837 if (done_with_period) {
1838 // return control to RGWMetaSyncCR and advance to the next period
1839 tn->log(10, SSTR(*this << ": done with period"));
1840 break;
1841 }
1842 if (mdlog_marker == max_marker && can_adjust_marker) {
1843 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1844 #define INCREMENTAL_INTERVAL 20
1845 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1846 }
1847 } while (can_adjust_marker);
1848
1849 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1850
1851 while (num_spawned() > 1) {
1852 yield wait_for_child();
1853 collect_children();
1854 }
1855
1856 yield lease_cr->go_down();
1857
1858 drain_all();
1859
1860 if (lost_lock) {
1861 return -EBUSY;
1862 }
1863
1864 if (!can_adjust_marker) {
1865 return -EAGAIN;
1866 }
1867
1868 return set_cr_done();
1869 }
1870 /* TODO */
1871 return 0;
1872 }
1873 };
1874
1875 class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1876 {
1877 RGWMetaSyncEnv *sync_env;
1878
1879 const rgw_pool& pool;
1880 const std::string& period;
1881 epoch_t realm_epoch;
1882 RGWMetadataLog* mdlog;
1883 uint32_t shard_id;
1884 rgw_meta_sync_marker sync_marker;
1885 const std::string period_marker;
1886
1887 RGWSyncTraceNodeRef tn;
1888
1889 static constexpr bool exit_on_error = false; // retry on all errors
1890 public:
1891 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1892 const std::string& period, epoch_t realm_epoch,
1893 RGWMetadataLog* mdlog, uint32_t _shard_id,
1894 const rgw_meta_sync_marker& _marker,
1895 std::string&& period_marker,
1896 RGWSyncTraceNodeRef& _tn_parent)
1897 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1898 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1899 shard_id(_shard_id), sync_marker(_marker),
1900 period_marker(std::move(period_marker)) {
1901 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
1902 std::to_string(shard_id));
1903 }
1904
1905 RGWCoroutine *alloc_cr() override {
1906 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
1907 shard_id, sync_marker, period_marker, backoff_ptr(), tn);
1908 }
1909
1910 RGWCoroutine *alloc_finisher_cr() override {
1911 rgw::sal::RadosStore* store = sync_env->store;
1912 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados, store->svc()->sysobj,
1913 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1914 &sync_marker);
1915 }
1916 };
1917
1918 class RGWMetaSyncCR : public RGWCoroutine {
1919 RGWMetaSyncEnv *sync_env;
1920 const rgw_pool& pool;
1921 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1922 RGWPeriodHistory::Cursor next; //< next period in history
1923 rgw_meta_sync_status sync_status;
1924 RGWSyncTraceNodeRef tn;
1925
1926 std::mutex mutex; //< protect access to shard_crs
1927
1928 // TODO: it should be enough to hold a reference on the stack only, as calling
1929 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1930 // already completed
1931 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1932 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1933 using RefPair = std::pair<ControlCRRef, StackRef>;
1934 map<int, RefPair> shard_crs;
1935 int ret{0};
1936
1937 public:
1938 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
1939 const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
1940 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1941 pool(sync_env->store->svc()->zone->get_zone_params().log_pool),
1942 cursor(cursor), sync_status(_sync_status), tn(_tn) {}
1943
1944 ~RGWMetaSyncCR() {
1945 }
1946
1947 int operate(const DoutPrefixProvider *dpp) override {
1948 reenter(this) {
1949 // loop through one period at a time
1950 tn->log(1, "start");
1951 for (;;) {
1952 if (cursor == sync_env->store->svc()->mdlog->get_period_history()->get_current()) {
1953 next = RGWPeriodHistory::Cursor{};
1954 if (cursor) {
1955 ldpp_dout(dpp, 10) << "RGWMetaSyncCR on current period="
1956 << cursor.get_period().get_id() << dendl;
1957 } else {
1958 ldpp_dout(dpp, 10) << "RGWMetaSyncCR with no period" << dendl;
1959 }
1960 } else {
1961 next = cursor;
1962 next.next();
1963 ldpp_dout(dpp, 10) << "RGWMetaSyncCR on period="
1964 << cursor.get_period().get_id() << ", next="
1965 << next.get_period().get_id() << dendl;
1966 }
1967
1968 yield {
1969 // get the mdlog for the current period (may be empty)
1970 auto& period_id = sync_status.sync_info.period;
1971 auto realm_epoch = sync_status.sync_info.realm_epoch;
1972 auto mdlog = sync_env->store->svc()->mdlog->get_log(period_id);
1973
1974 tn->log(1, SSTR("realm epoch=" << realm_epoch << " period id=" << period_id));
1975
1976 // prevent wakeup() from accessing shard_crs while we're spawning them
1977 std::lock_guard<std::mutex> lock(mutex);
1978
1979 // sync this period on each shard
1980 for (const auto& m : sync_status.sync_markers) {
1981 uint32_t shard_id = m.first;
1982 auto& marker = m.second;
1983
1984 std::string period_marker;
1985 if (next) {
1986 // read the maximum marker from the next period's sync status
1987 period_marker = next.get_period().get_sync_status()[shard_id];
1988 if (period_marker.empty()) {
1989 // no metadata changes have occurred on this shard, skip it
1990 ldpp_dout(dpp, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1991 << " with empty period marker" << dendl;
1992 continue;
1993 }
1994 }
1995
1996 using ShardCR = RGWMetaSyncShardControlCR;
1997 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1998 mdlog, shard_id, marker,
1999 std::move(period_marker), tn);
2000 auto stack = spawn(cr, false);
2001 shard_crs[shard_id] = RefPair{cr, stack};
2002 }
2003 }
2004 // wait for each shard to complete
2005 while (ret == 0 && num_spawned() > 0) {
2006 yield wait_for_child();
2007 collect(&ret, nullptr);
2008 }
2009 drain_all();
2010 {
2011 // drop shard cr refs under lock
2012 std::lock_guard<std::mutex> lock(mutex);
2013 shard_crs.clear();
2014 }
2015 if (ret < 0) {
2016 return set_cr_error(ret);
2017 }
2018 // advance to the next period
2019 ceph_assert(next);
2020 cursor = next;
2021
2022 // write the updated sync info
2023 sync_status.sync_info.period = cursor.get_period().get_id();
2024 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2025 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados,
2026 sync_env->store->svc()->sysobj,
2027 rgw_raw_obj(pool, sync_env->status_oid()),
2028 sync_status.sync_info));
2029 }
2030 }
2031 return 0;
2032 }
2033
2034 void wakeup(int shard_id) {
2035 std::lock_guard<std::mutex> lock(mutex);
2036 auto iter = shard_crs.find(shard_id);
2037 if (iter == shard_crs.end()) {
2038 return;
2039 }
2040 iter->second.first->wakeup();
2041 }
2042 };
2043
2044 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
2045 env->dpp = dpp;
2046 env->cct = store->ctx();
2047 env->store = store;
2048 env->conn = conn;
2049 env->async_rados = async_rados;
2050 env->http_manager = &http_manager;
2051 env->error_logger = error_logger;
2052 env->sync_tracer = store->getRados()->get_sync_tracer();
2053 }
2054
2055 int RGWRemoteMetaLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_meta_sync_status *sync_status)
2056 {
2057 if (store->svc()->zone->is_meta_master()) {
2058 return 0;
2059 }
2060 // cannot run concurrently with run_sync(), so run in a separate manager
2061 RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
2062 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
2063 int ret = http_manager.start();
2064 if (ret < 0) {
2065 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
2066 return ret;
2067 }
2068 RGWMetaSyncEnv sync_env_local = sync_env;
2069 sync_env_local.http_manager = &http_manager;
2070 tn->log(20, "read sync status");
2071 ret = crs.run(dpp, new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
2072 http_manager.stop();
2073 return ret;
2074 }
2075
2076 int RGWRemoteMetaLog::init_sync_status(const DoutPrefixProvider *dpp)
2077 {
2078 if (store->svc()->zone->is_meta_master()) {
2079 return 0;
2080 }
2081
2082 rgw_mdlog_info mdlog_info;
2083 int r = read_log_info(dpp, &mdlog_info);
2084 if (r < 0) {
2085 ldpp_dout(dpp, -1) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2086 return r;
2087 }
2088
2089 rgw_meta_sync_info sync_info;
2090 sync_info.num_shards = mdlog_info.num_shards;
2091 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
2092 if (cursor) {
2093 sync_info.period = cursor.get_period().get_id();
2094 sync_info.realm_epoch = cursor.get_epoch();
2095 }
2096
2097 return run(dpp, new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
2098 }
2099
2100 int RGWRemoteMetaLog::store_sync_info(const DoutPrefixProvider *dpp, const rgw_meta_sync_info& sync_info)
2101 {
2102 tn->log(20, "store sync info");
2103 return run(dpp, new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, async_rados, store->svc()->sysobj,
2104 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()),
2105 sync_info));
2106 }
2107
2108 // return a cursor to the period at our sync position
2109 static RGWPeriodHistory::Cursor get_period_at(const DoutPrefixProvider *dpp,
2110 rgw::sal::RadosStore* store,
2111 const rgw_meta_sync_info& info,
2112 optional_yield y)
2113 {
2114 if (info.period.empty()) {
2115 // return an empty cursor with error=0
2116 return RGWPeriodHistory::Cursor{};
2117 }
2118
2119 // look for an existing period in our history
2120 auto cursor = store->svc()->mdlog->get_period_history()->lookup(info.realm_epoch);
2121 if (cursor) {
2122 // verify that the period ids match
2123 auto& existing = cursor.get_period().get_id();
2124 if (existing != info.period) {
2125 ldpp_dout(dpp, -1) << "ERROR: sync status period=" << info.period
2126 << " does not match period=" << existing
2127 << " in history at realm epoch=" << info.realm_epoch << dendl;
2128 return RGWPeriodHistory::Cursor{-EEXIST};
2129 }
2130 return cursor;
2131 }
2132
2133 // read the period from rados or pull it from the master
2134 RGWPeriod period;
2135 int r = store->svc()->mdlog->pull_period(dpp, info.period, period, y);
2136 if (r < 0) {
2137 ldpp_dout(dpp, -1) << "ERROR: failed to read period id "
2138 << info.period << ": " << cpp_strerror(r) << dendl;
2139 return RGWPeriodHistory::Cursor{r};
2140 }
2141 // attach the period to our history
2142 cursor = store->svc()->mdlog->get_period_history()->attach(dpp, std::move(period), y);
2143 if (!cursor) {
2144 r = cursor.get_error();
2145 ldpp_dout(dpp, -1) << "ERROR: failed to read period history back to "
2146 << info.period << ": " << cpp_strerror(r) << dendl;
2147 }
2148 return cursor;
2149 }
2150
2151 int RGWRemoteMetaLog::run_sync(const DoutPrefixProvider *dpp, optional_yield y)
2152 {
2153 if (store->svc()->zone->is_meta_master()) {
2154 return 0;
2155 }
2156
2157 int r = 0;
2158
2159 // get shard count and oldest log period from master
2160 rgw_mdlog_info mdlog_info;
2161 for (;;) {
2162 if (going_down) {
2163 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
2164 return 0;
2165 }
2166 r = read_log_info(dpp, &mdlog_info);
2167 if (r == -EIO || r == -ENOENT) {
2168 // keep retrying if master isn't alive or hasn't initialized the log
2169 ldpp_dout(dpp, 10) << __func__ << "(): waiting for master.." << dendl;
2170 backoff.backoff_sleep();
2171 continue;
2172 }
2173 backoff.reset();
2174 if (r < 0) {
2175 ldpp_dout(dpp, -1) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2176 return r;
2177 }
2178 break;
2179 }
2180
2181 rgw_meta_sync_status sync_status;
2182 do {
2183 if (going_down) {
2184 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
2185 return 0;
2186 }
2187 r = run(dpp, new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2188 if (r < 0 && r != -ENOENT) {
2189 ldpp_dout(dpp, 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2190 return r;
2191 }
2192
2193 if (!mdlog_info.period.empty()) {
2194 // restart sync if the remote has a period, but:
2195 // a) our status does not, or
2196 // b) our sync period comes before the remote's oldest log period
2197 if (sync_status.sync_info.period.empty() ||
2198 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2199 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
2200 string reason;
2201 if (sync_status.sync_info.period.empty()) {
2202 reason = "period is empty";
2203 } else {
2204 reason = SSTR("sync_info realm epoch is behind: " << sync_status.sync_info.realm_epoch << " < " << mdlog_info.realm_epoch);
2205 }
2206 tn->log(1, "initialize sync (reason: " + reason + ")");
2207 ldpp_dout(dpp, 1) << "epoch=" << sync_status.sync_info.realm_epoch
2208 << " in sync status comes before remote's oldest mdlog epoch="
2209 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2210 }
2211 }
2212
2213 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
2214 ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl;
2215 sync_status.sync_info.num_shards = mdlog_info.num_shards;
2216 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
2217 if (cursor) {
2218 // run full sync, then start incremental from the current period/epoch
2219 sync_status.sync_info.period = cursor.get_period().get_id();
2220 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2221 }
2222 r = run(dpp, new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2223 if (r == -EBUSY) {
2224 backoff.backoff_sleep();
2225 continue;
2226 }
2227 backoff.reset();
2228 if (r < 0) {
2229 ldpp_dout(dpp, 0) << "ERROR: failed to init sync status r=" << r << dendl;
2230 return r;
2231 }
2232 }
2233 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2234
2235 auto num_shards = sync_status.sync_info.num_shards;
2236 if (num_shards != mdlog_info.num_shards) {
2237 ldpp_dout(dpp, -1) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2238 return -EINVAL;
2239 }
2240
2241 RGWPeriodHistory::Cursor cursor;
2242 do {
2243 r = run(dpp, new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2244 if (r < 0 && r != -ENOENT) {
2245 tn->log(0, SSTR("ERROR: failed to fetch sync status r=" << r));
2246 return r;
2247 }
2248
2249 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2250 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
2251 tn->log(20, "building full sync maps");
2252 r = run(dpp, new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers, tn));
2253 if (r == -EBUSY || r == -EIO) {
2254 backoff.backoff_sleep();
2255 continue;
2256 }
2257 backoff.reset();
2258 if (r < 0) {
2259 tn->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r << ")"));
2260 return r;
2261 }
2262
2263 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2264 r = store_sync_info(dpp, sync_status.sync_info);
2265 if (r < 0) {
2266 tn->log(0, SSTR("ERROR: failed to update sync status (r=" << r << ")"));
2267 return r;
2268 }
2269 /* fall through */
2270 case rgw_meta_sync_info::StateSync:
2271 tn->log(20, "sync");
2272 // find our position in the period history (if any)
2273 cursor = get_period_at(dpp, store, sync_status.sync_info, y);
2274 r = cursor.get_error();
2275 if (r < 0) {
2276 return r;
2277 }
2278 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
2279 r = run(dpp, meta_sync_cr);
2280 if (r < 0) {
2281 tn->log(0, "ERROR: failed to fetch all metadata keys");
2282 return r;
2283 }
2284 break;
2285 default:
2286 tn->log(0, "ERROR: bad sync state!");
2287 return -EIO;
2288 }
2289 } while (!going_down);
2290
2291 return 0;
2292 }
2293
2294 void RGWRemoteMetaLog::wakeup(int shard_id)
2295 {
2296 if (!meta_sync_cr) {
2297 return;
2298 }
2299 meta_sync_cr->wakeup(shard_id);
2300 }
2301
2302 int RGWCloneMetaLogCoroutine::operate(const DoutPrefixProvider *dpp)
2303 {
2304 reenter(this) {
2305 do {
2306 yield {
2307 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
2308 return state_init();
2309 }
2310 yield {
2311 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
2312 return state_read_shard_status();
2313 }
2314 yield {
2315 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
2316 return state_read_shard_status_complete();
2317 }
2318 yield {
2319 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
2320 return state_send_rest_request(dpp);
2321 }
2322 yield {
2323 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
2324 return state_receive_rest_response();
2325 }
2326 yield {
2327 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
2328 return state_store_mdlog_entries();
2329 }
2330 } while (truncated);
2331 yield {
2332 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
2333 return state_store_mdlog_entries_complete();
2334 }
2335 }
2336
2337 return 0;
2338 }
2339
2340 int RGWCloneMetaLogCoroutine::state_init()
2341 {
2342 data = rgw_mdlog_shard_data();
2343
2344 return 0;
2345 }
2346
2347 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2348 {
2349 const bool add_ref = false; // default constructs with refs=1
2350
2351 completion.reset(new RGWMetadataLogInfoCompletion(
2352 [this](int ret, const cls_log_header& header) {
2353 if (ret < 0) {
2354 if (ret != -ENOENT) {
2355 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to read mdlog info with "
2356 << cpp_strerror(ret) << dendl;
2357 }
2358 } else {
2359 shard_info.marker = header.max_marker;
2360 shard_info.last_update = header.max_time.to_real_time();
2361 }
2362 // wake up parent stack
2363 io_complete();
2364 }), add_ref);
2365
2366 int ret = mdlog->get_info_async(sync_env->dpp, shard_id, completion.get());
2367 if (ret < 0) {
2368 ldpp_dout(sync_env->dpp, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
2369 return set_cr_error(ret);
2370 }
2371
2372 return io_block(0);
2373 }
2374
2375 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2376 {
2377 completion.reset();
2378
2379 ldpp_dout(sync_env->dpp, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
2380
2381 marker = shard_info.marker;
2382
2383 return 0;
2384 }
2385
2386 int RGWCloneMetaLogCoroutine::state_send_rest_request(const DoutPrefixProvider *dpp)
2387 {
2388 RGWRESTConn *conn = sync_env->conn;
2389
2390 char buf[32];
2391 snprintf(buf, sizeof(buf), "%d", shard_id);
2392
2393 char max_entries_buf[32];
2394 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2395
2396 const char *marker_key = (marker.empty() ? "" : "marker");
2397
2398 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2399 { "id", buf },
2400 { "period", period.c_str() },
2401 { "max-entries", max_entries_buf },
2402 { marker_key, marker.c_str() },
2403 { NULL, NULL } };
2404
2405 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2406
2407 init_new_io(http_op);
2408
2409 int ret = http_op->aio_read(dpp);
2410 if (ret < 0) {
2411 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
2412 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2413 http_op->put();
2414 http_op = NULL;
2415 return set_cr_error(ret);
2416 }
2417
2418 return io_block(0);
2419 }
2420
2421 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2422 {
2423 int ret = http_op->wait(&data, null_yield);
2424 if (ret < 0) {
2425 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
2426 ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;
2427 http_op->put();
2428 http_op = NULL;
2429 return set_cr_error(ret);
2430 }
2431 http_op->put();
2432 http_op = NULL;
2433
2434 ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
2435
2436 truncated = ((int)data.entries.size() == max_entries);
2437
2438 if (data.entries.empty()) {
2439 if (new_marker) {
2440 *new_marker = marker;
2441 }
2442 return set_cr_done();
2443 }
2444
2445 if (new_marker) {
2446 *new_marker = data.entries.back().id;
2447 }
2448
2449 return 0;
2450 }
2451
2452
2453 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2454 {
2455 list<cls_log_entry> dest_entries;
2456
2457 vector<rgw_mdlog_entry>::iterator iter;
2458 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2459 rgw_mdlog_entry& entry = *iter;
2460 ldpp_dout(sync_env->dpp, 20) << "entry: name=" << entry.name << dendl;
2461
2462 cls_log_entry dest_entry;
2463 dest_entry.id = entry.id;
2464 dest_entry.section = entry.section;
2465 dest_entry.name = entry.name;
2466 dest_entry.timestamp = utime_t(entry.timestamp);
2467
2468 encode(entry.log_data, dest_entry.data);
2469
2470 dest_entries.push_back(dest_entry);
2471
2472 marker = entry.id;
2473 }
2474
2475 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2476
2477 int ret = mdlog->store_entries_in_shard(sync_env->dpp, dest_entries, shard_id, cn->completion());
2478 if (ret < 0) {
2479 cn->put();
2480 ldpp_dout(sync_env->dpp, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
2481 return set_cr_error(ret);
2482 }
2483 return io_block(0);
2484 }
2485
2486 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2487 {
2488 return set_cr_done();
2489 }
2490
2491 void rgw_meta_sync_info::decode_json(JSONObj *obj)
2492 {
2493 string s;
2494 JSONDecoder::decode_json("status", s, obj);
2495 if (s == "init") {
2496 state = StateInit;
2497 } else if (s == "building-full-sync-maps") {
2498 state = StateBuildingFullSyncMaps;
2499 } else if (s == "sync") {
2500 state = StateSync;
2501 }
2502 JSONDecoder::decode_json("num_shards", num_shards, obj);
2503 JSONDecoder::decode_json("period", period, obj);
2504 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
2505 }
2506
2507 void rgw_meta_sync_info::dump(Formatter *f) const
2508 {
2509 string s;
2510 switch ((SyncState)state) {
2511 case StateInit:
2512 s = "init";
2513 break;
2514 case StateBuildingFullSyncMaps:
2515 s = "building-full-sync-maps";
2516 break;
2517 case StateSync:
2518 s = "sync";
2519 break;
2520 default:
2521 s = "unknown";
2522 break;
2523 }
2524 encode_json("status", s, f);
2525 encode_json("num_shards", num_shards, f);
2526 encode_json("period", period, f);
2527 encode_json("realm_epoch", realm_epoch, f);
2528 }
2529
2530
2531 void rgw_meta_sync_marker::decode_json(JSONObj *obj)
2532 {
2533 int s;
2534 JSONDecoder::decode_json("state", s, obj);
2535 state = s;
2536 JSONDecoder::decode_json("marker", marker, obj);
2537 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
2538 JSONDecoder::decode_json("total_entries", total_entries, obj);
2539 JSONDecoder::decode_json("pos", pos, obj);
2540 utime_t ut;
2541 JSONDecoder::decode_json("timestamp", ut, obj);
2542 timestamp = ut.to_real_time();
2543 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
2544 }
2545
2546 void rgw_meta_sync_marker::dump(Formatter *f) const
2547 {
2548 encode_json("state", (int)state, f);
2549 encode_json("marker", marker, f);
2550 encode_json("next_step_marker", next_step_marker, f);
2551 encode_json("total_entries", total_entries, f);
2552 encode_json("pos", pos, f);
2553 encode_json("timestamp", utime_t(timestamp), f);
2554 encode_json("realm_epoch", realm_epoch, f);
2555 }
2556
2557 void rgw_meta_sync_status::decode_json(JSONObj *obj)
2558 {
2559 JSONDecoder::decode_json("info", sync_info, obj);
2560 JSONDecoder::decode_json("markers", sync_markers, obj);
2561 }
2562
2563 void rgw_meta_sync_status::dump(Formatter *f) const {
2564 encode_json("info", sync_info, f);
2565 encode_json("markers", sync_markers, f);
2566 }
2567
2568 void rgw_sync_error_info::dump(Formatter *f) const {
2569 encode_json("source_zone", source_zone, f);
2570 encode_json("error_code", error_code, f);
2571 encode_json("message", message, f);
2572 }
2573