]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_sync.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include <boost/optional.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/admin_socket.h"
12#include "common/errno.h"
13
14#include "rgw_common.h"
15#include "rgw_rados.h"
11fdf7f2 16#include "rgw_zone.h"
7c673cae
FG
17#include "rgw_sync.h"
18#include "rgw_metadata.h"
9f95a23c 19#include "rgw_mdlog_types.h"
7c673cae
FG
20#include "rgw_rest_conn.h"
21#include "rgw_tools.h"
22#include "rgw_cr_rados.h"
23#include "rgw_cr_rest.h"
24#include "rgw_http_client.h"
11fdf7f2 25#include "rgw_sync_trace.h"
7c673cae
FG
26
27#include "cls/lock/cls_lock_client.h"
28
11fdf7f2 29#include "services/svc_zone.h"
9f95a23c
TL
30#include "services/svc_mdlog.h"
31#include "services/svc_meta.h"
32#include "services/svc_cls.h"
11fdf7f2 33
31f18b77
FG
34#include <boost/asio/yield.hpp>
35
7c673cae
FG
36#define dout_subsys ceph_subsys_rgw
37
38#undef dout_prefix
39#define dout_prefix (*_dout << "meta sync: ")
40
41static string mdlog_sync_status_oid = "mdlog.sync-status";
42static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
43static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
44
9f95a23c 45RGWSyncErrorLogger::RGWSyncErrorLogger(rgw::sal::RGWRadosStore *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
7c673cae
FG
46 for (int i = 0; i < num_shards; i++) {
47 oids.push_back(get_shard_oid(oid_prefix, i));
48 }
49}
50string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
51 char buf[oid_prefix.size() + 16];
52 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
53 return string(buf);
54}
55
56RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
57 cls_log_entry entry;
58
59 rgw_sync_error_info info(source_zone, error_code, message);
60 bufferlist bl;
11fdf7f2 61 encode(info, bl);
9f95a23c 62 store->svc()->cls->timelog.prepare_entry(entry, real_clock::now(), section, name, bl);
7c673cae
FG
63
64 uint32_t shard_id = ++counter % num_shards;
65
66
67 return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
68}
69
70void RGWSyncBackoff::update_wait_time()
71{
72 if (cur_wait == 0) {
73 cur_wait = 1;
74 } else {
75 cur_wait = (cur_wait << 1);
76 }
77 if (cur_wait >= max_secs) {
78 cur_wait = max_secs;
79 }
80}
81
82void RGWSyncBackoff::backoff_sleep()
83{
84 update_wait_time();
85 sleep(cur_wait);
86}
87
88void RGWSyncBackoff::backoff(RGWCoroutine *op)
89{
90 update_wait_time();
91 op->wait(utime_t(cur_wait, 0));
92}
93
94int RGWBackoffControlCR::operate() {
95 reenter(this) {
96 // retry the operation until it succeeds
97 while (true) {
98 yield {
9f95a23c 99 std::lock_guard l{lock};
7c673cae
FG
100 cr = alloc_cr();
101 cr->get();
102 call(cr);
103 }
104 {
9f95a23c 105 std::lock_guard l{lock};
7c673cae
FG
106 cr->put();
107 cr = NULL;
108 }
109 if (retcode >= 0) {
110 break;
111 }
112 if (retcode != -EBUSY && retcode != -EAGAIN) {
113 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
114 if (exit_on_error) {
115 return set_cr_error(retcode);
116 }
117 }
118 if (reset_backoff) {
119 backoff.reset();
120 }
121 yield backoff.backoff(this);
122 }
123
124 // run an optional finisher
125 yield call(alloc_finisher_cr());
126 if (retcode < 0) {
127 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
128 return set_cr_error(retcode);
129 }
130 return set_cr_done();
131 }
132 return 0;
133}
134
135void rgw_mdlog_info::decode_json(JSONObj *obj) {
136 JSONDecoder::decode_json("num_objects", num_shards, obj);
137 JSONDecoder::decode_json("period", period, obj);
138 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
139}
140
141void rgw_mdlog_entry::decode_json(JSONObj *obj) {
142 JSONDecoder::decode_json("id", id, obj);
143 JSONDecoder::decode_json("section", section, obj);
144 JSONDecoder::decode_json("name", name, obj);
145 utime_t ut;
146 JSONDecoder::decode_json("timestamp", ut, obj);
147 timestamp = ut.to_real_time();
148 JSONDecoder::decode_json("data", log_data, obj);
149}
150
151void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
152 JSONDecoder::decode_json("marker", marker, obj);
153 JSONDecoder::decode_json("truncated", truncated, obj);
154 JSONDecoder::decode_json("entries", entries, obj);
155};
156
157int RGWShardCollectCR::operate() {
158 reenter(this) {
159 while (spawn_next()) {
160 current_running++;
161
162 while (current_running >= max_concurrent) {
163 int child_ret;
164 yield wait_for_child();
165 if (collect_next(&child_ret)) {
166 current_running--;
167 if (child_ret < 0 && child_ret != -ENOENT) {
168 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
169 status = child_ret;
170 }
171 }
172 }
173 }
174 while (current_running > 0) {
175 int child_ret;
176 yield wait_for_child();
177 if (collect_next(&child_ret)) {
178 current_running--;
179 if (child_ret < 0 && child_ret != -ENOENT) {
180 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
181 status = child_ret;
182 }
183 }
184 }
185 if (status < 0) {
186 return set_cr_error(status);
187 }
188 return set_cr_done();
189 }
190 return 0;
191}
192
193class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
194 RGWMetaSyncEnv *sync_env;
195
196 const std::string& period;
197 int num_shards;
198 map<int, RGWMetadataLogInfo> *mdlog_info;
199
200 int shard_id;
201#define READ_MDLOG_MAX_CONCURRENT 10
202
203public:
204 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
205 const std::string& period, int _num_shards,
206 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
207 sync_env(_sync_env),
208 period(period), num_shards(_num_shards),
209 mdlog_info(_mdlog_info), shard_id(0) {}
210 bool spawn_next() override;
211};
212
213class RGWListRemoteMDLogCR : public RGWShardCollectCR {
214 RGWMetaSyncEnv *sync_env;
215
216 const std::string& period;
217 map<int, string> shards;
218 int max_entries_per_shard;
219 map<int, rgw_mdlog_shard_data> *result;
220
221 map<int, string>::iterator iter;
222#define READ_MDLOG_MAX_CONCURRENT 10
223
224public:
225 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
226 const std::string& period, map<int, string>& _shards,
227 int _max_entries_per_shard,
228 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
229 sync_env(_sync_env), period(period),
230 max_entries_per_shard(_max_entries_per_shard),
231 result(_result) {
232 shards.swap(_shards);
233 iter = shards.begin();
234 }
235 bool spawn_next() override;
236};
237
238RGWRemoteMetaLog::~RGWRemoteMetaLog()
239{
240 delete error_logger;
241}
242
243int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
244{
245 rgw_http_param_pair pairs[] = { { "type", "metadata" },
246 { NULL, NULL } };
247
248 int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
249 if (ret < 0) {
11fdf7f2 250 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl;
7c673cae
FG
251 return ret;
252 }
253
11fdf7f2 254 ldpp_dout(dpp, 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
7c673cae
FG
255
256 return 0;
257}
258
259int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
260{
9f95a23c 261 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
262 return 0;
263 }
264
265 rgw_mdlog_info log_info;
266 int ret = read_log_info(&log_info);
267 if (ret < 0) {
268 return ret;
269 }
270
271 return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
272}
273
274int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
275{
9f95a23c 276 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
277 return 0;
278 }
279
280 return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
281}
282
283int RGWRemoteMetaLog::init()
284{
9f95a23c 285 conn = store->svc()->zone->get_master_conn();
7c673cae 286
11fdf7f2 287 int ret = http_manager.start();
7c673cae 288 if (ret < 0) {
11fdf7f2 289 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
290 return ret;
291 }
292
293 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
294
295 init_sync_env(&sync_env);
296
11fdf7f2
TL
297 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "meta");
298
7c673cae
FG
299 return 0;
300}
301
302void RGWRemoteMetaLog::finish()
303{
304 going_down = true;
305 stop();
306}
307
308#define CLONE_MAX_ENTRIES 100
309
310int RGWMetaSyncStatusManager::init()
311{
9f95a23c 312 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
313 return 0;
314 }
315
9f95a23c 316 if (!store->svc()->zone->get_master_conn()) {
7c673cae
FG
317 lderr(store->ctx()) << "no REST connection to master zone" << dendl;
318 return -EIO;
319 }
320
9f95a23c 321 int r = rgw_init_ioctx(store->getRados()->get_rados_handle(), store->svc()->zone->get_zone_params().log_pool, ioctx, true);
7c673cae 322 if (r < 0) {
9f95a23c 323 lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->svc()->zone->get_zone_params().log_pool << " ret=" << r << dendl;
7c673cae
FG
324 return r;
325 }
326
327 r = master_log.init();
328 if (r < 0) {
329 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
330 return r;
331 }
332
333 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
334
335 rgw_meta_sync_status sync_status;
336 r = read_sync_status(&sync_status);
337 if (r < 0 && r != -ENOENT) {
338 lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
339 return r;
340 }
341
342 int num_shards = sync_status.sync_info.num_shards;
343
344 for (int i = 0; i < num_shards; i++) {
9f95a23c 345 shard_objs[i] = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.shard_obj_name(i));
7c673cae
FG
346 }
347
9f95a23c 348 std::unique_lock wl{ts_to_shard_lock};
7c673cae
FG
349 for (int i = 0; i < num_shards; i++) {
350 clone_markers.push_back(string());
351 utime_shard ut;
352 ut.shard_id = i;
353 ts_to_shard[ut] = i;
354 }
355
356 return 0;
357}
358
11fdf7f2
TL
359unsigned RGWMetaSyncStatusManager::get_subsys() const
360{
361 return dout_subsys;
362}
363
364std::ostream& RGWMetaSyncStatusManager::gen_prefix(std::ostream& out) const
365{
366 return out << "meta sync: ";
367}
368
9f95a23c 369void RGWMetaSyncEnv::init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWRESTConn *_conn,
7c673cae 370 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
11fdf7f2
TL
371 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer) {
372 dpp = _dpp;
7c673cae
FG
373 cct = _cct;
374 store = _store;
375 conn = _conn;
376 async_rados = _async_rados;
377 http_manager = _http_manager;
378 error_logger = _error_logger;
11fdf7f2 379 sync_tracer = _sync_tracer;
7c673cae
FG
380}
381
382string RGWMetaSyncEnv::status_oid()
383{
384 return mdlog_sync_status_oid;
385}
386
387string RGWMetaSyncEnv::shard_obj_name(int shard_id)
388{
389 char buf[mdlog_sync_status_shard_prefix.size() + 16];
390 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
391
392 return string(buf);
393}
394
395class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
9f95a23c 396 rgw::sal::RGWRadosStore *store;
7c673cae
FG
397 RGWMetadataLog *mdlog;
398 int shard_id;
399 string *marker;
400 int max_entries;
401 list<cls_log_entry> *entries;
402 bool *truncated;
403
404protected:
405 int _send_request() override {
406 real_time from_time;
407 real_time end_time;
408
409 void *handle;
410
411 mdlog->init_list_entries(shard_id, from_time, end_time, *marker, &handle);
412
413 int ret = mdlog->list_entries(handle, max_entries, *entries, marker, truncated);
414
415 mdlog->complete_list_entries(handle);
416
417 return ret;
418 }
419public:
9f95a23c 420 RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae
FG
421 RGWMetadataLog* mdlog, int _shard_id,
422 string* _marker, int _max_entries,
423 list<cls_log_entry> *_entries, bool *_truncated)
424 : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
425 shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
426 entries(_entries), truncated(_truncated) {}
427};
428
429class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
430 RGWMetaSyncEnv *sync_env;
431 RGWMetadataLog *const mdlog;
432 int shard_id;
433 string marker;
434 string *pmarker;
435 int max_entries;
436 list<cls_log_entry> *entries;
437 bool *truncated;
438
224ce89b 439 RGWAsyncReadMDLogEntries *req{nullptr};
7c673cae
FG
440
441public:
442 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
443 int _shard_id, string*_marker, int _max_entries,
444 list<cls_log_entry> *_entries, bool *_truncated)
445 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
446 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
447 entries(_entries), truncated(_truncated) {}
448
449 ~RGWReadMDLogEntriesCR() override {
450 if (req) {
451 req->finish();
452 }
453 }
454
455 int send_request() override {
456 marker = *pmarker;
457 req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
458 sync_env->store, mdlog, shard_id, &marker,
459 max_entries, entries, truncated);
460 sync_env->async_rados->queue(req);
461 return 0;
462 }
463
464 int request_complete() override {
465 int ret = req->get_ret_status();
466 if (ret >= 0 && !entries->empty()) {
467 *pmarker = marker;
468 }
469 return req->get_ret_status();
470 }
471};
472
473
474class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
475 RGWMetaSyncEnv *env;
476 RGWRESTReadResource *http_op;
477
478 const std::string& period;
479 int shard_id;
480 RGWMetadataLogInfo *shard_info;
481
482public:
483 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
484 int _shard_id, RGWMetadataLogInfo *_shard_info)
485 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
486 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
487
488 int operate() override {
489 auto store = env->store;
9f95a23c 490 RGWRESTConn *conn = store->svc()->zone->get_master_conn();
7c673cae
FG
491 reenter(this) {
492 yield {
493 char buf[16];
494 snprintf(buf, sizeof(buf), "%d", shard_id);
495 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
496 { "id", buf },
497 { "period", period.c_str() },
498 { "info" , NULL },
499 { NULL, NULL } };
500
501 string p = "/admin/log/";
502
503 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
504 env->http_manager);
505
11fdf7f2 506 init_new_io(http_op);
7c673cae
FG
507
508 int ret = http_op->aio_read();
509 if (ret < 0) {
11fdf7f2 510 ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
7c673cae
FG
511 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
512 http_op->put();
513 return set_cr_error(ret);
514 }
515
516 return io_block(0);
517 }
518 yield {
9f95a23c 519 int ret = http_op->wait(shard_info, null_yield);
7c673cae
FG
520 http_op->put();
521 if (ret < 0) {
522 return set_cr_error(ret);
523 }
524 return set_cr_done();
525 }
526 }
527 return 0;
528 }
529};
530
9f95a23c
TL
531RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
532 const std::string& period,
533 int shard_id,
534 RGWMetadataLogInfo* info)
535{
536 return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info);
537}
538
7c673cae
FG
539class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
540 RGWMetaSyncEnv *sync_env;
541 RGWRESTReadResource *http_op;
542
543 const std::string& period;
544 int shard_id;
545 string marker;
546 uint32_t max_entries;
547 rgw_mdlog_shard_data *result;
548
549public:
550 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
551 int _shard_id, const string& _marker, uint32_t _max_entries,
552 rgw_mdlog_shard_data *_result)
553 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
554 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
555
556 int send_request() override {
557 RGWRESTConn *conn = sync_env->conn;
7c673cae
FG
558
559 char buf[32];
560 snprintf(buf, sizeof(buf), "%d", shard_id);
561
562 char max_entries_buf[32];
563 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
564
565 const char *marker_key = (marker.empty() ? "" : "marker");
566
567 rgw_http_param_pair pairs[] = { { "type", "metadata" },
568 { "id", buf },
569 { "period", period.c_str() },
570 { "max-entries", max_entries_buf },
571 { marker_key, marker.c_str() },
572 { NULL, NULL } };
573
574 string p = "/admin/log/";
575
576 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
11fdf7f2 577 init_new_io(http_op);
7c673cae
FG
578
579 int ret = http_op->aio_read();
580 if (ret < 0) {
11fdf7f2 581 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
7c673cae
FG
582 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
583 http_op->put();
584 return ret;
585 }
586
587 return 0;
588 }
589
590 int request_complete() override {
9f95a23c 591 int ret = http_op->wait(result, null_yield);
7c673cae
FG
592 http_op->put();
593 if (ret < 0 && ret != -ENOENT) {
11fdf7f2 594 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
7c673cae
FG
595 return ret;
596 }
597 return 0;
598 }
599};
600
9f95a23c
TL
601RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
602 const std::string& period,
603 int shard_id,
604 const std::string& marker,
605 uint32_t max_entries,
606 rgw_mdlog_shard_data *result)
607{
608 return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker,
609 max_entries, result);
610}
611
7c673cae
FG
612bool RGWReadRemoteMDLogInfoCR::spawn_next() {
613 if (shard_id >= num_shards) {
614 return false;
615 }
616 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
617 shard_id++;
618 return true;
619}
620
621bool RGWListRemoteMDLogCR::spawn_next() {
622 if (iter == shards.end()) {
623 return false;
624 }
625
626 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
627 ++iter;
628 return true;
629}
630
631class RGWInitSyncStatusCoroutine : public RGWCoroutine {
632 RGWMetaSyncEnv *sync_env;
633
634 rgw_meta_sync_info status;
635 vector<RGWMetadataLogInfo> shards_info;
31f18b77
FG
636 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
637 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
638public:
639 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
640 const rgw_meta_sync_info &status)
641 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
642 status(status), shards_info(status.num_shards),
643 lease_cr(nullptr), lease_stack(nullptr) {}
644
645 ~RGWInitSyncStatusCoroutine() override {
646 if (lease_cr) {
647 lease_cr->abort();
7c673cae
FG
648 }
649 }
650
651 int operate() override {
652 int ret;
653 reenter(this) {
654 yield {
655 set_status("acquiring sync lock");
656 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
657 string lock_name = "sync_lock";
9f95a23c 658 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77 659 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
9f95a23c 660 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
31f18b77
FG
661 lock_name, lock_duration, this));
662 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
663 }
664 while (!lease_cr->is_locked()) {
665 if (lease_cr->is_done()) {
11fdf7f2 666 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
7c673cae
FG
667 set_status("lease lock failed, early abort");
668 return set_cr_error(lease_cr->get_ret_status());
669 }
670 set_sleeping(true);
671 yield;
672 }
673 yield {
674 set_status("writing sync status");
9f95a23c
TL
675 rgw::sal::RGWRadosStore *store = sync_env->store;
676 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc()->sysobj,
677 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
7c673cae
FG
678 status));
679 }
680
681 if (retcode < 0) {
682 set_status("failed to write sync status");
11fdf7f2 683 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
7c673cae
FG
684 yield lease_cr->go_down();
685 return set_cr_error(retcode);
686 }
687 /* fetch current position in logs */
688 set_status("fetching remote log position");
689 yield {
690 for (int i = 0; i < (int)status.num_shards; i++) {
691 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
692 &shards_info[i]), false);
693 }
694 }
695
31f18b77 696 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
7c673cae
FG
697
698 yield {
699 set_status("updating sync status");
700 for (int i = 0; i < (int)status.num_shards; i++) {
701 rgw_meta_sync_marker marker;
702 RGWMetadataLogInfo& info = shards_info[i];
703 marker.next_step_marker = info.marker;
704 marker.timestamp = info.last_update;
9f95a23c 705 rgw::sal::RGWRadosStore *store = sync_env->store;
7c673cae 706 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
9f95a23c
TL
707 store->svc()->sysobj,
708 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
7c673cae
FG
709 marker), true);
710 }
711 }
712 yield {
713 set_status("changing sync state: build full sync maps");
714 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
9f95a23c
TL
715 rgw::sal::RGWRadosStore *store = sync_env->store;
716 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store->svc()->sysobj,
717 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
7c673cae
FG
718 status));
719 }
720 set_status("drop lock lease");
721 yield lease_cr->go_down();
722 while (collect(&ret, NULL)) {
723 if (ret < 0) {
724 return set_cr_error(ret);
725 }
726 yield;
727 }
728 drain_all();
729 return set_cr_done();
730 }
731 return 0;
732 }
733};
734
735class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
736 static constexpr int MAX_CONCURRENT_SHARDS = 16;
737
738 RGWMetaSyncEnv *env;
739 const int num_shards;
740 int shard_id{0};
741 map<uint32_t, rgw_meta_sync_marker>& markers;
742
743 public:
744 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
745 map<uint32_t, rgw_meta_sync_marker>& markers)
746 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
747 env(env), num_shards(num_shards), markers(markers)
748 {}
749 bool spawn_next() override;
750};
751
752bool RGWReadSyncStatusMarkersCR::spawn_next()
753{
754 if (shard_id >= num_shards) {
755 return false;
756 }
757 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
9f95a23c 758 rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool,
7c673cae 759 env->shard_obj_name(shard_id)};
9f95a23c 760 spawn(new CR(env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false);
7c673cae
FG
761 shard_id++;
762 return true;
763}
764
765class RGWReadSyncStatusCoroutine : public RGWCoroutine {
766 RGWMetaSyncEnv *sync_env;
767 rgw_meta_sync_status *sync_status;
768
769public:
770 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
771 rgw_meta_sync_status *_status)
772 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
773 {}
774 int operate() override;
775};
776
777int RGWReadSyncStatusCoroutine::operate()
778{
779 reenter(this) {
780 // read sync info
781 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
782 yield {
783 bool empty_on_enoent = false; // fail on ENOENT
9f95a23c 784 rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool,
7c673cae 785 sync_env->status_oid()};
9f95a23c 786 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, obj,
7c673cae
FG
787 &sync_status->sync_info, empty_on_enoent));
788 }
789 if (retcode < 0) {
11fdf7f2 790 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status info with "
7c673cae
FG
791 << cpp_strerror(retcode) << dendl;
792 return set_cr_error(retcode);
793 }
794 // read shard markers
795 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
796 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
797 sync_status->sync_markers));
798 if (retcode < 0) {
11fdf7f2 799 ldpp_dout(sync_env->dpp, 4) << "failed to read sync status markers with "
7c673cae
FG
800 << cpp_strerror(retcode) << dendl;
801 return set_cr_error(retcode);
802 }
803 return set_cr_done();
804 }
805 return 0;
806}
807
808class RGWFetchAllMetaCR : public RGWCoroutine {
809 RGWMetaSyncEnv *sync_env;
810
811 int num_shards;
812
813
814 int ret_status;
815
816 list<string> sections;
817 list<string>::iterator sections_iter;
181888fb
FG
818
819 struct meta_list_result {
820 list<string> keys;
821 string marker;
822 uint64_t count{0};
823 bool truncated{false};
824
825 void decode_json(JSONObj *obj) {
826 JSONDecoder::decode_json("keys", keys, obj);
827 JSONDecoder::decode_json("marker", marker, obj);
828 JSONDecoder::decode_json("count", count, obj);
829 JSONDecoder::decode_json("truncated", truncated, obj);
830 }
831 } result;
7c673cae
FG
832 list<string>::iterator iter;
833
834 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
835
31f18b77
FG
836 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
837 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
838 bool lost_lock;
839 bool failed;
840
181888fb
FG
841 string marker;
842
7c673cae
FG
843 map<uint32_t, rgw_meta_sync_marker>& markers;
844
11fdf7f2
TL
845 RGWSyncTraceNodeRef tn;
846
7c673cae
FG
847public:
848 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
11fdf7f2
TL
849 map<uint32_t, rgw_meta_sync_marker>& _markers,
850 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
7c673cae
FG
851 num_shards(_num_shards),
852 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
853 lost_lock(false), failed(false), markers(_markers) {
11fdf7f2 854 tn = sync_env->sync_tracer->add_node(_tn_parent, "fetch_all_meta");
7c673cae
FG
855 }
856
857 ~RGWFetchAllMetaCR() override {
7c673cae
FG
858 }
859
860 void append_section_from_set(set<string>& all_sections, const string& name) {
861 set<string>::iterator iter = all_sections.find(name);
862 if (iter != all_sections.end()) {
863 sections.emplace_back(std::move(*iter));
864 all_sections.erase(iter);
865 }
866 }
867 /*
868 * meta sync should go in the following order: user, bucket.instance, bucket
869 * then whatever other sections exist (if any)
870 */
871 void rearrange_sections() {
872 set<string> all_sections;
873 std::move(sections.begin(), sections.end(),
874 std::inserter(all_sections, all_sections.end()));
875 sections.clear();
876
877 append_section_from_set(all_sections, "user");
878 append_section_from_set(all_sections, "bucket.instance");
879 append_section_from_set(all_sections, "bucket");
880
881 std::move(all_sections.begin(), all_sections.end(),
882 std::back_inserter(sections));
883 }
884
885 int operate() override {
886 RGWRESTConn *conn = sync_env->conn;
887
888 reenter(this) {
889 yield {
890 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
891 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
892 string lock_name = "sync_lock";
31f18b77
FG
893 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
894 sync_env->store,
9f95a23c 895 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
31f18b77 896 lock_name, lock_duration, this));
224ce89b 897 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
898 }
899 while (!lease_cr->is_locked()) {
900 if (lease_cr->is_done()) {
11fdf7f2 901 ldpp_dout(sync_env->dpp, 5) << "lease cr failed, done early " << dendl;
7c673cae
FG
902 set_status("failed acquiring lock");
903 return set_cr_error(lease_cr->get_ret_status());
904 }
905 set_sleeping(true);
906 yield;
907 }
908 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
9f95a23c 909 sync_env->store->svc()->zone->get_zone_params().log_pool,
7c673cae
FG
910 mdlog_sync_full_sync_index_prefix));
911 yield {
912 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
913 "/admin/metadata", NULL, &sections));
914 }
915 if (get_ret_status() < 0) {
11fdf7f2 916 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch metadata sections" << dendl;
7c673cae
FG
917 yield entries_index->finish();
918 yield lease_cr->go_down();
919 drain_all();
920 return set_cr_error(get_ret_status());
921 }
922 rearrange_sections();
923 sections_iter = sections.begin();
924 for (; sections_iter != sections.end(); ++sections_iter) {
181888fb
FG
925 do {
926 yield {
927#define META_FULL_SYNC_CHUNK_SIZE "1000"
928 string entrypoint = string("/admin/metadata/") + *sections_iter;
929 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
930 { "marker", result.marker.c_str() },
931 { NULL, NULL } };
932 result.keys.clear();
933 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
934 entrypoint, pairs, &result));
7c673cae 935 }
11fdf7f2
TL
936 ret_status = get_ret_status();
937 if (ret_status == -ENOENT) {
938 set_retcode(0); /* reset coroutine status so that we don't return it */
939 ret_status = 0;
940 }
941 if (ret_status < 0) {
942 tn->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter));
181888fb
FG
943 yield entries_index->finish();
944 yield lease_cr->go_down();
945 drain_all();
11fdf7f2 946 return set_cr_error(ret_status);
7c673cae 947 }
181888fb
FG
948 iter = result.keys.begin();
949 for (; iter != result.keys.end(); ++iter) {
950 if (!lease_cr->is_locked()) {
951 lost_lock = true;
952 break;
953 }
954 yield; // allow entries_index consumer to make progress
955
11fdf7f2 956 tn->log(20, SSTR("list metadata: section=" << *sections_iter << " key=" << *iter));
181888fb
FG
957 string s = *sections_iter + ":" + *iter;
958 int shard_id;
9f95a23c
TL
959 rgw::sal::RGWRadosStore *store = sync_env->store;
960 int ret = store->ctl()->meta.mgr->get_shard_id(*sections_iter, *iter, &shard_id);
181888fb 961 if (ret < 0) {
11fdf7f2 962 tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter));
181888fb
FG
963 ret_status = ret;
964 break;
965 }
966 if (!entries_index->append(s, shard_id)) {
967 break;
968 }
7c673cae 969 }
181888fb 970 } while (result.truncated);
7c673cae
FG
971 }
972 yield {
973 if (!entries_index->finish()) {
974 failed = true;
975 }
976 }
977 if (!failed) {
978 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
979 int shard_id = (int)iter->first;
980 rgw_meta_sync_marker& marker = iter->second;
981 marker.total_entries = entries_index->get_total_entries(shard_id);
9f95a23c
TL
982 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store->svc()->sysobj,
983 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
7c673cae
FG
984 marker), true);
985 }
986 }
987
31f18b77 988 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
7c673cae
FG
989
990 yield lease_cr->go_down();
991
992 int ret;
993 while (collect(&ret, NULL)) {
994 if (ret < 0) {
995 return set_cr_error(ret);
996 }
997 yield;
998 }
999 drain_all();
1000 if (failed) {
1001 yield return set_cr_error(-EIO);
1002 }
1003 if (lost_lock) {
1004 yield return set_cr_error(-EBUSY);
1005 }
1006
1007 if (ret_status < 0) {
1008 yield return set_cr_error(ret_status);
1009 }
1010
1011 yield return set_cr_done();
1012 }
1013 return 0;
1014 }
1015};
1016
1017static string full_sync_index_shard_oid(int shard_id)
1018{
1019 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
1020 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
1021 return string(buf);
1022}
1023
1024class RGWReadRemoteMetadataCR : public RGWCoroutine {
1025 RGWMetaSyncEnv *sync_env;
1026
1027 RGWRESTReadResource *http_op;
1028
1029 string section;
1030 string key;
1031
1032 bufferlist *pbl;
1033
11fdf7f2
TL
1034 RGWSyncTraceNodeRef tn;
1035
7c673cae
FG
1036public:
1037 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
11fdf7f2
TL
1038 const string& _section, const string& _key, bufferlist *_pbl,
1039 const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
7c673cae
FG
1040 http_op(NULL),
1041 section(_section),
1042 key(_key),
1043 pbl(_pbl) {
11fdf7f2
TL
1044 tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta",
1045 section + ":" + key);
7c673cae
FG
1046 }
1047
1048 int operate() override {
1049 RGWRESTConn *conn = sync_env->conn;
1050 reenter(this) {
1051 yield {
1052 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
1053 { NULL, NULL } };
1054
1055 string p = string("/admin/metadata/") + section + "/" + key;
1056
1057 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1058
11fdf7f2 1059 init_new_io(http_op);
7c673cae
FG
1060
1061 int ret = http_op->aio_read();
1062 if (ret < 0) {
11fdf7f2 1063 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
7c673cae
FG
1064 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1065 http_op->put();
1066 return set_cr_error(ret);
1067 }
1068
1069 return io_block(0);
1070 }
1071 yield {
9f95a23c 1072 int ret = http_op->wait(pbl, null_yield);
7c673cae
FG
1073 http_op->put();
1074 if (ret < 0) {
1075 return set_cr_error(ret);
1076 }
1077 return set_cr_done();
1078 }
1079 }
1080 return 0;
1081 }
1082};
1083
1084class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
9f95a23c 1085 rgw::sal::RGWRadosStore *store;
7c673cae
FG
1086 string raw_key;
1087 bufferlist bl;
1088protected:
1089 int _send_request() override {
9f95a23c 1090 int ret = store->ctl()->meta.mgr->put(raw_key, bl, null_yield, RGWMDLogSyncType::APPLY_ALWAYS);
7c673cae
FG
1091 if (ret < 0) {
1092 ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1093 return ret;
1094 }
1095 return 0;
1096 }
1097public:
9f95a23c 1098 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae
FG
1099 const string& _raw_key,
1100 bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
1101 raw_key(_raw_key), bl(_bl) {}
1102};
1103
1104
1105class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1106 RGWMetaSyncEnv *sync_env;
1107 string raw_key;
1108 bufferlist bl;
1109
1110 RGWAsyncMetaStoreEntry *req;
1111
1112public:
1113 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1114 const string& _raw_key,
1115 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1116 raw_key(_raw_key), bl(_bl), req(NULL) {
1117 }
1118
1119 ~RGWMetaStoreEntryCR() override {
1120 if (req) {
1121 req->finish();
1122 }
1123 }
1124
1125 int send_request() override {
1126 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1127 sync_env->store, raw_key, bl);
1128 sync_env->async_rados->queue(req);
1129 return 0;
1130 }
1131
1132 int request_complete() override {
1133 return req->get_ret_status();
1134 }
1135};
1136
1137class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
9f95a23c 1138 rgw::sal::RGWRadosStore *store;
7c673cae
FG
1139 string raw_key;
1140protected:
1141 int _send_request() override {
9f95a23c 1142 int ret = store->ctl()->meta.mgr->remove(raw_key, null_yield);
7c673cae
FG
1143 if (ret < 0) {
1144 ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1145 return ret;
1146 }
1147 return 0;
1148 }
1149public:
9f95a23c 1150 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae
FG
1151 const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
1152 raw_key(_raw_key) {}
1153};
1154
1155
1156class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1157 RGWMetaSyncEnv *sync_env;
1158 string raw_key;
1159
1160 RGWAsyncMetaRemoveEntry *req;
1161
1162public:
1163 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1164 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1165 raw_key(_raw_key), req(NULL) {
1166 }
1167
1168 ~RGWMetaRemoveEntryCR() override {
1169 if (req) {
1170 req->finish();
1171 }
1172 }
1173
1174 int send_request() override {
1175 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1176 sync_env->store, raw_key);
1177 sync_env->async_rados->queue(req);
1178 return 0;
1179 }
1180
1181 int request_complete() override {
1182 int r = req->get_ret_status();
1183 if (r == -ENOENT) {
1184 r = 0;
1185 }
1186 return r;
1187 }
1188};
1189
1190#define META_SYNC_UPDATE_MARKER_WINDOW 10
1191
91327a77
AA
1192
1193int RGWLastCallerWinsCR::operate() {
1194 RGWCoroutine *call_cr;
1195 reenter(this) {
1196 while (cr) {
1197 call_cr = cr;
1198 cr = nullptr;
1199 yield call(call_cr);
1200 /* cr might have been modified at this point */
1201 }
1202 return set_cr_done();
1203 }
1204 return 0;
1205}
1206
7c673cae
FG
1207class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1208 RGWMetaSyncEnv *sync_env;
1209
1210 string marker_oid;
1211 rgw_meta_sync_marker sync_marker;
1212
11fdf7f2 1213 RGWSyncTraceNodeRef tn;
7c673cae
FG
1214
1215public:
1216 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1217 const string& _marker_oid,
11fdf7f2
TL
1218 const rgw_meta_sync_marker& _marker,
1219 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
7c673cae
FG
1220 sync_env(_sync_env),
1221 marker_oid(_marker_oid),
11fdf7f2
TL
1222 sync_marker(_marker),
1223 tn(_tn){}
7c673cae
FG
1224
1225 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1226 sync_marker.marker = new_marker;
1227 if (index_pos > 0) {
1228 sync_marker.pos = index_pos;
1229 }
1230
1231 if (!real_clock::is_zero(timestamp)) {
1232 sync_marker.timestamp = timestamp;
1233 }
1234
11fdf7f2
TL
1235 ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1236 tn->log(20, SSTR("new marker=" << new_marker));
9f95a23c 1237 rgw::sal::RGWRadosStore *store = sync_env->store;
7c673cae 1238 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
9f95a23c
TL
1239 store->svc()->sysobj,
1240 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
1241 sync_marker);
1242 }
91327a77 1243
11fdf7f2 1244 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
1245 return new RGWLastCallerWinsCR(sync_env->cct);
1246 }
7c673cae
FG
1247};
1248
11fdf7f2
TL
1249RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
1250 const string& _raw_key, const string& _entry_marker,
1251 const RGWMDLogStatus& _op_status,
1252 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
1253 sync_env(_sync_env),
1254 raw_key(_raw_key), entry_marker(_entry_marker),
1255 op_status(_op_status),
1256 pos(0), sync_status(0),
1257 marker_tracker(_marker_tracker), tries(0) {
1258 error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
1259 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
1260}
1261
7c673cae
FG
1262int RGWMetaSyncSingleEntryCR::operate() {
1263 reenter(this) {
1264#define NUM_TRANSIENT_ERROR_RETRIES 10
1265
1266 if (error_injection &&
1267 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
7c673cae
FG
1268 return set_cr_error(-EIO);
1269 }
1270
1271 if (op_status != MDLOG_STATUS_COMPLETE) {
11fdf7f2 1272 tn->log(20, "skipping pending operation");
7c673cae
FG
1273 yield call(marker_tracker->finish(entry_marker));
1274 if (retcode < 0) {
1275 return set_cr_error(retcode);
1276 }
1277 return set_cr_done();
1278 }
11fdf7f2 1279 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
1280 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1281 yield {
1282 pos = raw_key.find(':');
1283 section = raw_key.substr(0, pos);
1284 key = raw_key.substr(pos + 1);
11fdf7f2
TL
1285 tn->log(10, SSTR("fetching remote metadata entry" << (tries == 0 ? "" : " (retry)")));
1286 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl, tn));
7c673cae
FG
1287 }
1288
1289 sync_status = retcode;
1290
1291 if (sync_status == -ENOENT) {
1292 /* FIXME: do we need to remove the entry from the local zone? */
1293 break;
1294 }
1295
1296 if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
11fdf7f2 1297 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
7c673cae
FG
1298 continue;
1299 }
1300
1301 if (sync_status < 0) {
11fdf7f2 1302 tn->log(10, SSTR("failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status));
7c673cae
FG
1303 log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1304 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
1305 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1306 return set_cr_error(sync_status);
1307 }
1308
1309 break;
1310 }
1311
1312 retcode = 0;
1313 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1314 if (sync_status != -ENOENT) {
11fdf7f2 1315 tn->log(10, SSTR("storing local metadata entry"));
7c673cae
FG
1316 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1317 } else {
11fdf7f2 1318 tn->log(10, SSTR("removing local metadata entry"));
7c673cae
FG
1319 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1320 }
1321 if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
11fdf7f2 1322 ldpp_dout(sync_env->dpp, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
7c673cae
FG
1323 continue;
1324 }
1325 break;
1326 }
1327
1328 sync_status = retcode;
1329
1330 if (sync_status == 0 && marker_tracker) {
1331 /* update marker */
1332 yield call(marker_tracker->finish(entry_marker));
1333 sync_status = retcode;
1334 }
1335 if (sync_status < 0) {
11fdf7f2 1336 tn->log(10, SSTR("failed, status=" << sync_status));
7c673cae
FG
1337 return set_cr_error(sync_status);
1338 }
11fdf7f2 1339 tn->log(10, "success");
7c673cae
FG
1340 return set_cr_done();
1341 }
1342 return 0;
1343}
1344
1345class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1346 RGWMetaSyncEnv *sync_env;
1347 RGWMetadataLog *mdlog;
1348
1349 const std::string& period;
1350 int shard_id;
1351 string marker;
1352 bool truncated = false;
1353 string *new_marker;
1354
1355 int max_entries = CLONE_MAX_ENTRIES;
1356
1357 RGWRESTReadResource *http_op = nullptr;
1358 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1359
1360 RGWMetadataLogInfo shard_info;
1361 rgw_mdlog_shard_data data;
1362
1363public:
1364 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1365 const std::string& period, int _id,
1366 const string& _marker, string *_new_marker)
1367 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1368 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1369 if (new_marker) {
1370 *new_marker = marker;
1371 }
1372 }
1373 ~RGWCloneMetaLogCoroutine() override {
1374 if (http_op) {
1375 http_op->put();
1376 }
1377 if (completion) {
1378 completion->cancel();
1379 }
1380 }
1381
1382 int operate() override;
1383
1384 int state_init();
1385 int state_read_shard_status();
1386 int state_read_shard_status_complete();
1387 int state_send_rest_request();
1388 int state_receive_rest_response();
1389 int state_store_mdlog_entries();
1390 int state_store_mdlog_entries_complete();
1391};
1392
1393class RGWMetaSyncShardCR : public RGWCoroutine {
1394 RGWMetaSyncEnv *sync_env;
1395
1396 const rgw_pool& pool;
1397 const std::string& period; //< currently syncing period id
1398 const epoch_t realm_epoch; //< realm_epoch of period
1399 RGWMetadataLog* mdlog; //< log of syncing period
1400 uint32_t shard_id;
1401 rgw_meta_sync_marker& sync_marker;
1402 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1403 string marker;
1404 string max_marker;
1405 const std::string& period_marker; //< max marker stored in next period
1406
11fdf7f2 1407 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
1408 std::set<std::string> entries;
1409 std::set<std::string>::iterator iter;
7c673cae
FG
1410
1411 string oid;
1412
1413 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1414
1415 list<cls_log_entry> log_entries;
1416 list<cls_log_entry>::iterator log_iter;
1417 bool truncated = false;
1418
1419 string mdlog_marker;
1420 string raw_key;
1421 rgw_mdlog_entry mdlog_entry;
1422
9f95a23c
TL
1423 ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock");
1424 ceph::condition_variable inc_cond;
7c673cae
FG
1425
1426 boost::asio::coroutine incremental_cr;
1427 boost::asio::coroutine full_cr;
1428
31f18b77
FG
1429 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1430 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1431
7c673cae
FG
1432 bool lost_lock = false;
1433
1434 bool *reset_backoff;
1435
1436 // hold a reference to the cr stack while it's in the map
1437 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1438 map<StackRef, string> stack_to_pos;
1439 map<string, string> pos_to_prev;
1440
1441 bool can_adjust_marker = false;
1442 bool done_with_period = false;
1443
1444 int total_entries = 0;
1445
11fdf7f2 1446 RGWSyncTraceNodeRef tn;
7c673cae
FG
1447public:
1448 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1449 const std::string& period, epoch_t realm_epoch,
1450 RGWMetadataLog* mdlog, uint32_t _shard_id,
1451 rgw_meta_sync_marker& _marker,
11fdf7f2
TL
1452 const std::string& period_marker, bool *_reset_backoff,
1453 RGWSyncTraceNodeRef& _tn)
7c673cae
FG
1454 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1455 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1456 shard_id(_shard_id), sync_marker(_marker),
9f95a23c 1457 period_marker(period_marker),
11fdf7f2 1458 reset_backoff(_reset_backoff), tn(_tn) {
7c673cae
FG
1459 *reset_backoff = false;
1460 }
1461
1462 ~RGWMetaSyncShardCR() override {
1463 delete marker_tracker;
1464 if (lease_cr) {
1465 lease_cr->abort();
7c673cae
FG
1466 }
1467 }
1468
1469 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1470 delete marker_tracker;
1471 marker_tracker = mt;
1472 }
1473
1474 int operate() override {
1475 int r;
1476 while (true) {
1477 switch (sync_marker.state) {
1478 case rgw_meta_sync_marker::FullSync:
1479 r = full_sync();
1480 if (r < 0) {
11fdf7f2 1481 ldpp_dout(sync_env->dpp, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
7c673cae
FG
1482 return set_cr_error(r);
1483 }
1484 return 0;
1485 case rgw_meta_sync_marker::IncrementalSync:
1486 r = incremental_sync();
1487 if (r < 0) {
11fdf7f2 1488 ldpp_dout(sync_env->dpp, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
7c673cae
FG
1489 return set_cr_error(r);
1490 }
1491 return 0;
1492 }
1493 }
1494 /* unreachable */
1495 return 0;
1496 }
1497
1498 void collect_children()
1499 {
1500 int child_ret;
1501 RGWCoroutinesStack *child;
1502 while (collect_next(&child_ret, &child)) {
1503 auto iter = stack_to_pos.find(child);
1504 if (iter == stack_to_pos.end()) {
1505 /* some other stack that we don't care about */
1506 continue;
1507 }
1508
1509 string& pos = iter->second;
1510
1511 if (child_ret < 0) {
11fdf7f2 1512 ldpp_dout(sync_env->dpp, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
7c673cae
FG
1513 }
1514
1515 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
11fdf7f2 1516 ceph_assert(prev_iter != pos_to_prev.end());
7c673cae
FG
1517
1518 /*
1519 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1520 * update the marker and abort. We'll get called again for these. Permanent errors will be
1521 * handled by marking the entry at the error log shard, so that we retry on it separately
1522 */
1523 if (child_ret == -EAGAIN) {
1524 can_adjust_marker = false;
1525 }
1526
1527 if (pos_to_prev.size() == 1) {
1528 if (can_adjust_marker) {
1529 sync_marker.marker = pos;
1530 }
1531 pos_to_prev.erase(prev_iter);
1532 } else {
11fdf7f2 1533 ceph_assert(pos_to_prev.size() > 1);
7c673cae
FG
1534 pos_to_prev.erase(prev_iter);
1535 prev_iter = pos_to_prev.begin();
1536 if (can_adjust_marker) {
1537 sync_marker.marker = prev_iter->second;
1538 }
1539 }
1540
11fdf7f2 1541 ldpp_dout(sync_env->dpp, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
7c673cae
FG
1542 stack_to_pos.erase(iter);
1543 }
1544 }
1545
1546 int full_sync() {
1547#define OMAP_GET_MAX_ENTRIES 100
1548 int max_entries = OMAP_GET_MAX_ENTRIES;
1549 reenter(&full_cr) {
1550 set_status("full_sync");
11fdf7f2 1551 tn->log(10, "start full sync");
7c673cae
FG
1552 oid = full_sync_index_shard_oid(shard_id);
1553 can_adjust_marker = true;
1554 /* grab lock */
1555 yield {
1556 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1557 string lock_name = "sync_lock";
9f95a23c 1558 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77
FG
1559 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1560 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1561 lock_name, lock_duration, this));
1562 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1563 lost_lock = false;
1564 }
1565 while (!lease_cr->is_locked()) {
1566 if (lease_cr->is_done()) {
7c673cae 1567 drain_all();
11fdf7f2 1568 tn->log(5, "failed to take lease");
7c673cae
FG
1569 return lease_cr->get_ret_status();
1570 }
1571 set_sleeping(true);
1572 yield;
1573 }
11fdf7f2 1574 tn->log(10, "took lease");
7c673cae
FG
1575
1576 /* lock succeeded, a retry now should avoid previous backoff status */
1577 *reset_backoff = true;
1578
1579 /* prepare marker tracker */
1580 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1581 sync_env->shard_obj_name(shard_id),
11fdf7f2 1582 sync_marker, tn));
7c673cae
FG
1583
1584 marker = sync_marker.marker;
1585
1586 total_entries = sync_marker.pos;
1587
1588 /* sync! */
1589 do {
1590 if (!lease_cr->is_locked()) {
11fdf7f2 1591 tn->log(10, "lost lease");
7c673cae
FG
1592 lost_lock = true;
1593 break;
1594 }
11fdf7f2 1595 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
7c673cae 1596 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
11fdf7f2 1597 marker, max_entries, omapkeys));
7c673cae 1598 if (retcode < 0) {
11fdf7f2
TL
1599 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1600 tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
7c673cae
FG
1601 yield lease_cr->go_down();
1602 drain_all();
1603 return retcode;
1604 }
11fdf7f2
TL
1605 entries = std::move(omapkeys->entries);
1606 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1607 if (entries.size() > 0) {
1608 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1609 }
7c673cae
FG
1610 iter = entries.begin();
1611 for (; iter != entries.end(); ++iter) {
28e407b8 1612 marker = *iter;
11fdf7f2 1613 tn->log(20, SSTR("full sync: " << marker));
7c673cae 1614 total_entries++;
28e407b8 1615 if (!marker_tracker->start(marker, total_entries, real_time())) {
11fdf7f2 1616 tn->log(0, SSTR("ERROR: cannot start syncing " << marker << ". Duplicate entry?"));
7c673cae
FG
1617 } else {
1618 // fetch remote and write locally
1619 yield {
11fdf7f2 1620 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker, tn), false);
7c673cae 1621 // stack_to_pos holds a reference to the stack
28e407b8
AA
1622 stack_to_pos[stack] = marker;
1623 pos_to_prev[marker] = marker;
7c673cae
FG
1624 }
1625 }
7c673cae
FG
1626 }
1627 collect_children();
11fdf7f2
TL
1628 } while (omapkeys->more && can_adjust_marker);
1629
1630 tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
7c673cae
FG
1631
1632 while (num_spawned() > 1) {
1633 yield wait_for_child();
1634 collect_children();
1635 }
1636
1637 if (!lost_lock) {
1638 /* update marker to reflect we're done with full sync */
1639 if (can_adjust_marker) {
1640 // apply updates to a temporary marker, or operate() will send us
1641 // to incremental_sync() after we yield
1642 temp_marker = sync_marker;
1643 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1644 temp_marker->marker = std::move(temp_marker->next_step_marker);
1645 temp_marker->next_step_marker.clear();
1646 temp_marker->realm_epoch = realm_epoch;
11fdf7f2 1647 ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
7c673cae
FG
1648
1649 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
9f95a23c 1650 yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
7c673cae
FG
1651 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1652 *temp_marker));
1653 }
1654
1655 if (retcode < 0) {
11fdf7f2 1656 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
224ce89b
WB
1657 yield lease_cr->go_down();
1658 drain_all();
7c673cae
FG
1659 return retcode;
1660 }
1661 }
1662
1663 /*
1664 * if we reached here, it means that lost_lock is true, otherwise the state
1665 * change in the previous block will prevent us from reaching here
1666 */
1667
1668 yield lease_cr->go_down();
1669
31f18b77 1670 lease_cr.reset();
7c673cae
FG
1671
1672 drain_all();
1673
1674 if (!can_adjust_marker) {
1675 return -EAGAIN;
1676 }
1677
1678 if (lost_lock) {
1679 return -EBUSY;
1680 }
1681
11fdf7f2
TL
1682 tn->log(10, "full sync complete");
1683
7c673cae 1684 // apply the sync marker update
11fdf7f2 1685 ceph_assert(temp_marker);
7c673cae
FG
1686 sync_marker = std::move(*temp_marker);
1687 temp_marker = boost::none;
1688 // must not yield after this point!
1689 }
1690 return 0;
1691 }
1692
1693
1694 int incremental_sync() {
1695 reenter(&incremental_cr) {
1696 set_status("incremental_sync");
11fdf7f2 1697 tn->log(10, "start incremental sync");
7c673cae
FG
1698 can_adjust_marker = true;
1699 /* grab lock */
1700 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1701 yield {
1702 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1703 string lock_name = "sync_lock";
9f95a23c 1704 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77
FG
1705 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1706 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1707 lock_name, lock_duration, this));
1708 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1709 lost_lock = false;
1710 }
1711 while (!lease_cr->is_locked()) {
1712 if (lease_cr->is_done()) {
7c673cae 1713 drain_all();
11fdf7f2 1714 tn->log(10, "failed to take lease");
7c673cae
FG
1715 return lease_cr->get_ret_status();
1716 }
1717 set_sleeping(true);
1718 yield;
1719 }
1720 }
11fdf7f2 1721 tn->log(10, "took lease");
7c673cae
FG
1722 // if the period has advanced, we can't use the existing marker
1723 if (sync_marker.realm_epoch < realm_epoch) {
11fdf7f2 1724 ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
7c673cae
FG
1725 << " from old realm_epoch=" << sync_marker.realm_epoch
1726 << " (now " << realm_epoch << ')' << dendl;
1727 sync_marker.realm_epoch = realm_epoch;
1728 sync_marker.marker.clear();
1729 }
1730 mdlog_marker = sync_marker.marker;
1731 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1732 sync_env->shard_obj_name(shard_id),
11fdf7f2 1733 sync_marker, tn));
7c673cae
FG
1734
1735 /*
1736 * mdlog_marker: the remote sync marker positiion
1737 * sync_marker: the local sync marker position
1738 * max_marker: the max mdlog position that we fetched
1739 * marker: the current position we try to sync
1740 * period_marker: the last marker before the next period begins (optional)
1741 */
1742 marker = max_marker = sync_marker.marker;
1743 /* inc sync */
1744 do {
1745 if (!lease_cr->is_locked()) {
1746 lost_lock = true;
11fdf7f2 1747 tn->log(10, "lost lease");
7c673cae
FG
1748 break;
1749 }
1750#define INCREMENTAL_MAX_ENTRIES 100
11fdf7f2 1751 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
7c673cae 1752 if (!period_marker.empty() && period_marker <= mdlog_marker) {
11fdf7f2 1753 tn->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker << " period_marker=" << period_marker));
7c673cae
FG
1754 done_with_period = true;
1755 break;
1756 }
1757 if (mdlog_marker <= max_marker) {
1758 /* we're at the tip, try to bring more entries */
11fdf7f2 1759 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
7c673cae
FG
1760 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1761 period, shard_id,
1762 mdlog_marker, &mdlog_marker));
1763 }
1764 if (retcode < 0) {
11fdf7f2 1765 tn->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode));
7c673cae
FG
1766 yield lease_cr->go_down();
1767 drain_all();
1768 *reset_backoff = false; // back off and try again later
1769 return retcode;
1770 }
1771 *reset_backoff = true; /* if we got to this point, all systems function */
7c673cae 1772 if (mdlog_marker > max_marker) {
11fdf7f2
TL
1773 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1774 tn->log(20, SSTR("mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker));
7c673cae
FG
1775 marker = max_marker;
1776 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1777 &max_marker, INCREMENTAL_MAX_ENTRIES,
1778 &log_entries, &truncated));
1779 if (retcode < 0) {
11fdf7f2 1780 tn->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode));
7c673cae
FG
1781 yield lease_cr->go_down();
1782 drain_all();
1783 *reset_backoff = false; // back off and try again later
1784 return retcode;
1785 }
1786 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1787 if (!period_marker.empty() && period_marker <= log_iter->id) {
1788 done_with_period = true;
1789 if (period_marker < log_iter->id) {
11fdf7f2
TL
1790 tn->log(10, SSTR("found key=" << log_iter->id
1791 << " past period_marker=" << period_marker));
7c673cae
FG
1792 break;
1793 }
11fdf7f2 1794 ldpp_dout(sync_env->dpp, 10) << "found key at period_marker=" << period_marker << dendl;
7c673cae
FG
1795 // sync this entry, then return control to RGWMetaSyncCR
1796 }
1797 if (!mdlog_entry.convert_from(*log_iter)) {
11fdf7f2 1798 tn->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry"));
7c673cae
FG
1799 continue;
1800 }
11fdf7f2 1801 tn->log(20, SSTR("log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp));
7c673cae 1802 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
11fdf7f2 1803 ldpp_dout(sync_env->dpp, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
7c673cae
FG
1804 } else {
1805 raw_key = log_iter->section + ":" + log_iter->name;
1806 yield {
11fdf7f2
TL
1807 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker, tn), false);
1808 ceph_assert(stack);
7c673cae
FG
1809 // stack_to_pos holds a reference to the stack
1810 stack_to_pos[stack] = log_iter->id;
1811 pos_to_prev[log_iter->id] = marker;
1812 }
1813 }
1814 marker = log_iter->id;
1815 }
1816 }
1817 collect_children();
11fdf7f2 1818 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
7c673cae
FG
1819 if (done_with_period) {
1820 // return control to RGWMetaSyncCR and advance to the next period
11fdf7f2 1821 tn->log(10, SSTR(*this << ": done with period"));
7c673cae
FG
1822 break;
1823 }
1824 if (mdlog_marker == max_marker && can_adjust_marker) {
11fdf7f2 1825 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
1826#define INCREMENTAL_INTERVAL 20
1827 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1828 }
1829 } while (can_adjust_marker);
1830
11fdf7f2
TL
1831 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1832
7c673cae
FG
1833 while (num_spawned() > 1) {
1834 yield wait_for_child();
1835 collect_children();
1836 }
1837
1838 yield lease_cr->go_down();
1839
1840 drain_all();
1841
1842 if (lost_lock) {
1843 return -EBUSY;
1844 }
1845
1846 if (!can_adjust_marker) {
1847 return -EAGAIN;
1848 }
1849
1850 return set_cr_done();
1851 }
1852 /* TODO */
1853 return 0;
1854 }
1855};
1856
1857class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1858{
1859 RGWMetaSyncEnv *sync_env;
1860
1861 const rgw_pool& pool;
1862 const std::string& period;
1863 epoch_t realm_epoch;
1864 RGWMetadataLog* mdlog;
1865 uint32_t shard_id;
1866 rgw_meta_sync_marker sync_marker;
1867 const std::string period_marker;
1868
11fdf7f2
TL
1869 RGWSyncTraceNodeRef tn;
1870
7c673cae
FG
1871 static constexpr bool exit_on_error = false; // retry on all errors
1872public:
1873 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1874 const std::string& period, epoch_t realm_epoch,
1875 RGWMetadataLog* mdlog, uint32_t _shard_id,
1876 const rgw_meta_sync_marker& _marker,
11fdf7f2
TL
1877 std::string&& period_marker,
1878 RGWSyncTraceNodeRef& _tn_parent)
7c673cae
FG
1879 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1880 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1881 shard_id(_shard_id), sync_marker(_marker),
11fdf7f2
TL
1882 period_marker(std::move(period_marker)) {
1883 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
1884 std::to_string(shard_id));
1885 }
7c673cae
FG
1886
1887 RGWCoroutine *alloc_cr() override {
1888 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
11fdf7f2 1889 shard_id, sync_marker, period_marker, backoff_ptr(), tn);
7c673cae
FG
1890 }
1891
1892 RGWCoroutine *alloc_finisher_cr() override {
9f95a23c
TL
1893 rgw::sal::RGWRadosStore *store = sync_env->store;
1894 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store->svc()->sysobj,
7c673cae
FG
1895 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1896 &sync_marker);
1897 }
1898};
1899
1900class RGWMetaSyncCR : public RGWCoroutine {
1901 RGWMetaSyncEnv *sync_env;
1902 const rgw_pool& pool;
1903 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1904 RGWPeriodHistory::Cursor next; //< next period in history
1905 rgw_meta_sync_status sync_status;
11fdf7f2 1906 RGWSyncTraceNodeRef tn;
7c673cae
FG
1907
1908 std::mutex mutex; //< protect access to shard_crs
1909
1910 // TODO: it should be enough to hold a reference on the stack only, as calling
1911 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1912 // already completed
1913 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1914 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1915 using RefPair = std::pair<ControlCRRef, StackRef>;
1916 map<int, RefPair> shard_crs;
1917 int ret{0};
1918
1919public:
11fdf7f2
TL
1920 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
1921 const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
7c673cae 1922 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
9f95a23c 1923 pool(sync_env->store->svc()->zone->get_zone_params().log_pool),
11fdf7f2
TL
1924 cursor(cursor), sync_status(_sync_status), tn(_tn) {}
1925
1926 ~RGWMetaSyncCR() {
1927 }
7c673cae
FG
1928
1929 int operate() override {
1930 reenter(this) {
1931 // loop through one period at a time
11fdf7f2 1932 tn->log(1, "start");
7c673cae 1933 for (;;) {
9f95a23c 1934 if (cursor == sync_env->store->svc()->mdlog->get_period_history()->get_current()) {
7c673cae
FG
1935 next = RGWPeriodHistory::Cursor{};
1936 if (cursor) {
11fdf7f2 1937 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on current period="
7c673cae
FG
1938 << cursor.get_period().get_id() << dendl;
1939 } else {
11fdf7f2 1940 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR with no period" << dendl;
7c673cae
FG
1941 }
1942 } else {
1943 next = cursor;
1944 next.next();
11fdf7f2 1945 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR on period="
7c673cae
FG
1946 << cursor.get_period().get_id() << ", next="
1947 << next.get_period().get_id() << dendl;
1948 }
1949
1950 yield {
1951 // get the mdlog for the current period (may be empty)
1952 auto& period_id = sync_status.sync_info.period;
1953 auto realm_epoch = sync_status.sync_info.realm_epoch;
9f95a23c 1954 auto mdlog = sync_env->store->svc()->mdlog->get_log(period_id);
7c673cae 1955
11fdf7f2
TL
1956 tn->log(1, SSTR("realm epoch=" << realm_epoch << " period id=" << period_id));
1957
7c673cae
FG
1958 // prevent wakeup() from accessing shard_crs while we're spawning them
1959 std::lock_guard<std::mutex> lock(mutex);
1960
1961 // sync this period on each shard
1962 for (const auto& m : sync_status.sync_markers) {
1963 uint32_t shard_id = m.first;
1964 auto& marker = m.second;
1965
1966 std::string period_marker;
1967 if (next) {
1968 // read the maximum marker from the next period's sync status
1969 period_marker = next.get_period().get_sync_status()[shard_id];
1970 if (period_marker.empty()) {
1971 // no metadata changes have occurred on this shard, skip it
11fdf7f2 1972 ldpp_dout(sync_env->dpp, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
7c673cae
FG
1973 << " with empty period marker" << dendl;
1974 continue;
1975 }
1976 }
1977
1978 using ShardCR = RGWMetaSyncShardControlCR;
1979 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1980 mdlog, shard_id, marker,
11fdf7f2 1981 std::move(period_marker), tn);
7c673cae
FG
1982 auto stack = spawn(cr, false);
1983 shard_crs[shard_id] = RefPair{cr, stack};
1984 }
1985 }
1986 // wait for each shard to complete
1987 while (ret == 0 && num_spawned() > 0) {
1988 yield wait_for_child();
1989 collect(&ret, nullptr);
1990 }
1991 drain_all();
1992 {
1993 // drop shard cr refs under lock
1994 std::lock_guard<std::mutex> lock(mutex);
1995 shard_crs.clear();
1996 }
1997 if (ret < 0) {
1998 return set_cr_error(ret);
1999 }
2000 // advance to the next period
11fdf7f2 2001 ceph_assert(next);
7c673cae
FG
2002 cursor = next;
2003
2004 // write the updated sync info
2005 sync_status.sync_info.period = cursor.get_period().get_id();
2006 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2007 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
9f95a23c 2008 sync_env->store->svc()->sysobj,
7c673cae
FG
2009 rgw_raw_obj(pool, sync_env->status_oid()),
2010 sync_status.sync_info));
2011 }
2012 }
2013 return 0;
2014 }
2015
2016 void wakeup(int shard_id) {
2017 std::lock_guard<std::mutex> lock(mutex);
2018 auto iter = shard_crs.find(shard_id);
2019 if (iter == shard_crs.end()) {
2020 return;
2021 }
2022 iter->second.first->wakeup();
2023 }
2024};
2025
2026void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
11fdf7f2 2027 env->dpp = dpp;
7c673cae
FG
2028 env->cct = store->ctx();
2029 env->store = store;
2030 env->conn = conn;
2031 env->async_rados = async_rados;
2032 env->http_manager = &http_manager;
2033 env->error_logger = error_logger;
9f95a23c 2034 env->sync_tracer = store->getRados()->get_sync_tracer();
7c673cae
FG
2035}
2036
2037int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
2038{
9f95a23c 2039 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2040 return 0;
2041 }
2042 // cannot run concurrently with run_sync(), so run in a separate manager
9f95a23c 2043 RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
7c673cae 2044 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 2045 int ret = http_manager.start();
7c673cae 2046 if (ret < 0) {
11fdf7f2 2047 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
2048 return ret;
2049 }
2050 RGWMetaSyncEnv sync_env_local = sync_env;
2051 sync_env_local.http_manager = &http_manager;
11fdf7f2 2052 tn->log(20, "read sync status");
7c673cae
FG
2053 ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
2054 http_manager.stop();
2055 return ret;
2056}
2057
2058int RGWRemoteMetaLog::init_sync_status()
2059{
9f95a23c 2060 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2061 return 0;
2062 }
2063
2064 rgw_mdlog_info mdlog_info;
2065 int r = read_log_info(&mdlog_info);
2066 if (r < 0) {
2067 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2068 return r;
2069 }
2070
2071 rgw_meta_sync_info sync_info;
2072 sync_info.num_shards = mdlog_info.num_shards;
9f95a23c 2073 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
7c673cae
FG
2074 if (cursor) {
2075 sync_info.period = cursor.get_period().get_id();
2076 sync_info.realm_epoch = cursor.get_epoch();
2077 }
2078
2079 return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
2080}
2081
2082int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
2083{
11fdf7f2 2084 tn->log(20, "store sync info");
9f95a23c
TL
2085 return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store->svc()->sysobj,
2086 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()),
7c673cae
FG
2087 sync_info));
2088}
2089
2090// return a cursor to the period at our sync position
9f95a23c 2091static RGWPeriodHistory::Cursor get_period_at(rgw::sal::RGWRadosStore* store,
7c673cae
FG
2092 const rgw_meta_sync_info& info)
2093{
2094 if (info.period.empty()) {
2095 // return an empty cursor with error=0
2096 return RGWPeriodHistory::Cursor{};
2097 }
2098
2099 // look for an existing period in our history
9f95a23c 2100 auto cursor = store->svc()->mdlog->get_period_history()->lookup(info.realm_epoch);
7c673cae
FG
2101 if (cursor) {
2102 // verify that the period ids match
2103 auto& existing = cursor.get_period().get_id();
2104 if (existing != info.period) {
2105 lderr(store->ctx()) << "ERROR: sync status period=" << info.period
2106 << " does not match period=" << existing
2107 << " in history at realm epoch=" << info.realm_epoch << dendl;
2108 return RGWPeriodHistory::Cursor{-EEXIST};
2109 }
2110 return cursor;
2111 }
2112
2113 // read the period from rados or pull it from the master
2114 RGWPeriod period;
9f95a23c 2115 int r = store->svc()->mdlog->pull_period(info.period, period);
7c673cae
FG
2116 if (r < 0) {
2117 lderr(store->ctx()) << "ERROR: failed to read period id "
2118 << info.period << ": " << cpp_strerror(r) << dendl;
2119 return RGWPeriodHistory::Cursor{r};
2120 }
2121 // attach the period to our history
9f95a23c 2122 cursor = store->svc()->mdlog->get_period_history()->attach(std::move(period));
7c673cae
FG
2123 if (!cursor) {
2124 r = cursor.get_error();
2125 lderr(store->ctx()) << "ERROR: failed to read period history back to "
2126 << info.period << ": " << cpp_strerror(r) << dendl;
2127 }
2128 return cursor;
2129}
2130
2131int RGWRemoteMetaLog::run_sync()
2132{
9f95a23c 2133 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2134 return 0;
2135 }
2136
2137 int r = 0;
2138
2139 // get shard count and oldest log period from master
2140 rgw_mdlog_info mdlog_info;
2141 for (;;) {
2142 if (going_down) {
11fdf7f2 2143 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
7c673cae
FG
2144 return 0;
2145 }
2146 r = read_log_info(&mdlog_info);
2147 if (r == -EIO || r == -ENOENT) {
2148 // keep retrying if master isn't alive or hasn't initialized the log
11fdf7f2 2149 ldpp_dout(dpp, 10) << __func__ << "(): waiting for master.." << dendl;
7c673cae
FG
2150 backoff.backoff_sleep();
2151 continue;
2152 }
2153 backoff.reset();
2154 if (r < 0) {
2155 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2156 return r;
2157 }
2158 break;
2159 }
2160
2161 rgw_meta_sync_status sync_status;
2162 do {
2163 if (going_down) {
11fdf7f2 2164 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
7c673cae
FG
2165 return 0;
2166 }
2167 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2168 if (r < 0 && r != -ENOENT) {
11fdf7f2 2169 ldpp_dout(dpp, 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
7c673cae
FG
2170 return r;
2171 }
2172
2173 if (!mdlog_info.period.empty()) {
2174 // restart sync if the remote has a period, but:
2175 // a) our status does not, or
2176 // b) our sync period comes before the remote's oldest log period
2177 if (sync_status.sync_info.period.empty() ||
2178 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2179 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
11fdf7f2
TL
2180 string reason;
2181 if (sync_status.sync_info.period.empty()) {
2182 reason = "period is empty";
2183 } else {
2184 reason = SSTR("sync_info realm epoch is behind: " << sync_status.sync_info.realm_epoch << " < " << mdlog_info.realm_epoch);
2185 }
2186 tn->log(1, "initialize sync (reason: " + reason + ")");
2187 ldpp_dout(dpp, 1) << "epoch=" << sync_status.sync_info.realm_epoch
7c673cae
FG
2188 << " in sync status comes before remote's oldest mdlog epoch="
2189 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2190 }
2191 }
2192
2193 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
11fdf7f2 2194 ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl;
7c673cae 2195 sync_status.sync_info.num_shards = mdlog_info.num_shards;
9f95a23c 2196 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
7c673cae
FG
2197 if (cursor) {
2198 // run full sync, then start incremental from the current period/epoch
2199 sync_status.sync_info.period = cursor.get_period().get_id();
2200 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2201 }
2202 r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2203 if (r == -EBUSY) {
2204 backoff.backoff_sleep();
2205 continue;
2206 }
2207 backoff.reset();
2208 if (r < 0) {
11fdf7f2 2209 ldpp_dout(dpp, 0) << "ERROR: failed to init sync status r=" << r << dendl;
7c673cae
FG
2210 return r;
2211 }
2212 }
2213 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2214
2215 auto num_shards = sync_status.sync_info.num_shards;
2216 if (num_shards != mdlog_info.num_shards) {
2217 lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2218 return -EINVAL;
2219 }
2220
2221 RGWPeriodHistory::Cursor cursor;
2222 do {
2223 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2224 if (r < 0 && r != -ENOENT) {
11fdf7f2 2225 tn->log(0, SSTR("ERROR: failed to fetch sync status r=" << r));
7c673cae
FG
2226 return r;
2227 }
2228
2229 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2230 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
11fdf7f2
TL
2231 tn->log(20, "building full sync maps");
2232 r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers, tn));
7c673cae
FG
2233 if (r == -EBUSY || r == -EAGAIN) {
2234 backoff.backoff_sleep();
2235 continue;
2236 }
2237 backoff.reset();
2238 if (r < 0) {
11fdf7f2 2239 tn->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r << ")"));
7c673cae
FG
2240 return r;
2241 }
2242
2243 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2244 r = store_sync_info(sync_status.sync_info);
2245 if (r < 0) {
11fdf7f2 2246 tn->log(0, SSTR("ERROR: failed to update sync status (r=" << r << ")"));
7c673cae
FG
2247 return r;
2248 }
2249 /* fall through */
2250 case rgw_meta_sync_info::StateSync:
11fdf7f2 2251 tn->log(20, "sync");
7c673cae
FG
2252 // find our position in the period history (if any)
2253 cursor = get_period_at(store, sync_status.sync_info);
2254 r = cursor.get_error();
2255 if (r < 0) {
2256 return r;
2257 }
11fdf7f2 2258 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
7c673cae
FG
2259 r = run(meta_sync_cr);
2260 if (r < 0) {
11fdf7f2 2261 tn->log(0, "ERROR: failed to fetch all metadata keys");
7c673cae
FG
2262 return r;
2263 }
2264 break;
2265 default:
11fdf7f2 2266 tn->log(0, "ERROR: bad sync state!");
7c673cae
FG
2267 return -EIO;
2268 }
2269 } while (!going_down);
2270
2271 return 0;
2272}
2273
2274void RGWRemoteMetaLog::wakeup(int shard_id)
2275{
2276 if (!meta_sync_cr) {
2277 return;
2278 }
2279 meta_sync_cr->wakeup(shard_id);
2280}
2281
2282int RGWCloneMetaLogCoroutine::operate()
2283{
2284 reenter(this) {
2285 do {
2286 yield {
11fdf7f2 2287 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
7c673cae
FG
2288 return state_init();
2289 }
2290 yield {
11fdf7f2 2291 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
7c673cae
FG
2292 return state_read_shard_status();
2293 }
2294 yield {
11fdf7f2 2295 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
7c673cae
FG
2296 return state_read_shard_status_complete();
2297 }
2298 yield {
11fdf7f2 2299 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
7c673cae
FG
2300 return state_send_rest_request();
2301 }
2302 yield {
11fdf7f2 2303 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
7c673cae
FG
2304 return state_receive_rest_response();
2305 }
2306 yield {
11fdf7f2 2307 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
7c673cae
FG
2308 return state_store_mdlog_entries();
2309 }
2310 } while (truncated);
2311 yield {
11fdf7f2 2312 ldpp_dout(sync_env->dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
7c673cae
FG
2313 return state_store_mdlog_entries_complete();
2314 }
2315 }
2316
2317 return 0;
2318}
2319
2320int RGWCloneMetaLogCoroutine::state_init()
2321{
2322 data = rgw_mdlog_shard_data();
2323
2324 return 0;
2325}
2326
2327int RGWCloneMetaLogCoroutine::state_read_shard_status()
2328{
2329 const bool add_ref = false; // default constructs with refs=1
2330
2331 completion.reset(new RGWMetadataLogInfoCompletion(
2332 [this](int ret, const cls_log_header& header) {
2333 if (ret < 0) {
11fdf7f2
TL
2334 if (ret != -ENOENT) {
2335 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to read mdlog info with "
2336 << cpp_strerror(ret) << dendl;
2337 }
7c673cae
FG
2338 } else {
2339 shard_info.marker = header.max_marker;
2340 shard_info.last_update = header.max_time.to_real_time();
2341 }
2342 // wake up parent stack
11fdf7f2 2343 io_complete();
7c673cae
FG
2344 }), add_ref);
2345
2346 int ret = mdlog->get_info_async(shard_id, completion.get());
2347 if (ret < 0) {
11fdf7f2 2348 ldpp_dout(sync_env->dpp, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
7c673cae
FG
2349 return set_cr_error(ret);
2350 }
2351
2352 return io_block(0);
2353}
2354
2355int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2356{
2357 completion.reset();
2358
11fdf7f2 2359 ldpp_dout(sync_env->dpp, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
7c673cae
FG
2360
2361 marker = shard_info.marker;
2362
2363 return 0;
2364}
2365
2366int RGWCloneMetaLogCoroutine::state_send_rest_request()
2367{
2368 RGWRESTConn *conn = sync_env->conn;
2369
2370 char buf[32];
2371 snprintf(buf, sizeof(buf), "%d", shard_id);
2372
2373 char max_entries_buf[32];
2374 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2375
2376 const char *marker_key = (marker.empty() ? "" : "marker");
2377
2378 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2379 { "id", buf },
2380 { "period", period.c_str() },
2381 { "max-entries", max_entries_buf },
2382 { marker_key, marker.c_str() },
2383 { NULL, NULL } };
2384
2385 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2386
11fdf7f2 2387 init_new_io(http_op);
7c673cae
FG
2388
2389 int ret = http_op->aio_read();
2390 if (ret < 0) {
11fdf7f2 2391 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
7c673cae
FG
2392 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2393 http_op->put();
2394 http_op = NULL;
1adf2230 2395 return set_cr_error(ret);
7c673cae
FG
2396 }
2397
2398 return io_block(0);
2399}
2400
2401int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2402{
9f95a23c 2403 int ret = http_op->wait(&data, null_yield);
7c673cae
FG
2404 if (ret < 0) {
2405 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
11fdf7f2 2406 ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;
7c673cae
FG
2407 http_op->put();
2408 http_op = NULL;
2409 return set_cr_error(ret);
2410 }
2411 http_op->put();
2412 http_op = NULL;
2413
11fdf7f2 2414 ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
7c673cae
FG
2415
2416 truncated = ((int)data.entries.size() == max_entries);
2417
2418 if (data.entries.empty()) {
2419 if (new_marker) {
2420 *new_marker = marker;
2421 }
2422 return set_cr_done();
2423 }
2424
2425 if (new_marker) {
2426 *new_marker = data.entries.back().id;
2427 }
2428
2429 return 0;
2430}
2431
2432
2433int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2434{
2435 list<cls_log_entry> dest_entries;
2436
2437 vector<rgw_mdlog_entry>::iterator iter;
2438 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2439 rgw_mdlog_entry& entry = *iter;
11fdf7f2 2440 ldpp_dout(sync_env->dpp, 20) << "entry: name=" << entry.name << dendl;
7c673cae
FG
2441
2442 cls_log_entry dest_entry;
2443 dest_entry.id = entry.id;
2444 dest_entry.section = entry.section;
2445 dest_entry.name = entry.name;
2446 dest_entry.timestamp = utime_t(entry.timestamp);
2447
11fdf7f2 2448 encode(entry.log_data, dest_entry.data);
7c673cae
FG
2449
2450 dest_entries.push_back(dest_entry);
2451
2452 marker = entry.id;
2453 }
2454
2455 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2456
2457 int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
2458 if (ret < 0) {
2459 cn->put();
11fdf7f2 2460 ldpp_dout(sync_env->dpp, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
7c673cae
FG
2461 return set_cr_error(ret);
2462 }
2463 return io_block(0);
2464}
2465
2466int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2467{
2468 return set_cr_done();
2469}