]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync.cc
import 15.2.5
[ceph.git] / ceph / src / rgw / rgw_sync.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include <boost/optional.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/admin_socket.h"
12#include "common/errno.h"
13
14#include "rgw_common.h"
15#include "rgw_rados.h"
11fdf7f2 16#include "rgw_zone.h"
7c673cae
FG
17#include "rgw_sync.h"
18#include "rgw_metadata.h"
9f95a23c 19#include "rgw_mdlog_types.h"
7c673cae
FG
20#include "rgw_rest_conn.h"
21#include "rgw_tools.h"
22#include "rgw_cr_rados.h"
23#include "rgw_cr_rest.h"
24#include "rgw_http_client.h"
11fdf7f2 25#include "rgw_sync_trace.h"
7c673cae
FG
26
27#include "cls/lock/cls_lock_client.h"
28
11fdf7f2 29#include "services/svc_zone.h"
9f95a23c
TL
30#include "services/svc_mdlog.h"
31#include "services/svc_meta.h"
32#include "services/svc_cls.h"
11fdf7f2 33
31f18b77
FG
34#include <boost/asio/yield.hpp>
35
7c673cae
FG
36#define dout_subsys ceph_subsys_rgw
37
38#undef dout_prefix
39#define dout_prefix (*_dout << "meta sync: ")
40
41static string mdlog_sync_status_oid = "mdlog.sync-status";
42static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
43static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
44
9f95a23c 45RGWSyncErrorLogger::RGWSyncErrorLogger(rgw::sal::RGWRadosStore *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
7c673cae
FG
46 for (int i = 0; i < num_shards; i++) {
47 oids.push_back(get_shard_oid(oid_prefix, i));
48 }
49}
50string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
51 char buf[oid_prefix.size() + 16];
52 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
53 return string(buf);
54}
55
56RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
57 cls_log_entry entry;
58
59 rgw_sync_error_info info(source_zone, error_code, message);
60 bufferlist bl;
11fdf7f2 61 encode(info, bl);
9f95a23c 62 store->svc()->cls->timelog.prepare_entry(entry, real_clock::now(), section, name, bl);
7c673cae
FG
63
64 uint32_t shard_id = ++counter % num_shards;
65
66
67 return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
68}
69
70void RGWSyncBackoff::update_wait_time()
71{
72 if (cur_wait == 0) {
73 cur_wait = 1;
74 } else {
75 cur_wait = (cur_wait << 1);
76 }
77 if (cur_wait >= max_secs) {
78 cur_wait = max_secs;
79 }
80}
81
82void RGWSyncBackoff::backoff_sleep()
83{
84 update_wait_time();
85 sleep(cur_wait);
86}
87
88void RGWSyncBackoff::backoff(RGWCoroutine *op)
89{
90 update_wait_time();
91 op->wait(utime_t(cur_wait, 0));
92}
93
94int RGWBackoffControlCR::operate() {
95 reenter(this) {
96 // retry the operation until it succeeds
97 while (true) {
98 yield {
9f95a23c 99 std::lock_guard l{lock};
7c673cae
FG
100 cr = alloc_cr();
101 cr->get();
102 call(cr);
103 }
104 {
9f95a23c 105 std::lock_guard l{lock};
7c673cae
FG
106 cr->put();
107 cr = NULL;
108 }
109 if (retcode >= 0) {
110 break;
111 }
112 if (retcode != -EBUSY && retcode != -EAGAIN) {
113 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
114 if (exit_on_error) {
115 return set_cr_error(retcode);
116 }
117 }
118 if (reset_backoff) {
119 backoff.reset();
120 }
121 yield backoff.backoff(this);
122 }
123
124 // run an optional finisher
125 yield call(alloc_finisher_cr());
126 if (retcode < 0) {
127 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
128 return set_cr_error(retcode);
129 }
130 return set_cr_done();
131 }
132 return 0;
133}
134
135void rgw_mdlog_info::decode_json(JSONObj *obj) {
136 JSONDecoder::decode_json("num_objects", num_shards, obj);
137 JSONDecoder::decode_json("period", period, obj);
138 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
139}
140
141void rgw_mdlog_entry::decode_json(JSONObj *obj) {
142 JSONDecoder::decode_json("id", id, obj);
143 JSONDecoder::decode_json("section", section, obj);
144 JSONDecoder::decode_json("name", name, obj);
145 utime_t ut;
146 JSONDecoder::decode_json("timestamp", ut, obj);
147 timestamp = ut.to_real_time();
148 JSONDecoder::decode_json("data", log_data, obj);
149}
150
151void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
152 JSONDecoder::decode_json("marker", marker, obj);
153 JSONDecoder::decode_json("truncated", truncated, obj);
154 JSONDecoder::decode_json("entries", entries, obj);
155};
156
157int RGWShardCollectCR::operate() {
158 reenter(this) {
159 while (spawn_next()) {
160 current_running++;
161
162 while (current_running >= max_concurrent) {
163 int child_ret;
164 yield wait_for_child();
165 if (collect_next(&child_ret)) {
166 current_running--;
167 if (child_ret < 0 && child_ret != -ENOENT) {
168 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
169 status = child_ret;
170 }
171 }
172 }
173 }
174 while (current_running > 0) {
175 int child_ret;
176 yield wait_for_child();
177 if (collect_next(&child_ret)) {
178 current_running--;
179 if (child_ret < 0 && child_ret != -ENOENT) {
180 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
181 status = child_ret;
182 }
183 }
184 }
185 if (status < 0) {
186 return set_cr_error(status);
187 }
188 return set_cr_done();
189 }
190 return 0;
191}
192
193class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
194 RGWMetaSyncEnv *sync_env;
195
196 const std::string& period;
197 int num_shards;
198 map<int, RGWMetadataLogInfo> *mdlog_info;
199
200 int shard_id;
201#define READ_MDLOG_MAX_CONCURRENT 10
202
203public:
204 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
205 const std::string& period, int _num_shards,
206 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
207 sync_env(_sync_env),
208 period(period), num_shards(_num_shards),
209 mdlog_info(_mdlog_info), shard_id(0) {}
210 bool spawn_next() override;
211};
212
213class RGWListRemoteMDLogCR : public RGWShardCollectCR {
214 RGWMetaSyncEnv *sync_env;
215
216 const std::string& period;
217 map<int, string> shards;
218 int max_entries_per_shard;
219 map<int, rgw_mdlog_shard_data> *result;
220
221 map<int, string>::iterator iter;
222#define READ_MDLOG_MAX_CONCURRENT 10
223
224public:
225 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
226 const std::string& period, map<int, string>& _shards,
227 int _max_entries_per_shard,
228 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
229 sync_env(_sync_env), period(period),
230 max_entries_per_shard(_max_entries_per_shard),
231 result(_result) {
232 shards.swap(_shards);
233 iter = shards.begin();
234 }
235 bool spawn_next() override;
236};
237
238RGWRemoteMetaLog::~RGWRemoteMetaLog()
239{
240 delete error_logger;
241}
242
243int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
244{
245 rgw_http_param_pair pairs[] = { { "type", "metadata" },
246 { NULL, NULL } };
247
248 int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
249 if (ret < 0) {
11fdf7f2 250 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl;
7c673cae
FG
251 return ret;
252 }
253
11fdf7f2 254 ldpp_dout(dpp, 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
7c673cae
FG
255
256 return 0;
257}
258
259int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
260{
9f95a23c 261 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
262 return 0;
263 }
264
265 rgw_mdlog_info log_info;
266 int ret = read_log_info(&log_info);
267 if (ret < 0) {
268 return ret;
269 }
270
271 return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
272}
273
274int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
275{
9f95a23c 276 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
277 return 0;
278 }
279
280 return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
281}
282
283int RGWRemoteMetaLog::init()
284{
9f95a23c 285 conn = store->svc()->zone->get_master_conn();
7c673cae 286
11fdf7f2 287 int ret = http_manager.start();
7c673cae 288 if (ret < 0) {
11fdf7f2 289 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
290 return ret;
291 }
292
293 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
294
295 init_sync_env(&sync_env);
296
11fdf7f2
TL
297 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "meta");
298
7c673cae
FG
299 return 0;
300}
301
302void RGWRemoteMetaLog::finish()
303{
304 going_down = true;
305 stop();
306}
307
308#define CLONE_MAX_ENTRIES 100
309
310int RGWMetaSyncStatusManager::init()
311{
9f95a23c 312 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
313 return 0;
314 }
315
9f95a23c 316 if (!store->svc()->zone->get_master_conn()) {
7c673cae
FG
317 lderr(store->ctx()) << "no REST connection to master zone" << dendl;
318 return -EIO;
319 }
320
9f95a23c 321 int r = rgw_init_ioctx(store->getRados()->get_rados_handle(), store->svc()->zone->get_zone_params().log_pool, ioctx, true);
7c673cae 322 if (r < 0) {
9f95a23c 323 lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->svc()->zone->get_zone_params().log_pool << " ret=" << r << dendl;
7c673cae
FG
324 return r;
325 }
326
327 r = master_log.init();
328 if (r < 0) {
329 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
330 return r;
331 }
332
333 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
334
335 rgw_meta_sync_status sync_status;
336 r = read_sync_status(&sync_status);
337 if (r < 0 && r != -ENOENT) {
338 lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
339 return r;
340 }
341
342 int num_shards = sync_status.sync_info.num_shards;
343
344 for (int i = 0; i < num_shards; i++) {
9f95a23c 345 shard_objs[i] = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.shard_obj_name(i));
7c673cae
FG
346 }
347
9f95a23c 348 std::unique_lock wl{ts_to_shard_lock};
7c673cae
FG
349 for (int i = 0; i < num_shards; i++) {
350 clone_markers.push_back(string());
351 utime_shard ut;
352 ut.shard_id = i;
353 ts_to_shard[ut] = i;
354 }
355
356 return 0;
357}
358
11fdf7f2
TL
359unsigned RGWMetaSyncStatusManager::get_subsys() const
360{
361 return dout_subsys;
362}
363
364std::ostream& RGWMetaSyncStatusManager::gen_prefix(std::ostream& out) const
365{
366 return out << "meta sync: ";
367}
368
9f95a23c 369void RGWMetaSyncEnv::init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWRESTConn *_conn,
7c673cae 370 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
11fdf7f2
TL
371 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer) {
372 dpp = _dpp;
7c673cae
FG
373 cct = _cct;
374 store = _store;
375 conn = _conn;
376 async_rados = _async_rados;
377 http_manager = _http_manager;
378 error_logger = _error_logger;
11fdf7f2 379 sync_tracer = _sync_tracer;
7c673cae
FG
380}
381
382string RGWMetaSyncEnv::status_oid()
383{
384 return mdlog_sync_status_oid;
385}
386
387string RGWMetaSyncEnv::shard_obj_name(int shard_id)
388{
389 char buf[mdlog_sync_status_shard_prefix.size() + 16];
390 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
391
392 return string(buf);
393}
394
395class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
9f95a23c 396 rgw::sal::RGWRadosStore *store;
7c673cae
FG
397 RGWMetadataLog *mdlog;
398 int shard_id;
7c673cae 399 int max_entries;
7c673cae
FG
400
401protected:
402 int _send_request() override {
403 real_time from_time;
404 real_time end_time;
405
406 void *handle;
407
f6b5b4d7 408 mdlog->init_list_entries(shard_id, from_time, end_time, marker, &handle);
7c673cae 409
f6b5b4d7 410 int ret = mdlog->list_entries(handle, max_entries, entries, &marker, &truncated);
7c673cae
FG
411
412 mdlog->complete_list_entries(handle);
413
414 return ret;
415 }
416public:
f6b5b4d7
TL
417 string marker;
418 list<cls_log_entry> entries;
419 bool truncated;
420
9f95a23c 421 RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae 422 RGWMetadataLog* mdlog, int _shard_id,
f6b5b4d7 423 std::string _marker, int _max_entries)
7c673cae 424 : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
f6b5b4d7 425 shard_id(_shard_id), max_entries(_max_entries), marker(std::move(_marker)) {}
7c673cae
FG
426};
427
428class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
429 RGWMetaSyncEnv *sync_env;
430 RGWMetadataLog *const mdlog;
431 int shard_id;
432 string marker;
433 string *pmarker;
434 int max_entries;
435 list<cls_log_entry> *entries;
436 bool *truncated;
437
224ce89b 438 RGWAsyncReadMDLogEntries *req{nullptr};
7c673cae
FG
439
440public:
441 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
442 int _shard_id, string*_marker, int _max_entries,
443 list<cls_log_entry> *_entries, bool *_truncated)
444 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
445 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
446 entries(_entries), truncated(_truncated) {}
447
448 ~RGWReadMDLogEntriesCR() override {
449 if (req) {
450 req->finish();
451 }
452 }
453
454 int send_request() override {
455 marker = *pmarker;
456 req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
f6b5b4d7
TL
457 sync_env->store, mdlog, shard_id, marker,
458 max_entries);
7c673cae
FG
459 sync_env->async_rados->queue(req);
460 return 0;
461 }
462
463 int request_complete() override {
f6b5b4d7
TL
464 *pmarker = std::move(req->marker);
465 *entries = std::move(req->entries);
466 *truncated = req->truncated;
7c673cae
FG
467 return req->get_ret_status();
468 }
469};
470
471
472class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
473 RGWMetaSyncEnv *env;
474 RGWRESTReadResource *http_op;
475
476 const std::string& period;
477 int shard_id;
478 RGWMetadataLogInfo *shard_info;
479
480public:
481 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
482 int _shard_id, RGWMetadataLogInfo *_shard_info)
483 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
484 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
485
486 int operate() override {
487 auto store = env->store;
9f95a23c 488 RGWRESTConn *conn = store->svc()->zone->get_master_conn();
7c673cae
FG
489 reenter(this) {
490 yield {
491 char buf[16];
492 snprintf(buf, sizeof(buf), "%d", shard_id);
493 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
494 { "id", buf },
495 { "period", period.c_str() },
496 { "info" , NULL },
497 { NULL, NULL } };
498
499 string p = "/admin/log/";
500
501 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
502 env->http_manager);
503
11fdf7f2 504 init_new_io(http_op);
7c673cae
FG
505
506 int ret = http_op->aio_read();
507 if (ret < 0) {
11fdf7f2 508 ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
7c673cae
FG
509 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
510 http_op->put();
511 return set_cr_error(ret);
512 }
513
514 return io_block(0);
515 }
516 yield {
9f95a23c 517 int ret = http_op->wait(shard_info, null_yield);
7c673cae
FG
518 http_op->put();
519 if (ret < 0) {
520 return set_cr_error(ret);
521 }
522 return set_cr_done();
523 }
524 }
525 return 0;
526 }
527};
528
9f95a23c
TL
529RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
530 const std::string& period,
531 int shard_id,
532 RGWMetadataLogInfo* info)
533{
534 return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info);
535}
536
7c673cae
FG
537class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
538 RGWMetaSyncEnv *sync_env;
539 RGWRESTReadResource *http_op;
540
541 const std::string& period;
542 int shard_id;
543 string marker;
544 uint32_t max_entries;
545 rgw_mdlog_shard_data *result;
546
547public:
548 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
549 int _shard_id, const string& _marker, uint32_t _max_entries,
550 rgw_mdlog_shard_data *_result)
551 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
552 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
553
554 int send_request() override {
555 RGWRESTConn *conn = sync_env->conn;
7c673cae
FG
556
557 char buf[32];
558 snprintf(buf, sizeof(buf), "%d", shard_id);
559
560 char max_entries_buf[32];
561 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
562
563 const char *marker_key = (marker.empty() ? "" : "marker");
564
565 rgw_http_param_pair pairs[] = { { "type", "metadata" },
566 { "id", buf },
567 { "period", period.c_str() },
568 { "max-entries", max_entries_buf },
569 { marker_key, marker.c_str() },
570 { NULL, NULL } };
571
572 string p = "/admin/log/";
573
574 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
11fdf7f2 575 init_new_io(http_op);
7c673cae
FG
576
577 int ret = http_op->aio_read();
578 if (ret < 0) {
11fdf7f2 579 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
7c673cae
FG
580 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
581 http_op->put();
582 return ret;
583 }
584
585 return 0;
586 }
587
588 int request_complete() override {
9f95a23c 589 int ret = http_op->wait(result, null_yield);
7c673cae
FG
590 http_op->put();
591 if (ret < 0 && ret != -ENOENT) {
11fdf7f2 592 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
7c673cae
FG
593 return ret;
594 }
595 return 0;
596 }
597};
598
9f95a23c
TL
599RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
600 const std::string& period,
601 int shard_id,
602 const std::string& marker,
603 uint32_t max_entries,
604 rgw_mdlog_shard_data *result)
605{
606 return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker,
607 max_entries, result);
608}
609
7c673cae
FG
610bool RGWReadRemoteMDLogInfoCR::spawn_next() {
611 if (shard_id >= num_shards) {
612 return false;
613 }
614 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
615 shard_id++;
616 return true;
617}
618
619bool RGWListRemoteMDLogCR::spawn_next() {
620 if (iter == shards.end()) {
621 return false;
622 }
623
624 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
625 ++iter;
626 return true;
627}
628
629class RGWInitSyncStatusCoroutine : public RGWCoroutine {
630 RGWMetaSyncEnv *sync_env;
631
632 rgw_meta_sync_info status;
633 vector<RGWMetadataLogInfo> shards_info;
31f18b77
FG
634 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
635 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
636public:
637 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
638 const rgw_meta_sync_info &status)
639 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
640 status(status), shards_info(status.num_shards),
641 lease_cr(nullptr), lease_stack(nullptr) {}
642
643 ~RGWInitSyncStatusCoroutine() override {
644 if (lease_cr) {
645 lease_cr->abort();
7c673cae
FG
646 }
647 }
648
649 int operate() override {
650 int ret;
651 reenter(this) {
652 yield {
653 set_status("acquiring sync lock");
654 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
655 string lock_name = "sync_lock";
9f95a23c 656 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77 657 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
9f95a23c 658 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
31f18b77
FG
659 lock_name, lock_duration, this));
660 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
661 }
662 while (!lease_cr->is_locked()) {
663 if (lease_cr->is_done()) {
11fdf7f2 664 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
7c673cae
FG
665 set_status("lease lock failed, early abort");
666 return set_cr_error(lease_cr->get_ret_status());
667 }
668 set_sleeping(true);
669 yield;
670 }
671 yield {
672 set_status("writing sync status");
9f95a23c
TL
673 rgw::sal::RGWRadosStore *store = sync_env->store;
674 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc()->sysobj,
675 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
7c673cae
FG
676 status));
677 }
678
679 if (retcode < 0) {
680 set_status("failed to write sync status");
11fdf7f2 681 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
7c673cae
FG
682 yield lease_cr->go_down();
683 return set_cr_error(retcode);
684 }
685 /* fetch current position in logs */
686 set_status("fetching remote log position");
687 yield {
688 for (int i = 0; i < (int)status.num_shards; i++) {
689 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
690 &shards_info[i]), false);
691 }
692 }
693
31f18b77 694 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
7c673cae
FG
695
696 yield {
697 set_status("updating sync status");
698 for (int i = 0; i < (int)status.num_shards; i++) {
699 rgw_meta_sync_marker marker;
700 RGWMetadataLogInfo& info = shards_info[i];
701 marker.next_step_marker = info.marker;
702 marker.timestamp = info.last_update;
9f95a23c 703 rgw::sal::RGWRadosStore *store = sync_env->store;
7c673cae 704 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
9f95a23c
TL
705 store->svc()->sysobj,
706 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
7c673cae
FG
707 marker), true);
708 }
709 }
710 yield {
711 set_status("changing sync state: build full sync maps");
712 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
9f95a23c
TL
713 rgw::sal::RGWRadosStore *store = sync_env->store;
714 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc()->sysobj,
715 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
7c673cae
FG
716 status));
717 }
718 set_status("drop lock lease");
719 yield lease_cr->go_down();
720 while (collect(&ret, NULL)) {
721 if (ret < 0) {
722 return set_cr_error(ret);
723 }
724 yield;
725 }
726 drain_all();
727 return set_cr_done();
728 }
729 return 0;
730 }
731};
732
733class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
734 static constexpr int MAX_CONCURRENT_SHARDS = 16;
735
736 RGWMetaSyncEnv *env;
737 const int num_shards;
738 int shard_id{0};
739 map<uint32_t, rgw_meta_sync_marker>& markers;
740
741 public:
742 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
743 map<uint32_t, rgw_meta_sync_marker>& markers)
744 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
745 env(env), num_shards(num_shards), markers(markers)
746 {}
747 bool spawn_next() override;
748};
749
750bool RGWReadSyncStatusMarkersCR::spawn_next()
751{
752 if (shard_id >= num_shards) {
753 return false;
754 }
755 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
9f95a23c 756 rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool,
7c673cae 757 env->shard_obj_name(shard_id)};
9f95a23c 758 spawn(new CR(env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false);
7c673cae
FG
759 shard_id++;
760 return true;
761}
762
763class RGWReadSyncStatusCoroutine : public RGWCoroutine {
764 RGWMetaSyncEnv *sync_env;
765 rgw_meta_sync_status *sync_status;
766
767public:
768 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
769 rgw_meta_sync_status *_status)
770 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
771 {}
772 int operate() override;
773};
774
775int RGWReadSyncStatusCoroutine::operate()
776{
777 reenter(this) {
778 // read sync info
779 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
780 yield {
781 bool empty_on_enoent = false; // fail on ENOENT
9f95a23c 782 rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool,
7c673cae 783 sync_env->status_oid()};
9f95a23c 784 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, obj,
7c673cae
FG
785 &sync_status->sync_info, empty_on_enoent));
786 }
787 if (retcode < 0) {
11fdf7f2 788 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status info with "
7c673cae
FG
789 << cpp_strerror(retcode) << dendl;
790 return set_cr_error(retcode);
791 }
792 // read shard markers
793 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
794 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
795 sync_status->sync_markers));
796 if (retcode < 0) {
11fdf7f2 797 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status markers with "
7c673cae
FG
798 << cpp_strerror(retcode) << dendl;
799 return set_cr_error(retcode);
800 }
801 return set_cr_done();
802 }
803 return 0;
804}
805
806class RGWFetchAllMetaCR : public RGWCoroutine {
807 RGWMetaSyncEnv *sync_env;
808
809 int num_shards;
810
811
812 int ret_status;
813
814 list<string> sections;
815 list<string>::iterator sections_iter;
181888fb
FG
816
817 struct meta_list_result {
818 list<string> keys;
819 string marker;
820 uint64_t count{0};
821 bool truncated{false};
822
823 void decode_json(JSONObj *obj) {
824 JSONDecoder::decode_json("keys", keys, obj);
825 JSONDecoder::decode_json("marker", marker, obj);
826 JSONDecoder::decode_json("count", count, obj);
827 JSONDecoder::decode_json("truncated", truncated, obj);
828 }
829 } result;
7c673cae
FG
830 list<string>::iterator iter;
831
832 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
833
31f18b77
FG
834 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
835 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
836 bool lost_lock;
837 bool failed;
838
181888fb
FG
839 string marker;
840
7c673cae
FG
841 map<uint32_t, rgw_meta_sync_marker>& markers;
842
11fdf7f2
TL
843 RGWSyncTraceNodeRef tn;
844
7c673cae
FG
845public:
846 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
11fdf7f2
TL
847 map<uint32_t, rgw_meta_sync_marker>& _markers,
848 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
7c673cae
FG
849 num_shards(_num_shards),
850 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
851 lost_lock(false), failed(false), markers(_markers) {
11fdf7f2 852 tn = sync_env->sync_tracer->add_node(_tn_parent, "fetch_all_meta");
7c673cae
FG
853 }
854
855 ~RGWFetchAllMetaCR() override {
7c673cae
FG
856 }
857
858 void append_section_from_set(set<string>& all_sections, const string& name) {
859 set<string>::iterator iter = all_sections.find(name);
860 if (iter != all_sections.end()) {
861 sections.emplace_back(std::move(*iter));
862 all_sections.erase(iter);
863 }
864 }
865 /*
866 * meta sync should go in the following order: user, bucket.instance, bucket
867 * then whatever other sections exist (if any)
868 */
869 void rearrange_sections() {
870 set<string> all_sections;
871 std::move(sections.begin(), sections.end(),
872 std::inserter(all_sections, all_sections.end()));
873 sections.clear();
874
875 append_section_from_set(all_sections, "user");
876 append_section_from_set(all_sections, "bucket.instance");
877 append_section_from_set(all_sections, "bucket");
878
879 std::move(all_sections.begin(), all_sections.end(),
880 std::back_inserter(sections));
881 }
882
883 int operate() override {
884 RGWRESTConn *conn = sync_env->conn;
885
886 reenter(this) {
887 yield {
888 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
889 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
890 string lock_name = "sync_lock";
31f18b77
FG
891 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
892 sync_env->store,
9f95a23c 893 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
31f18b77 894 lock_name, lock_duration, this));
224ce89b 895 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
896 }
897 while (!lease_cr->is_locked()) {
898 if (lease_cr->is_done()) {
11fdf7f2 899 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
7c673cae
FG
900 set_status("failed acquiring lock");
901 return set_cr_error(lease_cr->get_ret_status());
902 }
903 set_sleeping(true);
904 yield;
905 }
906 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
9f95a23c 907 sync_env->store->svc()->zone->get_zone_params().log_pool,
7c673cae
FG
908 mdlog_sync_full_sync_index_prefix));
909 yield {
910 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
911 "/admin/metadata", NULL, &sections));
912 }
913 if (get_ret_status() < 0) {
11fdf7f2 914 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch metadata sections" << dendl;
7c673cae
FG
915 yield entries_index->finish();
916 yield lease_cr->go_down();
917 drain_all();
918 return set_cr_error(get_ret_status());
919 }
920 rearrange_sections();
921 sections_iter = sections.begin();
922 for (; sections_iter != sections.end(); ++sections_iter) {
181888fb
FG
923 do {
924 yield {
925#define META_FULL_SYNC_CHUNK_SIZE "1000"
926 string entrypoint = string("/admin/metadata/") + *sections_iter;
927 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
928 { "marker", result.marker.c_str() },
929 { NULL, NULL } };
930 result.keys.clear();
931 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
932 entrypoint, pairs, &result));
7c673cae 933 }
11fdf7f2
TL
934 ret_status = get_ret_status();
935 if (ret_status == -ENOENT) {
936 set_retcode(0); /* reset coroutine status so that we don't return it */
937 ret_status = 0;
938 }
939 if (ret_status < 0) {
940 tn->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter));
181888fb
FG
941 yield entries_index->finish();
942 yield lease_cr->go_down();
943 drain_all();
11fdf7f2 944 return set_cr_error(ret_status);
7c673cae 945 }
181888fb
FG
946 iter = result.keys.begin();
947 for (; iter != result.keys.end(); ++iter) {
948 if (!lease_cr->is_locked()) {
949 lost_lock = true;
950 break;
951 }
952 yield; // allow entries_index consumer to make progress
953
11fdf7f2 954 tn->log(20, SSTR("list metadata: section=" << *sections_iter << " key=" << *iter));
181888fb
FG
955 string s = *sections_iter + ":" + *iter;
956 int shard_id;
9f95a23c
TL
957 rgw::sal::RGWRadosStore *store = sync_env->store;
958 int ret = store->ctl()->meta.mgr->get_shard_id(*sections_iter, *iter, &shard_id);
181888fb 959 if (ret < 0) {
11fdf7f2 960 tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter));
181888fb
FG
961 ret_status = ret;
962 break;
963 }
964 if (!entries_index->append(s, shard_id)) {
965 break;
966 }
7c673cae 967 }
181888fb 968 } while (result.truncated);
7c673cae
FG
969 }
970 yield {
971 if (!entries_index->finish()) {
972 failed = true;
973 }
974 }
975 if (!failed) {
976 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
977 int shard_id = (int)iter->first;
978 rgw_meta_sync_marker& marker = iter->second;
979 marker.total_entries = entries_index->get_total_entries(shard_id);
9f95a23c
TL
980 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store->svc()->sysobj,
981 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
7c673cae
FG
982 marker), true);
983 }
984 }
985
31f18b77 986 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
7c673cae
FG
987
988 yield lease_cr->go_down();
989
990 int ret;
991 while (collect(&ret, NULL)) {
992 if (ret < 0) {
993 return set_cr_error(ret);
994 }
995 yield;
996 }
997 drain_all();
998 if (failed) {
999 yield return set_cr_error(-EIO);
1000 }
1001 if (lost_lock) {
1002 yield return set_cr_error(-EBUSY);
1003 }
1004
1005 if (ret_status < 0) {
1006 yield return set_cr_error(ret_status);
1007 }
1008
1009 yield return set_cr_done();
1010 }
1011 return 0;
1012 }
1013};
1014
1015static string full_sync_index_shard_oid(int shard_id)
1016{
1017 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
1018 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
1019 return string(buf);
1020}
1021
1022class RGWReadRemoteMetadataCR : public RGWCoroutine {
1023 RGWMetaSyncEnv *sync_env;
1024
1025 RGWRESTReadResource *http_op;
1026
1027 string section;
1028 string key;
1029
1030 bufferlist *pbl;
1031
11fdf7f2
TL
1032 RGWSyncTraceNodeRef tn;
1033
7c673cae
FG
1034public:
1035 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
11fdf7f2
TL
1036 const string& _section, const string& _key, bufferlist *_pbl,
1037 const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
7c673cae
FG
1038 http_op(NULL),
1039 section(_section),
1040 key(_key),
1041 pbl(_pbl) {
11fdf7f2
TL
1042 tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta",
1043 section + ":" + key);
7c673cae
FG
1044 }
1045
1046 int operate() override {
1047 RGWRESTConn *conn = sync_env->conn;
1048 reenter(this) {
1049 yield {
1050 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
1051 { NULL, NULL } };
1052
1053 string p = string("/admin/metadata/") + section + "/" + key;
1054
1055 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1056
11fdf7f2 1057 init_new_io(http_op);
7c673cae
FG
1058
1059 int ret = http_op->aio_read();
1060 if (ret < 0) {
11fdf7f2 1061 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
7c673cae
FG
1062 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1063 http_op->put();
1064 return set_cr_error(ret);
1065 }
1066
1067 return io_block(0);
1068 }
1069 yield {
9f95a23c 1070 int ret = http_op->wait(pbl, null_yield);
7c673cae
FG
1071 http_op->put();
1072 if (ret < 0) {
1073 return set_cr_error(ret);
1074 }
1075 return set_cr_done();
1076 }
1077 }
1078 return 0;
1079 }
1080};
1081
1082class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
9f95a23c 1083 rgw::sal::RGWRadosStore *store;
7c673cae
FG
1084 string raw_key;
1085 bufferlist bl;
1086protected:
1087 int _send_request() override {
9f95a23c 1088 int ret = store->ctl()->meta.mgr->put(raw_key, bl, null_yield, RGWMDLogSyncType::APPLY_ALWAYS);
7c673cae
FG
1089 if (ret < 0) {
1090 ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1091 return ret;
1092 }
1093 return 0;
1094 }
1095public:
9f95a23c 1096 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae
FG
1097 const string& _raw_key,
1098 bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
1099 raw_key(_raw_key), bl(_bl) {}
1100};
1101
1102
1103class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1104 RGWMetaSyncEnv *sync_env;
1105 string raw_key;
1106 bufferlist bl;
1107
1108 RGWAsyncMetaStoreEntry *req;
1109
1110public:
1111 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1112 const string& _raw_key,
1113 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1114 raw_key(_raw_key), bl(_bl), req(NULL) {
1115 }
1116
1117 ~RGWMetaStoreEntryCR() override {
1118 if (req) {
1119 req->finish();
1120 }
1121 }
1122
1123 int send_request() override {
1124 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1125 sync_env->store, raw_key, bl);
1126 sync_env->async_rados->queue(req);
1127 return 0;
1128 }
1129
1130 int request_complete() override {
1131 return req->get_ret_status();
1132 }
1133};
1134
1135class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
9f95a23c 1136 rgw::sal::RGWRadosStore *store;
7c673cae
FG
1137 string raw_key;
1138protected:
1139 int _send_request() override {
9f95a23c 1140 int ret = store->ctl()->meta.mgr->remove(raw_key, null_yield);
7c673cae
FG
1141 if (ret < 0) {
1142 ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1143 return ret;
1144 }
1145 return 0;
1146 }
1147public:
9f95a23c 1148 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae
FG
1149 const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
1150 raw_key(_raw_key) {}
1151};
1152
1153
1154class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1155 RGWMetaSyncEnv *sync_env;
1156 string raw_key;
1157
1158 RGWAsyncMetaRemoveEntry *req;
1159
1160public:
1161 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1162 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1163 raw_key(_raw_key), req(NULL) {
1164 }
1165
1166 ~RGWMetaRemoveEntryCR() override {
1167 if (req) {
1168 req->finish();
1169 }
1170 }
1171
1172 int send_request() override {
1173 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1174 sync_env->store, raw_key);
1175 sync_env->async_rados->queue(req);
1176 return 0;
1177 }
1178
1179 int request_complete() override {
1180 int r = req->get_ret_status();
1181 if (r == -ENOENT) {
1182 r = 0;
1183 }
1184 return r;
1185 }
1186};
1187
1188#define META_SYNC_UPDATE_MARKER_WINDOW 10
1189
91327a77
AA
1190
1191int RGWLastCallerWinsCR::operate() {
1192 RGWCoroutine *call_cr;
1193 reenter(this) {
1194 while (cr) {
1195 call_cr = cr;
1196 cr = nullptr;
1197 yield call(call_cr);
1198 /* cr might have been modified at this point */
1199 }
1200 return set_cr_done();
1201 }
1202 return 0;
1203}
1204
7c673cae
FG
1205class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1206 RGWMetaSyncEnv *sync_env;
1207
1208 string marker_oid;
1209 rgw_meta_sync_marker sync_marker;
1210
11fdf7f2 1211 RGWSyncTraceNodeRef tn;
7c673cae
FG
1212
1213public:
1214 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1215 const string& _marker_oid,
11fdf7f2
TL
1216 const rgw_meta_sync_marker& _marker,
1217 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
7c673cae
FG
1218 sync_env(_sync_env),
1219 marker_oid(_marker_oid),
11fdf7f2
TL
1220 sync_marker(_marker),
1221 tn(_tn){}
7c673cae
FG
1222
1223 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1224 sync_marker.marker = new_marker;
1225 if (index_pos > 0) {
1226 sync_marker.pos = index_pos;
1227 }
1228
1229 if (!real_clock::is_zero(timestamp)) {
1230 sync_marker.timestamp = timestamp;
1231 }
1232
11fdf7f2
TL
1233 ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1234 tn->log(20, SSTR("new marker=" << new_marker));
9f95a23c 1235 rgw::sal::RGWRadosStore *store = sync_env->store;
7c673cae 1236 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
9f95a23c
TL
1237 store->svc()->sysobj,
1238 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
1239 sync_marker);
1240 }
91327a77 1241
11fdf7f2 1242 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
1243 return new RGWLastCallerWinsCR(sync_env->cct);
1244 }
7c673cae
FG
1245};
1246
11fdf7f2
TL
1247RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
1248 const string& _raw_key, const string& _entry_marker,
1249 const RGWMDLogStatus& _op_status,
1250 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
1251 sync_env(_sync_env),
1252 raw_key(_raw_key), entry_marker(_entry_marker),
1253 op_status(_op_status),
1254 pos(0), sync_status(0),
1255 marker_tracker(_marker_tracker), tries(0) {
1256 error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
1257 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
1258}
1259
7c673cae
FG
1260int RGWMetaSyncSingleEntryCR::operate() {
1261 reenter(this) {
1262#define NUM_TRANSIENT_ERROR_RETRIES 10
1263
1264 if (error_injection &&
1265 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
7c673cae
FG
1266 return set_cr_error(-EIO);
1267 }
1268
1269 if (op_status != MDLOG_STATUS_COMPLETE) {
11fdf7f2 1270 tn->log(20, "skipping pending operation");
7c673cae
FG
1271 yield call(marker_tracker->finish(entry_marker));
1272 if (retcode < 0) {
1273 return set_cr_error(retcode);
1274 }
1275 return set_cr_done();
1276 }
11fdf7f2 1277 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
1278 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1279 yield {
1280 pos = raw_key.find(':');
1281 section = raw_key.substr(0, pos);
1282 key = raw_key.substr(pos + 1);
11fdf7f2
TL
1283 tn->log(10, SSTR("fetching remote metadata entry" << (tries == 0 ? "" : " (retry)")));
1284 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl, tn));
7c673cae
FG
1285 }
1286
1287 sync_status = retcode;
1288
1289 if (sync_status == -ENOENT) {
1290 /* FIXME: do we need to remove the entry from the local zone? */
1291 break;
1292 }
1293
1294 if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
11fdf7f2 1295 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
7c673cae
FG
1296 continue;
1297 }
1298
1299 if (sync_status < 0) {
11fdf7f2 1300 tn->log(10, SSTR("failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status));
7c673cae
FG
1301 log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1302 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
1303 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1304 return set_cr_error(sync_status);
1305 }
1306
1307 break;
1308 }
1309
1310 retcode = 0;
1311 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1312 if (sync_status != -ENOENT) {
11fdf7f2 1313 tn->log(10, SSTR("storing local metadata entry"));
7c673cae
FG
1314 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1315 } else {
11fdf7f2 1316 tn->log(10, SSTR("removing local metadata entry"));
7c673cae
FG
1317 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1318 }
1319 if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
11fdf7f2 1320 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
7c673cae
FG
1321 continue;
1322 }
1323 break;
1324 }
1325
1326 sync_status = retcode;
1327
1328 if (sync_status == 0 && marker_tracker) {
1329 /* update marker */
1330 yield call(marker_tracker->finish(entry_marker));
1331 sync_status = retcode;
1332 }
1333 if (sync_status < 0) {
11fdf7f2 1334 tn->log(10, SSTR("failed, status=" << sync_status));
7c673cae
FG
1335 return set_cr_error(sync_status);
1336 }
11fdf7f2 1337 tn->log(10, "success");
7c673cae
FG
1338 return set_cr_done();
1339 }
1340 return 0;
1341}
1342
1343class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1344 RGWMetaSyncEnv *sync_env;
1345 RGWMetadataLog *mdlog;
1346
1347 const std::string& period;
1348 int shard_id;
1349 string marker;
1350 bool truncated = false;
1351 string *new_marker;
1352
1353 int max_entries = CLONE_MAX_ENTRIES;
1354
1355 RGWRESTReadResource *http_op = nullptr;
1356 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1357
1358 RGWMetadataLogInfo shard_info;
1359 rgw_mdlog_shard_data data;
1360
1361public:
1362 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1363 const std::string& period, int _id,
1364 const string& _marker, string *_new_marker)
1365 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1366 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1367 if (new_marker) {
1368 *new_marker = marker;
1369 }
1370 }
1371 ~RGWCloneMetaLogCoroutine() override {
1372 if (http_op) {
1373 http_op->put();
1374 }
1375 if (completion) {
1376 completion->cancel();
1377 }
1378 }
1379
1380 int operate() override;
1381
1382 int state_init();
1383 int state_read_shard_status();
1384 int state_read_shard_status_complete();
1385 int state_send_rest_request();
1386 int state_receive_rest_response();
1387 int state_store_mdlog_entries();
1388 int state_store_mdlog_entries_complete();
1389};
1390
1391class RGWMetaSyncShardCR : public RGWCoroutine {
1392 RGWMetaSyncEnv *sync_env;
1393
1394 const rgw_pool& pool;
1395 const std::string& period; //< currently syncing period id
1396 const epoch_t realm_epoch; //< realm_epoch of period
1397 RGWMetadataLog* mdlog; //< log of syncing period
1398 uint32_t shard_id;
1399 rgw_meta_sync_marker& sync_marker;
1400 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1401 string marker;
1402 string max_marker;
1403 const std::string& period_marker; //< max marker stored in next period
1404
11fdf7f2 1405 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
1406 std::set<std::string> entries;
1407 std::set<std::string>::iterator iter;
7c673cae
FG
1408
1409 string oid;
1410
1411 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1412
1413 list<cls_log_entry> log_entries;
1414 list<cls_log_entry>::iterator log_iter;
1415 bool truncated = false;
1416
1417 string mdlog_marker;
1418 string raw_key;
1419 rgw_mdlog_entry mdlog_entry;
1420
9f95a23c
TL
1421 ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock");
1422 ceph::condition_variable inc_cond;
7c673cae
FG
1423
1424 boost::asio::coroutine incremental_cr;
1425 boost::asio::coroutine full_cr;
1426
31f18b77
FG
1427 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1428 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1429
7c673cae
FG
1430 bool lost_lock = false;
1431
1432 bool *reset_backoff;
1433
1434 // hold a reference to the cr stack while it's in the map
1435 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1436 map<StackRef, string> stack_to_pos;
1437 map<string, string> pos_to_prev;
1438
1439 bool can_adjust_marker = false;
1440 bool done_with_period = false;
1441
1442 int total_entries = 0;
1443
11fdf7f2 1444 RGWSyncTraceNodeRef tn;
7c673cae
FG
1445public:
1446 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1447 const std::string& period, epoch_t realm_epoch,
1448 RGWMetadataLog* mdlog, uint32_t _shard_id,
1449 rgw_meta_sync_marker& _marker,
11fdf7f2
TL
1450 const std::string& period_marker, bool *_reset_backoff,
1451 RGWSyncTraceNodeRef& _tn)
7c673cae
FG
1452 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1453 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1454 shard_id(_shard_id), sync_marker(_marker),
9f95a23c 1455 period_marker(period_marker),
11fdf7f2 1456 reset_backoff(_reset_backoff), tn(_tn) {
7c673cae
FG
1457 *reset_backoff = false;
1458 }
1459
1460 ~RGWMetaSyncShardCR() override {
1461 delete marker_tracker;
1462 if (lease_cr) {
1463 lease_cr->abort();
7c673cae
FG
1464 }
1465 }
1466
1467 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1468 delete marker_tracker;
1469 marker_tracker = mt;
1470 }
1471
1472 int operate() override {
1473 int r;
1474 while (true) {
1475 switch (sync_marker.state) {
1476 case rgw_meta_sync_marker::FullSync:
1477 r = full_sync();
1478 if (r < 0) {
11fdf7f2 1479 ldpp_dout(sync_env->dpp, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
7c673cae
FG
1480 return set_cr_error(r);
1481 }
1482 return 0;
1483 case rgw_meta_sync_marker::IncrementalSync:
1484 r = incremental_sync();
1485 if (r < 0) {
11fdf7f2 1486 ldpp_dout(sync_env->dpp, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
7c673cae
FG
1487 return set_cr_error(r);
1488 }
1489 return 0;
1490 }
1491 }
1492 /* unreachable */
1493 return 0;
1494 }
1495
1496 void collect_children()
1497 {
1498 int child_ret;
1499 RGWCoroutinesStack *child;
1500 while (collect_next(&child_ret, &child)) {
1501 auto iter = stack_to_pos.find(child);
1502 if (iter == stack_to_pos.end()) {
1503 /* some other stack that we don't care about */
1504 continue;
1505 }
1506
1507 string& pos = iter->second;
1508
1509 if (child_ret < 0) {
11fdf7f2 1510 ldpp_dout(sync_env->dpp, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
7c673cae
FG
1511 }
1512
1513 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
11fdf7f2 1514 ceph_assert(prev_iter != pos_to_prev.end());
7c673cae
FG
1515
1516 /*
1517 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1518 * update the marker and abort. We'll get called again for these. Permanent errors will be
1519 * handled by marking the entry at the error log shard, so that we retry on it separately
1520 */
1521 if (child_ret == -EAGAIN) {
1522 can_adjust_marker = false;
1523 }
1524
1525 if (pos_to_prev.size() == 1) {
1526 if (can_adjust_marker) {
1527 sync_marker.marker = pos;
1528 }
1529 pos_to_prev.erase(prev_iter);
1530 } else {
11fdf7f2 1531 ceph_assert(pos_to_prev.size() > 1);
7c673cae
FG
1532 pos_to_prev.erase(prev_iter);
1533 prev_iter = pos_to_prev.begin();
1534 if (can_adjust_marker) {
1535 sync_marker.marker = prev_iter->second;
1536 }
1537 }
1538
11fdf7f2 1539 ldpp_dout(sync_env->dpp, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
7c673cae
FG
1540 stack_to_pos.erase(iter);
1541 }
1542 }
1543
1544 int full_sync() {
1545#define OMAP_GET_MAX_ENTRIES 100
1546 int max_entries = OMAP_GET_MAX_ENTRIES;
1547 reenter(&full_cr) {
1548 set_status("full_sync");
11fdf7f2 1549 tn->log(10, "start full sync");
7c673cae
FG
1550 oid = full_sync_index_shard_oid(shard_id);
1551 can_adjust_marker = true;
1552 /* grab lock */
1553 yield {
1554 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1555 string lock_name = "sync_lock";
9f95a23c 1556 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77
FG
1557 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1558 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1559 lock_name, lock_duration, this));
1560 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1561 lost_lock = false;
1562 }
1563 while (!lease_cr->is_locked()) {
1564 if (lease_cr->is_done()) {
7c673cae 1565 drain_all();
11fdf7f2 1566 tn->log(5, "failed to take lease");
7c673cae
FG
1567 return lease_cr->get_ret_status();
1568 }
1569 set_sleeping(true);
1570 yield;
1571 }
11fdf7f2 1572 tn->log(10, "took lease");
7c673cae
FG
1573
1574 /* lock succeeded, a retry now should avoid previous backoff status */
1575 *reset_backoff = true;
1576
1577 /* prepare marker tracker */
1578 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1579 sync_env->shard_obj_name(shard_id),
11fdf7f2 1580 sync_marker, tn));
7c673cae
FG
1581
1582 marker = sync_marker.marker;
1583
1584 total_entries = sync_marker.pos;
1585
1586 /* sync! */
1587 do {
1588 if (!lease_cr->is_locked()) {
11fdf7f2 1589 tn->log(10, "lost lease");
7c673cae
FG
1590 lost_lock = true;
1591 break;
1592 }
11fdf7f2 1593 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
7c673cae 1594 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
11fdf7f2 1595 marker, max_entries, omapkeys));
7c673cae 1596 if (retcode < 0) {
11fdf7f2
TL
1597 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1598 tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
7c673cae
FG
1599 yield lease_cr->go_down();
1600 drain_all();
1601 return retcode;
1602 }
11fdf7f2
TL
1603 entries = std::move(omapkeys->entries);
1604 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1605 if (entries.size() > 0) {
1606 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1607 }
7c673cae
FG
1608 iter = entries.begin();
1609 for (; iter != entries.end(); ++iter) {
28e407b8 1610 marker = *iter;
11fdf7f2 1611 tn->log(20, SSTR("full sync: " << marker));
7c673cae 1612 total_entries++;
28e407b8 1613 if (!marker_tracker->start(marker, total_entries, real_time())) {
11fdf7f2 1614 tn->log(0, SSTR("ERROR: cannot start syncing " << marker << ". Duplicate entry?"));
7c673cae
FG
1615 } else {
1616 // fetch remote and write locally
1617 yield {
11fdf7f2 1618 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker, tn), false);
7c673cae 1619 // stack_to_pos holds a reference to the stack
28e407b8
AA
1620 stack_to_pos[stack] = marker;
1621 pos_to_prev[marker] = marker;
7c673cae
FG
1622 }
1623 }
7c673cae
FG
1624 }
1625 collect_children();
11fdf7f2
TL
1626 } while (omapkeys->more && can_adjust_marker);
1627
1628 tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
7c673cae
FG
1629
1630 while (num_spawned() > 1) {
1631 yield wait_for_child();
1632 collect_children();
1633 }
1634
1635 if (!lost_lock) {
1636 /* update marker to reflect we're done with full sync */
1637 if (can_adjust_marker) {
1638 // apply updates to a temporary marker, or operate() will send us
1639 // to incremental_sync() after we yield
1640 temp_marker = sync_marker;
1641 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1642 temp_marker->marker = std::move(temp_marker->next_step_marker);
1643 temp_marker->next_step_marker.clear();
1644 temp_marker->realm_epoch = realm_epoch;
11fdf7f2 1645 ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
7c673cae
FG
1646
1647 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
9f95a23c 1648 yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
7c673cae
FG
1649 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1650 *temp_marker));
1651 }
1652
1653 if (retcode < 0) {
11fdf7f2 1654 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
224ce89b
WB
1655 yield lease_cr->go_down();
1656 drain_all();
7c673cae
FG
1657 return retcode;
1658 }
1659 }
1660
1661 /*
1662 * if we reached here, it means that lost_lock is true, otherwise the state
1663 * change in the previous block will prevent us from reaching here
1664 */
1665
1666 yield lease_cr->go_down();
1667
31f18b77 1668 lease_cr.reset();
7c673cae
FG
1669
1670 drain_all();
1671
1672 if (!can_adjust_marker) {
1673 return -EAGAIN;
1674 }
1675
1676 if (lost_lock) {
1677 return -EBUSY;
1678 }
1679
11fdf7f2
TL
1680 tn->log(10, "full sync complete");
1681
7c673cae 1682 // apply the sync marker update
11fdf7f2 1683 ceph_assert(temp_marker);
7c673cae
FG
1684 sync_marker = std::move(*temp_marker);
1685 temp_marker = boost::none;
1686 // must not yield after this point!
1687 }
1688 return 0;
1689 }
1690
1691
1692 int incremental_sync() {
1693 reenter(&incremental_cr) {
1694 set_status("incremental_sync");
11fdf7f2 1695 tn->log(10, "start incremental sync");
7c673cae
FG
1696 can_adjust_marker = true;
1697 /* grab lock */
1698 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1699 yield {
1700 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1701 string lock_name = "sync_lock";
9f95a23c 1702 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77
FG
1703 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1704 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1705 lock_name, lock_duration, this));
1706 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1707 lost_lock = false;
1708 }
1709 while (!lease_cr->is_locked()) {
1710 if (lease_cr->is_done()) {
7c673cae 1711 drain_all();
11fdf7f2 1712 tn->log(10, "failed to take lease");
7c673cae
FG
1713 return lease_cr->get_ret_status();
1714 }
1715 set_sleeping(true);
1716 yield;
1717 }
1718 }
11fdf7f2 1719 tn->log(10, "took lease");
7c673cae
FG
1720 // if the period has advanced, we can't use the existing marker
1721 if (sync_marker.realm_epoch < realm_epoch) {
11fdf7f2 1722 ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
7c673cae
FG
1723 << " from old realm_epoch=" << sync_marker.realm_epoch
1724 << " (now " << realm_epoch << ')' << dendl;
1725 sync_marker.realm_epoch = realm_epoch;
1726 sync_marker.marker.clear();
1727 }
1728 mdlog_marker = sync_marker.marker;
1729 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1730 sync_env->shard_obj_name(shard_id),
11fdf7f2 1731 sync_marker, tn));
7c673cae
FG
1732
1733 /*
1734 * mdlog_marker: the remote sync marker positiion
1735 * sync_marker: the local sync marker position
1736 * max_marker: the max mdlog position that we fetched
1737 * marker: the current position we try to sync
1738 * period_marker: the last marker before the next period begins (optional)
1739 */
1740 marker = max_marker = sync_marker.marker;
1741 /* inc sync */
1742 do {
1743 if (!lease_cr->is_locked()) {
1744 lost_lock = true;
11fdf7f2 1745 tn->log(10, "lost lease");
7c673cae
FG
1746 break;
1747 }
1748#define INCREMENTAL_MAX_ENTRIES 100
11fdf7f2 1749 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
7c673cae 1750 if (!period_marker.empty() && period_marker <= mdlog_marker) {
11fdf7f2 1751 tn->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker << " period_marker=" << period_marker));
7c673cae
FG
1752 done_with_period = true;
1753 break;
1754 }
1755 if (mdlog_marker <= max_marker) {
1756 /* we're at the tip, try to bring more entries */
11fdf7f2 1757 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
7c673cae
FG
1758 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1759 period, shard_id,
1760 mdlog_marker, &mdlog_marker));
1761 }
1762 if (retcode < 0) {
11fdf7f2 1763 tn->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode));
7c673cae
FG
1764 yield lease_cr->go_down();
1765 drain_all();
1766 *reset_backoff = false; // back off and try again later
1767 return retcode;
1768 }
1769 *reset_backoff = true; /* if we got to this point, all systems function */
7c673cae 1770 if (mdlog_marker > max_marker) {
11fdf7f2
TL
1771 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1772 tn->log(20, SSTR("mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker));
7c673cae
FG
1773 marker = max_marker;
1774 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1775 &max_marker, INCREMENTAL_MAX_ENTRIES,
1776 &log_entries, &truncated));
1777 if (retcode < 0) {
11fdf7f2 1778 tn->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode));
7c673cae
FG
1779 yield lease_cr->go_down();
1780 drain_all();
1781 *reset_backoff = false; // back off and try again later
1782 return retcode;
1783 }
1784 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1785 if (!period_marker.empty() && period_marker <= log_iter->id) {
1786 done_with_period = true;
1787 if (period_marker < log_iter->id) {
11fdf7f2
TL
1788 tn->log(10, SSTR("found key=" << log_iter->id
1789 << " past period_marker=" << period_marker));
7c673cae
FG
1790 break;
1791 }
11fdf7f2 1792 ldpp_dout(sync_env->dpp, 10) << "found key at period_marker=" << period_marker << dendl;
7c673cae
FG
1793 // sync this entry, then return control to RGWMetaSyncCR
1794 }
1795 if (!mdlog_entry.convert_from(*log_iter)) {
11fdf7f2 1796 tn->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry"));
7c673cae
FG
1797 continue;
1798 }
11fdf7f2 1799 tn->log(20, SSTR("log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp));
7c673cae 1800 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
11fdf7f2 1801 ldpp_dout(sync_env->dpp, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
7c673cae
FG
1802 } else {
1803 raw_key = log_iter->section + ":" + log_iter->name;
1804 yield {
11fdf7f2
TL
1805 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker, tn), false);
1806 ceph_assert(stack);
7c673cae
FG
1807 // stack_to_pos holds a reference to the stack
1808 stack_to_pos[stack] = log_iter->id;
1809 pos_to_prev[log_iter->id] = marker;
1810 }
1811 }
1812 marker = log_iter->id;
1813 }
1814 }
1815 collect_children();
11fdf7f2 1816 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
7c673cae
FG
1817 if (done_with_period) {
1818 // return control to RGWMetaSyncCR and advance to the next period
11fdf7f2 1819 tn->log(10, SSTR(*this << ": done with period"));
7c673cae
FG
1820 break;
1821 }
1822 if (mdlog_marker == max_marker && can_adjust_marker) {
11fdf7f2 1823 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
1824#define INCREMENTAL_INTERVAL 20
1825 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1826 }
1827 } while (can_adjust_marker);
1828
11fdf7f2
TL
1829 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1830
7c673cae
FG
1831 while (num_spawned() > 1) {
1832 yield wait_for_child();
1833 collect_children();
1834 }
1835
1836 yield lease_cr->go_down();
1837
1838 drain_all();
1839
1840 if (lost_lock) {
1841 return -EBUSY;
1842 }
1843
1844 if (!can_adjust_marker) {
1845 return -EAGAIN;
1846 }
1847
1848 return set_cr_done();
1849 }
1850 /* TODO */
1851 return 0;
1852 }
1853};
1854
1855class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1856{
1857 RGWMetaSyncEnv *sync_env;
1858
1859 const rgw_pool& pool;
1860 const std::string& period;
1861 epoch_t realm_epoch;
1862 RGWMetadataLog* mdlog;
1863 uint32_t shard_id;
1864 rgw_meta_sync_marker sync_marker;
1865 const std::string period_marker;
1866
11fdf7f2
TL
1867 RGWSyncTraceNodeRef tn;
1868
7c673cae
FG
1869 static constexpr bool exit_on_error = false; // retry on all errors
1870public:
1871 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1872 const std::string& period, epoch_t realm_epoch,
1873 RGWMetadataLog* mdlog, uint32_t _shard_id,
1874 const rgw_meta_sync_marker& _marker,
11fdf7f2
TL
1875 std::string&& period_marker,
1876 RGWSyncTraceNodeRef& _tn_parent)
7c673cae
FG
1877 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1878 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1879 shard_id(_shard_id), sync_marker(_marker),
11fdf7f2
TL
1880 period_marker(std::move(period_marker)) {
1881 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
1882 std::to_string(shard_id));
1883 }
7c673cae
FG
1884
1885 RGWCoroutine *alloc_cr() override {
1886 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
11fdf7f2 1887 shard_id, sync_marker, period_marker, backoff_ptr(), tn);
7c673cae
FG
1888 }
1889
1890 RGWCoroutine *alloc_finisher_cr() override {
9f95a23c
TL
1891 rgw::sal::RGWRadosStore *store = sync_env->store;
1892 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store->svc()->sysobj,
7c673cae
FG
1893 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1894 &sync_marker);
1895 }
1896};
1897
1898class RGWMetaSyncCR : public RGWCoroutine {
1899 RGWMetaSyncEnv *sync_env;
1900 const rgw_pool& pool;
1901 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1902 RGWPeriodHistory::Cursor next; //< next period in history
1903 rgw_meta_sync_status sync_status;
11fdf7f2 1904 RGWSyncTraceNodeRef tn;
7c673cae
FG
1905
1906 std::mutex mutex; //< protect access to shard_crs
1907
1908 // TODO: it should be enough to hold a reference on the stack only, as calling
1909 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1910 // already completed
1911 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1912 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1913 using RefPair = std::pair<ControlCRRef, StackRef>;
1914 map<int, RefPair> shard_crs;
1915 int ret{0};
1916
1917public:
11fdf7f2
TL
1918 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
1919 const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
7c673cae 1920 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
9f95a23c 1921 pool(sync_env->store->svc()->zone->get_zone_params().log_pool),
11fdf7f2
TL
1922 cursor(cursor), sync_status(_sync_status), tn(_tn) {}
1923
1924 ~RGWMetaSyncCR() {
1925 }
7c673cae
FG
1926
1927 int operate() override {
1928 reenter(this) {
1929 // loop through one period at a time
11fdf7f2 1930 tn->log(1, "start");
7c673cae 1931 for (;;) {
9f95a23c 1932 if (cursor == sync_env->store->svc()->mdlog->get_period_history()->get_current()) {
7c673cae
FG
1933 next = RGWPeriodHistory::Cursor{};
1934 if (cursor) {
11fdf7f2 1935 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on current period="
7c673cae
FG
1936 << cursor.get_period().get_id() << dendl;
1937 } else {
11fdf7f2 1938 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR with no period" << dendl;
7c673cae
FG
1939 }
1940 } else {
1941 next = cursor;
1942 next.next();
11fdf7f2 1943 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on period="
7c673cae
FG
1944 << cursor.get_period().get_id() << ", next="
1945 << next.get_period().get_id() << dendl;
1946 }
1947
1948 yield {
1949 // get the mdlog for the current period (may be empty)
1950 auto& period_id = sync_status.sync_info.period;
1951 auto realm_epoch = sync_status.sync_info.realm_epoch;
9f95a23c 1952 auto mdlog = sync_env->store->svc()->mdlog->get_log(period_id);
7c673cae 1953
11fdf7f2
TL
1954 tn->log(1, SSTR("realm epoch=" << realm_epoch << " period id=" << period_id));
1955
7c673cae
FG
1956 // prevent wakeup() from accessing shard_crs while we're spawning them
1957 std::lock_guard<std::mutex> lock(mutex);
1958
1959 // sync this period on each shard
1960 for (const auto& m : sync_status.sync_markers) {
1961 uint32_t shard_id = m.first;
1962 auto& marker = m.second;
1963
1964 std::string period_marker;
1965 if (next) {
1966 // read the maximum marker from the next period's sync status
1967 period_marker = next.get_period().get_sync_status()[shard_id];
1968 if (period_marker.empty()) {
1969 // no metadata changes have occurred on this shard, skip it
11fdf7f2 1970 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
7c673cae
FG
1971 << " with empty period marker" << dendl;
1972 continue;
1973 }
1974 }
1975
1976 using ShardCR = RGWMetaSyncShardControlCR;
1977 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1978 mdlog, shard_id, marker,
11fdf7f2 1979 std::move(period_marker), tn);
7c673cae
FG
1980 auto stack = spawn(cr, false);
1981 shard_crs[shard_id] = RefPair{cr, stack};
1982 }
1983 }
1984 // wait for each shard to complete
1985 while (ret == 0 && num_spawned() > 0) {
1986 yield wait_for_child();
1987 collect(&ret, nullptr);
1988 }
1989 drain_all();
1990 {
1991 // drop shard cr refs under lock
1992 std::lock_guard<std::mutex> lock(mutex);
1993 shard_crs.clear();
1994 }
1995 if (ret < 0) {
1996 return set_cr_error(ret);
1997 }
1998 // advance to the next period
11fdf7f2 1999 ceph_assert(next);
7c673cae
FG
2000 cursor = next;
2001
2002 // write the updated sync info
2003 sync_status.sync_info.period = cursor.get_period().get_id();
2004 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2005 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
9f95a23c 2006 sync_env->store->svc()->sysobj,
7c673cae
FG
2007 rgw_raw_obj(pool, sync_env->status_oid()),
2008 sync_status.sync_info));
2009 }
2010 }
2011 return 0;
2012 }
2013
2014 void wakeup(int shard_id) {
2015 std::lock_guard<std::mutex> lock(mutex);
2016 auto iter = shard_crs.find(shard_id);
2017 if (iter == shard_crs.end()) {
2018 return;
2019 }
2020 iter->second.first->wakeup();
2021 }
2022};
2023
2024void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
11fdf7f2 2025 env->dpp = dpp;
7c673cae
FG
2026 env->cct = store->ctx();
2027 env->store = store;
2028 env->conn = conn;
2029 env->async_rados = async_rados;
2030 env->http_manager = &http_manager;
2031 env->error_logger = error_logger;
9f95a23c 2032 env->sync_tracer = store->getRados()->get_sync_tracer();
7c673cae
FG
2033}
2034
2035int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
2036{
9f95a23c 2037 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2038 return 0;
2039 }
2040 // cannot run concurrently with run_sync(), so run in a separate manager
9f95a23c 2041 RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
7c673cae 2042 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 2043 int ret = http_manager.start();
7c673cae 2044 if (ret < 0) {
11fdf7f2 2045 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
2046 return ret;
2047 }
2048 RGWMetaSyncEnv sync_env_local = sync_env;
2049 sync_env_local.http_manager = &http_manager;
11fdf7f2 2050 tn->log(20, "read sync status");
7c673cae
FG
2051 ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
2052 http_manager.stop();
2053 return ret;
2054}
2055
2056int RGWRemoteMetaLog::init_sync_status()
2057{
9f95a23c 2058 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2059 return 0;
2060 }
2061
2062 rgw_mdlog_info mdlog_info;
2063 int r = read_log_info(&mdlog_info);
2064 if (r < 0) {
2065 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2066 return r;
2067 }
2068
2069 rgw_meta_sync_info sync_info;
2070 sync_info.num_shards = mdlog_info.num_shards;
9f95a23c 2071 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
7c673cae
FG
2072 if (cursor) {
2073 sync_info.period = cursor.get_period().get_id();
2074 sync_info.realm_epoch = cursor.get_epoch();
2075 }
2076
2077 return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
2078}
2079
2080int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
2081{
11fdf7f2 2082 tn->log(20, "store sync info");
9f95a23c
TL
2083 return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store->svc()->sysobj,
2084 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()),
7c673cae
FG
2085 sync_info));
2086}
2087
2088// return a cursor to the period at our sync position
9f95a23c 2089static RGWPeriodHistory::Cursor get_period_at(rgw::sal::RGWRadosStore* store,
7c673cae
FG
2090 const rgw_meta_sync_info& info)
2091{
2092 if (info.period.empty()) {
2093 // return an empty cursor with error=0
2094 return RGWPeriodHistory::Cursor{};
2095 }
2096
2097 // look for an existing period in our history
9f95a23c 2098 auto cursor = store->svc()->mdlog->get_period_history()->lookup(info.realm_epoch);
7c673cae
FG
2099 if (cursor) {
2100 // verify that the period ids match
2101 auto& existing = cursor.get_period().get_id();
2102 if (existing != info.period) {
2103 lderr(store->ctx()) << "ERROR: sync status period=" << info.period
2104 << " does not match period=" << existing
2105 << " in history at realm epoch=" << info.realm_epoch << dendl;
2106 return RGWPeriodHistory::Cursor{-EEXIST};
2107 }
2108 return cursor;
2109 }
2110
2111 // read the period from rados or pull it from the master
2112 RGWPeriod period;
9f95a23c 2113 int r = store->svc()->mdlog->pull_period(info.period, period);
7c673cae
FG
2114 if (r < 0) {
2115 lderr(store->ctx()) << "ERROR: failed to read period id "
2116 << info.period << ": " << cpp_strerror(r) << dendl;
2117 return RGWPeriodHistory::Cursor{r};
2118 }
2119 // attach the period to our history
9f95a23c 2120 cursor = store->svc()->mdlog->get_period_history()->attach(std::move(period));
7c673cae
FG
2121 if (!cursor) {
2122 r = cursor.get_error();
2123 lderr(store->ctx()) << "ERROR: failed to read period history back to "
2124 << info.period << ": " << cpp_strerror(r) << dendl;
2125 }
2126 return cursor;
2127}
2128
2129int RGWRemoteMetaLog::run_sync()
2130{
9f95a23c 2131 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2132 return 0;
2133 }
2134
2135 int r = 0;
2136
2137 // get shard count and oldest log period from master
2138 rgw_mdlog_info mdlog_info;
2139 for (;;) {
2140 if (going_down) {
11fdf7f2 2141 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
7c673cae
FG
2142 return 0;
2143 }
2144 r = read_log_info(&mdlog_info);
2145 if (r == -EIO || r == -ENOENT) {
2146 // keep retrying if master isn't alive or hasn't initialized the log
11fdf7f2 2147 ldpp_dout(dpp, 10) << __func__ << "(): waiting for master.." << dendl;
7c673cae
FG
2148 backoff.backoff_sleep();
2149 continue;
2150 }
2151 backoff.reset();
2152 if (r < 0) {
2153 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2154 return r;
2155 }
2156 break;
2157 }
2158
2159 rgw_meta_sync_status sync_status;
2160 do {
2161 if (going_down) {
11fdf7f2 2162 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
7c673cae
FG
2163 return 0;
2164 }
2165 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2166 if (r < 0 && r != -ENOENT) {
11fdf7f2 2167 ldpp_dout(dpp, 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
7c673cae
FG
2168 return r;
2169 }
2170
2171 if (!mdlog_info.period.empty()) {
2172 // restart sync if the remote has a period, but:
2173 // a) our status does not, or
2174 // b) our sync period comes before the remote's oldest log period
2175 if (sync_status.sync_info.period.empty() ||
2176 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2177 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
11fdf7f2
TL
2178 string reason;
2179 if (sync_status.sync_info.period.empty()) {
2180 reason = "period is empty";
2181 } else {
2182 reason = SSTR("sync_info realm epoch is behind: " << sync_status.sync_info.realm_epoch << " < " << mdlog_info.realm_epoch);
2183 }
2184 tn->log(1, "initialize sync (reason: " + reason + ")");
2185 ldpp_dout(dpp, 1) << "epoch=" << sync_status.sync_info.realm_epoch
7c673cae
FG
2186 << " in sync status comes before remote's oldest mdlog epoch="
2187 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2188 }
2189 }
2190
2191 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
11fdf7f2 2192 ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl;
7c673cae 2193 sync_status.sync_info.num_shards = mdlog_info.num_shards;
9f95a23c 2194 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
7c673cae
FG
2195 if (cursor) {
2196 // run full sync, then start incremental from the current period/epoch
2197 sync_status.sync_info.period = cursor.get_period().get_id();
2198 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2199 }
2200 r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2201 if (r == -EBUSY) {
2202 backoff.backoff_sleep();
2203 continue;
2204 }
2205 backoff.reset();
2206 if (r < 0) {
11fdf7f2 2207 ldpp_dout(dpp, 0) << "ERROR: failed to init sync status r=" << r << dendl;
7c673cae
FG
2208 return r;
2209 }
2210 }
2211 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2212
2213 auto num_shards = sync_status.sync_info.num_shards;
2214 if (num_shards != mdlog_info.num_shards) {
2215 lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2216 return -EINVAL;
2217 }
2218
2219 RGWPeriodHistory::Cursor cursor;
2220 do {
2221 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2222 if (r < 0 && r != -ENOENT) {
11fdf7f2 2223 tn->log(0, SSTR("ERROR: failed to fetch sync status r=" << r));
7c673cae
FG
2224 return r;
2225 }
2226
2227 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2228 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
11fdf7f2
TL
2229 tn->log(20, "building full sync maps");
2230 r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers, tn));
7c673cae
FG
2231 if (r == -EBUSY || r == -EAGAIN) {
2232 backoff.backoff_sleep();
2233 continue;
2234 }
2235 backoff.reset();
2236 if (r < 0) {
11fdf7f2 2237 tn->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r << ")"));
7c673cae
FG
2238 return r;
2239 }
2240
2241 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2242 r = store_sync_info(sync_status.sync_info);
2243 if (r < 0) {
11fdf7f2 2244 tn->log(0, SSTR("ERROR: failed to update sync status (r=" << r << ")"));
7c673cae
FG
2245 return r;
2246 }
2247 /* fall through */
2248 case rgw_meta_sync_info::StateSync:
11fdf7f2 2249 tn->log(20, "sync");
7c673cae
FG
2250 // find our position in the period history (if any)
2251 cursor = get_period_at(store, sync_status.sync_info);
2252 r = cursor.get_error();
2253 if (r < 0) {
2254 return r;
2255 }
11fdf7f2 2256 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
7c673cae
FG
2257 r = run(meta_sync_cr);
2258 if (r < 0) {
11fdf7f2 2259 tn->log(0, "ERROR: failed to fetch all metadata keys");
7c673cae
FG
2260 return r;
2261 }
2262 break;
2263 default:
11fdf7f2 2264 tn->log(0, "ERROR: bad sync state!");
7c673cae
FG
2265 return -EIO;
2266 }
2267 } while (!going_down);
2268
2269 return 0;
2270}
2271
2272void RGWRemoteMetaLog::wakeup(int shard_id)
2273{
2274 if (!meta_sync_cr) {
2275 return;
2276 }
2277 meta_sync_cr->wakeup(shard_id);
2278}
2279
2280int RGWCloneMetaLogCoroutine::operate()
2281{
2282 reenter(this) {
2283 do {
2284 yield {
11fdf7f2 2285 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
7c673cae
FG
2286 return state_init();
2287 }
2288 yield {
11fdf7f2 2289 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
7c673cae
FG
2290 return state_read_shard_status();
2291 }
2292 yield {
11fdf7f2 2293 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
7c673cae
FG
2294 return state_read_shard_status_complete();
2295 }
2296 yield {
11fdf7f2 2297 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
7c673cae
FG
2298 return state_send_rest_request();
2299 }
2300 yield {
11fdf7f2 2301 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
7c673cae
FG
2302 return state_receive_rest_response();
2303 }
2304 yield {
11fdf7f2 2305 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
7c673cae
FG
2306 return state_store_mdlog_entries();
2307 }
2308 } while (truncated);
2309 yield {
11fdf7f2 2310 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
7c673cae
FG
2311 return state_store_mdlog_entries_complete();
2312 }
2313 }
2314
2315 return 0;
2316}
2317
2318int RGWCloneMetaLogCoroutine::state_init()
2319{
2320 data = rgw_mdlog_shard_data();
2321
2322 return 0;
2323}
2324
2325int RGWCloneMetaLogCoroutine::state_read_shard_status()
2326{
2327 const bool add_ref = false; // default constructs with refs=1
2328
2329 completion.reset(new RGWMetadataLogInfoCompletion(
2330 [this](int ret, const cls_log_header& header) {
2331 if (ret < 0) {
11fdf7f2
TL
2332 if (ret != -ENOENT) {
2333 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to read mdlog info with "
2334 << cpp_strerror(ret) << dendl;
2335 }
7c673cae
FG
2336 } else {
2337 shard_info.marker = header.max_marker;
2338 shard_info.last_update = header.max_time.to_real_time();
2339 }
2340 // wake up parent stack
11fdf7f2 2341 io_complete();
7c673cae
FG
2342 }), add_ref);
2343
2344 int ret = mdlog->get_info_async(shard_id, completion.get());
2345 if (ret < 0) {
11fdf7f2 2346 ldpp_dout(sync_env->dpp, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
7c673cae
FG
2347 return set_cr_error(ret);
2348 }
2349
2350 return io_block(0);
2351}
2352
2353int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2354{
2355 completion.reset();
2356
11fdf7f2 2357 ldpp_dout(sync_env->dpp, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
7c673cae
FG
2358
2359 marker = shard_info.marker;
2360
2361 return 0;
2362}
2363
2364int RGWCloneMetaLogCoroutine::state_send_rest_request()
2365{
2366 RGWRESTConn *conn = sync_env->conn;
2367
2368 char buf[32];
2369 snprintf(buf, sizeof(buf), "%d", shard_id);
2370
2371 char max_entries_buf[32];
2372 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2373
2374 const char *marker_key = (marker.empty() ? "" : "marker");
2375
2376 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2377 { "id", buf },
2378 { "period", period.c_str() },
2379 { "max-entries", max_entries_buf },
2380 { marker_key, marker.c_str() },
2381 { NULL, NULL } };
2382
2383 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2384
11fdf7f2 2385 init_new_io(http_op);
7c673cae
FG
2386
2387 int ret = http_op->aio_read();
2388 if (ret < 0) {
11fdf7f2 2389 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
7c673cae
FG
2390 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2391 http_op->put();
2392 http_op = NULL;
1adf2230 2393 return set_cr_error(ret);
7c673cae
FG
2394 }
2395
2396 return io_block(0);
2397}
2398
2399int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2400{
9f95a23c 2401 int ret = http_op->wait(&data, null_yield);
7c673cae
FG
2402 if (ret < 0) {
2403 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
11fdf7f2 2404 ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;
7c673cae
FG
2405 http_op->put();
2406 http_op = NULL;
2407 return set_cr_error(ret);
2408 }
2409 http_op->put();
2410 http_op = NULL;
2411
11fdf7f2 2412 ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
7c673cae
FG
2413
2414 truncated = ((int)data.entries.size() == max_entries);
2415
2416 if (data.entries.empty()) {
2417 if (new_marker) {
2418 *new_marker = marker;
2419 }
2420 return set_cr_done();
2421 }
2422
2423 if (new_marker) {
2424 *new_marker = data.entries.back().id;
2425 }
2426
2427 return 0;
2428}
2429
2430
2431int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2432{
2433 list<cls_log_entry> dest_entries;
2434
2435 vector<rgw_mdlog_entry>::iterator iter;
2436 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2437 rgw_mdlog_entry& entry = *iter;
11fdf7f2 2438 ldpp_dout(sync_env->dpp, 20) << "entry: name=" << entry.name << dendl;
7c673cae
FG
2439
2440 cls_log_entry dest_entry;
2441 dest_entry.id = entry.id;
2442 dest_entry.section = entry.section;
2443 dest_entry.name = entry.name;
2444 dest_entry.timestamp = utime_t(entry.timestamp);
2445
11fdf7f2 2446 encode(entry.log_data, dest_entry.data);
7c673cae
FG
2447
2448 dest_entries.push_back(dest_entry);
2449
2450 marker = entry.id;
2451 }
2452
2453 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2454
2455 int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
2456 if (ret < 0) {
2457 cn->put();
11fdf7f2 2458 ldpp_dout(sync_env->dpp, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
7c673cae
FG
2459 return set_cr_error(ret);
2460 }
2461 return io_block(0);
2462}
2463
2464int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2465{
2466 return set_cr_done();
2467}