]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_sync.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include <boost/optional.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/admin_socket.h"
12#include "common/errno.h"
13
14#include "rgw_common.h"
11fdf7f2 15#include "rgw_zone.h"
7c673cae
FG
16#include "rgw_sync.h"
17#include "rgw_metadata.h"
9f95a23c 18#include "rgw_mdlog_types.h"
7c673cae
FG
19#include "rgw_rest_conn.h"
20#include "rgw_tools.h"
21#include "rgw_cr_rados.h"
22#include "rgw_cr_rest.h"
23#include "rgw_http_client.h"
11fdf7f2 24#include "rgw_sync_trace.h"
7c673cae
FG
25
26#include "cls/lock/cls_lock_client.h"
27
11fdf7f2 28#include "services/svc_zone.h"
9f95a23c
TL
29#include "services/svc_mdlog.h"
30#include "services/svc_meta.h"
31#include "services/svc_cls.h"
11fdf7f2 32
31f18b77
FG
33#include <boost/asio/yield.hpp>
34
7c673cae
FG
35#define dout_subsys ceph_subsys_rgw
36
37#undef dout_prefix
38#define dout_prefix (*_dout << "meta sync: ")
39
40static string mdlog_sync_status_oid = "mdlog.sync-status";
41static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
42static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
43
9f95a23c 44RGWSyncErrorLogger::RGWSyncErrorLogger(rgw::sal::RGWRadosStore *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
7c673cae
FG
45 for (int i = 0; i < num_shards; i++) {
46 oids.push_back(get_shard_oid(oid_prefix, i));
47 }
48}
49string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
50 char buf[oid_prefix.size() + 16];
51 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
52 return string(buf);
53}
54
b3b6e05e 55RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const DoutPrefixProvider *dpp, const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
7c673cae
FG
56 cls_log_entry entry;
57
58 rgw_sync_error_info info(source_zone, error_code, message);
59 bufferlist bl;
11fdf7f2 60 encode(info, bl);
9f95a23c 61 store->svc()->cls->timelog.prepare_entry(entry, real_clock::now(), section, name, bl);
7c673cae
FG
62
63 uint32_t shard_id = ++counter % num_shards;
64
65
b3b6e05e 66 return new RGWRadosTimelogAddCR(dpp, store, oids[shard_id], entry);
7c673cae
FG
67}
68
69void RGWSyncBackoff::update_wait_time()
70{
71 if (cur_wait == 0) {
72 cur_wait = 1;
73 } else {
74 cur_wait = (cur_wait << 1);
75 }
76 if (cur_wait >= max_secs) {
77 cur_wait = max_secs;
78 }
79}
80
81void RGWSyncBackoff::backoff_sleep()
82{
83 update_wait_time();
84 sleep(cur_wait);
85}
86
87void RGWSyncBackoff::backoff(RGWCoroutine *op)
88{
89 update_wait_time();
90 op->wait(utime_t(cur_wait, 0));
91}
92
b3b6e05e 93int RGWBackoffControlCR::operate(const DoutPrefixProvider *dpp) {
7c673cae
FG
94 reenter(this) {
95 // retry the operation until it succeeds
96 while (true) {
97 yield {
9f95a23c 98 std::lock_guard l{lock};
7c673cae
FG
99 cr = alloc_cr();
100 cr->get();
101 call(cr);
102 }
103 {
9f95a23c 104 std::lock_guard l{lock};
7c673cae
FG
105 cr->put();
106 cr = NULL;
107 }
108 if (retcode >= 0) {
109 break;
110 }
111 if (retcode != -EBUSY && retcode != -EAGAIN) {
112 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
113 if (exit_on_error) {
114 return set_cr_error(retcode);
115 }
116 }
117 if (reset_backoff) {
118 backoff.reset();
119 }
120 yield backoff.backoff(this);
121 }
122
123 // run an optional finisher
124 yield call(alloc_finisher_cr());
125 if (retcode < 0) {
126 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
127 return set_cr_error(retcode);
128 }
129 return set_cr_done();
130 }
131 return 0;
132}
133
134void rgw_mdlog_info::decode_json(JSONObj *obj) {
135 JSONDecoder::decode_json("num_objects", num_shards, obj);
136 JSONDecoder::decode_json("period", period, obj);
137 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
138}
139
140void rgw_mdlog_entry::decode_json(JSONObj *obj) {
141 JSONDecoder::decode_json("id", id, obj);
142 JSONDecoder::decode_json("section", section, obj);
143 JSONDecoder::decode_json("name", name, obj);
144 utime_t ut;
145 JSONDecoder::decode_json("timestamp", ut, obj);
146 timestamp = ut.to_real_time();
147 JSONDecoder::decode_json("data", log_data, obj);
148}
149
150void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
151 JSONDecoder::decode_json("marker", marker, obj);
152 JSONDecoder::decode_json("truncated", truncated, obj);
153 JSONDecoder::decode_json("entries", entries, obj);
154};
155
b3b6e05e 156int RGWShardCollectCR::operate(const DoutPrefixProvider *dpp) {
7c673cae
FG
157 reenter(this) {
158 while (spawn_next()) {
159 current_running++;
160
161 while (current_running >= max_concurrent) {
162 int child_ret;
163 yield wait_for_child();
164 if (collect_next(&child_ret)) {
165 current_running--;
166 if (child_ret < 0 && child_ret != -ENOENT) {
167 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
168 status = child_ret;
169 }
170 }
171 }
172 }
173 while (current_running > 0) {
174 int child_ret;
175 yield wait_for_child();
176 if (collect_next(&child_ret)) {
177 current_running--;
178 if (child_ret < 0 && child_ret != -ENOENT) {
179 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
180 status = child_ret;
181 }
182 }
183 }
184 if (status < 0) {
185 return set_cr_error(status);
186 }
187 return set_cr_done();
188 }
189 return 0;
190}
191
192class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
193 RGWMetaSyncEnv *sync_env;
194
195 const std::string& period;
196 int num_shards;
197 map<int, RGWMetadataLogInfo> *mdlog_info;
198
199 int shard_id;
200#define READ_MDLOG_MAX_CONCURRENT 10
201
202public:
203 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
204 const std::string& period, int _num_shards,
205 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
206 sync_env(_sync_env),
207 period(period), num_shards(_num_shards),
208 mdlog_info(_mdlog_info), shard_id(0) {}
209 bool spawn_next() override;
210};
211
212class RGWListRemoteMDLogCR : public RGWShardCollectCR {
213 RGWMetaSyncEnv *sync_env;
214
215 const std::string& period;
216 map<int, string> shards;
217 int max_entries_per_shard;
218 map<int, rgw_mdlog_shard_data> *result;
219
220 map<int, string>::iterator iter;
221#define READ_MDLOG_MAX_CONCURRENT 10
222
223public:
224 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
225 const std::string& period, map<int, string>& _shards,
226 int _max_entries_per_shard,
227 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
228 sync_env(_sync_env), period(period),
229 max_entries_per_shard(_max_entries_per_shard),
230 result(_result) {
231 shards.swap(_shards);
232 iter = shards.begin();
233 }
234 bool spawn_next() override;
235};
236
237RGWRemoteMetaLog::~RGWRemoteMetaLog()
238{
239 delete error_logger;
240}
241
b3b6e05e 242int RGWRemoteMetaLog::read_log_info(const DoutPrefixProvider *dpp, rgw_mdlog_info *log_info)
7c673cae
FG
243{
244 rgw_http_param_pair pairs[] = { { "type", "metadata" },
245 { NULL, NULL } };
246
b3b6e05e 247 int ret = conn->get_json_resource(dpp, "/admin/log", pairs, null_yield, *log_info);
7c673cae 248 if (ret < 0) {
11fdf7f2 249 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl;
7c673cae
FG
250 return ret;
251 }
252
11fdf7f2 253 ldpp_dout(dpp, 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
7c673cae
FG
254
255 return 0;
256}
257
b3b6e05e 258int RGWRemoteMetaLog::read_master_log_shards_info(const DoutPrefixProvider *dpp, const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
7c673cae 259{
9f95a23c 260 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
261 return 0;
262 }
263
264 rgw_mdlog_info log_info;
b3b6e05e 265 int ret = read_log_info(dpp, &log_info);
7c673cae
FG
266 if (ret < 0) {
267 return ret;
268 }
269
b3b6e05e 270 return run(dpp, new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
7c673cae
FG
271}
272
b3b6e05e 273int RGWRemoteMetaLog::read_master_log_shards_next(const DoutPrefixProvider *dpp, const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
7c673cae 274{
9f95a23c 275 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
276 return 0;
277 }
278
b3b6e05e 279 return run(dpp, new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
7c673cae
FG
280}
281
282int RGWRemoteMetaLog::init()
283{
9f95a23c 284 conn = store->svc()->zone->get_master_conn();
7c673cae 285
11fdf7f2 286 int ret = http_manager.start();
7c673cae 287 if (ret < 0) {
11fdf7f2 288 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
289 return ret;
290 }
291
292 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
293
294 init_sync_env(&sync_env);
295
11fdf7f2
TL
296 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "meta");
297
7c673cae
FG
298 return 0;
299}
300
301void RGWRemoteMetaLog::finish()
302{
303 going_down = true;
304 stop();
305}
306
307#define CLONE_MAX_ENTRIES 100
308
b3b6e05e 309int RGWMetaSyncStatusManager::init(const DoutPrefixProvider *dpp)
7c673cae 310{
9f95a23c 311 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
312 return 0;
313 }
314
9f95a23c 315 if (!store->svc()->zone->get_master_conn()) {
b3b6e05e 316 ldpp_dout(dpp, -1) << "no REST connection to master zone" << dendl;
7c673cae
FG
317 return -EIO;
318 }
319
b3b6e05e 320 int r = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), store->svc()->zone->get_zone_params().log_pool, ioctx, true);
7c673cae 321 if (r < 0) {
b3b6e05e 322 ldpp_dout(dpp, -1) << "ERROR: failed to open log pool (" << store->svc()->zone->get_zone_params().log_pool << " ret=" << r << dendl;
7c673cae
FG
323 return r;
324 }
325
326 r = master_log.init();
327 if (r < 0) {
b3b6e05e 328 ldpp_dout(dpp, -1) << "ERROR: failed to init remote log, r=" << r << dendl;
7c673cae
FG
329 return r;
330 }
331
332 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
333
334 rgw_meta_sync_status sync_status;
b3b6e05e 335 r = read_sync_status(dpp, &sync_status);
7c673cae 336 if (r < 0 && r != -ENOENT) {
b3b6e05e 337 ldpp_dout(dpp, -1) << "ERROR: failed to read sync status, r=" << r << dendl;
7c673cae
FG
338 return r;
339 }
340
341 int num_shards = sync_status.sync_info.num_shards;
342
343 for (int i = 0; i < num_shards; i++) {
9f95a23c 344 shard_objs[i] = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.shard_obj_name(i));
7c673cae
FG
345 }
346
9f95a23c 347 std::unique_lock wl{ts_to_shard_lock};
7c673cae
FG
348 for (int i = 0; i < num_shards; i++) {
349 clone_markers.push_back(string());
350 utime_shard ut;
351 ut.shard_id = i;
352 ts_to_shard[ut] = i;
353 }
354
355 return 0;
356}
357
11fdf7f2
TL
358unsigned RGWMetaSyncStatusManager::get_subsys() const
359{
360 return dout_subsys;
361}
362
363std::ostream& RGWMetaSyncStatusManager::gen_prefix(std::ostream& out) const
364{
365 return out << "meta sync: ";
366}
367
9f95a23c 368void RGWMetaSyncEnv::init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWRESTConn *_conn,
7c673cae 369 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
11fdf7f2
TL
370 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer) {
371 dpp = _dpp;
7c673cae
FG
372 cct = _cct;
373 store = _store;
374 conn = _conn;
375 async_rados = _async_rados;
376 http_manager = _http_manager;
377 error_logger = _error_logger;
11fdf7f2 378 sync_tracer = _sync_tracer;
7c673cae
FG
379}
380
381string RGWMetaSyncEnv::status_oid()
382{
383 return mdlog_sync_status_oid;
384}
385
386string RGWMetaSyncEnv::shard_obj_name(int shard_id)
387{
388 char buf[mdlog_sync_status_shard_prefix.size() + 16];
389 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
390
391 return string(buf);
392}
393
394class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
b3b6e05e 395 const DoutPrefixProvider *dpp;
9f95a23c 396 rgw::sal::RGWRadosStore *store;
7c673cae
FG
397 RGWMetadataLog *mdlog;
398 int shard_id;
7c673cae 399 int max_entries;
7c673cae
FG
400
401protected:
b3b6e05e 402 int _send_request(const DoutPrefixProvider *dpp) override {
7c673cae
FG
403 real_time from_time;
404 real_time end_time;
405
406 void *handle;
407
f6b5b4d7 408 mdlog->init_list_entries(shard_id, from_time, end_time, marker, &handle);
7c673cae 409
b3b6e05e 410 int ret = mdlog->list_entries(dpp, handle, max_entries, entries, &marker, &truncated);
7c673cae
FG
411
412 mdlog->complete_list_entries(handle);
413
414 return ret;
415 }
416public:
f6b5b4d7
TL
417 string marker;
418 list<cls_log_entry> entries;
419 bool truncated;
420
b3b6e05e 421 RGWAsyncReadMDLogEntries(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae 422 RGWMetadataLog* mdlog, int _shard_id,
f6b5b4d7 423 std::string _marker, int _max_entries)
b3b6e05e 424 : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), store(_store), mdlog(mdlog),
f6b5b4d7 425 shard_id(_shard_id), max_entries(_max_entries), marker(std::move(_marker)) {}
7c673cae
FG
426};
427
428class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
429 RGWMetaSyncEnv *sync_env;
430 RGWMetadataLog *const mdlog;
431 int shard_id;
432 string marker;
433 string *pmarker;
434 int max_entries;
435 list<cls_log_entry> *entries;
436 bool *truncated;
437
224ce89b 438 RGWAsyncReadMDLogEntries *req{nullptr};
7c673cae
FG
439
440public:
441 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
442 int _shard_id, string*_marker, int _max_entries,
443 list<cls_log_entry> *_entries, bool *_truncated)
444 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
445 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
446 entries(_entries), truncated(_truncated) {}
447
448 ~RGWReadMDLogEntriesCR() override {
449 if (req) {
450 req->finish();
451 }
452 }
453
b3b6e05e 454 int send_request(const DoutPrefixProvider *dpp) override {
7c673cae 455 marker = *pmarker;
b3b6e05e 456 req = new RGWAsyncReadMDLogEntries(dpp, this, stack->create_completion_notifier(),
f6b5b4d7
TL
457 sync_env->store, mdlog, shard_id, marker,
458 max_entries);
7c673cae
FG
459 sync_env->async_rados->queue(req);
460 return 0;
461 }
462
463 int request_complete() override {
f6b5b4d7
TL
464 *pmarker = std::move(req->marker);
465 *entries = std::move(req->entries);
466 *truncated = req->truncated;
7c673cae
FG
467 return req->get_ret_status();
468 }
469};
470
471
472class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
473 RGWMetaSyncEnv *env;
474 RGWRESTReadResource *http_op;
475
476 const std::string& period;
477 int shard_id;
478 RGWMetadataLogInfo *shard_info;
479
480public:
481 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
482 int _shard_id, RGWMetadataLogInfo *_shard_info)
483 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
484 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
485
b3b6e05e 486 int operate(const DoutPrefixProvider *dpp) override {
7c673cae 487 auto store = env->store;
9f95a23c 488 RGWRESTConn *conn = store->svc()->zone->get_master_conn();
7c673cae
FG
489 reenter(this) {
490 yield {
491 char buf[16];
492 snprintf(buf, sizeof(buf), "%d", shard_id);
493 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
494 { "id", buf },
495 { "period", period.c_str() },
496 { "info" , NULL },
497 { NULL, NULL } };
498
499 string p = "/admin/log/";
500
501 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
502 env->http_manager);
503
11fdf7f2 504 init_new_io(http_op);
7c673cae 505
b3b6e05e 506 int ret = http_op->aio_read(dpp);
7c673cae 507 if (ret < 0) {
11fdf7f2 508 ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
7c673cae
FG
509 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
510 http_op->put();
511 return set_cr_error(ret);
512 }
513
514 return io_block(0);
515 }
516 yield {
9f95a23c 517 int ret = http_op->wait(shard_info, null_yield);
7c673cae
FG
518 http_op->put();
519 if (ret < 0) {
520 return set_cr_error(ret);
521 }
522 return set_cr_done();
523 }
524 }
525 return 0;
526 }
527};
528
9f95a23c
TL
529RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
530 const std::string& period,
531 int shard_id,
532 RGWMetadataLogInfo* info)
533{
534 return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info);
535}
536
7c673cae
FG
537class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
538 RGWMetaSyncEnv *sync_env;
539 RGWRESTReadResource *http_op;
540
541 const std::string& period;
542 int shard_id;
543 string marker;
544 uint32_t max_entries;
545 rgw_mdlog_shard_data *result;
546
547public:
548 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
549 int _shard_id, const string& _marker, uint32_t _max_entries,
550 rgw_mdlog_shard_data *_result)
551 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
552 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
553
b3b6e05e 554 int send_request(const DoutPrefixProvider *dpp) override {
7c673cae 555 RGWRESTConn *conn = sync_env->conn;
7c673cae
FG
556
557 char buf[32];
558 snprintf(buf, sizeof(buf), "%d", shard_id);
559
560 char max_entries_buf[32];
561 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
562
563 const char *marker_key = (marker.empty() ? "" : "marker");
564
565 rgw_http_param_pair pairs[] = { { "type", "metadata" },
566 { "id", buf },
567 { "period", period.c_str() },
568 { "max-entries", max_entries_buf },
569 { marker_key, marker.c_str() },
570 { NULL, NULL } };
571
572 string p = "/admin/log/";
573
574 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
11fdf7f2 575 init_new_io(http_op);
7c673cae 576
b3b6e05e 577 int ret = http_op->aio_read(dpp);
7c673cae 578 if (ret < 0) {
b3b6e05e 579 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
7c673cae
FG
580 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
581 http_op->put();
582 return ret;
583 }
584
585 return 0;
586 }
587
588 int request_complete() override {
9f95a23c 589 int ret = http_op->wait(result, null_yield);
7c673cae
FG
590 http_op->put();
591 if (ret < 0 && ret != -ENOENT) {
11fdf7f2 592 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
7c673cae
FG
593 return ret;
594 }
595 return 0;
596 }
597};
598
9f95a23c
TL
599RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
600 const std::string& period,
601 int shard_id,
602 const std::string& marker,
603 uint32_t max_entries,
604 rgw_mdlog_shard_data *result)
605{
606 return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker,
607 max_entries, result);
608}
609
7c673cae
FG
610bool RGWReadRemoteMDLogInfoCR::spawn_next() {
611 if (shard_id >= num_shards) {
612 return false;
613 }
614 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
615 shard_id++;
616 return true;
617}
618
619bool RGWListRemoteMDLogCR::spawn_next() {
620 if (iter == shards.end()) {
621 return false;
622 }
623
624 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
625 ++iter;
626 return true;
627}
628
629class RGWInitSyncStatusCoroutine : public RGWCoroutine {
630 RGWMetaSyncEnv *sync_env;
631
632 rgw_meta_sync_info status;
633 vector<RGWMetadataLogInfo> shards_info;
31f18b77
FG
634 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
635 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
636public:
637 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
638 const rgw_meta_sync_info &status)
639 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
640 status(status), shards_info(status.num_shards),
641 lease_cr(nullptr), lease_stack(nullptr) {}
642
643 ~RGWInitSyncStatusCoroutine() override {
644 if (lease_cr) {
645 lease_cr->abort();
7c673cae
FG
646 }
647 }
648
b3b6e05e 649 int operate(const DoutPrefixProvider *dpp) override {
7c673cae
FG
650 int ret;
651 reenter(this) {
652 yield {
653 set_status("acquiring sync lock");
654 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
655 string lock_name = "sync_lock";
9f95a23c 656 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77 657 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
9f95a23c 658 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
31f18b77
FG
659 lock_name, lock_duration, this));
660 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
661 }
662 while (!lease_cr->is_locked()) {
663 if (lease_cr->is_done()) {
b3b6e05e 664 ldpp_dout(dpp, 5) << "lease cr failed, done early " << dendl;
7c673cae
FG
665 set_status("lease lock failed, early abort");
666 return set_cr_error(lease_cr->get_ret_status());
667 }
668 set_sleeping(true);
669 yield;
670 }
671 yield {
672 set_status("writing sync status");
9f95a23c 673 rgw::sal::RGWRadosStore *store = sync_env->store;
b3b6e05e 674 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
9f95a23c 675 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
7c673cae
FG
676 status));
677 }
678
679 if (retcode < 0) {
680 set_status("failed to write sync status");
b3b6e05e 681 ldpp_dout(dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
7c673cae
FG
682 yield lease_cr->go_down();
683 return set_cr_error(retcode);
684 }
685 /* fetch current position in logs */
686 set_status("fetching remote log position");
687 yield {
688 for (int i = 0; i < (int)status.num_shards; i++) {
689 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
690 &shards_info[i]), false);
691 }
692 }
693
31f18b77 694 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
7c673cae
FG
695
696 yield {
697 set_status("updating sync status");
698 for (int i = 0; i < (int)status.num_shards; i++) {
699 rgw_meta_sync_marker marker;
700 RGWMetadataLogInfo& info = shards_info[i];
701 marker.next_step_marker = info.marker;
702 marker.timestamp = info.last_update;
9f95a23c 703 rgw::sal::RGWRadosStore *store = sync_env->store;
b3b6e05e
TL
704 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp,
705 sync_env->async_rados,
9f95a23c
TL
706 store->svc()->sysobj,
707 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
7c673cae
FG
708 marker), true);
709 }
710 }
711 yield {
712 set_status("changing sync state: build full sync maps");
713 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
9f95a23c 714 rgw::sal::RGWRadosStore *store = sync_env->store;
b3b6e05e 715 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
9f95a23c 716 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
7c673cae
FG
717 status));
718 }
719 set_status("drop lock lease");
720 yield lease_cr->go_down();
721 while (collect(&ret, NULL)) {
722 if (ret < 0) {
723 return set_cr_error(ret);
724 }
725 yield;
726 }
727 drain_all();
728 return set_cr_done();
729 }
730 return 0;
731 }
732};
733
734class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
735 static constexpr int MAX_CONCURRENT_SHARDS = 16;
736
737 RGWMetaSyncEnv *env;
738 const int num_shards;
739 int shard_id{0};
740 map<uint32_t, rgw_meta_sync_marker>& markers;
741
742 public:
743 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
744 map<uint32_t, rgw_meta_sync_marker>& markers)
745 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
746 env(env), num_shards(num_shards), markers(markers)
747 {}
748 bool spawn_next() override;
749};
750
751bool RGWReadSyncStatusMarkersCR::spawn_next()
752{
753 if (shard_id >= num_shards) {
754 return false;
755 }
756 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
9f95a23c 757 rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool,
7c673cae 758 env->shard_obj_name(shard_id)};
b3b6e05e 759 spawn(new CR(env->dpp, env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false);
7c673cae
FG
760 shard_id++;
761 return true;
762}
763
764class RGWReadSyncStatusCoroutine : public RGWCoroutine {
765 RGWMetaSyncEnv *sync_env;
766 rgw_meta_sync_status *sync_status;
767
768public:
769 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
770 rgw_meta_sync_status *_status)
771 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
772 {}
b3b6e05e 773 int operate(const DoutPrefixProvider *dpp) override;
7c673cae
FG
774};
775
b3b6e05e 776int RGWReadSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
7c673cae
FG
777{
778 reenter(this) {
779 // read sync info
780 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
781 yield {
782 bool empty_on_enoent = false; // fail on ENOENT
9f95a23c 783 rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool,
7c673cae 784 sync_env->status_oid()};
b3b6e05e 785 call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, obj,
7c673cae
FG
786 &sync_status->sync_info, empty_on_enoent));
787 }
788 if (retcode < 0) {
b3b6e05e 789 ldpp_dout(dpp, 4) << "failed to read sync status info with "
7c673cae
FG
790 << cpp_strerror(retcode) << dendl;
791 return set_cr_error(retcode);
792 }
793 // read shard markers
794 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
795 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
796 sync_status->sync_markers));
797 if (retcode < 0) {
b3b6e05e 798 ldpp_dout(dpp, 4) << "failed to read sync status markers with "
7c673cae
FG
799 << cpp_strerror(retcode) << dendl;
800 return set_cr_error(retcode);
801 }
802 return set_cr_done();
803 }
804 return 0;
805}
806
807class RGWFetchAllMetaCR : public RGWCoroutine {
808 RGWMetaSyncEnv *sync_env;
809
810 int num_shards;
811
812
813 int ret_status;
814
815 list<string> sections;
816 list<string>::iterator sections_iter;
181888fb
FG
817
818 struct meta_list_result {
819 list<string> keys;
820 string marker;
821 uint64_t count{0};
822 bool truncated{false};
823
824 void decode_json(JSONObj *obj) {
825 JSONDecoder::decode_json("keys", keys, obj);
826 JSONDecoder::decode_json("marker", marker, obj);
827 JSONDecoder::decode_json("count", count, obj);
828 JSONDecoder::decode_json("truncated", truncated, obj);
829 }
830 } result;
7c673cae
FG
831 list<string>::iterator iter;
832
833 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
834
31f18b77
FG
835 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
836 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
837 bool lost_lock;
838 bool failed;
839
181888fb
FG
840 string marker;
841
7c673cae
FG
842 map<uint32_t, rgw_meta_sync_marker>& markers;
843
11fdf7f2
TL
844 RGWSyncTraceNodeRef tn;
845
7c673cae
FG
846public:
847 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
11fdf7f2
TL
848 map<uint32_t, rgw_meta_sync_marker>& _markers,
849 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
7c673cae
FG
850 num_shards(_num_shards),
851 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
852 lost_lock(false), failed(false), markers(_markers) {
11fdf7f2 853 tn = sync_env->sync_tracer->add_node(_tn_parent, "fetch_all_meta");
7c673cae
FG
854 }
855
856 ~RGWFetchAllMetaCR() override {
7c673cae
FG
857 }
858
859 void append_section_from_set(set<string>& all_sections, const string& name) {
860 set<string>::iterator iter = all_sections.find(name);
861 if (iter != all_sections.end()) {
862 sections.emplace_back(std::move(*iter));
863 all_sections.erase(iter);
864 }
865 }
866 /*
867 * meta sync should go in the following order: user, bucket.instance, bucket
868 * then whatever other sections exist (if any)
869 */
870 void rearrange_sections() {
871 set<string> all_sections;
872 std::move(sections.begin(), sections.end(),
873 std::inserter(all_sections, all_sections.end()));
874 sections.clear();
875
876 append_section_from_set(all_sections, "user");
877 append_section_from_set(all_sections, "bucket.instance");
878 append_section_from_set(all_sections, "bucket");
879
880 std::move(all_sections.begin(), all_sections.end(),
881 std::back_inserter(sections));
882 }
883
b3b6e05e 884 int operate(const DoutPrefixProvider *dpp) override {
7c673cae
FG
885 RGWRESTConn *conn = sync_env->conn;
886
887 reenter(this) {
888 yield {
889 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
890 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
891 string lock_name = "sync_lock";
31f18b77
FG
892 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
893 sync_env->store,
9f95a23c 894 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
31f18b77 895 lock_name, lock_duration, this));
224ce89b 896 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
897 }
898 while (!lease_cr->is_locked()) {
899 if (lease_cr->is_done()) {
b3b6e05e 900 ldpp_dout(dpp, 5) << "lease cr failed, done early " << dendl;
7c673cae
FG
901 set_status("failed acquiring lock");
902 return set_cr_error(lease_cr->get_ret_status());
903 }
904 set_sleeping(true);
905 yield;
906 }
907 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
9f95a23c 908 sync_env->store->svc()->zone->get_zone_params().log_pool,
7c673cae
FG
909 mdlog_sync_full_sync_index_prefix));
910 yield {
911 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
912 "/admin/metadata", NULL, &sections));
913 }
914 if (get_ret_status() < 0) {
b3b6e05e 915 ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata sections" << dendl;
7c673cae
FG
916 yield entries_index->finish();
917 yield lease_cr->go_down();
918 drain_all();
919 return set_cr_error(get_ret_status());
920 }
921 rearrange_sections();
922 sections_iter = sections.begin();
923 for (; sections_iter != sections.end(); ++sections_iter) {
181888fb
FG
924 do {
925 yield {
926#define META_FULL_SYNC_CHUNK_SIZE "1000"
927 string entrypoint = string("/admin/metadata/") + *sections_iter;
928 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
929 { "marker", result.marker.c_str() },
930 { NULL, NULL } };
931 result.keys.clear();
932 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
933 entrypoint, pairs, &result));
7c673cae 934 }
11fdf7f2
TL
935 ret_status = get_ret_status();
936 if (ret_status == -ENOENT) {
937 set_retcode(0); /* reset coroutine status so that we don't return it */
938 ret_status = 0;
939 }
940 if (ret_status < 0) {
941 tn->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter));
181888fb
FG
942 yield entries_index->finish();
943 yield lease_cr->go_down();
944 drain_all();
11fdf7f2 945 return set_cr_error(ret_status);
7c673cae 946 }
181888fb
FG
947 iter = result.keys.begin();
948 for (; iter != result.keys.end(); ++iter) {
949 if (!lease_cr->is_locked()) {
950 lost_lock = true;
951 break;
952 }
953 yield; // allow entries_index consumer to make progress
954
11fdf7f2 955 tn->log(20, SSTR("list metadata: section=" << *sections_iter << " key=" << *iter));
181888fb
FG
956 string s = *sections_iter + ":" + *iter;
957 int shard_id;
9f95a23c
TL
958 rgw::sal::RGWRadosStore *store = sync_env->store;
959 int ret = store->ctl()->meta.mgr->get_shard_id(*sections_iter, *iter, &shard_id);
181888fb 960 if (ret < 0) {
11fdf7f2 961 tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter));
181888fb
FG
962 ret_status = ret;
963 break;
964 }
965 if (!entries_index->append(s, shard_id)) {
966 break;
967 }
7c673cae 968 }
181888fb 969 } while (result.truncated);
7c673cae
FG
970 }
971 yield {
972 if (!entries_index->finish()) {
973 failed = true;
974 }
975 }
976 if (!failed) {
977 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
978 int shard_id = (int)iter->first;
979 rgw_meta_sync_marker& marker = iter->second;
980 marker.total_entries = entries_index->get_total_entries(shard_id);
b3b6e05e 981 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
9f95a23c 982 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
7c673cae
FG
983 marker), true);
984 }
985 }
986
31f18b77 987 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
7c673cae
FG
988
989 yield lease_cr->go_down();
990
991 int ret;
992 while (collect(&ret, NULL)) {
993 if (ret < 0) {
994 return set_cr_error(ret);
995 }
996 yield;
997 }
998 drain_all();
999 if (failed) {
1000 yield return set_cr_error(-EIO);
1001 }
1002 if (lost_lock) {
1003 yield return set_cr_error(-EBUSY);
1004 }
1005
1006 if (ret_status < 0) {
1007 yield return set_cr_error(ret_status);
1008 }
1009
1010 yield return set_cr_done();
1011 }
1012 return 0;
1013 }
1014};
1015
1016static string full_sync_index_shard_oid(int shard_id)
1017{
1018 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
1019 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
1020 return string(buf);
1021}
1022
1023class RGWReadRemoteMetadataCR : public RGWCoroutine {
1024 RGWMetaSyncEnv *sync_env;
1025
1026 RGWRESTReadResource *http_op;
1027
1028 string section;
1029 string key;
1030
1031 bufferlist *pbl;
1032
11fdf7f2
TL
1033 RGWSyncTraceNodeRef tn;
1034
7c673cae
FG
1035public:
1036 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
11fdf7f2
TL
1037 const string& _section, const string& _key, bufferlist *_pbl,
1038 const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
7c673cae
FG
1039 http_op(NULL),
1040 section(_section),
1041 key(_key),
1042 pbl(_pbl) {
11fdf7f2
TL
1043 tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta",
1044 section + ":" + key);
7c673cae
FG
1045 }
1046
b3b6e05e 1047 int operate(const DoutPrefixProvider *dpp) override {
7c673cae
FG
1048 RGWRESTConn *conn = sync_env->conn;
1049 reenter(this) {
1050 yield {
f67539c2
TL
1051 string key_encode;
1052 url_encode(key, key_encode);
7c673cae
FG
1053 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
1054 { NULL, NULL } };
1055
f67539c2 1056 string p = string("/admin/metadata/") + section + "/" + key_encode;
7c673cae
FG
1057
1058 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1059
11fdf7f2 1060 init_new_io(http_op);
7c673cae 1061
b3b6e05e 1062 int ret = http_op->aio_read(dpp);
7c673cae 1063 if (ret < 0) {
b3b6e05e 1064 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
7c673cae
FG
1065 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1066 http_op->put();
1067 return set_cr_error(ret);
1068 }
1069
1070 return io_block(0);
1071 }
1072 yield {
9f95a23c 1073 int ret = http_op->wait(pbl, null_yield);
7c673cae
FG
1074 http_op->put();
1075 if (ret < 0) {
1076 return set_cr_error(ret);
1077 }
1078 return set_cr_done();
1079 }
1080 }
1081 return 0;
1082 }
1083};
1084
1085class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
9f95a23c 1086 rgw::sal::RGWRadosStore *store;
7c673cae
FG
1087 string raw_key;
1088 bufferlist bl;
b3b6e05e 1089 const DoutPrefixProvider *dpp;
7c673cae 1090protected:
b3b6e05e
TL
1091 int _send_request(const DoutPrefixProvider *dpp) override {
1092 int ret = store->ctl()->meta.mgr->put(raw_key, bl, null_yield, dpp, RGWMDLogSyncType::APPLY_ALWAYS, true);
7c673cae 1093 if (ret < 0) {
b3b6e05e 1094 ldpp_dout(dpp, 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
7c673cae
FG
1095 return ret;
1096 }
1097 return 0;
1098 }
1099public:
9f95a23c 1100 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
7c673cae 1101 const string& _raw_key,
b3b6e05e
TL
1102 bufferlist& _bl,
1103 const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store),
1104 raw_key(_raw_key), bl(_bl), dpp(dpp) {}
7c673cae
FG
1105};
1106
1107
1108class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1109 RGWMetaSyncEnv *sync_env;
1110 string raw_key;
1111 bufferlist bl;
1112
1113 RGWAsyncMetaStoreEntry *req;
1114
1115public:
1116 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1117 const string& _raw_key,
1118 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1119 raw_key(_raw_key), bl(_bl), req(NULL) {
1120 }
1121
1122 ~RGWMetaStoreEntryCR() override {
1123 if (req) {
1124 req->finish();
1125 }
1126 }
1127
b3b6e05e 1128 int send_request(const DoutPrefixProvider *dpp) override {
7c673cae 1129 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
b3b6e05e 1130 sync_env->store, raw_key, bl, dpp);
7c673cae
FG
1131 sync_env->async_rados->queue(req);
1132 return 0;
1133 }
1134
1135 int request_complete() override {
1136 return req->get_ret_status();
1137 }
1138};
1139
1140class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
9f95a23c 1141 rgw::sal::RGWRadosStore *store;
7c673cae 1142 string raw_key;
b3b6e05e 1143 const DoutPrefixProvider *dpp;
7c673cae 1144protected:
b3b6e05e
TL
1145 int _send_request(const DoutPrefixProvider *dpp) override {
1146 int ret = store->ctl()->meta.mgr->remove(raw_key, null_yield, dpp);
7c673cae 1147 if (ret < 0) {
b3b6e05e 1148 ldpp_dout(dpp, 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
7c673cae
FG
1149 return ret;
1150 }
1151 return 0;
1152 }
1153public:
9f95a23c 1154 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
b3b6e05e
TL
1155 const string& _raw_key, const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store),
1156 raw_key(_raw_key), dpp(dpp) {}
7c673cae
FG
1157};
1158
1159
1160class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1161 RGWMetaSyncEnv *sync_env;
1162 string raw_key;
1163
1164 RGWAsyncMetaRemoveEntry *req;
1165
1166public:
1167 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1168 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1169 raw_key(_raw_key), req(NULL) {
1170 }
1171
1172 ~RGWMetaRemoveEntryCR() override {
1173 if (req) {
1174 req->finish();
1175 }
1176 }
1177
b3b6e05e 1178 int send_request(const DoutPrefixProvider *dpp) override {
7c673cae 1179 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
b3b6e05e 1180 sync_env->store, raw_key, dpp);
7c673cae
FG
1181 sync_env->async_rados->queue(req);
1182 return 0;
1183 }
1184
1185 int request_complete() override {
1186 int r = req->get_ret_status();
1187 if (r == -ENOENT) {
1188 r = 0;
1189 }
1190 return r;
1191 }
1192};
1193
1194#define META_SYNC_UPDATE_MARKER_WINDOW 10
1195
91327a77 1196
b3b6e05e 1197int RGWLastCallerWinsCR::operate(const DoutPrefixProvider *dpp) {
91327a77
AA
1198 RGWCoroutine *call_cr;
1199 reenter(this) {
1200 while (cr) {
1201 call_cr = cr;
1202 cr = nullptr;
1203 yield call(call_cr);
1204 /* cr might have been modified at this point */
1205 }
1206 return set_cr_done();
1207 }
1208 return 0;
1209}
1210
7c673cae
FG
1211class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1212 RGWMetaSyncEnv *sync_env;
1213
1214 string marker_oid;
1215 rgw_meta_sync_marker sync_marker;
1216
11fdf7f2 1217 RGWSyncTraceNodeRef tn;
7c673cae
FG
1218
1219public:
1220 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1221 const string& _marker_oid,
11fdf7f2
TL
1222 const rgw_meta_sync_marker& _marker,
1223 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
7c673cae
FG
1224 sync_env(_sync_env),
1225 marker_oid(_marker_oid),
11fdf7f2
TL
1226 sync_marker(_marker),
1227 tn(_tn){}
7c673cae
FG
1228
1229 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1230 sync_marker.marker = new_marker;
1231 if (index_pos > 0) {
1232 sync_marker.pos = index_pos;
1233 }
1234
1235 if (!real_clock::is_zero(timestamp)) {
1236 sync_marker.timestamp = timestamp;
1237 }
1238
11fdf7f2
TL
1239 ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1240 tn->log(20, SSTR("new marker=" << new_marker));
9f95a23c 1241 rgw::sal::RGWRadosStore *store = sync_env->store;
b3b6e05e 1242 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados,
9f95a23c
TL
1243 store->svc()->sysobj,
1244 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
1245 sync_marker);
1246 }
91327a77 1247
11fdf7f2 1248 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
1249 return new RGWLastCallerWinsCR(sync_env->cct);
1250 }
7c673cae
FG
1251};
1252
11fdf7f2
TL
1253RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
1254 const string& _raw_key, const string& _entry_marker,
1255 const RGWMDLogStatus& _op_status,
1256 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
1257 sync_env(_sync_env),
1258 raw_key(_raw_key), entry_marker(_entry_marker),
1259 op_status(_op_status),
1260 pos(0), sync_status(0),
1261 marker_tracker(_marker_tracker), tries(0) {
1262 error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
1263 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
1264}
1265
b3b6e05e 1266int RGWMetaSyncSingleEntryCR::operate(const DoutPrefixProvider *dpp) {
7c673cae
FG
1267 reenter(this) {
1268#define NUM_TRANSIENT_ERROR_RETRIES 10
1269
1270 if (error_injection &&
1271 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
7c673cae
FG
1272 return set_cr_error(-EIO);
1273 }
1274
1275 if (op_status != MDLOG_STATUS_COMPLETE) {
11fdf7f2 1276 tn->log(20, "skipping pending operation");
7c673cae
FG
1277 yield call(marker_tracker->finish(entry_marker));
1278 if (retcode < 0) {
1279 return set_cr_error(retcode);
1280 }
1281 return set_cr_done();
1282 }
11fdf7f2 1283 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
1284 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1285 yield {
1286 pos = raw_key.find(':');
1287 section = raw_key.substr(0, pos);
1288 key = raw_key.substr(pos + 1);
11fdf7f2
TL
1289 tn->log(10, SSTR("fetching remote metadata entry" << (tries == 0 ? "" : " (retry)")));
1290 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl, tn));
7c673cae
FG
1291 }
1292
1293 sync_status = retcode;
1294
1295 if (sync_status == -ENOENT) {
1296 /* FIXME: do we need to remove the entry from the local zone? */
1297 break;
1298 }
1299
1300 if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
b3b6e05e 1301 ldpp_dout(dpp, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
7c673cae
FG
1302 continue;
1303 }
1304
1305 if (sync_status < 0) {
11fdf7f2 1306 tn->log(10, SSTR("failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status));
7c673cae 1307 log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
b3b6e05e 1308 yield call(sync_env->error_logger->log_error_cr(dpp, sync_env->conn->get_remote_id(), section, key, -sync_status,
7c673cae
FG
1309 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1310 return set_cr_error(sync_status);
1311 }
1312
1313 break;
1314 }
1315
1316 retcode = 0;
1317 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1318 if (sync_status != -ENOENT) {
11fdf7f2 1319 tn->log(10, SSTR("storing local metadata entry"));
7c673cae
FG
1320 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1321 } else {
11fdf7f2 1322 tn->log(10, SSTR("removing local metadata entry"));
7c673cae
FG
1323 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1324 }
1325 if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
b3b6e05e 1326 ldpp_dout(dpp, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
7c673cae
FG
1327 continue;
1328 }
1329 break;
1330 }
1331
1332 sync_status = retcode;
1333
1334 if (sync_status == 0 && marker_tracker) {
1335 /* update marker */
1336 yield call(marker_tracker->finish(entry_marker));
1337 sync_status = retcode;
1338 }
1339 if (sync_status < 0) {
11fdf7f2 1340 tn->log(10, SSTR("failed, status=" << sync_status));
7c673cae
FG
1341 return set_cr_error(sync_status);
1342 }
11fdf7f2 1343 tn->log(10, "success");
7c673cae
FG
1344 return set_cr_done();
1345 }
1346 return 0;
1347}
1348
1349class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1350 RGWMetaSyncEnv *sync_env;
1351 RGWMetadataLog *mdlog;
1352
1353 const std::string& period;
1354 int shard_id;
1355 string marker;
1356 bool truncated = false;
1357 string *new_marker;
1358
1359 int max_entries = CLONE_MAX_ENTRIES;
1360
1361 RGWRESTReadResource *http_op = nullptr;
1362 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1363
1364 RGWMetadataLogInfo shard_info;
1365 rgw_mdlog_shard_data data;
1366
1367public:
1368 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1369 const std::string& period, int _id,
1370 const string& _marker, string *_new_marker)
1371 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1372 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1373 if (new_marker) {
1374 *new_marker = marker;
1375 }
1376 }
1377 ~RGWCloneMetaLogCoroutine() override {
1378 if (http_op) {
1379 http_op->put();
1380 }
1381 if (completion) {
1382 completion->cancel();
1383 }
1384 }
1385
b3b6e05e 1386 int operate(const DoutPrefixProvider *dpp) override;
7c673cae
FG
1387
1388 int state_init();
1389 int state_read_shard_status();
1390 int state_read_shard_status_complete();
b3b6e05e 1391 int state_send_rest_request(const DoutPrefixProvider *dpp);
7c673cae
FG
1392 int state_receive_rest_response();
1393 int state_store_mdlog_entries();
1394 int state_store_mdlog_entries_complete();
1395};
1396
1397class RGWMetaSyncShardCR : public RGWCoroutine {
1398 RGWMetaSyncEnv *sync_env;
1399
1400 const rgw_pool& pool;
1401 const std::string& period; //< currently syncing period id
1402 const epoch_t realm_epoch; //< realm_epoch of period
1403 RGWMetadataLog* mdlog; //< log of syncing period
1404 uint32_t shard_id;
1405 rgw_meta_sync_marker& sync_marker;
1406 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1407 string marker;
1408 string max_marker;
1409 const std::string& period_marker; //< max marker stored in next period
1410
11fdf7f2 1411 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
1412 std::set<std::string> entries;
1413 std::set<std::string>::iterator iter;
7c673cae
FG
1414
1415 string oid;
1416
1417 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1418
1419 list<cls_log_entry> log_entries;
1420 list<cls_log_entry>::iterator log_iter;
1421 bool truncated = false;
1422
1423 string mdlog_marker;
1424 string raw_key;
1425 rgw_mdlog_entry mdlog_entry;
1426
9f95a23c
TL
1427 ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock");
1428 ceph::condition_variable inc_cond;
7c673cae
FG
1429
1430 boost::asio::coroutine incremental_cr;
1431 boost::asio::coroutine full_cr;
1432
31f18b77
FG
1433 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1434 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1435
7c673cae
FG
1436 bool lost_lock = false;
1437
1438 bool *reset_backoff;
1439
1440 // hold a reference to the cr stack while it's in the map
1441 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1442 map<StackRef, string> stack_to_pos;
1443 map<string, string> pos_to_prev;
1444
1445 bool can_adjust_marker = false;
1446 bool done_with_period = false;
1447
1448 int total_entries = 0;
1449
11fdf7f2 1450 RGWSyncTraceNodeRef tn;
7c673cae
FG
1451public:
1452 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1453 const std::string& period, epoch_t realm_epoch,
1454 RGWMetadataLog* mdlog, uint32_t _shard_id,
1455 rgw_meta_sync_marker& _marker,
11fdf7f2
TL
1456 const std::string& period_marker, bool *_reset_backoff,
1457 RGWSyncTraceNodeRef& _tn)
7c673cae
FG
1458 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1459 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1460 shard_id(_shard_id), sync_marker(_marker),
9f95a23c 1461 period_marker(period_marker),
11fdf7f2 1462 reset_backoff(_reset_backoff), tn(_tn) {
7c673cae
FG
1463 *reset_backoff = false;
1464 }
1465
1466 ~RGWMetaSyncShardCR() override {
1467 delete marker_tracker;
1468 if (lease_cr) {
1469 lease_cr->abort();
7c673cae
FG
1470 }
1471 }
1472
1473 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1474 delete marker_tracker;
1475 marker_tracker = mt;
1476 }
1477
b3b6e05e 1478 int operate(const DoutPrefixProvider *dpp) override {
7c673cae
FG
1479 int r;
1480 while (true) {
1481 switch (sync_marker.state) {
1482 case rgw_meta_sync_marker::FullSync:
1483 r = full_sync();
1484 if (r < 0) {
b3b6e05e 1485 ldpp_dout(dpp, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
7c673cae
FG
1486 return set_cr_error(r);
1487 }
1488 return 0;
1489 case rgw_meta_sync_marker::IncrementalSync:
1490 r = incremental_sync();
1491 if (r < 0) {
b3b6e05e 1492 ldpp_dout(dpp, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
7c673cae
FG
1493 return set_cr_error(r);
1494 }
1495 return 0;
1496 }
1497 }
1498 /* unreachable */
1499 return 0;
1500 }
1501
1502 void collect_children()
1503 {
1504 int child_ret;
1505 RGWCoroutinesStack *child;
1506 while (collect_next(&child_ret, &child)) {
1507 auto iter = stack_to_pos.find(child);
1508 if (iter == stack_to_pos.end()) {
1509 /* some other stack that we don't care about */
1510 continue;
1511 }
1512
1513 string& pos = iter->second;
1514
1515 if (child_ret < 0) {
11fdf7f2 1516 ldpp_dout(sync_env->dpp, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
7c673cae
FG
1517 }
1518
1519 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
11fdf7f2 1520 ceph_assert(prev_iter != pos_to_prev.end());
7c673cae
FG
1521
1522 /*
1523 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1524 * update the marker and abort. We'll get called again for these. Permanent errors will be
1525 * handled by marking the entry at the error log shard, so that we retry on it separately
1526 */
1527 if (child_ret == -EAGAIN) {
1528 can_adjust_marker = false;
1529 }
1530
1531 if (pos_to_prev.size() == 1) {
1532 if (can_adjust_marker) {
1533 sync_marker.marker = pos;
1534 }
1535 pos_to_prev.erase(prev_iter);
1536 } else {
11fdf7f2 1537 ceph_assert(pos_to_prev.size() > 1);
7c673cae
FG
1538 pos_to_prev.erase(prev_iter);
1539 prev_iter = pos_to_prev.begin();
1540 if (can_adjust_marker) {
1541 sync_marker.marker = prev_iter->second;
1542 }
1543 }
1544
11fdf7f2 1545 ldpp_dout(sync_env->dpp, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
7c673cae
FG
1546 stack_to_pos.erase(iter);
1547 }
1548 }
1549
1550 int full_sync() {
1551#define OMAP_GET_MAX_ENTRIES 100
1552 int max_entries = OMAP_GET_MAX_ENTRIES;
1553 reenter(&full_cr) {
1554 set_status("full_sync");
11fdf7f2 1555 tn->log(10, "start full sync");
7c673cae
FG
1556 oid = full_sync_index_shard_oid(shard_id);
1557 can_adjust_marker = true;
1558 /* grab lock */
1559 yield {
1560 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1561 string lock_name = "sync_lock";
9f95a23c 1562 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77
FG
1563 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1564 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1565 lock_name, lock_duration, this));
1566 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1567 lost_lock = false;
1568 }
1569 while (!lease_cr->is_locked()) {
1570 if (lease_cr->is_done()) {
7c673cae 1571 drain_all();
11fdf7f2 1572 tn->log(5, "failed to take lease");
7c673cae
FG
1573 return lease_cr->get_ret_status();
1574 }
1575 set_sleeping(true);
1576 yield;
1577 }
11fdf7f2 1578 tn->log(10, "took lease");
7c673cae
FG
1579
1580 /* lock succeeded, a retry now should avoid previous backoff status */
1581 *reset_backoff = true;
1582
1583 /* prepare marker tracker */
1584 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1585 sync_env->shard_obj_name(shard_id),
11fdf7f2 1586 sync_marker, tn));
7c673cae
FG
1587
1588 marker = sync_marker.marker;
1589
1590 total_entries = sync_marker.pos;
1591
1592 /* sync! */
1593 do {
1594 if (!lease_cr->is_locked()) {
11fdf7f2 1595 tn->log(10, "lost lease");
7c673cae
FG
1596 lost_lock = true;
1597 break;
1598 }
11fdf7f2 1599 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
7c673cae 1600 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
11fdf7f2 1601 marker, max_entries, omapkeys));
7c673cae 1602 if (retcode < 0) {
11fdf7f2
TL
1603 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1604 tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
7c673cae
FG
1605 yield lease_cr->go_down();
1606 drain_all();
1607 return retcode;
1608 }
11fdf7f2
TL
1609 entries = std::move(omapkeys->entries);
1610 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1611 if (entries.size() > 0) {
1612 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1613 }
7c673cae
FG
1614 iter = entries.begin();
1615 for (; iter != entries.end(); ++iter) {
28e407b8 1616 marker = *iter;
11fdf7f2 1617 tn->log(20, SSTR("full sync: " << marker));
7c673cae 1618 total_entries++;
28e407b8 1619 if (!marker_tracker->start(marker, total_entries, real_time())) {
11fdf7f2 1620 tn->log(0, SSTR("ERROR: cannot start syncing " << marker << ". Duplicate entry?"));
7c673cae
FG
1621 } else {
1622 // fetch remote and write locally
1623 yield {
11fdf7f2 1624 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker, tn), false);
7c673cae 1625 // stack_to_pos holds a reference to the stack
28e407b8
AA
1626 stack_to_pos[stack] = marker;
1627 pos_to_prev[marker] = marker;
7c673cae
FG
1628 }
1629 }
7c673cae
FG
1630 }
1631 collect_children();
11fdf7f2
TL
1632 } while (omapkeys->more && can_adjust_marker);
1633
1634 tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
7c673cae
FG
1635
1636 while (num_spawned() > 1) {
1637 yield wait_for_child();
1638 collect_children();
1639 }
1640
1641 if (!lost_lock) {
1642 /* update marker to reflect we're done with full sync */
1643 if (can_adjust_marker) {
1644 // apply updates to a temporary marker, or operate() will send us
1645 // to incremental_sync() after we yield
1646 temp_marker = sync_marker;
1647 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1648 temp_marker->marker = std::move(temp_marker->next_step_marker);
1649 temp_marker->next_step_marker.clear();
1650 temp_marker->realm_epoch = realm_epoch;
11fdf7f2 1651 ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
7c673cae
FG
1652
1653 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
b3b6e05e 1654 yield call(new WriteMarkerCR(sync_env->dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
7c673cae
FG
1655 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1656 *temp_marker));
1657 }
1658
1659 if (retcode < 0) {
11fdf7f2 1660 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
224ce89b
WB
1661 yield lease_cr->go_down();
1662 drain_all();
7c673cae
FG
1663 return retcode;
1664 }
1665 }
1666
1667 /*
1668 * if we reached here, it means that lost_lock is true, otherwise the state
1669 * change in the previous block will prevent us from reaching here
1670 */
1671
1672 yield lease_cr->go_down();
1673
31f18b77 1674 lease_cr.reset();
7c673cae
FG
1675
1676 drain_all();
1677
1678 if (!can_adjust_marker) {
1679 return -EAGAIN;
1680 }
1681
1682 if (lost_lock) {
1683 return -EBUSY;
1684 }
1685
11fdf7f2
TL
1686 tn->log(10, "full sync complete");
1687
7c673cae 1688 // apply the sync marker update
11fdf7f2 1689 ceph_assert(temp_marker);
7c673cae
FG
1690 sync_marker = std::move(*temp_marker);
1691 temp_marker = boost::none;
1692 // must not yield after this point!
1693 }
1694 return 0;
1695 }
1696
1697
1698 int incremental_sync() {
1699 reenter(&incremental_cr) {
1700 set_status("incremental_sync");
11fdf7f2 1701 tn->log(10, "start incremental sync");
7c673cae
FG
1702 can_adjust_marker = true;
1703 /* grab lock */
1704 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1705 yield {
1706 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1707 string lock_name = "sync_lock";
9f95a23c 1708 rgw::sal::RGWRadosStore *store = sync_env->store;
31f18b77
FG
1709 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1710 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1711 lock_name, lock_duration, this));
1712 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1713 lost_lock = false;
1714 }
1715 while (!lease_cr->is_locked()) {
1716 if (lease_cr->is_done()) {
7c673cae 1717 drain_all();
11fdf7f2 1718 tn->log(10, "failed to take lease");
7c673cae
FG
1719 return lease_cr->get_ret_status();
1720 }
1721 set_sleeping(true);
1722 yield;
1723 }
1724 }
11fdf7f2 1725 tn->log(10, "took lease");
7c673cae
FG
1726 // if the period has advanced, we can't use the existing marker
1727 if (sync_marker.realm_epoch < realm_epoch) {
11fdf7f2 1728 ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
7c673cae
FG
1729 << " from old realm_epoch=" << sync_marker.realm_epoch
1730 << " (now " << realm_epoch << ')' << dendl;
1731 sync_marker.realm_epoch = realm_epoch;
1732 sync_marker.marker.clear();
1733 }
1734 mdlog_marker = sync_marker.marker;
1735 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1736 sync_env->shard_obj_name(shard_id),
11fdf7f2 1737 sync_marker, tn));
7c673cae
FG
1738
1739 /*
1740 * mdlog_marker: the remote sync marker positiion
1741 * sync_marker: the local sync marker position
1742 * max_marker: the max mdlog position that we fetched
1743 * marker: the current position we try to sync
1744 * period_marker: the last marker before the next period begins (optional)
1745 */
1746 marker = max_marker = sync_marker.marker;
1747 /* inc sync */
1748 do {
1749 if (!lease_cr->is_locked()) {
1750 lost_lock = true;
11fdf7f2 1751 tn->log(10, "lost lease");
7c673cae
FG
1752 break;
1753 }
1754#define INCREMENTAL_MAX_ENTRIES 100
11fdf7f2 1755 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
7c673cae 1756 if (!period_marker.empty() && period_marker <= mdlog_marker) {
11fdf7f2 1757 tn->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker << " period_marker=" << period_marker));
7c673cae
FG
1758 done_with_period = true;
1759 break;
1760 }
1761 if (mdlog_marker <= max_marker) {
1762 /* we're at the tip, try to bring more entries */
11fdf7f2 1763 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
7c673cae
FG
1764 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1765 period, shard_id,
1766 mdlog_marker, &mdlog_marker));
1767 }
1768 if (retcode < 0) {
11fdf7f2 1769 tn->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode));
7c673cae
FG
1770 yield lease_cr->go_down();
1771 drain_all();
1772 *reset_backoff = false; // back off and try again later
1773 return retcode;
1774 }
1775 *reset_backoff = true; /* if we got to this point, all systems function */
7c673cae 1776 if (mdlog_marker > max_marker) {
11fdf7f2
TL
1777 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1778 tn->log(20, SSTR("mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker));
7c673cae
FG
1779 marker = max_marker;
1780 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1781 &max_marker, INCREMENTAL_MAX_ENTRIES,
1782 &log_entries, &truncated));
1783 if (retcode < 0) {
11fdf7f2 1784 tn->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode));
7c673cae
FG
1785 yield lease_cr->go_down();
1786 drain_all();
1787 *reset_backoff = false; // back off and try again later
1788 return retcode;
1789 }
1790 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1791 if (!period_marker.empty() && period_marker <= log_iter->id) {
1792 done_with_period = true;
1793 if (period_marker < log_iter->id) {
11fdf7f2
TL
1794 tn->log(10, SSTR("found key=" << log_iter->id
1795 << " past period_marker=" << period_marker));
7c673cae
FG
1796 break;
1797 }
11fdf7f2 1798 ldpp_dout(sync_env->dpp, 10) << "found key at period_marker=" << period_marker << dendl;
7c673cae
FG
1799 // sync this entry, then return control to RGWMetaSyncCR
1800 }
1801 if (!mdlog_entry.convert_from(*log_iter)) {
11fdf7f2 1802 tn->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry"));
7c673cae
FG
1803 continue;
1804 }
11fdf7f2 1805 tn->log(20, SSTR("log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp));
7c673cae 1806 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
11fdf7f2 1807 ldpp_dout(sync_env->dpp, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
7c673cae
FG
1808 } else {
1809 raw_key = log_iter->section + ":" + log_iter->name;
1810 yield {
11fdf7f2
TL
1811 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker, tn), false);
1812 ceph_assert(stack);
7c673cae
FG
1813 // stack_to_pos holds a reference to the stack
1814 stack_to_pos[stack] = log_iter->id;
1815 pos_to_prev[log_iter->id] = marker;
1816 }
1817 }
1818 marker = log_iter->id;
1819 }
1820 }
1821 collect_children();
11fdf7f2 1822 ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
7c673cae
FG
1823 if (done_with_period) {
1824 // return control to RGWMetaSyncCR and advance to the next period
11fdf7f2 1825 tn->log(10, SSTR(*this << ": done with period"));
7c673cae
FG
1826 break;
1827 }
1828 if (mdlog_marker == max_marker && can_adjust_marker) {
11fdf7f2 1829 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
1830#define INCREMENTAL_INTERVAL 20
1831 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1832 }
1833 } while (can_adjust_marker);
1834
11fdf7f2
TL
1835 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1836
7c673cae
FG
1837 while (num_spawned() > 1) {
1838 yield wait_for_child();
1839 collect_children();
1840 }
1841
1842 yield lease_cr->go_down();
1843
1844 drain_all();
1845
1846 if (lost_lock) {
1847 return -EBUSY;
1848 }
1849
1850 if (!can_adjust_marker) {
1851 return -EAGAIN;
1852 }
1853
1854 return set_cr_done();
1855 }
1856 /* TODO */
1857 return 0;
1858 }
1859};
1860
1861class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1862{
1863 RGWMetaSyncEnv *sync_env;
1864
1865 const rgw_pool& pool;
1866 const std::string& period;
1867 epoch_t realm_epoch;
1868 RGWMetadataLog* mdlog;
1869 uint32_t shard_id;
1870 rgw_meta_sync_marker sync_marker;
1871 const std::string period_marker;
1872
11fdf7f2
TL
1873 RGWSyncTraceNodeRef tn;
1874
7c673cae
FG
1875 static constexpr bool exit_on_error = false; // retry on all errors
1876public:
1877 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1878 const std::string& period, epoch_t realm_epoch,
1879 RGWMetadataLog* mdlog, uint32_t _shard_id,
1880 const rgw_meta_sync_marker& _marker,
11fdf7f2
TL
1881 std::string&& period_marker,
1882 RGWSyncTraceNodeRef& _tn_parent)
7c673cae
FG
1883 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1884 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1885 shard_id(_shard_id), sync_marker(_marker),
11fdf7f2
TL
1886 period_marker(std::move(period_marker)) {
1887 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
1888 std::to_string(shard_id));
1889 }
7c673cae
FG
1890
1891 RGWCoroutine *alloc_cr() override {
1892 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
11fdf7f2 1893 shard_id, sync_marker, period_marker, backoff_ptr(), tn);
7c673cae
FG
1894 }
1895
1896 RGWCoroutine *alloc_finisher_cr() override {
9f95a23c 1897 rgw::sal::RGWRadosStore *store = sync_env->store;
b3b6e05e 1898 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados, store->svc()->sysobj,
7c673cae
FG
1899 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1900 &sync_marker);
1901 }
1902};
1903
1904class RGWMetaSyncCR : public RGWCoroutine {
1905 RGWMetaSyncEnv *sync_env;
1906 const rgw_pool& pool;
1907 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1908 RGWPeriodHistory::Cursor next; //< next period in history
1909 rgw_meta_sync_status sync_status;
11fdf7f2 1910 RGWSyncTraceNodeRef tn;
7c673cae
FG
1911
1912 std::mutex mutex; //< protect access to shard_crs
1913
1914 // TODO: it should be enough to hold a reference on the stack only, as calling
1915 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1916 // already completed
1917 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1918 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1919 using RefPair = std::pair<ControlCRRef, StackRef>;
1920 map<int, RefPair> shard_crs;
1921 int ret{0};
1922
1923public:
11fdf7f2
TL
1924 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
1925 const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
7c673cae 1926 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
9f95a23c 1927 pool(sync_env->store->svc()->zone->get_zone_params().log_pool),
11fdf7f2
TL
1928 cursor(cursor), sync_status(_sync_status), tn(_tn) {}
1929
1930 ~RGWMetaSyncCR() {
1931 }
7c673cae 1932
b3b6e05e 1933 int operate(const DoutPrefixProvider *dpp) override {
7c673cae
FG
1934 reenter(this) {
1935 // loop through one period at a time
11fdf7f2 1936 tn->log(1, "start");
7c673cae 1937 for (;;) {
9f95a23c 1938 if (cursor == sync_env->store->svc()->mdlog->get_period_history()->get_current()) {
7c673cae
FG
1939 next = RGWPeriodHistory::Cursor{};
1940 if (cursor) {
b3b6e05e 1941 ldpp_dout(dpp, 10) << "RGWMetaSyncCR on current period="
7c673cae
FG
1942 << cursor.get_period().get_id() << dendl;
1943 } else {
b3b6e05e 1944 ldpp_dout(dpp, 10) << "RGWMetaSyncCR with no period" << dendl;
7c673cae
FG
1945 }
1946 } else {
1947 next = cursor;
1948 next.next();
b3b6e05e 1949 ldpp_dout(dpp, 10) << "RGWMetaSyncCR on period="
7c673cae
FG
1950 << cursor.get_period().get_id() << ", next="
1951 << next.get_period().get_id() << dendl;
1952 }
1953
1954 yield {
1955 // get the mdlog for the current period (may be empty)
1956 auto& period_id = sync_status.sync_info.period;
1957 auto realm_epoch = sync_status.sync_info.realm_epoch;
9f95a23c 1958 auto mdlog = sync_env->store->svc()->mdlog->get_log(period_id);
7c673cae 1959
11fdf7f2
TL
1960 tn->log(1, SSTR("realm epoch=" << realm_epoch << " period id=" << period_id));
1961
7c673cae
FG
1962 // prevent wakeup() from accessing shard_crs while we're spawning them
1963 std::lock_guard<std::mutex> lock(mutex);
1964
1965 // sync this period on each shard
1966 for (const auto& m : sync_status.sync_markers) {
1967 uint32_t shard_id = m.first;
1968 auto& marker = m.second;
1969
1970 std::string period_marker;
1971 if (next) {
1972 // read the maximum marker from the next period's sync status
1973 period_marker = next.get_period().get_sync_status()[shard_id];
1974 if (period_marker.empty()) {
1975 // no metadata changes have occurred on this shard, skip it
b3b6e05e 1976 ldpp_dout(dpp, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
7c673cae
FG
1977 << " with empty period marker" << dendl;
1978 continue;
1979 }
1980 }
1981
1982 using ShardCR = RGWMetaSyncShardControlCR;
1983 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1984 mdlog, shard_id, marker,
11fdf7f2 1985 std::move(period_marker), tn);
7c673cae
FG
1986 auto stack = spawn(cr, false);
1987 shard_crs[shard_id] = RefPair{cr, stack};
1988 }
1989 }
1990 // wait for each shard to complete
1991 while (ret == 0 && num_spawned() > 0) {
1992 yield wait_for_child();
1993 collect(&ret, nullptr);
1994 }
1995 drain_all();
1996 {
1997 // drop shard cr refs under lock
1998 std::lock_guard<std::mutex> lock(mutex);
1999 shard_crs.clear();
2000 }
2001 if (ret < 0) {
2002 return set_cr_error(ret);
2003 }
2004 // advance to the next period
11fdf7f2 2005 ceph_assert(next);
7c673cae
FG
2006 cursor = next;
2007
2008 // write the updated sync info
2009 sync_status.sync_info.period = cursor.get_period().get_id();
2010 sync_status.sync_info.realm_epoch = cursor.get_epoch();
b3b6e05e 2011 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados,
9f95a23c 2012 sync_env->store->svc()->sysobj,
7c673cae
FG
2013 rgw_raw_obj(pool, sync_env->status_oid()),
2014 sync_status.sync_info));
2015 }
2016 }
2017 return 0;
2018 }
2019
2020 void wakeup(int shard_id) {
2021 std::lock_guard<std::mutex> lock(mutex);
2022 auto iter = shard_crs.find(shard_id);
2023 if (iter == shard_crs.end()) {
2024 return;
2025 }
2026 iter->second.first->wakeup();
2027 }
2028};
2029
2030void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
11fdf7f2 2031 env->dpp = dpp;
7c673cae
FG
2032 env->cct = store->ctx();
2033 env->store = store;
2034 env->conn = conn;
2035 env->async_rados = async_rados;
2036 env->http_manager = &http_manager;
2037 env->error_logger = error_logger;
9f95a23c 2038 env->sync_tracer = store->getRados()->get_sync_tracer();
7c673cae
FG
2039}
2040
b3b6e05e 2041int RGWRemoteMetaLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_meta_sync_status *sync_status)
7c673cae 2042{
9f95a23c 2043 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2044 return 0;
2045 }
2046 // cannot run concurrently with run_sync(), so run in a separate manager
9f95a23c 2047 RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
7c673cae 2048 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 2049 int ret = http_manager.start();
7c673cae 2050 if (ret < 0) {
11fdf7f2 2051 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
2052 return ret;
2053 }
2054 RGWMetaSyncEnv sync_env_local = sync_env;
2055 sync_env_local.http_manager = &http_manager;
11fdf7f2 2056 tn->log(20, "read sync status");
b3b6e05e 2057 ret = crs.run(dpp, new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
7c673cae
FG
2058 http_manager.stop();
2059 return ret;
2060}
2061
b3b6e05e 2062int RGWRemoteMetaLog::init_sync_status(const DoutPrefixProvider *dpp)
7c673cae 2063{
9f95a23c 2064 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2065 return 0;
2066 }
2067
2068 rgw_mdlog_info mdlog_info;
b3b6e05e 2069 int r = read_log_info(dpp, &mdlog_info);
7c673cae 2070 if (r < 0) {
b3b6e05e 2071 ldpp_dout(dpp, -1) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
7c673cae
FG
2072 return r;
2073 }
2074
2075 rgw_meta_sync_info sync_info;
2076 sync_info.num_shards = mdlog_info.num_shards;
9f95a23c 2077 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
7c673cae
FG
2078 if (cursor) {
2079 sync_info.period = cursor.get_period().get_id();
2080 sync_info.realm_epoch = cursor.get_epoch();
2081 }
2082
b3b6e05e 2083 return run(dpp, new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
7c673cae
FG
2084}
2085
b3b6e05e 2086int RGWRemoteMetaLog::store_sync_info(const DoutPrefixProvider *dpp, const rgw_meta_sync_info& sync_info)
7c673cae 2087{
11fdf7f2 2088 tn->log(20, "store sync info");
b3b6e05e 2089 return run(dpp, new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, async_rados, store->svc()->sysobj,
9f95a23c 2090 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()),
7c673cae
FG
2091 sync_info));
2092}
2093
2094// return a cursor to the period at our sync position
b3b6e05e
TL
2095static RGWPeriodHistory::Cursor get_period_at(const DoutPrefixProvider *dpp,
2096 rgw::sal::RGWRadosStore* store,
f67539c2
TL
2097 const rgw_meta_sync_info& info,
2098 optional_yield y)
7c673cae
FG
2099{
2100 if (info.period.empty()) {
2101 // return an empty cursor with error=0
2102 return RGWPeriodHistory::Cursor{};
2103 }
2104
2105 // look for an existing period in our history
9f95a23c 2106 auto cursor = store->svc()->mdlog->get_period_history()->lookup(info.realm_epoch);
7c673cae
FG
2107 if (cursor) {
2108 // verify that the period ids match
2109 auto& existing = cursor.get_period().get_id();
2110 if (existing != info.period) {
b3b6e05e 2111 ldpp_dout(dpp, -1) << "ERROR: sync status period=" << info.period
7c673cae
FG
2112 << " does not match period=" << existing
2113 << " in history at realm epoch=" << info.realm_epoch << dendl;
2114 return RGWPeriodHistory::Cursor{-EEXIST};
2115 }
2116 return cursor;
2117 }
2118
2119 // read the period from rados or pull it from the master
2120 RGWPeriod period;
b3b6e05e 2121 int r = store->svc()->mdlog->pull_period(dpp, info.period, period, y);
7c673cae 2122 if (r < 0) {
b3b6e05e 2123 ldpp_dout(dpp, -1) << "ERROR: failed to read period id "
7c673cae
FG
2124 << info.period << ": " << cpp_strerror(r) << dendl;
2125 return RGWPeriodHistory::Cursor{r};
2126 }
2127 // attach the period to our history
b3b6e05e 2128 cursor = store->svc()->mdlog->get_period_history()->attach(dpp, std::move(period), y);
7c673cae
FG
2129 if (!cursor) {
2130 r = cursor.get_error();
b3b6e05e 2131 ldpp_dout(dpp, -1) << "ERROR: failed to read period history back to "
7c673cae
FG
2132 << info.period << ": " << cpp_strerror(r) << dendl;
2133 }
2134 return cursor;
2135}
2136
b3b6e05e 2137int RGWRemoteMetaLog::run_sync(const DoutPrefixProvider *dpp, optional_yield y)
7c673cae 2138{
9f95a23c 2139 if (store->svc()->zone->is_meta_master()) {
7c673cae
FG
2140 return 0;
2141 }
2142
2143 int r = 0;
2144
2145 // get shard count and oldest log period from master
2146 rgw_mdlog_info mdlog_info;
2147 for (;;) {
2148 if (going_down) {
11fdf7f2 2149 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
7c673cae
FG
2150 return 0;
2151 }
b3b6e05e 2152 r = read_log_info(dpp, &mdlog_info);
7c673cae
FG
2153 if (r == -EIO || r == -ENOENT) {
2154 // keep retrying if master isn't alive or hasn't initialized the log
11fdf7f2 2155 ldpp_dout(dpp, 10) << __func__ << "(): waiting for master.." << dendl;
7c673cae
FG
2156 backoff.backoff_sleep();
2157 continue;
2158 }
2159 backoff.reset();
2160 if (r < 0) {
b3b6e05e 2161 ldpp_dout(dpp, -1) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
7c673cae
FG
2162 return r;
2163 }
2164 break;
2165 }
2166
2167 rgw_meta_sync_status sync_status;
2168 do {
2169 if (going_down) {
11fdf7f2 2170 ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl;
7c673cae
FG
2171 return 0;
2172 }
b3b6e05e 2173 r = run(dpp, new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
7c673cae 2174 if (r < 0 && r != -ENOENT) {
11fdf7f2 2175 ldpp_dout(dpp, 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
7c673cae
FG
2176 return r;
2177 }
2178
2179 if (!mdlog_info.period.empty()) {
2180 // restart sync if the remote has a period, but:
2181 // a) our status does not, or
2182 // b) our sync period comes before the remote's oldest log period
2183 if (sync_status.sync_info.period.empty() ||
2184 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2185 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
11fdf7f2
TL
2186 string reason;
2187 if (sync_status.sync_info.period.empty()) {
2188 reason = "period is empty";
2189 } else {
2190 reason = SSTR("sync_info realm epoch is behind: " << sync_status.sync_info.realm_epoch << " < " << mdlog_info.realm_epoch);
2191 }
2192 tn->log(1, "initialize sync (reason: " + reason + ")");
2193 ldpp_dout(dpp, 1) << "epoch=" << sync_status.sync_info.realm_epoch
7c673cae
FG
2194 << " in sync status comes before remote's oldest mdlog epoch="
2195 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2196 }
2197 }
2198
2199 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
11fdf7f2 2200 ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl;
7c673cae 2201 sync_status.sync_info.num_shards = mdlog_info.num_shards;
9f95a23c 2202 auto cursor = store->svc()->mdlog->get_period_history()->get_current();
7c673cae
FG
2203 if (cursor) {
2204 // run full sync, then start incremental from the current period/epoch
2205 sync_status.sync_info.period = cursor.get_period().get_id();
2206 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2207 }
b3b6e05e 2208 r = run(dpp, new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
7c673cae
FG
2209 if (r == -EBUSY) {
2210 backoff.backoff_sleep();
2211 continue;
2212 }
2213 backoff.reset();
2214 if (r < 0) {
11fdf7f2 2215 ldpp_dout(dpp, 0) << "ERROR: failed to init sync status r=" << r << dendl;
7c673cae
FG
2216 return r;
2217 }
2218 }
2219 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2220
2221 auto num_shards = sync_status.sync_info.num_shards;
2222 if (num_shards != mdlog_info.num_shards) {
b3b6e05e 2223 ldpp_dout(dpp, -1) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
7c673cae
FG
2224 return -EINVAL;
2225 }
2226
2227 RGWPeriodHistory::Cursor cursor;
2228 do {
b3b6e05e 2229 r = run(dpp, new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
7c673cae 2230 if (r < 0 && r != -ENOENT) {
11fdf7f2 2231 tn->log(0, SSTR("ERROR: failed to fetch sync status r=" << r));
7c673cae
FG
2232 return r;
2233 }
2234
2235 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2236 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
11fdf7f2 2237 tn->log(20, "building full sync maps");
b3b6e05e 2238 r = run(dpp, new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers, tn));
7c673cae
FG
2239 if (r == -EBUSY || r == -EAGAIN) {
2240 backoff.backoff_sleep();
2241 continue;
2242 }
2243 backoff.reset();
2244 if (r < 0) {
11fdf7f2 2245 tn->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r << ")"));
7c673cae
FG
2246 return r;
2247 }
2248
2249 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
b3b6e05e 2250 r = store_sync_info(dpp, sync_status.sync_info);
7c673cae 2251 if (r < 0) {
11fdf7f2 2252 tn->log(0, SSTR("ERROR: failed to update sync status (r=" << r << ")"));
7c673cae
FG
2253 return r;
2254 }
2255 /* fall through */
2256 case rgw_meta_sync_info::StateSync:
11fdf7f2 2257 tn->log(20, "sync");
7c673cae 2258 // find our position in the period history (if any)
b3b6e05e 2259 cursor = get_period_at(dpp, store, sync_status.sync_info, y);
7c673cae
FG
2260 r = cursor.get_error();
2261 if (r < 0) {
2262 return r;
2263 }
11fdf7f2 2264 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
b3b6e05e 2265 r = run(dpp, meta_sync_cr);
7c673cae 2266 if (r < 0) {
11fdf7f2 2267 tn->log(0, "ERROR: failed to fetch all metadata keys");
7c673cae
FG
2268 return r;
2269 }
2270 break;
2271 default:
11fdf7f2 2272 tn->log(0, "ERROR: bad sync state!");
7c673cae
FG
2273 return -EIO;
2274 }
2275 } while (!going_down);
2276
2277 return 0;
2278}
2279
2280void RGWRemoteMetaLog::wakeup(int shard_id)
2281{
2282 if (!meta_sync_cr) {
2283 return;
2284 }
2285 meta_sync_cr->wakeup(shard_id);
2286}
2287
b3b6e05e 2288int RGWCloneMetaLogCoroutine::operate(const DoutPrefixProvider *dpp)
7c673cae
FG
2289{
2290 reenter(this) {
2291 do {
2292 yield {
b3b6e05e 2293 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
7c673cae
FG
2294 return state_init();
2295 }
2296 yield {
b3b6e05e 2297 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
7c673cae
FG
2298 return state_read_shard_status();
2299 }
2300 yield {
b3b6e05e 2301 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
7c673cae
FG
2302 return state_read_shard_status_complete();
2303 }
2304 yield {
b3b6e05e
TL
2305 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
2306 return state_send_rest_request(dpp);
7c673cae
FG
2307 }
2308 yield {
b3b6e05e 2309 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
7c673cae
FG
2310 return state_receive_rest_response();
2311 }
2312 yield {
b3b6e05e 2313 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
7c673cae
FG
2314 return state_store_mdlog_entries();
2315 }
2316 } while (truncated);
2317 yield {
b3b6e05e 2318 ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
7c673cae
FG
2319 return state_store_mdlog_entries_complete();
2320 }
2321 }
2322
2323 return 0;
2324}
2325
2326int RGWCloneMetaLogCoroutine::state_init()
2327{
2328 data = rgw_mdlog_shard_data();
2329
2330 return 0;
2331}
2332
2333int RGWCloneMetaLogCoroutine::state_read_shard_status()
2334{
2335 const bool add_ref = false; // default constructs with refs=1
2336
2337 completion.reset(new RGWMetadataLogInfoCompletion(
2338 [this](int ret, const cls_log_header& header) {
2339 if (ret < 0) {
11fdf7f2
TL
2340 if (ret != -ENOENT) {
2341 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to read mdlog info with "
2342 << cpp_strerror(ret) << dendl;
2343 }
7c673cae
FG
2344 } else {
2345 shard_info.marker = header.max_marker;
2346 shard_info.last_update = header.max_time.to_real_time();
2347 }
2348 // wake up parent stack
11fdf7f2 2349 io_complete();
7c673cae
FG
2350 }), add_ref);
2351
b3b6e05e 2352 int ret = mdlog->get_info_async(sync_env->dpp, shard_id, completion.get());
7c673cae 2353 if (ret < 0) {
11fdf7f2 2354 ldpp_dout(sync_env->dpp, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
7c673cae
FG
2355 return set_cr_error(ret);
2356 }
2357
2358 return io_block(0);
2359}
2360
2361int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2362{
2363 completion.reset();
2364
11fdf7f2 2365 ldpp_dout(sync_env->dpp, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
7c673cae
FG
2366
2367 marker = shard_info.marker;
2368
2369 return 0;
2370}
2371
b3b6e05e 2372int RGWCloneMetaLogCoroutine::state_send_rest_request(const DoutPrefixProvider *dpp)
7c673cae
FG
2373{
2374 RGWRESTConn *conn = sync_env->conn;
2375
2376 char buf[32];
2377 snprintf(buf, sizeof(buf), "%d", shard_id);
2378
2379 char max_entries_buf[32];
2380 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2381
2382 const char *marker_key = (marker.empty() ? "" : "marker");
2383
2384 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2385 { "id", buf },
2386 { "period", period.c_str() },
2387 { "max-entries", max_entries_buf },
2388 { marker_key, marker.c_str() },
2389 { NULL, NULL } };
2390
2391 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2392
11fdf7f2 2393 init_new_io(http_op);
7c673cae 2394
b3b6e05e 2395 int ret = http_op->aio_read(dpp);
7c673cae 2396 if (ret < 0) {
b3b6e05e 2397 ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
7c673cae
FG
2398 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2399 http_op->put();
2400 http_op = NULL;
1adf2230 2401 return set_cr_error(ret);
7c673cae
FG
2402 }
2403
2404 return io_block(0);
2405}
2406
2407int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2408{
9f95a23c 2409 int ret = http_op->wait(&data, null_yield);
7c673cae
FG
2410 if (ret < 0) {
2411 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
11fdf7f2 2412 ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;
7c673cae
FG
2413 http_op->put();
2414 http_op = NULL;
2415 return set_cr_error(ret);
2416 }
2417 http_op->put();
2418 http_op = NULL;
2419
11fdf7f2 2420 ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
7c673cae
FG
2421
2422 truncated = ((int)data.entries.size() == max_entries);
2423
2424 if (data.entries.empty()) {
2425 if (new_marker) {
2426 *new_marker = marker;
2427 }
2428 return set_cr_done();
2429 }
2430
2431 if (new_marker) {
2432 *new_marker = data.entries.back().id;
2433 }
2434
2435 return 0;
2436}
2437
2438
2439int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2440{
2441 list<cls_log_entry> dest_entries;
2442
2443 vector<rgw_mdlog_entry>::iterator iter;
2444 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2445 rgw_mdlog_entry& entry = *iter;
11fdf7f2 2446 ldpp_dout(sync_env->dpp, 20) << "entry: name=" << entry.name << dendl;
7c673cae
FG
2447
2448 cls_log_entry dest_entry;
2449 dest_entry.id = entry.id;
2450 dest_entry.section = entry.section;
2451 dest_entry.name = entry.name;
2452 dest_entry.timestamp = utime_t(entry.timestamp);
2453
11fdf7f2 2454 encode(entry.log_data, dest_entry.data);
7c673cae
FG
2455
2456 dest_entries.push_back(dest_entry);
2457
2458 marker = entry.id;
2459 }
2460
2461 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2462
b3b6e05e 2463 int ret = mdlog->store_entries_in_shard(sync_env->dpp, dest_entries, shard_id, cn->completion());
7c673cae
FG
2464 if (ret < 0) {
2465 cn->put();
11fdf7f2 2466 ldpp_dout(sync_env->dpp, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
7c673cae
FG
2467 return set_cr_error(ret);
2468 }
2469 return io_block(0);
2470}
2471
2472int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2473{
2474 return set_cr_done();
2475}