]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/optional.hpp>
5
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
13
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
16 #include "rgw_sync.h"
17 #include "rgw_metadata.h"
18 #include "rgw_rest_conn.h"
19 #include "rgw_tools.h"
20 #include "rgw_cr_rados.h"
21 #include "rgw_cr_rest.h"
22 #include "rgw_http_client.h"
23
24 #include "cls/lock/cls_lock_client.h"
25
26 #include <boost/asio/yield.hpp>
27
28 #define dout_subsys ceph_subsys_rgw
29
30 #undef dout_prefix
31 #define dout_prefix (*_dout << "meta sync: ")
32
33 static string mdlog_sync_status_oid = "mdlog.sync-status";
34 static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
35 static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
36
37 RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
38 for (int i = 0; i < num_shards; i++) {
39 oids.push_back(get_shard_oid(oid_prefix, i));
40 }
41 }
42 string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
43 char buf[oid_prefix.size() + 16];
44 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
45 return string(buf);
46 }
47
48 RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
49 cls_log_entry entry;
50
51 rgw_sync_error_info info(source_zone, error_code, message);
52 bufferlist bl;
53 ::encode(info, bl);
54 store->time_log_prepare_entry(entry, real_clock::now(), section, name, bl);
55
56 uint32_t shard_id = ++counter % num_shards;
57
58
59 return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
60 }
61
62 void RGWSyncBackoff::update_wait_time()
63 {
64 if (cur_wait == 0) {
65 cur_wait = 1;
66 } else {
67 cur_wait = (cur_wait << 1);
68 }
69 if (cur_wait >= max_secs) {
70 cur_wait = max_secs;
71 }
72 }
73
74 void RGWSyncBackoff::backoff_sleep()
75 {
76 update_wait_time();
77 sleep(cur_wait);
78 }
79
80 void RGWSyncBackoff::backoff(RGWCoroutine *op)
81 {
82 update_wait_time();
83 op->wait(utime_t(cur_wait, 0));
84 }
85
86 int RGWBackoffControlCR::operate() {
87 reenter(this) {
88 // retry the operation until it succeeds
89 while (true) {
90 yield {
91 Mutex::Locker l(lock);
92 cr = alloc_cr();
93 cr->get();
94 call(cr);
95 }
96 {
97 Mutex::Locker l(lock);
98 cr->put();
99 cr = NULL;
100 }
101 if (retcode >= 0) {
102 break;
103 }
104 if (retcode != -EBUSY && retcode != -EAGAIN) {
105 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
106 if (exit_on_error) {
107 return set_cr_error(retcode);
108 }
109 }
110 if (reset_backoff) {
111 backoff.reset();
112 }
113 yield backoff.backoff(this);
114 }
115
116 // run an optional finisher
117 yield call(alloc_finisher_cr());
118 if (retcode < 0) {
119 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
120 return set_cr_error(retcode);
121 }
122 return set_cr_done();
123 }
124 return 0;
125 }
126
127 void rgw_mdlog_info::decode_json(JSONObj *obj) {
128 JSONDecoder::decode_json("num_objects", num_shards, obj);
129 JSONDecoder::decode_json("period", period, obj);
130 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
131 }
132
133 void rgw_mdlog_entry::decode_json(JSONObj *obj) {
134 JSONDecoder::decode_json("id", id, obj);
135 JSONDecoder::decode_json("section", section, obj);
136 JSONDecoder::decode_json("name", name, obj);
137 utime_t ut;
138 JSONDecoder::decode_json("timestamp", ut, obj);
139 timestamp = ut.to_real_time();
140 JSONDecoder::decode_json("data", log_data, obj);
141 }
142
143 void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
144 JSONDecoder::decode_json("marker", marker, obj);
145 JSONDecoder::decode_json("truncated", truncated, obj);
146 JSONDecoder::decode_json("entries", entries, obj);
147 };
148
149 int RGWShardCollectCR::operate() {
150 reenter(this) {
151 while (spawn_next()) {
152 current_running++;
153
154 while (current_running >= max_concurrent) {
155 int child_ret;
156 yield wait_for_child();
157 if (collect_next(&child_ret)) {
158 current_running--;
159 if (child_ret < 0 && child_ret != -ENOENT) {
160 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
161 status = child_ret;
162 }
163 }
164 }
165 }
166 while (current_running > 0) {
167 int child_ret;
168 yield wait_for_child();
169 if (collect_next(&child_ret)) {
170 current_running--;
171 if (child_ret < 0 && child_ret != -ENOENT) {
172 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
173 status = child_ret;
174 }
175 }
176 }
177 if (status < 0) {
178 return set_cr_error(status);
179 }
180 return set_cr_done();
181 }
182 return 0;
183 }
184
185 class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
186 RGWMetaSyncEnv *sync_env;
187
188 const std::string& period;
189 int num_shards;
190 map<int, RGWMetadataLogInfo> *mdlog_info;
191
192 int shard_id;
193 #define READ_MDLOG_MAX_CONCURRENT 10
194
195 public:
196 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
197 const std::string& period, int _num_shards,
198 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
199 sync_env(_sync_env),
200 period(period), num_shards(_num_shards),
201 mdlog_info(_mdlog_info), shard_id(0) {}
202 bool spawn_next() override;
203 };
204
205 class RGWListRemoteMDLogCR : public RGWShardCollectCR {
206 RGWMetaSyncEnv *sync_env;
207
208 const std::string& period;
209 map<int, string> shards;
210 int max_entries_per_shard;
211 map<int, rgw_mdlog_shard_data> *result;
212
213 map<int, string>::iterator iter;
214 #define READ_MDLOG_MAX_CONCURRENT 10
215
216 public:
217 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
218 const std::string& period, map<int, string>& _shards,
219 int _max_entries_per_shard,
220 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
221 sync_env(_sync_env), period(period),
222 max_entries_per_shard(_max_entries_per_shard),
223 result(_result) {
224 shards.swap(_shards);
225 iter = shards.begin();
226 }
227 bool spawn_next() override;
228 };
229
230 RGWRemoteMetaLog::~RGWRemoteMetaLog()
231 {
232 delete error_logger;
233 }
234
235 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
236 {
237 rgw_http_param_pair pairs[] = { { "type", "metadata" },
238 { NULL, NULL } };
239
240 int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
241 if (ret < 0) {
242 ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl;
243 return ret;
244 }
245
246 ldout(store->ctx(), 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
247
248 return 0;
249 }
250
251 int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
252 {
253 if (store->is_meta_master()) {
254 return 0;
255 }
256
257 rgw_mdlog_info log_info;
258 int ret = read_log_info(&log_info);
259 if (ret < 0) {
260 return ret;
261 }
262
263 return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
264 }
265
266 int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
267 {
268 if (store->is_meta_master()) {
269 return 0;
270 }
271
272 return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
273 }
274
275 int RGWRemoteMetaLog::init()
276 {
277 conn = store->rest_master_conn;
278
279 int ret = http_manager.set_threaded();
280 if (ret < 0) {
281 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
282 return ret;
283 }
284
285 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
286
287 init_sync_env(&sync_env);
288
289 return 0;
290 }
291
292 void RGWRemoteMetaLog::finish()
293 {
294 going_down = true;
295 stop();
296 }
297
298 #define CLONE_MAX_ENTRIES 100
299
300 int RGWMetaSyncStatusManager::init()
301 {
302 if (store->is_meta_master()) {
303 return 0;
304 }
305
306 if (!store->rest_master_conn) {
307 lderr(store->ctx()) << "no REST connection to master zone" << dendl;
308 return -EIO;
309 }
310
311 int r = rgw_init_ioctx(store->get_rados_handle(), store->get_zone_params().log_pool, ioctx, true);
312 if (r < 0) {
313 lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->get_zone_params().log_pool << " ret=" << r << dendl;
314 return r;
315 }
316
317 r = master_log.init();
318 if (r < 0) {
319 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
320 return r;
321 }
322
323 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
324
325 rgw_meta_sync_status sync_status;
326 r = read_sync_status(&sync_status);
327 if (r < 0 && r != -ENOENT) {
328 lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
329 return r;
330 }
331
332 int num_shards = sync_status.sync_info.num_shards;
333
334 for (int i = 0; i < num_shards; i++) {
335 shard_objs[i] = rgw_raw_obj(store->get_zone_params().log_pool, sync_env.shard_obj_name(i));
336 }
337
338 RWLock::WLocker wl(ts_to_shard_lock);
339 for (int i = 0; i < num_shards; i++) {
340 clone_markers.push_back(string());
341 utime_shard ut;
342 ut.shard_id = i;
343 ts_to_shard[ut] = i;
344 }
345
346 return 0;
347 }
348
349 void RGWMetaSyncEnv::init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
350 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
351 RGWSyncErrorLogger *_error_logger) {
352 cct = _cct;
353 store = _store;
354 conn = _conn;
355 async_rados = _async_rados;
356 http_manager = _http_manager;
357 error_logger = _error_logger;
358 }
359
360 string RGWMetaSyncEnv::status_oid()
361 {
362 return mdlog_sync_status_oid;
363 }
364
365 string RGWMetaSyncEnv::shard_obj_name(int shard_id)
366 {
367 char buf[mdlog_sync_status_shard_prefix.size() + 16];
368 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
369
370 return string(buf);
371 }
372
373 class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
374 RGWRados *store;
375 RGWMetadataLog *mdlog;
376 int shard_id;
377 string *marker;
378 int max_entries;
379 list<cls_log_entry> *entries;
380 bool *truncated;
381
382 protected:
383 int _send_request() override {
384 real_time from_time;
385 real_time end_time;
386
387 void *handle;
388
389 mdlog->init_list_entries(shard_id, from_time, end_time, *marker, &handle);
390
391 int ret = mdlog->list_entries(handle, max_entries, *entries, marker, truncated);
392
393 mdlog->complete_list_entries(handle);
394
395 return ret;
396 }
397 public:
398 RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
399 RGWMetadataLog* mdlog, int _shard_id,
400 string* _marker, int _max_entries,
401 list<cls_log_entry> *_entries, bool *_truncated)
402 : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
403 shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
404 entries(_entries), truncated(_truncated) {}
405 };
406
407 class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
408 RGWMetaSyncEnv *sync_env;
409 RGWMetadataLog *const mdlog;
410 int shard_id;
411 string marker;
412 string *pmarker;
413 int max_entries;
414 list<cls_log_entry> *entries;
415 bool *truncated;
416
417 RGWAsyncReadMDLogEntries *req{nullptr};
418
419 public:
420 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
421 int _shard_id, string*_marker, int _max_entries,
422 list<cls_log_entry> *_entries, bool *_truncated)
423 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
424 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
425 entries(_entries), truncated(_truncated) {}
426
427 ~RGWReadMDLogEntriesCR() override {
428 if (req) {
429 req->finish();
430 }
431 }
432
433 int send_request() override {
434 marker = *pmarker;
435 req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
436 sync_env->store, mdlog, shard_id, &marker,
437 max_entries, entries, truncated);
438 sync_env->async_rados->queue(req);
439 return 0;
440 }
441
442 int request_complete() override {
443 int ret = req->get_ret_status();
444 if (ret >= 0 && !entries->empty()) {
445 *pmarker = marker;
446 }
447 return req->get_ret_status();
448 }
449 };
450
451
452 class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
453 RGWMetaSyncEnv *env;
454 RGWRESTReadResource *http_op;
455
456 const std::string& period;
457 int shard_id;
458 RGWMetadataLogInfo *shard_info;
459
460 public:
461 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
462 int _shard_id, RGWMetadataLogInfo *_shard_info)
463 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
464 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
465
466 int operate() override {
467 auto store = env->store;
468 RGWRESTConn *conn = store->rest_master_conn;
469 reenter(this) {
470 yield {
471 char buf[16];
472 snprintf(buf, sizeof(buf), "%d", shard_id);
473 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
474 { "id", buf },
475 { "period", period.c_str() },
476 { "info" , NULL },
477 { NULL, NULL } };
478
479 string p = "/admin/log/";
480
481 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
482 env->http_manager);
483
484 http_op->set_user_info((void *)stack);
485
486 int ret = http_op->aio_read();
487 if (ret < 0) {
488 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
489 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
490 http_op->put();
491 return set_cr_error(ret);
492 }
493
494 return io_block(0);
495 }
496 yield {
497 int ret = http_op->wait(shard_info);
498 http_op->put();
499 if (ret < 0) {
500 return set_cr_error(ret);
501 }
502 return set_cr_done();
503 }
504 }
505 return 0;
506 }
507 };
508
509 class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
510 RGWMetaSyncEnv *sync_env;
511 RGWRESTReadResource *http_op;
512
513 const std::string& period;
514 int shard_id;
515 string marker;
516 uint32_t max_entries;
517 rgw_mdlog_shard_data *result;
518
519 public:
520 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
521 int _shard_id, const string& _marker, uint32_t _max_entries,
522 rgw_mdlog_shard_data *_result)
523 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
524 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
525
526 int send_request() override {
527 RGWRESTConn *conn = sync_env->conn;
528 RGWRados *store = sync_env->store;
529
530 char buf[32];
531 snprintf(buf, sizeof(buf), "%d", shard_id);
532
533 char max_entries_buf[32];
534 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
535
536 const char *marker_key = (marker.empty() ? "" : "marker");
537
538 rgw_http_param_pair pairs[] = { { "type", "metadata" },
539 { "id", buf },
540 { "period", period.c_str() },
541 { "max-entries", max_entries_buf },
542 { marker_key, marker.c_str() },
543 { NULL, NULL } };
544
545 string p = "/admin/log/";
546
547 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
548 http_op->set_user_info((void *)stack);
549
550 int ret = http_op->aio_read();
551 if (ret < 0) {
552 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
553 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
554 http_op->put();
555 return ret;
556 }
557
558 return 0;
559 }
560
561 int request_complete() override {
562 int ret = http_op->wait(result);
563 http_op->put();
564 if (ret < 0 && ret != -ENOENT) {
565 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
566 return ret;
567 }
568 return 0;
569 }
570 };
571
572 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
573 if (shard_id >= num_shards) {
574 return false;
575 }
576 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
577 shard_id++;
578 return true;
579 }
580
581 bool RGWListRemoteMDLogCR::spawn_next() {
582 if (iter == shards.end()) {
583 return false;
584 }
585
586 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
587 ++iter;
588 return true;
589 }
590
591 class RGWInitSyncStatusCoroutine : public RGWCoroutine {
592 RGWMetaSyncEnv *sync_env;
593
594 rgw_meta_sync_info status;
595 vector<RGWMetadataLogInfo> shards_info;
596 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
597 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
598 public:
599 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
600 const rgw_meta_sync_info &status)
601 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
602 status(status), shards_info(status.num_shards),
603 lease_cr(nullptr), lease_stack(nullptr) {}
604
605 ~RGWInitSyncStatusCoroutine() override {
606 if (lease_cr) {
607 lease_cr->abort();
608 }
609 }
610
611 int operate() override {
612 int ret;
613 reenter(this) {
614 yield {
615 set_status("acquiring sync lock");
616 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
617 string lock_name = "sync_lock";
618 RGWRados *store = sync_env->store;
619 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
620 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
621 lock_name, lock_duration, this));
622 lease_stack.reset(spawn(lease_cr.get(), false));
623 }
624 while (!lease_cr->is_locked()) {
625 if (lease_cr->is_done()) {
626 ldout(cct, 5) << "lease cr failed, done early " << dendl;
627 set_status("lease lock failed, early abort");
628 return set_cr_error(lease_cr->get_ret_status());
629 }
630 set_sleeping(true);
631 yield;
632 }
633 yield {
634 set_status("writing sync status");
635 RGWRados *store = sync_env->store;
636 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
637 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
638 status));
639 }
640
641 if (retcode < 0) {
642 set_status("failed to write sync status");
643 ldout(cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
644 yield lease_cr->go_down();
645 return set_cr_error(retcode);
646 }
647 /* fetch current position in logs */
648 set_status("fetching remote log position");
649 yield {
650 for (int i = 0; i < (int)status.num_shards; i++) {
651 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
652 &shards_info[i]), false);
653 }
654 }
655
656 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
657
658 yield {
659 set_status("updating sync status");
660 for (int i = 0; i < (int)status.num_shards; i++) {
661 rgw_meta_sync_marker marker;
662 RGWMetadataLogInfo& info = shards_info[i];
663 marker.next_step_marker = info.marker;
664 marker.timestamp = info.last_update;
665 RGWRados *store = sync_env->store;
666 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
667 store,
668 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
669 marker), true);
670 }
671 }
672 yield {
673 set_status("changing sync state: build full sync maps");
674 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
675 RGWRados *store = sync_env->store;
676 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
677 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
678 status));
679 }
680 set_status("drop lock lease");
681 yield lease_cr->go_down();
682 while (collect(&ret, NULL)) {
683 if (ret < 0) {
684 return set_cr_error(ret);
685 }
686 yield;
687 }
688 drain_all();
689 return set_cr_done();
690 }
691 return 0;
692 }
693 };
694
695 class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
696 static constexpr int MAX_CONCURRENT_SHARDS = 16;
697
698 RGWMetaSyncEnv *env;
699 const int num_shards;
700 int shard_id{0};
701 map<uint32_t, rgw_meta_sync_marker>& markers;
702
703 public:
704 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
705 map<uint32_t, rgw_meta_sync_marker>& markers)
706 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
707 env(env), num_shards(num_shards), markers(markers)
708 {}
709 bool spawn_next() override;
710 };
711
712 bool RGWReadSyncStatusMarkersCR::spawn_next()
713 {
714 if (shard_id >= num_shards) {
715 return false;
716 }
717 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
718 rgw_raw_obj obj{env->store->get_zone_params().log_pool,
719 env->shard_obj_name(shard_id)};
720 spawn(new CR(env->async_rados, env->store, obj, &markers[shard_id]), false);
721 shard_id++;
722 return true;
723 }
724
725 class RGWReadSyncStatusCoroutine : public RGWCoroutine {
726 RGWMetaSyncEnv *sync_env;
727 rgw_meta_sync_status *sync_status;
728
729 public:
730 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
731 rgw_meta_sync_status *_status)
732 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
733 {}
734 int operate() override;
735 };
736
737 int RGWReadSyncStatusCoroutine::operate()
738 {
739 reenter(this) {
740 // read sync info
741 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
742 yield {
743 bool empty_on_enoent = false; // fail on ENOENT
744 rgw_raw_obj obj{sync_env->store->get_zone_params().log_pool,
745 sync_env->status_oid()};
746 call(new ReadInfoCR(sync_env->async_rados, sync_env->store, obj,
747 &sync_status->sync_info, empty_on_enoent));
748 }
749 if (retcode < 0) {
750 ldout(sync_env->cct, 4) << "failed to read sync status info with "
751 << cpp_strerror(retcode) << dendl;
752 return set_cr_error(retcode);
753 }
754 // read shard markers
755 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
756 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
757 sync_status->sync_markers));
758 if (retcode < 0) {
759 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
760 << cpp_strerror(retcode) << dendl;
761 return set_cr_error(retcode);
762 }
763 return set_cr_done();
764 }
765 return 0;
766 }
767
768 class RGWFetchAllMetaCR : public RGWCoroutine {
769 RGWMetaSyncEnv *sync_env;
770
771 int num_shards;
772
773
774 int ret_status;
775
776 list<string> sections;
777 list<string>::iterator sections_iter;
778
779 struct meta_list_result {
780 list<string> keys;
781 string marker;
782 uint64_t count{0};
783 bool truncated{false};
784
785 void decode_json(JSONObj *obj) {
786 JSONDecoder::decode_json("keys", keys, obj);
787 JSONDecoder::decode_json("marker", marker, obj);
788 JSONDecoder::decode_json("count", count, obj);
789 JSONDecoder::decode_json("truncated", truncated, obj);
790 }
791 } result;
792 list<string>::iterator iter;
793
794 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
795
796 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
797 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
798 bool lost_lock;
799 bool failed;
800
801 string marker;
802
803 map<uint32_t, rgw_meta_sync_marker>& markers;
804
805 public:
806 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
807 map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
808 num_shards(_num_shards),
809 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
810 lost_lock(false), failed(false), markers(_markers) {
811 }
812
813 ~RGWFetchAllMetaCR() override {
814 }
815
816 void append_section_from_set(set<string>& all_sections, const string& name) {
817 set<string>::iterator iter = all_sections.find(name);
818 if (iter != all_sections.end()) {
819 sections.emplace_back(std::move(*iter));
820 all_sections.erase(iter);
821 }
822 }
823 /*
824 * meta sync should go in the following order: user, bucket.instance, bucket
825 * then whatever other sections exist (if any)
826 */
827 void rearrange_sections() {
828 set<string> all_sections;
829 std::move(sections.begin(), sections.end(),
830 std::inserter(all_sections, all_sections.end()));
831 sections.clear();
832
833 append_section_from_set(all_sections, "user");
834 append_section_from_set(all_sections, "bucket.instance");
835 append_section_from_set(all_sections, "bucket");
836
837 std::move(all_sections.begin(), all_sections.end(),
838 std::back_inserter(sections));
839 }
840
841 int operate() override {
842 RGWRESTConn *conn = sync_env->conn;
843
844 reenter(this) {
845 yield {
846 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
847 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
848 string lock_name = "sync_lock";
849 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
850 sync_env->store,
851 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->status_oid()),
852 lock_name, lock_duration, this));
853 lease_stack.reset(spawn(lease_cr.get(), false));
854 }
855 while (!lease_cr->is_locked()) {
856 if (lease_cr->is_done()) {
857 ldout(cct, 5) << "lease cr failed, done early " << dendl;
858 set_status("failed acquiring lock");
859 return set_cr_error(lease_cr->get_ret_status());
860 }
861 set_sleeping(true);
862 yield;
863 }
864 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
865 sync_env->store->get_zone_params().log_pool,
866 mdlog_sync_full_sync_index_prefix));
867 yield {
868 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
869 "/admin/metadata", NULL, &sections));
870 }
871 if (get_ret_status() < 0) {
872 ldout(cct, 0) << "ERROR: failed to fetch metadata sections" << dendl;
873 yield entries_index->finish();
874 yield lease_cr->go_down();
875 drain_all();
876 return set_cr_error(get_ret_status());
877 }
878 rearrange_sections();
879 sections_iter = sections.begin();
880 for (; sections_iter != sections.end(); ++sections_iter) {
881 do {
882 yield {
883 #define META_FULL_SYNC_CHUNK_SIZE "1000"
884 string entrypoint = string("/admin/metadata/") + *sections_iter;
885 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
886 { "marker", result.marker.c_str() },
887 { NULL, NULL } };
888 result.keys.clear();
889 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
890 entrypoint, pairs, &result));
891 }
892 if (get_ret_status() < 0) {
893 ldout(cct, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
894 yield entries_index->finish();
895 yield lease_cr->go_down();
896 drain_all();
897 return set_cr_error(get_ret_status());
898 }
899 iter = result.keys.begin();
900 for (; iter != result.keys.end(); ++iter) {
901 if (!lease_cr->is_locked()) {
902 lost_lock = true;
903 break;
904 }
905 yield; // allow entries_index consumer to make progress
906
907 ldout(cct, 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
908 string s = *sections_iter + ":" + *iter;
909 int shard_id;
910 RGWRados *store = sync_env->store;
911 int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
912 if (ret < 0) {
913 ldout(cct, 0) << "ERROR: could not determine shard id for " << *sections_iter << ":" << *iter << dendl;
914 ret_status = ret;
915 break;
916 }
917 if (!entries_index->append(s, shard_id)) {
918 break;
919 }
920 }
921 } while (result.truncated);
922 }
923 yield {
924 if (!entries_index->finish()) {
925 failed = true;
926 }
927 }
928 if (!failed) {
929 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
930 int shard_id = (int)iter->first;
931 rgw_meta_sync_marker& marker = iter->second;
932 marker.total_entries = entries_index->get_total_entries(shard_id);
933 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store,
934 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
935 marker), true);
936 }
937 }
938
939 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
940
941 yield lease_cr->go_down();
942
943 int ret;
944 while (collect(&ret, NULL)) {
945 if (ret < 0) {
946 return set_cr_error(ret);
947 }
948 yield;
949 }
950 drain_all();
951 if (failed) {
952 yield return set_cr_error(-EIO);
953 }
954 if (lost_lock) {
955 yield return set_cr_error(-EBUSY);
956 }
957
958 if (ret_status < 0) {
959 yield return set_cr_error(ret_status);
960 }
961
962 yield return set_cr_done();
963 }
964 return 0;
965 }
966 };
967
968 static string full_sync_index_shard_oid(int shard_id)
969 {
970 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
971 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
972 return string(buf);
973 }
974
975 class RGWReadRemoteMetadataCR : public RGWCoroutine {
976 RGWMetaSyncEnv *sync_env;
977
978 RGWRESTReadResource *http_op;
979
980 string section;
981 string key;
982
983 bufferlist *pbl;
984
985 public:
986 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
987 const string& _section, const string& _key, bufferlist *_pbl) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
988 http_op(NULL),
989 section(_section),
990 key(_key),
991 pbl(_pbl) {
992 }
993
994 int operate() override {
995 RGWRESTConn *conn = sync_env->conn;
996 reenter(this) {
997 yield {
998 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
999 { NULL, NULL } };
1000
1001 string p = string("/admin/metadata/") + section + "/" + key;
1002
1003 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1004
1005 http_op->set_user_info((void *)stack);
1006
1007 int ret = http_op->aio_read();
1008 if (ret < 0) {
1009 ldout(sync_env->cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
1010 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1011 http_op->put();
1012 return set_cr_error(ret);
1013 }
1014
1015 return io_block(0);
1016 }
1017 yield {
1018 int ret = http_op->wait_bl(pbl);
1019 http_op->put();
1020 if (ret < 0) {
1021 return set_cr_error(ret);
1022 }
1023 return set_cr_done();
1024 }
1025 }
1026 return 0;
1027 }
1028 };
1029
1030 class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
1031 RGWRados *store;
1032 string raw_key;
1033 bufferlist bl;
1034 protected:
1035 int _send_request() override {
1036 int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
1037 if (ret < 0) {
1038 ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1039 return ret;
1040 }
1041 return 0;
1042 }
1043 public:
1044 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1045 const string& _raw_key,
1046 bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
1047 raw_key(_raw_key), bl(_bl) {}
1048 };
1049
1050
1051 class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1052 RGWMetaSyncEnv *sync_env;
1053 string raw_key;
1054 bufferlist bl;
1055
1056 RGWAsyncMetaStoreEntry *req;
1057
1058 public:
1059 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1060 const string& _raw_key,
1061 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1062 raw_key(_raw_key), bl(_bl), req(NULL) {
1063 }
1064
1065 ~RGWMetaStoreEntryCR() override {
1066 if (req) {
1067 req->finish();
1068 }
1069 }
1070
1071 int send_request() override {
1072 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1073 sync_env->store, raw_key, bl);
1074 sync_env->async_rados->queue(req);
1075 return 0;
1076 }
1077
1078 int request_complete() override {
1079 return req->get_ret_status();
1080 }
1081 };
1082
1083 class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
1084 RGWRados *store;
1085 string raw_key;
1086 protected:
1087 int _send_request() override {
1088 int ret = store->meta_mgr->remove(raw_key);
1089 if (ret < 0) {
1090 ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1091 return ret;
1092 }
1093 return 0;
1094 }
1095 public:
1096 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1097 const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
1098 raw_key(_raw_key) {}
1099 };
1100
1101
1102 class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1103 RGWMetaSyncEnv *sync_env;
1104 string raw_key;
1105
1106 RGWAsyncMetaRemoveEntry *req;
1107
1108 public:
1109 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1110 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1111 raw_key(_raw_key), req(NULL) {
1112 }
1113
1114 ~RGWMetaRemoveEntryCR() override {
1115 if (req) {
1116 req->finish();
1117 }
1118 }
1119
1120 int send_request() override {
1121 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1122 sync_env->store, raw_key);
1123 sync_env->async_rados->queue(req);
1124 return 0;
1125 }
1126
1127 int request_complete() override {
1128 int r = req->get_ret_status();
1129 if (r == -ENOENT) {
1130 r = 0;
1131 }
1132 return r;
1133 }
1134 };
1135
1136 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1137
1138
1139 int RGWLastCallerWinsCR::operate() {
1140 RGWCoroutine *call_cr;
1141 reenter(this) {
1142 while (cr) {
1143 call_cr = cr;
1144 cr = nullptr;
1145 yield call(call_cr);
1146 /* cr might have been modified at this point */
1147 }
1148 return set_cr_done();
1149 }
1150 return 0;
1151 }
1152
1153 class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1154 RGWMetaSyncEnv *sync_env;
1155
1156 string marker_oid;
1157 rgw_meta_sync_marker sync_marker;
1158
1159
1160 public:
1161 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1162 const string& _marker_oid,
1163 const rgw_meta_sync_marker& _marker) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
1164 sync_env(_sync_env),
1165 marker_oid(_marker_oid),
1166 sync_marker(_marker) {}
1167
1168 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1169 sync_marker.marker = new_marker;
1170 if (index_pos > 0) {
1171 sync_marker.pos = index_pos;
1172 }
1173
1174 if (!real_clock::is_zero(timestamp)) {
1175 sync_marker.timestamp = timestamp;
1176 }
1177
1178 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1179 RGWRados *store = sync_env->store;
1180 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
1181 store,
1182 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
1183 sync_marker);
1184 }
1185
1186 RGWOrderCallCR *allocate_order_control_cr() {
1187 return new RGWLastCallerWinsCR(sync_env->cct);
1188 }
1189 };
1190
1191 int RGWMetaSyncSingleEntryCR::operate() {
1192 reenter(this) {
1193 #define NUM_TRANSIENT_ERROR_RETRIES 10
1194
1195 if (error_injection &&
1196 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
1197 ldout(sync_env->cct, 0) << __FILE__ << ":" << __LINE__ << ": injecting meta sync error on key=" << raw_key << dendl;
1198 return set_cr_error(-EIO);
1199 }
1200
1201 if (op_status != MDLOG_STATUS_COMPLETE) {
1202 ldout(sync_env->cct, 20) << "skipping pending operation" << dendl;
1203 yield call(marker_tracker->finish(entry_marker));
1204 if (retcode < 0) {
1205 return set_cr_error(retcode);
1206 }
1207 return set_cr_done();
1208 }
1209 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1210 yield {
1211 pos = raw_key.find(':');
1212 section = raw_key.substr(0, pos);
1213 key = raw_key.substr(pos + 1);
1214 ldout(sync_env->cct, 20) << "fetching remote metadata: " << section << ":" << key << (tries == 0 ? "" : " (retry)") << dendl;
1215 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl));
1216 }
1217
1218 sync_status = retcode;
1219
1220 if (sync_status == -ENOENT) {
1221 /* FIXME: do we need to remove the entry from the local zone? */
1222 break;
1223 }
1224
1225 if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1226 ldout(sync_env->cct, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
1227 continue;
1228 }
1229
1230 if (sync_status < 0) {
1231 ldout(sync_env->cct, 10) << *this << ": failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << dendl;
1232 log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1233 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
1234 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1235 return set_cr_error(sync_status);
1236 }
1237
1238 break;
1239 }
1240
1241 retcode = 0;
1242 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1243 if (sync_status != -ENOENT) {
1244 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1245 } else {
1246 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1247 }
1248 if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1249 ldout(sync_env->cct, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
1250 continue;
1251 }
1252 break;
1253 }
1254
1255 sync_status = retcode;
1256
1257 if (sync_status == 0 && marker_tracker) {
1258 /* update marker */
1259 yield call(marker_tracker->finish(entry_marker));
1260 sync_status = retcode;
1261 }
1262 if (sync_status < 0) {
1263 return set_cr_error(sync_status);
1264 }
1265 return set_cr_done();
1266 }
1267 return 0;
1268 }
1269
1270 class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1271 RGWMetaSyncEnv *sync_env;
1272 RGWMetadataLog *mdlog;
1273
1274 const std::string& period;
1275 int shard_id;
1276 string marker;
1277 bool truncated = false;
1278 string *new_marker;
1279
1280 int max_entries = CLONE_MAX_ENTRIES;
1281
1282 RGWRESTReadResource *http_op = nullptr;
1283 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1284
1285 RGWMetadataLogInfo shard_info;
1286 rgw_mdlog_shard_data data;
1287
1288 public:
1289 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1290 const std::string& period, int _id,
1291 const string& _marker, string *_new_marker)
1292 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1293 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1294 if (new_marker) {
1295 *new_marker = marker;
1296 }
1297 }
1298 ~RGWCloneMetaLogCoroutine() override {
1299 if (http_op) {
1300 http_op->put();
1301 }
1302 if (completion) {
1303 completion->cancel();
1304 }
1305 }
1306
1307 int operate() override;
1308
1309 int state_init();
1310 int state_read_shard_status();
1311 int state_read_shard_status_complete();
1312 int state_send_rest_request();
1313 int state_receive_rest_response();
1314 int state_store_mdlog_entries();
1315 int state_store_mdlog_entries_complete();
1316 };
1317
1318 class RGWMetaSyncShardCR : public RGWCoroutine {
1319 RGWMetaSyncEnv *sync_env;
1320
1321 const rgw_pool& pool;
1322 const std::string& period; //< currently syncing period id
1323 const epoch_t realm_epoch; //< realm_epoch of period
1324 RGWMetadataLog* mdlog; //< log of syncing period
1325 uint32_t shard_id;
1326 rgw_meta_sync_marker& sync_marker;
1327 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1328 string marker;
1329 string max_marker;
1330 const std::string& period_marker; //< max marker stored in next period
1331
1332 std::set<std::string> entries;
1333 std::set<std::string>::iterator iter;
1334
1335 string oid;
1336
1337 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1338
1339 list<cls_log_entry> log_entries;
1340 list<cls_log_entry>::iterator log_iter;
1341 bool truncated = false;
1342
1343 string mdlog_marker;
1344 string raw_key;
1345 rgw_mdlog_entry mdlog_entry;
1346
1347 Mutex inc_lock;
1348 Cond inc_cond;
1349
1350 boost::asio::coroutine incremental_cr;
1351 boost::asio::coroutine full_cr;
1352
1353 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1354 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1355
1356 bool lost_lock = false;
1357
1358 bool *reset_backoff;
1359
1360 // hold a reference to the cr stack while it's in the map
1361 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1362 map<StackRef, string> stack_to_pos;
1363 map<string, string> pos_to_prev;
1364
1365 bool can_adjust_marker = false;
1366 bool done_with_period = false;
1367
1368 int total_entries = 0;
1369
1370 public:
1371 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1372 const std::string& period, epoch_t realm_epoch,
1373 RGWMetadataLog* mdlog, uint32_t _shard_id,
1374 rgw_meta_sync_marker& _marker,
1375 const std::string& period_marker, bool *_reset_backoff)
1376 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1377 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1378 shard_id(_shard_id), sync_marker(_marker),
1379 period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
1380 reset_backoff(_reset_backoff) {
1381 *reset_backoff = false;
1382 }
1383
1384 ~RGWMetaSyncShardCR() override {
1385 delete marker_tracker;
1386 if (lease_cr) {
1387 lease_cr->abort();
1388 }
1389 }
1390
1391 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1392 delete marker_tracker;
1393 marker_tracker = mt;
1394 }
1395
1396 int operate() override {
1397 int r;
1398 while (true) {
1399 switch (sync_marker.state) {
1400 case rgw_meta_sync_marker::FullSync:
1401 r = full_sync();
1402 if (r < 0) {
1403 ldout(sync_env->cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1404 return set_cr_error(r);
1405 }
1406 return 0;
1407 case rgw_meta_sync_marker::IncrementalSync:
1408 r = incremental_sync();
1409 if (r < 0) {
1410 ldout(sync_env->cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1411 return set_cr_error(r);
1412 }
1413 return 0;
1414 }
1415 }
1416 /* unreachable */
1417 return 0;
1418 }
1419
1420 void collect_children()
1421 {
1422 int child_ret;
1423 RGWCoroutinesStack *child;
1424 while (collect_next(&child_ret, &child)) {
1425 auto iter = stack_to_pos.find(child);
1426 if (iter == stack_to_pos.end()) {
1427 /* some other stack that we don't care about */
1428 continue;
1429 }
1430
1431 string& pos = iter->second;
1432
1433 if (child_ret < 0) {
1434 ldout(sync_env->cct, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
1435 }
1436
1437 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
1438 assert(prev_iter != pos_to_prev.end());
1439
1440 /*
1441 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1442 * update the marker and abort. We'll get called again for these. Permanent errors will be
1443 * handled by marking the entry at the error log shard, so that we retry on it separately
1444 */
1445 if (child_ret == -EAGAIN) {
1446 can_adjust_marker = false;
1447 }
1448
1449 if (pos_to_prev.size() == 1) {
1450 if (can_adjust_marker) {
1451 sync_marker.marker = pos;
1452 }
1453 pos_to_prev.erase(prev_iter);
1454 } else {
1455 assert(pos_to_prev.size() > 1);
1456 pos_to_prev.erase(prev_iter);
1457 prev_iter = pos_to_prev.begin();
1458 if (can_adjust_marker) {
1459 sync_marker.marker = prev_iter->second;
1460 }
1461 }
1462
1463 ldout(sync_env->cct, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
1464 stack_to_pos.erase(iter);
1465 }
1466 }
1467
1468 int full_sync() {
1469 #define OMAP_GET_MAX_ENTRIES 100
1470 int max_entries = OMAP_GET_MAX_ENTRIES;
1471 reenter(&full_cr) {
1472 set_status("full_sync");
1473 oid = full_sync_index_shard_oid(shard_id);
1474 can_adjust_marker = true;
1475 /* grab lock */
1476 yield {
1477 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1478 string lock_name = "sync_lock";
1479 RGWRados *store = sync_env->store;
1480 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1481 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1482 lock_name, lock_duration, this));
1483 lease_stack.reset(spawn(lease_cr.get(), false));
1484 lost_lock = false;
1485 }
1486 while (!lease_cr->is_locked()) {
1487 if (lease_cr->is_done()) {
1488 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1489 drain_all();
1490 return lease_cr->get_ret_status();
1491 }
1492 set_sleeping(true);
1493 yield;
1494 }
1495
1496 /* lock succeeded, a retry now should avoid previous backoff status */
1497 *reset_backoff = true;
1498
1499 /* prepare marker tracker */
1500 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1501 sync_env->shard_obj_name(shard_id),
1502 sync_marker));
1503
1504 marker = sync_marker.marker;
1505
1506 total_entries = sync_marker.pos;
1507
1508 /* sync! */
1509 do {
1510 if (!lease_cr->is_locked()) {
1511 lost_lock = true;
1512 break;
1513 }
1514 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1515 marker, &entries, max_entries));
1516 if (retcode < 0) {
1517 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1518 yield lease_cr->go_down();
1519 drain_all();
1520 return retcode;
1521 }
1522 iter = entries.begin();
1523 for (; iter != entries.end(); ++iter) {
1524 marker = *iter;
1525 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << marker << dendl;
1526 total_entries++;
1527 if (!marker_tracker->start(marker, total_entries, real_time())) {
1528 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << marker << ". Duplicate entry?" << dendl;
1529 } else {
1530 // fetch remote and write locally
1531 yield {
1532 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker), false);
1533 // stack_to_pos holds a reference to the stack
1534 stack_to_pos[stack] = marker;
1535 pos_to_prev[marker] = marker;
1536 }
1537 }
1538 }
1539 collect_children();
1540 } while ((int)entries.size() == max_entries && can_adjust_marker);
1541
1542 while (num_spawned() > 1) {
1543 yield wait_for_child();
1544 collect_children();
1545 }
1546
1547 if (!lost_lock) {
1548 /* update marker to reflect we're done with full sync */
1549 if (can_adjust_marker) {
1550 // apply updates to a temporary marker, or operate() will send us
1551 // to incremental_sync() after we yield
1552 temp_marker = sync_marker;
1553 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1554 temp_marker->marker = std::move(temp_marker->next_step_marker);
1555 temp_marker->next_step_marker.clear();
1556 temp_marker->realm_epoch = realm_epoch;
1557 ldout(sync_env->cct, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
1558
1559 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
1560 yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store,
1561 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1562 *temp_marker));
1563 }
1564
1565 if (retcode < 0) {
1566 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1567 yield lease_cr->go_down();
1568 drain_all();
1569 return retcode;
1570 }
1571 }
1572
1573 /*
1574 * if we reached here, it means that lost_lock is true, otherwise the state
1575 * change in the previous block will prevent us from reaching here
1576 */
1577
1578 yield lease_cr->go_down();
1579
1580 lease_cr.reset();
1581
1582 drain_all();
1583
1584 if (!can_adjust_marker) {
1585 return -EAGAIN;
1586 }
1587
1588 if (lost_lock) {
1589 return -EBUSY;
1590 }
1591
1592 // apply the sync marker update
1593 assert(temp_marker);
1594 sync_marker = std::move(*temp_marker);
1595 temp_marker = boost::none;
1596 // must not yield after this point!
1597 }
1598 return 0;
1599 }
1600
1601
1602 int incremental_sync() {
1603 reenter(&incremental_cr) {
1604 set_status("incremental_sync");
1605 can_adjust_marker = true;
1606 /* grab lock */
1607 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1608 yield {
1609 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1610 string lock_name = "sync_lock";
1611 RGWRados *store = sync_env->store;
1612 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1613 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1614 lock_name, lock_duration, this));
1615 lease_stack.reset(spawn(lease_cr.get(), false));
1616 lost_lock = false;
1617 }
1618 while (!lease_cr->is_locked()) {
1619 if (lease_cr->is_done()) {
1620 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1621 drain_all();
1622 return lease_cr->get_ret_status();
1623 }
1624 set_sleeping(true);
1625 yield;
1626 }
1627 }
1628 // if the period has advanced, we can't use the existing marker
1629 if (sync_marker.realm_epoch < realm_epoch) {
1630 ldout(sync_env->cct, 4) << "clearing marker=" << sync_marker.marker
1631 << " from old realm_epoch=" << sync_marker.realm_epoch
1632 << " (now " << realm_epoch << ')' << dendl;
1633 sync_marker.realm_epoch = realm_epoch;
1634 sync_marker.marker.clear();
1635 }
1636 mdlog_marker = sync_marker.marker;
1637 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1638 sync_env->shard_obj_name(shard_id),
1639 sync_marker));
1640
1641 /*
1642 * mdlog_marker: the remote sync marker positiion
1643 * sync_marker: the local sync marker position
1644 * max_marker: the max mdlog position that we fetched
1645 * marker: the current position we try to sync
1646 * period_marker: the last marker before the next period begins (optional)
1647 */
1648 marker = max_marker = sync_marker.marker;
1649 /* inc sync */
1650 do {
1651 if (!lease_cr->is_locked()) {
1652 lost_lock = true;
1653 break;
1654 }
1655 #define INCREMENTAL_MAX_ENTRIES 100
1656 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1657 if (!period_marker.empty() && period_marker <= mdlog_marker) {
1658 ldout(cct, 10) << "mdlog_marker past period_marker=" << period_marker << dendl;
1659 done_with_period = true;
1660 break;
1661 }
1662 if (mdlog_marker <= max_marker) {
1663 /* we're at the tip, try to bring more entries */
1664 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
1665 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1666 period, shard_id,
1667 mdlog_marker, &mdlog_marker));
1668 }
1669 if (retcode < 0) {
1670 ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl;
1671 yield lease_cr->go_down();
1672 drain_all();
1673 *reset_backoff = false; // back off and try again later
1674 return retcode;
1675 }
1676 *reset_backoff = true; /* if we got to this point, all systems function */
1677 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1678 if (mdlog_marker > max_marker) {
1679 marker = max_marker;
1680 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1681 &max_marker, INCREMENTAL_MAX_ENTRIES,
1682 &log_entries, &truncated));
1683 if (retcode < 0) {
1684 ldout(sync_env->cct, 10) << *this << ": failed to list mdlog entries, retcode=" << retcode << dendl;
1685 yield lease_cr->go_down();
1686 drain_all();
1687 *reset_backoff = false; // back off and try again later
1688 return retcode;
1689 }
1690 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1691 if (!period_marker.empty() && period_marker <= log_iter->id) {
1692 done_with_period = true;
1693 if (period_marker < log_iter->id) {
1694 ldout(cct, 10) << "found key=" << log_iter->id
1695 << " past period_marker=" << period_marker << dendl;
1696 break;
1697 }
1698 ldout(cct, 10) << "found key at period_marker=" << period_marker << dendl;
1699 // sync this entry, then return control to RGWMetaSyncCR
1700 }
1701 if (!mdlog_entry.convert_from(*log_iter)) {
1702 ldout(sync_env->cct, 0) << __func__ << ":" << __LINE__ << ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry" << dendl;
1703 continue;
1704 }
1705 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
1706 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
1707 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
1708 } else {
1709 raw_key = log_iter->section + ":" + log_iter->name;
1710 yield {
1711 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
1712 assert(stack);
1713 // stack_to_pos holds a reference to the stack
1714 stack_to_pos[stack] = log_iter->id;
1715 pos_to_prev[log_iter->id] = marker;
1716 }
1717 }
1718 marker = log_iter->id;
1719 }
1720 }
1721 collect_children();
1722 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1723 if (done_with_period) {
1724 // return control to RGWMetaSyncCR and advance to the next period
1725 ldout(sync_env->cct, 10) << *this << ": done with period" << dendl;
1726 break;
1727 }
1728 if (mdlog_marker == max_marker && can_adjust_marker) {
1729 #define INCREMENTAL_INTERVAL 20
1730 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1731 }
1732 } while (can_adjust_marker);
1733
1734 while (num_spawned() > 1) {
1735 yield wait_for_child();
1736 collect_children();
1737 }
1738
1739 yield lease_cr->go_down();
1740
1741 drain_all();
1742
1743 if (lost_lock) {
1744 return -EBUSY;
1745 }
1746
1747 if (!can_adjust_marker) {
1748 return -EAGAIN;
1749 }
1750
1751 return set_cr_done();
1752 }
1753 /* TODO */
1754 return 0;
1755 }
1756 };
1757
1758 class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1759 {
1760 RGWMetaSyncEnv *sync_env;
1761
1762 const rgw_pool& pool;
1763 const std::string& period;
1764 epoch_t realm_epoch;
1765 RGWMetadataLog* mdlog;
1766 uint32_t shard_id;
1767 rgw_meta_sync_marker sync_marker;
1768 const std::string period_marker;
1769
1770 static constexpr bool exit_on_error = false; // retry on all errors
1771 public:
1772 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1773 const std::string& period, epoch_t realm_epoch,
1774 RGWMetadataLog* mdlog, uint32_t _shard_id,
1775 const rgw_meta_sync_marker& _marker,
1776 std::string&& period_marker)
1777 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1778 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1779 shard_id(_shard_id), sync_marker(_marker),
1780 period_marker(std::move(period_marker)) {}
1781
1782 RGWCoroutine *alloc_cr() override {
1783 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
1784 shard_id, sync_marker, period_marker, backoff_ptr());
1785 }
1786
1787 RGWCoroutine *alloc_finisher_cr() override {
1788 RGWRados *store = sync_env->store;
1789 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store,
1790 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1791 &sync_marker);
1792 }
1793 };
1794
1795 class RGWMetaSyncCR : public RGWCoroutine {
1796 RGWMetaSyncEnv *sync_env;
1797 const rgw_pool& pool;
1798 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1799 RGWPeriodHistory::Cursor next; //< next period in history
1800 rgw_meta_sync_status sync_status;
1801
1802 std::mutex mutex; //< protect access to shard_crs
1803
1804 // TODO: it should be enough to hold a reference on the stack only, as calling
1805 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1806 // already completed
1807 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1808 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1809 using RefPair = std::pair<ControlCRRef, StackRef>;
1810 map<int, RefPair> shard_crs;
1811 int ret{0};
1812
1813 public:
1814 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor,
1815 const rgw_meta_sync_status& _sync_status)
1816 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1817 pool(sync_env->store->get_zone_params().log_pool),
1818 cursor(cursor), sync_status(_sync_status) {}
1819
1820 int operate() override {
1821 reenter(this) {
1822 // loop through one period at a time
1823 for (;;) {
1824 if (cursor == sync_env->store->period_history->get_current()) {
1825 next = RGWPeriodHistory::Cursor{};
1826 if (cursor) {
1827 ldout(cct, 10) << "RGWMetaSyncCR on current period="
1828 << cursor.get_period().get_id() << dendl;
1829 } else {
1830 ldout(cct, 10) << "RGWMetaSyncCR with no period" << dendl;
1831 }
1832 } else {
1833 next = cursor;
1834 next.next();
1835 ldout(cct, 10) << "RGWMetaSyncCR on period="
1836 << cursor.get_period().get_id() << ", next="
1837 << next.get_period().get_id() << dendl;
1838 }
1839
1840 yield {
1841 // get the mdlog for the current period (may be empty)
1842 auto& period_id = sync_status.sync_info.period;
1843 auto realm_epoch = sync_status.sync_info.realm_epoch;
1844 auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
1845
1846 // prevent wakeup() from accessing shard_crs while we're spawning them
1847 std::lock_guard<std::mutex> lock(mutex);
1848
1849 // sync this period on each shard
1850 for (const auto& m : sync_status.sync_markers) {
1851 uint32_t shard_id = m.first;
1852 auto& marker = m.second;
1853
1854 std::string period_marker;
1855 if (next) {
1856 // read the maximum marker from the next period's sync status
1857 period_marker = next.get_period().get_sync_status()[shard_id];
1858 if (period_marker.empty()) {
1859 // no metadata changes have occurred on this shard, skip it
1860 ldout(cct, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1861 << " with empty period marker" << dendl;
1862 continue;
1863 }
1864 }
1865
1866 using ShardCR = RGWMetaSyncShardControlCR;
1867 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1868 mdlog, shard_id, marker,
1869 std::move(period_marker));
1870 auto stack = spawn(cr, false);
1871 shard_crs[shard_id] = RefPair{cr, stack};
1872 }
1873 }
1874 // wait for each shard to complete
1875 while (ret == 0 && num_spawned() > 0) {
1876 yield wait_for_child();
1877 collect(&ret, nullptr);
1878 }
1879 drain_all();
1880 {
1881 // drop shard cr refs under lock
1882 std::lock_guard<std::mutex> lock(mutex);
1883 shard_crs.clear();
1884 }
1885 if (ret < 0) {
1886 return set_cr_error(ret);
1887 }
1888 // advance to the next period
1889 assert(next);
1890 cursor = next;
1891
1892 // write the updated sync info
1893 sync_status.sync_info.period = cursor.get_period().get_id();
1894 sync_status.sync_info.realm_epoch = cursor.get_epoch();
1895 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
1896 sync_env->store,
1897 rgw_raw_obj(pool, sync_env->status_oid()),
1898 sync_status.sync_info));
1899 }
1900 }
1901 return 0;
1902 }
1903
1904 void wakeup(int shard_id) {
1905 std::lock_guard<std::mutex> lock(mutex);
1906 auto iter = shard_crs.find(shard_id);
1907 if (iter == shard_crs.end()) {
1908 return;
1909 }
1910 iter->second.first->wakeup();
1911 }
1912 };
1913
1914 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
1915 env->cct = store->ctx();
1916 env->store = store;
1917 env->conn = conn;
1918 env->async_rados = async_rados;
1919 env->http_manager = &http_manager;
1920 env->error_logger = error_logger;
1921 }
1922
1923 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
1924 {
1925 if (store->is_meta_master()) {
1926 return 0;
1927 }
1928 // cannot run concurrently with run_sync(), so run in a separate manager
1929 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
1930 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
1931 int ret = http_manager.set_threaded();
1932 if (ret < 0) {
1933 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
1934 return ret;
1935 }
1936 RGWMetaSyncEnv sync_env_local = sync_env;
1937 sync_env_local.http_manager = &http_manager;
1938 ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
1939 http_manager.stop();
1940 return ret;
1941 }
1942
1943 int RGWRemoteMetaLog::init_sync_status()
1944 {
1945 if (store->is_meta_master()) {
1946 return 0;
1947 }
1948
1949 rgw_mdlog_info mdlog_info;
1950 int r = read_log_info(&mdlog_info);
1951 if (r < 0) {
1952 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
1953 return r;
1954 }
1955
1956 rgw_meta_sync_info sync_info;
1957 sync_info.num_shards = mdlog_info.num_shards;
1958 auto cursor = store->period_history->get_current();
1959 if (cursor) {
1960 sync_info.period = cursor.get_period().get_id();
1961 sync_info.realm_epoch = cursor.get_epoch();
1962 }
1963
1964 return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
1965 }
1966
1967 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
1968 {
1969 return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store,
1970 rgw_raw_obj(store->get_zone_params().log_pool, sync_env.status_oid()),
1971 sync_info));
1972 }
1973
1974 // return a cursor to the period at our sync position
1975 static RGWPeriodHistory::Cursor get_period_at(RGWRados* store,
1976 const rgw_meta_sync_info& info)
1977 {
1978 if (info.period.empty()) {
1979 // return an empty cursor with error=0
1980 return RGWPeriodHistory::Cursor{};
1981 }
1982
1983 // look for an existing period in our history
1984 auto cursor = store->period_history->lookup(info.realm_epoch);
1985 if (cursor) {
1986 // verify that the period ids match
1987 auto& existing = cursor.get_period().get_id();
1988 if (existing != info.period) {
1989 lderr(store->ctx()) << "ERROR: sync status period=" << info.period
1990 << " does not match period=" << existing
1991 << " in history at realm epoch=" << info.realm_epoch << dendl;
1992 return RGWPeriodHistory::Cursor{-EEXIST};
1993 }
1994 return cursor;
1995 }
1996
1997 // read the period from rados or pull it from the master
1998 RGWPeriod period;
1999 int r = store->period_puller->pull(info.period, period);
2000 if (r < 0) {
2001 lderr(store->ctx()) << "ERROR: failed to read period id "
2002 << info.period << ": " << cpp_strerror(r) << dendl;
2003 return RGWPeriodHistory::Cursor{r};
2004 }
2005 // attach the period to our history
2006 cursor = store->period_history->attach(std::move(period));
2007 if (!cursor) {
2008 r = cursor.get_error();
2009 lderr(store->ctx()) << "ERROR: failed to read period history back to "
2010 << info.period << ": " << cpp_strerror(r) << dendl;
2011 }
2012 return cursor;
2013 }
2014
2015 int RGWRemoteMetaLog::run_sync()
2016 {
2017 if (store->is_meta_master()) {
2018 return 0;
2019 }
2020
2021 int r = 0;
2022
2023 // get shard count and oldest log period from master
2024 rgw_mdlog_info mdlog_info;
2025 for (;;) {
2026 if (going_down) {
2027 ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
2028 return 0;
2029 }
2030 r = read_log_info(&mdlog_info);
2031 if (r == -EIO || r == -ENOENT) {
2032 // keep retrying if master isn't alive or hasn't initialized the log
2033 ldout(store->ctx(), 10) << __func__ << "(): waiting for master.." << dendl;
2034 backoff.backoff_sleep();
2035 continue;
2036 }
2037 backoff.reset();
2038 if (r < 0) {
2039 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2040 return r;
2041 }
2042 break;
2043 }
2044
2045 rgw_meta_sync_status sync_status;
2046 do {
2047 if (going_down) {
2048 ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
2049 return 0;
2050 }
2051 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2052 if (r < 0 && r != -ENOENT) {
2053 ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2054 return r;
2055 }
2056
2057 if (!mdlog_info.period.empty()) {
2058 // restart sync if the remote has a period, but:
2059 // a) our status does not, or
2060 // b) our sync period comes before the remote's oldest log period
2061 if (sync_status.sync_info.period.empty() ||
2062 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2063 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
2064 ldout(store->ctx(), 1) << "epoch=" << sync_status.sync_info.realm_epoch
2065 << " in sync status comes before remote's oldest mdlog epoch="
2066 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2067 }
2068 }
2069
2070 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
2071 ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
2072 sync_status.sync_info.num_shards = mdlog_info.num_shards;
2073 auto cursor = store->period_history->get_current();
2074 if (cursor) {
2075 // run full sync, then start incremental from the current period/epoch
2076 sync_status.sync_info.period = cursor.get_period().get_id();
2077 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2078 }
2079 r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2080 if (r == -EBUSY) {
2081 backoff.backoff_sleep();
2082 continue;
2083 }
2084 backoff.reset();
2085 if (r < 0) {
2086 ldout(store->ctx(), 0) << "ERROR: failed to init sync status r=" << r << dendl;
2087 return r;
2088 }
2089 }
2090 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2091
2092 auto num_shards = sync_status.sync_info.num_shards;
2093 if (num_shards != mdlog_info.num_shards) {
2094 lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2095 return -EINVAL;
2096 }
2097
2098 RGWPeriodHistory::Cursor cursor;
2099 do {
2100 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2101 if (r < 0 && r != -ENOENT) {
2102 ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2103 return r;
2104 }
2105
2106 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2107 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
2108 ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
2109 r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers));
2110 if (r == -EBUSY || r == -EAGAIN) {
2111 backoff.backoff_sleep();
2112 continue;
2113 }
2114 backoff.reset();
2115 if (r < 0) {
2116 ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
2117 return r;
2118 }
2119
2120 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2121 r = store_sync_info(sync_status.sync_info);
2122 if (r < 0) {
2123 ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
2124 return r;
2125 }
2126 /* fall through */
2127 case rgw_meta_sync_info::StateSync:
2128 ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
2129 // find our position in the period history (if any)
2130 cursor = get_period_at(store, sync_status.sync_info);
2131 r = cursor.get_error();
2132 if (r < 0) {
2133 return r;
2134 }
2135 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status);
2136 r = run(meta_sync_cr);
2137 if (r < 0) {
2138 ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
2139 return r;
2140 }
2141 break;
2142 default:
2143 ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
2144 return -EIO;
2145 }
2146 } while (!going_down);
2147
2148 return 0;
2149 }
2150
2151 void RGWRemoteMetaLog::wakeup(int shard_id)
2152 {
2153 if (!meta_sync_cr) {
2154 return;
2155 }
2156 meta_sync_cr->wakeup(shard_id);
2157 }
2158
2159 int RGWCloneMetaLogCoroutine::operate()
2160 {
2161 reenter(this) {
2162 do {
2163 yield {
2164 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
2165 return state_init();
2166 }
2167 yield {
2168 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
2169 return state_read_shard_status();
2170 }
2171 yield {
2172 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
2173 return state_read_shard_status_complete();
2174 }
2175 yield {
2176 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
2177 return state_send_rest_request();
2178 }
2179 yield {
2180 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
2181 return state_receive_rest_response();
2182 }
2183 yield {
2184 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
2185 return state_store_mdlog_entries();
2186 }
2187 } while (truncated);
2188 yield {
2189 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
2190 return state_store_mdlog_entries_complete();
2191 }
2192 }
2193
2194 return 0;
2195 }
2196
2197 int RGWCloneMetaLogCoroutine::state_init()
2198 {
2199 data = rgw_mdlog_shard_data();
2200
2201 return 0;
2202 }
2203
2204 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2205 {
2206 const bool add_ref = false; // default constructs with refs=1
2207
2208 completion.reset(new RGWMetadataLogInfoCompletion(
2209 [this](int ret, const cls_log_header& header) {
2210 if (ret < 0) {
2211 ldout(cct, 1) << "ERROR: failed to read mdlog info with "
2212 << cpp_strerror(ret) << dendl;
2213 } else {
2214 shard_info.marker = header.max_marker;
2215 shard_info.last_update = header.max_time.to_real_time();
2216 }
2217 // wake up parent stack
2218 stack->get_completion_mgr()->complete(nullptr, stack);
2219 }), add_ref);
2220
2221 int ret = mdlog->get_info_async(shard_id, completion.get());
2222 if (ret < 0) {
2223 ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
2224 return set_cr_error(ret);
2225 }
2226
2227 return io_block(0);
2228 }
2229
2230 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2231 {
2232 completion.reset();
2233
2234 ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
2235
2236 marker = shard_info.marker;
2237
2238 return 0;
2239 }
2240
2241 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2242 {
2243 RGWRESTConn *conn = sync_env->conn;
2244
2245 char buf[32];
2246 snprintf(buf, sizeof(buf), "%d", shard_id);
2247
2248 char max_entries_buf[32];
2249 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2250
2251 const char *marker_key = (marker.empty() ? "" : "marker");
2252
2253 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2254 { "id", buf },
2255 { "period", period.c_str() },
2256 { "max-entries", max_entries_buf },
2257 { marker_key, marker.c_str() },
2258 { NULL, NULL } };
2259
2260 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2261
2262 http_op->set_user_info((void *)stack);
2263
2264 int ret = http_op->aio_read();
2265 if (ret < 0) {
2266 ldout(cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
2267 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2268 http_op->put();
2269 http_op = NULL;
2270 return set_cr_error(ret);
2271 }
2272
2273 return io_block(0);
2274 }
2275
2276 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2277 {
2278 int ret = http_op->wait(&data);
2279 if (ret < 0) {
2280 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
2281 ldout(cct, 5) << "failed to wait for op, ret=" << ret << dendl;
2282 http_op->put();
2283 http_op = NULL;
2284 return set_cr_error(ret);
2285 }
2286 http_op->put();
2287 http_op = NULL;
2288
2289 ldout(cct, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
2290
2291 truncated = ((int)data.entries.size() == max_entries);
2292
2293 if (data.entries.empty()) {
2294 if (new_marker) {
2295 *new_marker = marker;
2296 }
2297 return set_cr_done();
2298 }
2299
2300 if (new_marker) {
2301 *new_marker = data.entries.back().id;
2302 }
2303
2304 return 0;
2305 }
2306
2307
2308 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2309 {
2310 list<cls_log_entry> dest_entries;
2311
2312 vector<rgw_mdlog_entry>::iterator iter;
2313 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2314 rgw_mdlog_entry& entry = *iter;
2315 ldout(cct, 20) << "entry: name=" << entry.name << dendl;
2316
2317 cls_log_entry dest_entry;
2318 dest_entry.id = entry.id;
2319 dest_entry.section = entry.section;
2320 dest_entry.name = entry.name;
2321 dest_entry.timestamp = utime_t(entry.timestamp);
2322
2323 ::encode(entry.log_data, dest_entry.data);
2324
2325 dest_entries.push_back(dest_entry);
2326
2327 marker = entry.id;
2328 }
2329
2330 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2331
2332 int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
2333 if (ret < 0) {
2334 cn->put();
2335 ldout(cct, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
2336 return set_cr_error(ret);
2337 }
2338 return io_block(0);
2339 }
2340
2341 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2342 {
2343 return set_cr_done();
2344 }
2345
2346
2347 // TODO: move into rgw_sync_trim.cc
2348 #undef dout_prefix
2349 #define dout_prefix (*_dout << "meta trim: ")
2350
2351 /// purge all log shards for the given mdlog
2352 class PurgeLogShardsCR : public RGWShardCollectCR {
2353 RGWRados *const store;
2354 const RGWMetadataLog* mdlog;
2355 const int num_shards;
2356 rgw_raw_obj obj;
2357 int i{0};
2358
2359 static constexpr int max_concurrent = 16;
2360
2361 public:
2362 PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
2363 const rgw_pool& pool, int num_shards)
2364 : RGWShardCollectCR(store->ctx(), max_concurrent),
2365 store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
2366 {}
2367
2368 bool spawn_next() override {
2369 if (i == num_shards) {
2370 return false;
2371 }
2372 mdlog->get_shard_oid(i++, obj.oid);
2373 spawn(new RGWRadosRemoveCR(store, obj), false);
2374 return true;
2375 }
2376 };
2377
2378 using Cursor = RGWPeriodHistory::Cursor;
2379
2380 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
2381 class PurgePeriodLogsCR : public RGWCoroutine {
2382 RGWRados *const store;
2383 RGWMetadataManager *const metadata;
2384 RGWObjVersionTracker objv;
2385 Cursor cursor;
2386 epoch_t realm_epoch;
2387 epoch_t *last_trim_epoch; //< update last trim on success
2388
2389 public:
2390 PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
2391 : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
2392 realm_epoch(realm_epoch), last_trim_epoch(last_trim)
2393 {}
2394
2395 int operate();
2396 };
2397
2398 int PurgePeriodLogsCR::operate()
2399 {
2400 reenter(this) {
2401 // read our current oldest log period
2402 yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
2403 if (retcode < 0) {
2404 return set_cr_error(retcode);
2405 }
2406 assert(cursor);
2407 ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
2408 << " period=" << cursor.get_period().get_id() << dendl;
2409
2410 // trim -up to- the given realm_epoch
2411 while (cursor.get_epoch() < realm_epoch) {
2412 ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
2413 << " period=" << cursor.get_period().get_id() << dendl;
2414 yield {
2415 const auto mdlog = metadata->get_log(cursor.get_period().get_id());
2416 const auto& pool = store->get_zone_params().log_pool;
2417 auto num_shards = cct->_conf->rgw_md_log_max_shards;
2418 call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
2419 }
2420 if (retcode < 0) {
2421 ldout(cct, 1) << "failed to remove log shards: "
2422 << cpp_strerror(retcode) << dendl;
2423 return set_cr_error(retcode);
2424 }
2425 ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
2426 << " period=" << cursor.get_period().get_id() << dendl;
2427
2428 // update our mdlog history
2429 yield call(metadata->trim_log_period_cr(cursor, &objv));
2430 if (retcode == -ENOENT) {
2431 // must have raced to update mdlog history. return success and allow the
2432 // winner to continue purging
2433 ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
2434 << " period=" << cursor.get_period().get_id() << dendl;
2435 return set_cr_done();
2436 } else if (retcode < 0) {
2437 ldout(cct, 1) << "failed to remove log shards for realm_epoch="
2438 << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
2439 << " with: " << cpp_strerror(retcode) << dendl;
2440 return set_cr_error(retcode);
2441 }
2442
2443 if (*last_trim_epoch < cursor.get_epoch()) {
2444 *last_trim_epoch = cursor.get_epoch();
2445 }
2446
2447 assert(cursor.has_next()); // get_current() should always come after
2448 cursor.next();
2449 }
2450 return set_cr_done();
2451 }
2452 return 0;
2453 }
2454
2455 namespace {
2456
2457 using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
2458
2459 /// construct a RGWRESTConn for each zone in the realm
2460 template <typename Zonegroups>
2461 connection_map make_peer_connections(RGWRados *store,
2462 const Zonegroups& zonegroups)
2463 {
2464 connection_map connections;
2465 for (auto& g : zonegroups) {
2466 for (auto& z : g.second.zones) {
2467 std::unique_ptr<RGWRESTConn> conn{
2468 new RGWRESTConn(store->ctx(), store, z.first, z.second.endpoints)};
2469 connections.emplace(z.first, std::move(conn));
2470 }
2471 }
2472 return connections;
2473 }
2474
2475 /// return the marker that it's safe to trim up to
2476 const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
2477 {
2478 return m.state == m.FullSync ? m.next_step_marker : m.marker;
2479 }
2480
2481 /// comparison operator for take_min_status()
2482 bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
2483 {
2484 // sort by stable marker
2485 return get_stable_marker(lhs) < get_stable_marker(rhs);
2486 }
2487
2488 /// populate the status with the minimum stable marker of each shard for any
2489 /// peer whose realm_epoch matches the minimum realm_epoch in the input
2490 template <typename Iter>
2491 int take_min_status(CephContext *cct, Iter first, Iter last,
2492 rgw_meta_sync_status *status)
2493 {
2494 if (first == last) {
2495 return -EINVAL;
2496 }
2497 const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
2498
2499 status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
2500 for (auto p = first; p != last; ++p) {
2501 // validate peer's shard count
2502 if (p->sync_markers.size() != num_shards) {
2503 ldout(cct, 1) << "take_min_status got peer status with "
2504 << p->sync_markers.size() << " shards, expected "
2505 << num_shards << dendl;
2506 return -EINVAL;
2507 }
2508 if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
2509 // earlier epoch, take its entire status
2510 *status = std::move(*p);
2511 } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
2512 // same epoch, take any earlier markers
2513 auto m = status->sync_markers.begin();
2514 for (auto& shard : p->sync_markers) {
2515 if (shard.second < m->second) {
2516 m->second = std::move(shard.second);
2517 }
2518 ++m;
2519 }
2520 }
2521 }
2522 return 0;
2523 }
2524
2525 struct TrimEnv {
2526 RGWRados *const store;
2527 RGWHTTPManager *const http;
2528 int num_shards;
2529 const std::string& zone;
2530 Cursor current; //< cursor to current period
2531 epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
2532
2533 TrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2534 : store(store), http(http), num_shards(num_shards),
2535 zone(store->get_zone_params().get_id()),
2536 current(store->period_history->get_current())
2537 {}
2538 };
2539
2540 struct MasterTrimEnv : public TrimEnv {
2541 connection_map connections; //< peer connections
2542 std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
2543 /// last trim marker for each shard, only applies to current period's mdlog
2544 std::vector<std::string> last_trim_markers;
2545
2546 MasterTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2547 : TrimEnv(store, http, num_shards),
2548 last_trim_markers(num_shards)
2549 {
2550 auto& period = current.get_period();
2551 connections = make_peer_connections(store, period.get_map().zonegroups);
2552 connections.erase(zone);
2553 peer_status.resize(connections.size());
2554 }
2555 };
2556
2557 struct PeerTrimEnv : public TrimEnv {
2558 /// last trim timestamp for each shard, only applies to current period's mdlog
2559 std::vector<ceph::real_time> last_trim_timestamps;
2560
2561 PeerTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2562 : TrimEnv(store, http, num_shards),
2563 last_trim_timestamps(num_shards)
2564 {}
2565
2566 void set_num_shards(int num_shards) {
2567 this->num_shards = num_shards;
2568 last_trim_timestamps.resize(num_shards);
2569 }
2570 };
2571
2572 } // anonymous namespace
2573
2574
2575 /// spawn a trim cr for each shard that needs it, while limiting the number
2576 /// of concurrent shards
2577 class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
2578 private:
2579 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2580
2581 MasterTrimEnv& env;
2582 RGWMetadataLog *mdlog;
2583 int shard_id{0};
2584 std::string oid;
2585 const rgw_meta_sync_status& sync_status;
2586
2587 public:
2588 MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
2589 const rgw_meta_sync_status& sync_status)
2590 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2591 env(env), mdlog(mdlog), sync_status(sync_status)
2592 {}
2593
2594 bool spawn_next() override;
2595 };
2596
2597 bool MetaMasterTrimShardCollectCR::spawn_next()
2598 {
2599 while (shard_id < env.num_shards) {
2600 auto m = sync_status.sync_markers.find(shard_id);
2601 if (m == sync_status.sync_markers.end()) {
2602 shard_id++;
2603 continue;
2604 }
2605 auto& stable = get_stable_marker(m->second);
2606 auto& last_trim = env.last_trim_markers[shard_id];
2607
2608 if (stable <= last_trim) {
2609 // already trimmed
2610 ldout(cct, 20) << "skipping log shard " << shard_id
2611 << " at marker=" << stable
2612 << " last_trim=" << last_trim
2613 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2614 shard_id++;
2615 continue;
2616 }
2617
2618 mdlog->get_shard_oid(shard_id, oid);
2619
2620 ldout(cct, 10) << "trimming log shard " << shard_id
2621 << " at marker=" << stable
2622 << " last_trim=" << last_trim
2623 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2624 spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
2625 shard_id++;
2626 return true;
2627 }
2628 return false;
2629 }
2630
2631 /// spawn rest requests to read each peer's sync status
2632 class MetaMasterStatusCollectCR : public RGWShardCollectCR {
2633 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2634
2635 MasterTrimEnv& env;
2636 connection_map::iterator c;
2637 std::vector<rgw_meta_sync_status>::iterator s;
2638 public:
2639 MetaMasterStatusCollectCR(MasterTrimEnv& env)
2640 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2641 env(env), c(env.connections.begin()), s(env.peer_status.begin())
2642 {}
2643
2644 bool spawn_next() override {
2645 if (c == env.connections.end()) {
2646 return false;
2647 }
2648 static rgw_http_param_pair params[] = {
2649 { "type", "metadata" },
2650 { "status", nullptr },
2651 { nullptr, nullptr }
2652 };
2653
2654 ldout(cct, 20) << "query sync status from " << c->first << dendl;
2655 auto conn = c->second.get();
2656 using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
2657 spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
2658 false);
2659 ++c;
2660 ++s;
2661 return true;
2662 }
2663 };
2664
2665 class MetaMasterTrimCR : public RGWCoroutine {
2666 MasterTrimEnv& env;
2667 rgw_meta_sync_status min_status; //< minimum sync status of all peers
2668 int ret{0};
2669
2670 public:
2671 MetaMasterTrimCR(MasterTrimEnv& env)
2672 : RGWCoroutine(env.store->ctx()), env(env)
2673 {}
2674
2675 int operate();
2676 };
2677
2678 int MetaMasterTrimCR::operate()
2679 {
2680 reenter(this) {
2681 // TODO: detect this and fail before we spawn the trim thread?
2682 if (env.connections.empty()) {
2683 ldout(cct, 4) << "no peers, exiting" << dendl;
2684 return set_cr_done();
2685 }
2686
2687 ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
2688 // query mdlog sync status from peers
2689 yield call(new MetaMasterStatusCollectCR(env));
2690
2691 // must get a successful reply from all peers to consider trimming
2692 if (ret < 0) {
2693 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
2694 return set_cr_error(ret);
2695 }
2696
2697 // determine the minimum epoch and markers
2698 ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
2699 env.peer_status.end(), &min_status);
2700 if (ret < 0) {
2701 ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
2702 return set_cr_error(ret);
2703 }
2704 yield {
2705 auto store = env.store;
2706 auto epoch = min_status.sync_info.realm_epoch;
2707 ldout(cct, 4) << "realm epoch min=" << epoch
2708 << " current=" << env.current.get_epoch()<< dendl;
2709 if (epoch > env.last_trim_epoch + 1) {
2710 // delete any prior mdlog periods
2711 spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
2712 } else {
2713 ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
2714 << env.last_trim_epoch << dendl;
2715 }
2716
2717 // if realm_epoch == current, trim mdlog based on markers
2718 if (epoch == env.current.get_epoch()) {
2719 auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
2720 spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
2721 }
2722 }
2723 // ignore any errors during purge/trim because we want to hold the lock open
2724 return set_cr_done();
2725 }
2726 return 0;
2727 }
2728
2729
2730 /// read the first entry of the master's mdlog shard and trim to that position
2731 class MetaPeerTrimShardCR : public RGWCoroutine {
2732 RGWMetaSyncEnv& env;
2733 RGWMetadataLog *mdlog;
2734 const std::string& period_id;
2735 const int shard_id;
2736 RGWMetadataLogInfo info;
2737 ceph::real_time stable; //< safe timestamp to trim, according to master
2738 ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
2739 rgw_mdlog_shard_data result; //< result from master's mdlog listing
2740
2741 public:
2742 MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
2743 const std::string& period_id, int shard_id,
2744 ceph::real_time *last_trim)
2745 : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
2746 period_id(period_id), shard_id(shard_id), last_trim(last_trim)
2747 {}
2748
2749 int operate() override;
2750 };
2751
2752 int MetaPeerTrimShardCR::operate()
2753 {
2754 reenter(this) {
2755 // query master's first mdlog entry for this shard
2756 yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2757 "", 1, &result));
2758 if (retcode < 0) {
2759 ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
2760 << shard_id << " for period " << period_id
2761 << ": " << cpp_strerror(retcode) << dendl;
2762 return set_cr_error(retcode);
2763 }
2764 if (result.entries.empty()) {
2765 // if there are no mdlog entries, we don't have a timestamp to compare. we
2766 // can't just trim everything, because there could be racing updates since
2767 // this empty reply. query the mdlog shard info to read its max timestamp,
2768 // then retry the listing to make sure it's still empty before trimming to
2769 // that
2770 ldout(cct, 10) << "empty master mdlog shard " << shard_id
2771 << ", reading last timestamp from shard info" << dendl;
2772 // read the mdlog shard info for the last timestamp
2773 using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR;
2774 yield call(new ShardInfoCR(&env, period_id, shard_id, &info));
2775 if (retcode < 0) {
2776 ldout(cct, 5) << "failed to read info from master's mdlog shard "
2777 << shard_id << " for period " << period_id
2778 << ": " << cpp_strerror(retcode) << dendl;
2779 return set_cr_error(retcode);
2780 }
2781 if (ceph::real_clock::is_zero(info.last_update)) {
2782 return set_cr_done(); // nothing to trim
2783 }
2784 ldout(cct, 10) << "got mdlog shard info with last update="
2785 << info.last_update << dendl;
2786 // re-read the master's first mdlog entry to make sure it hasn't changed
2787 yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2788 "", 1, &result));
2789 if (retcode < 0) {
2790 ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
2791 << shard_id << " for period " << period_id
2792 << ": " << cpp_strerror(retcode) << dendl;
2793 return set_cr_error(retcode);
2794 }
2795 // if the mdlog is still empty, trim to max marker
2796 if (result.entries.empty()) {
2797 stable = info.last_update;
2798 } else {
2799 stable = result.entries.front().timestamp;
2800
2801 // can only trim -up to- master's first timestamp, so subtract a second.
2802 // (this is why we use timestamps instead of markers for the peers)
2803 stable -= std::chrono::seconds(1);
2804 }
2805 } else {
2806 stable = result.entries.front().timestamp;
2807 stable -= std::chrono::seconds(1);
2808 }
2809
2810 if (stable <= *last_trim) {
2811 ldout(cct, 10) << "skipping log shard " << shard_id
2812 << " at timestamp=" << stable
2813 << " last_trim=" << *last_trim << dendl;
2814 return set_cr_done();
2815 }
2816
2817 ldout(cct, 10) << "trimming log shard " << shard_id
2818 << " at timestamp=" << stable
2819 << " last_trim=" << *last_trim << dendl;
2820 yield {
2821 std::string oid;
2822 mdlog->get_shard_oid(shard_id, oid);
2823 call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
2824 }
2825 if (retcode < 0 && retcode != -ENODATA) {
2826 ldout(cct, 1) << "failed to trim mdlog shard " << shard_id
2827 << ": " << cpp_strerror(retcode) << dendl;
2828 return set_cr_error(retcode);
2829 }
2830 *last_trim = stable;
2831 return set_cr_done();
2832 }
2833 return 0;
2834 }
2835
2836 class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
2837 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2838
2839 PeerTrimEnv& env;
2840 RGWMetadataLog *mdlog;
2841 const std::string& period_id;
2842 RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
2843 int shard_id{0};
2844
2845 public:
2846 MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
2847 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2848 env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
2849 {
2850 meta_env.init(cct, env.store, env.store->rest_master_conn,
2851 env.store->get_async_rados(), env.http, nullptr);
2852 }
2853
2854 bool spawn_next() override;
2855 };
2856
2857 bool MetaPeerTrimShardCollectCR::spawn_next()
2858 {
2859 if (shard_id >= env.num_shards) {
2860 return false;
2861 }
2862 auto& last_trim = env.last_trim_timestamps[shard_id];
2863 spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
2864 false);
2865 shard_id++;
2866 return true;
2867 }
2868
2869 class MetaPeerTrimCR : public RGWCoroutine {
2870 PeerTrimEnv& env;
2871 rgw_mdlog_info mdlog_info; //< master's mdlog info
2872
2873 public:
2874 MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
2875
2876 int operate();
2877 };
2878
2879 int MetaPeerTrimCR::operate()
2880 {
2881 reenter(this) {
2882 ldout(cct, 10) << "fetching master mdlog info" << dendl;
2883 yield {
2884 // query mdlog_info from master for oldest_log_period
2885 rgw_http_param_pair params[] = {
2886 { "type", "metadata" },
2887 { nullptr, nullptr }
2888 };
2889
2890 using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
2891 call(new LogInfoCR(cct, env.store->rest_master_conn, env.http,
2892 "/admin/log/", params, &mdlog_info));
2893 }
2894 if (retcode < 0) {
2895 ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
2896 return set_cr_error(retcode);
2897 }
2898 // use master's shard count instead
2899 env.set_num_shards(mdlog_info.num_shards);
2900
2901 if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
2902 // delete any prior mdlog periods
2903 yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
2904 &env.last_trim_epoch));
2905 } else {
2906 ldout(cct, 10) << "mdlogs already purged through realm_epoch "
2907 << env.last_trim_epoch << dendl;
2908 }
2909
2910 // if realm_epoch == current, trim mdlog based on master's markers
2911 if (mdlog_info.realm_epoch == env.current.get_epoch()) {
2912 yield {
2913 auto meta_mgr = env.store->meta_mgr;
2914 auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
2915 call(new MetaPeerTrimShardCollectCR(env, mdlog));
2916 // ignore any errors during purge/trim because we want to hold the lock open
2917 }
2918 }
2919 return set_cr_done();
2920 }
2921 return 0;
2922 }
2923
2924 class MetaTrimPollCR : public RGWCoroutine {
2925 RGWRados *const store;
2926 const utime_t interval; //< polling interval
2927 const rgw_raw_obj obj;
2928 const std::string name{"meta_trim"}; //< lock name
2929 const std::string cookie;
2930
2931 protected:
2932 /// allocate the coroutine to run within the lease
2933 virtual RGWCoroutine* alloc_cr() = 0;
2934
2935 public:
2936 MetaTrimPollCR(RGWRados *store, utime_t interval)
2937 : RGWCoroutine(store->ctx()), store(store), interval(interval),
2938 obj(store->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
2939 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
2940 {}
2941
2942 int operate();
2943 };
2944
2945 int MetaTrimPollCR::operate()
2946 {
2947 reenter(this) {
2948 for (;;) {
2949 set_status("sleeping");
2950 wait(interval);
2951
2952 // prevent others from trimming for our entire wait interval
2953 set_status("acquiring trim lock");
2954 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
2955 obj, name, cookie, interval.sec()));
2956 if (retcode < 0) {
2957 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
2958 continue;
2959 }
2960
2961 set_status("trimming");
2962 yield call(alloc_cr());
2963
2964 if (retcode < 0) {
2965 // on errors, unlock so other gateways can try
2966 set_status("unlocking");
2967 yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
2968 obj, name, cookie));
2969 }
2970 }
2971 }
2972 return 0;
2973 }
2974
2975 class MetaMasterTrimPollCR : public MetaTrimPollCR {
2976 MasterTrimEnv env; //< trim state to share between calls
2977 RGWCoroutine* alloc_cr() override {
2978 return new MetaMasterTrimCR(env);
2979 }
2980 public:
2981 MetaMasterTrimPollCR(RGWRados *store, RGWHTTPManager *http,
2982 int num_shards, utime_t interval)
2983 : MetaTrimPollCR(store, interval),
2984 env(store, http, num_shards)
2985 {}
2986 };
2987
2988 class MetaPeerTrimPollCR : public MetaTrimPollCR {
2989 PeerTrimEnv env; //< trim state to share between calls
2990 RGWCoroutine* alloc_cr() override {
2991 return new MetaPeerTrimCR(env);
2992 }
2993 public:
2994 MetaPeerTrimPollCR(RGWRados *store, RGWHTTPManager *http,
2995 int num_shards, utime_t interval)
2996 : MetaTrimPollCR(store, interval),
2997 env(store, http, num_shards)
2998 {}
2999 };
3000
3001 RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
3002 int num_shards, utime_t interval)
3003 {
3004 if (store->is_meta_master()) {
3005 return new MetaMasterTrimPollCR(store, http, num_shards, interval);
3006 }
3007 return new MetaPeerTrimPollCR(store, http, num_shards, interval);
3008 }
3009
3010
3011 struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
3012 MetaMasterAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
3013 : MasterTrimEnv(store, http, num_shards),
3014 MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
3015 {}
3016 };
3017
3018 struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
3019 MetaPeerAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
3020 : PeerTrimEnv(store, http, num_shards),
3021 MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
3022 {}
3023 };
3024
3025 RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
3026 RGWHTTPManager *http,
3027 int num_shards)
3028 {
3029 if (store->is_meta_master()) {
3030 return new MetaMasterAdminTrimCR(store, http, num_shards);
3031 }
3032 return new MetaPeerAdminTrimCR(store, http, num_shards);
3033 }