1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include <boost/optional.hpp>
6 #include "common/RefCountedObj.h"
7 #include "common/WorkQueue.h"
8 #include "common/Throttle.h"
9 #include "common/admin_socket.h"
10 #include "common/errno.h"
12 #include "common/ceph_json.h"
13 #include "common/Formatter.h"
15 #include "rgw_common.h"
18 #include "rgw_metadata.h"
19 #include "rgw_mdlog_types.h"
20 #include "rgw_rest_conn.h"
21 #include "rgw_tools.h"
22 #include "rgw_cr_rados.h"
23 #include "rgw_cr_rest.h"
24 #include "rgw_http_client.h"
25 #include "rgw_sync_trace.h"
27 #include "cls/lock/cls_lock_client.h"
29 #include "services/svc_zone.h"
30 #include "services/svc_mdlog.h"
31 #include "services/svc_meta.h"
32 #include "services/svc_cls.h"
34 #include <boost/asio/yield.hpp>
36 #define dout_subsys ceph_subsys_rgw
39 #define dout_prefix (*_dout << "meta sync: ")
43 static string mdlog_sync_status_oid
= "mdlog.sync-status";
44 static string mdlog_sync_status_shard_prefix
= "mdlog.sync-status.shard";
45 static string mdlog_sync_full_sync_index_prefix
= "meta.full-sync.index";
47 RGWContinuousLeaseCR::~RGWContinuousLeaseCR() {}
49 RGWSyncErrorLogger::RGWSyncErrorLogger(rgw::sal::RadosStore
* _store
, const string
&oid_prefix
, int _num_shards
) : store(_store
), num_shards(_num_shards
) {
50 for (int i
= 0; i
< num_shards
; i
++) {
51 oids
.push_back(get_shard_oid(oid_prefix
, i
));
54 string
RGWSyncErrorLogger::get_shard_oid(const string
& oid_prefix
, int shard_id
) {
55 char buf
[oid_prefix
.size() + 16];
56 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), shard_id
);
60 RGWCoroutine
*RGWSyncErrorLogger::log_error_cr(const DoutPrefixProvider
*dpp
, const string
& source_zone
, const string
& section
, const string
& name
, uint32_t error_code
, const string
& message
) {
63 rgw_sync_error_info
info(source_zone
, error_code
, message
);
66 store
->svc()->cls
->timelog
.prepare_entry(entry
, real_clock::now(), section
, name
, bl
);
68 uint32_t shard_id
= ++counter
% num_shards
;
71 return new RGWRadosTimelogAddCR(dpp
, store
, oids
[shard_id
], entry
);
74 void RGWSyncBackoff::update_wait_time()
79 cur_wait
= (cur_wait
<< 1);
81 if (cur_wait
>= max_secs
) {
86 void RGWSyncBackoff::backoff_sleep()
92 void RGWSyncBackoff::backoff(RGWCoroutine
*op
)
95 op
->wait(utime_t(cur_wait
, 0));
98 int RGWBackoffControlCR::operate(const DoutPrefixProvider
*dpp
) {
100 // retry the operation until it succeeds
103 std::lock_guard l
{lock
};
109 std::lock_guard l
{lock
};
116 if (retcode
!= -EBUSY
&& retcode
!= -EAGAIN
) {
117 ldout(cct
, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode
<< dendl
;
119 return set_cr_error(retcode
);
125 yield backoff
.backoff(this);
128 // run an optional finisher
129 yield
call(alloc_finisher_cr());
131 ldout(cct
, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode
<< dendl
;
132 return set_cr_error(retcode
);
134 return set_cr_done();
139 void rgw_mdlog_info::decode_json(JSONObj
*obj
) {
140 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
141 JSONDecoder::decode_json("period", period
, obj
);
142 JSONDecoder::decode_json("realm_epoch", realm_epoch
, obj
);
145 void rgw_mdlog_entry::decode_json(JSONObj
*obj
) {
146 JSONDecoder::decode_json("id", id
, obj
);
147 JSONDecoder::decode_json("section", section
, obj
);
148 JSONDecoder::decode_json("name", name
, obj
);
150 JSONDecoder::decode_json("timestamp", ut
, obj
);
151 timestamp
= ut
.to_real_time();
152 JSONDecoder::decode_json("data", log_data
, obj
);
155 void rgw_mdlog_shard_data::decode_json(JSONObj
*obj
) {
156 JSONDecoder::decode_json("marker", marker
, obj
);
157 JSONDecoder::decode_json("truncated", truncated
, obj
);
158 JSONDecoder::decode_json("entries", entries
, obj
);
161 int RGWShardCollectCR::operate(const DoutPrefixProvider
*dpp
) {
163 while (spawn_next()) {
166 while (current_running
>= max_concurrent
) {
168 yield
wait_for_child();
169 if (collect_next(&child_ret
)) {
171 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
172 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
178 while (current_running
> 0) {
180 yield
wait_for_child();
181 if (collect_next(&child_ret
)) {
183 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
184 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
190 return set_cr_error(status
);
192 return set_cr_done();
197 class RGWReadRemoteMDLogInfoCR
: public RGWShardCollectCR
{
198 RGWMetaSyncEnv
*sync_env
;
200 const std::string
& period
;
202 map
<int, RGWMetadataLogInfo
> *mdlog_info
;
205 #define READ_MDLOG_MAX_CONCURRENT 10
208 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv
*_sync_env
,
209 const std::string
& period
, int _num_shards
,
210 map
<int, RGWMetadataLogInfo
> *_mdlog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
212 period(period
), num_shards(_num_shards
),
213 mdlog_info(_mdlog_info
), shard_id(0) {}
214 bool spawn_next() override
;
217 class RGWListRemoteMDLogCR
: public RGWShardCollectCR
{
218 RGWMetaSyncEnv
*sync_env
;
220 const std::string
& period
;
221 map
<int, string
> shards
;
222 int max_entries_per_shard
;
223 map
<int, rgw_mdlog_shard_data
> *result
;
225 map
<int, string
>::iterator iter
;
226 #define READ_MDLOG_MAX_CONCURRENT 10
229 RGWListRemoteMDLogCR(RGWMetaSyncEnv
*_sync_env
,
230 const std::string
& period
, map
<int, string
>& _shards
,
231 int _max_entries_per_shard
,
232 map
<int, rgw_mdlog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
233 sync_env(_sync_env
), period(period
),
234 max_entries_per_shard(_max_entries_per_shard
),
236 shards
.swap(_shards
);
237 iter
= shards
.begin();
239 bool spawn_next() override
;
242 int RGWRemoteMetaLog::read_log_info(const DoutPrefixProvider
*dpp
, rgw_mdlog_info
*log_info
)
244 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
247 int ret
= conn
->get_json_resource(dpp
, "/admin/log", pairs
, null_yield
, *log_info
);
249 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch mdlog info" << dendl
;
253 ldpp_dout(dpp
, 20) << "remote mdlog, num_shards=" << log_info
->num_shards
<< dendl
;
258 int RGWRemoteMetaLog::read_master_log_shards_info(const DoutPrefixProvider
*dpp
, const string
&master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
)
260 if (store
->svc()->zone
->is_meta_master()) {
264 rgw_mdlog_info log_info
;
265 int ret
= read_log_info(dpp
, &log_info
);
270 return run(dpp
, new RGWReadRemoteMDLogInfoCR(&sync_env
, master_period
, log_info
.num_shards
, shards_info
));
273 int RGWRemoteMetaLog::read_master_log_shards_next(const DoutPrefixProvider
*dpp
, const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
)
275 if (store
->svc()->zone
->is_meta_master()) {
279 return run(dpp
, new RGWListRemoteMDLogCR(&sync_env
, period
, shard_markers
, 1, result
));
282 int RGWRemoteMetaLog::init()
284 conn
= store
->svc()->zone
->get_master_conn();
286 int ret
= http_manager
.start();
288 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
292 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
294 init_sync_env(&sync_env
);
296 tn
= sync_env
.sync_tracer
->add_node(sync_env
.sync_tracer
->root_node
, "meta");
301 void RGWRemoteMetaLog::finish()
307 #define CLONE_MAX_ENTRIES 100
309 int RGWMetaSyncStatusManager::init(const DoutPrefixProvider
*dpp
)
311 if (store
->svc()->zone
->is_meta_master()) {
315 if (!store
->svc()->zone
->get_master_conn()) {
316 ldpp_dout(dpp
, -1) << "no REST connection to master zone" << dendl
;
320 int r
= rgw_init_ioctx(dpp
, store
->getRados()->get_rados_handle(), store
->svc()->zone
->get_zone_params().log_pool
, ioctx
, true);
322 ldpp_dout(dpp
, -1) << "ERROR: failed to open log pool (" << store
->svc()->zone
->get_zone_params().log_pool
<< " ret=" << r
<< dendl
;
326 r
= master_log
.init();
328 ldpp_dout(dpp
, -1) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
332 RGWMetaSyncEnv
& sync_env
= master_log
.get_sync_env();
334 rgw_meta_sync_status sync_status
;
335 r
= read_sync_status(dpp
, &sync_status
);
336 if (r
< 0 && r
!= -ENOENT
) {
337 ldpp_dout(dpp
, -1) << "ERROR: failed to read sync status, r=" << r
<< dendl
;
341 int num_shards
= sync_status
.sync_info
.num_shards
;
343 for (int i
= 0; i
< num_shards
; i
++) {
344 shard_objs
[i
] = rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sync_env
.shard_obj_name(i
));
347 std::unique_lock wl
{ts_to_shard_lock
};
348 for (int i
= 0; i
< num_shards
; i
++) {
349 clone_markers
.push_back(string());
358 unsigned RGWMetaSyncStatusManager::get_subsys() const
363 std::ostream
& RGWMetaSyncStatusManager::gen_prefix(std::ostream
& out
) const
365 return out
<< "meta sync: ";
368 void RGWMetaSyncEnv::init(const DoutPrefixProvider
*_dpp
, CephContext
*_cct
, rgw::sal::RadosStore
* _store
, RGWRESTConn
*_conn
,
369 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
370 RGWSyncErrorLogger
*_error_logger
, RGWSyncTraceManager
*_sync_tracer
) {
375 async_rados
= _async_rados
;
376 http_manager
= _http_manager
;
377 error_logger
= _error_logger
;
378 sync_tracer
= _sync_tracer
;
381 string
RGWMetaSyncEnv::status_oid()
383 return mdlog_sync_status_oid
;
386 string
RGWMetaSyncEnv::shard_obj_name(int shard_id
)
388 char buf
[mdlog_sync_status_shard_prefix
.size() + 16];
389 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_status_shard_prefix
.c_str(), shard_id
);
394 class RGWAsyncReadMDLogEntries
: public RGWAsyncRadosRequest
{
395 const DoutPrefixProvider
*dpp
;
396 rgw::sal::RadosStore
* store
;
397 RGWMetadataLog
*mdlog
;
402 int _send_request(const DoutPrefixProvider
*dpp
) override
{
408 mdlog
->init_list_entries(shard_id
, from_time
, end_time
, marker
, &handle
);
410 int ret
= mdlog
->list_entries(dpp
, handle
, max_entries
, entries
, &marker
, &truncated
);
412 mdlog
->complete_list_entries(handle
);
418 list
<cls_log_entry
> entries
;
421 RGWAsyncReadMDLogEntries(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
422 RGWMetadataLog
* mdlog
, int _shard_id
,
423 std::string _marker
, int _max_entries
)
424 : RGWAsyncRadosRequest(caller
, cn
), dpp(dpp
), store(_store
), mdlog(mdlog
),
425 shard_id(_shard_id
), max_entries(_max_entries
), marker(std::move(_marker
)) {}
428 class RGWReadMDLogEntriesCR
: public RGWSimpleCoroutine
{
429 RGWMetaSyncEnv
*sync_env
;
430 RGWMetadataLog
*const mdlog
;
435 list
<cls_log_entry
> *entries
;
438 RGWAsyncReadMDLogEntries
*req
{nullptr};
441 RGWReadMDLogEntriesCR(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
442 int _shard_id
, string
*_marker
, int _max_entries
,
443 list
<cls_log_entry
> *_entries
, bool *_truncated
)
444 : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
445 shard_id(_shard_id
), pmarker(_marker
), max_entries(_max_entries
),
446 entries(_entries
), truncated(_truncated
) {}
448 ~RGWReadMDLogEntriesCR() override
{
454 int send_request(const DoutPrefixProvider
*dpp
) override
{
456 req
= new RGWAsyncReadMDLogEntries(dpp
, this, stack
->create_completion_notifier(),
457 sync_env
->store
, mdlog
, shard_id
, marker
,
459 sync_env
->async_rados
->queue(req
);
463 int request_complete() override
{
464 *pmarker
= std::move(req
->marker
);
465 *entries
= std::move(req
->entries
);
466 *truncated
= req
->truncated
;
467 return req
->get_ret_status();
472 class RGWReadRemoteMDLogShardInfoCR
: public RGWCoroutine
{
474 RGWRESTReadResource
*http_op
;
476 const std::string
& period
;
478 RGWMetadataLogInfo
*shard_info
;
481 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
482 int _shard_id
, RGWMetadataLogInfo
*_shard_info
)
483 : RGWCoroutine(env
->store
->ctx()), env(env
), http_op(NULL
),
484 period(period
), shard_id(_shard_id
), shard_info(_shard_info
) {}
486 int operate(const DoutPrefixProvider
*dpp
) override
{
487 auto store
= env
->store
;
488 RGWRESTConn
*conn
= store
->svc()->zone
->get_master_conn();
492 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
493 rgw_http_param_pair pairs
[] = { { "type" , "metadata" },
495 { "period", period
.c_str() },
499 string p
= "/admin/log/";
501 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
,
504 init_new_io(http_op
);
506 int ret
= http_op
->aio_read(dpp
);
508 ldpp_dout(env
->dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
509 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
511 return set_cr_error(ret
);
517 int ret
= http_op
->wait(shard_info
, null_yield
);
520 return set_cr_error(ret
);
522 return set_cr_done();
529 RGWCoroutine
* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv
*env
,
530 const std::string
& period
,
532 RGWMetadataLogInfo
* info
)
534 return new RGWReadRemoteMDLogShardInfoCR(env
, period
, shard_id
, info
);
537 class RGWListRemoteMDLogShardCR
: public RGWSimpleCoroutine
{
538 RGWMetaSyncEnv
*sync_env
;
539 RGWRESTReadResource
*http_op
;
541 const std::string
& period
;
544 uint32_t max_entries
;
545 rgw_mdlog_shard_data
*result
;
548 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
549 int _shard_id
, const string
& _marker
, uint32_t _max_entries
,
550 rgw_mdlog_shard_data
*_result
)
551 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
552 period(period
), shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
554 int send_request(const DoutPrefixProvider
*dpp
) override
{
555 RGWRESTConn
*conn
= sync_env
->conn
;
558 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
560 char max_entries_buf
[32];
561 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
563 const char *marker_key
= (marker
.empty() ? "" : "marker");
565 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
567 { "period", period
.c_str() },
568 { "max-entries", max_entries_buf
},
569 { marker_key
, marker
.c_str() },
572 string p
= "/admin/log/";
574 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
575 init_new_io(http_op
);
577 int ret
= http_op
->aio_read(dpp
);
579 ldpp_dout(dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
580 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
588 int request_complete() override
{
589 int ret
= http_op
->wait(result
, null_yield
);
591 if (ret
< 0 && ret
!= -ENOENT
) {
592 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret
<< dendl
;
599 RGWCoroutine
* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv
*env
,
600 const std::string
& period
,
602 const std::string
& marker
,
603 uint32_t max_entries
,
604 rgw_mdlog_shard_data
*result
)
606 return new RGWListRemoteMDLogShardCR(env
, period
, shard_id
, marker
,
607 max_entries
, result
);
610 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
611 if (shard_id
>= num_shards
) {
614 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, period
, shard_id
, &(*mdlog_info
)[shard_id
]), false);
619 bool RGWListRemoteMDLogCR::spawn_next() {
620 if (iter
== shards
.end()) {
624 spawn(new RGWListRemoteMDLogShardCR(sync_env
, period
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
629 class RGWInitSyncStatusCoroutine
: public RGWCoroutine
{
630 RGWMetaSyncEnv
*sync_env
;
632 rgw_meta_sync_info status
;
633 vector
<RGWMetadataLogInfo
> shards_info
;
634 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
635 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
637 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
638 const rgw_meta_sync_info
&status
)
639 : RGWCoroutine(_sync_env
->store
->ctx()), sync_env(_sync_env
),
640 status(status
), shards_info(status
.num_shards
),
641 lease_cr(nullptr), lease_stack(nullptr) {}
643 ~RGWInitSyncStatusCoroutine() override
{
649 int operate(const DoutPrefixProvider
*dpp
) override
{
653 set_status("acquiring sync lock");
654 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
655 string lock_name
= "sync_lock";
656 rgw::sal::RadosStore
* store
= sync_env
->store
;
657 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
658 rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
659 lock_name
, lock_duration
, this));
660 lease_stack
.reset(spawn(lease_cr
.get(), false));
662 while (!lease_cr
->is_locked()) {
663 if (lease_cr
->is_done()) {
664 ldpp_dout(dpp
, 5) << "lease cr failed, done early " << dendl
;
665 set_status("lease lock failed, early abort");
666 return set_cr_error(lease_cr
->get_ret_status());
672 set_status("writing sync status");
673 rgw::sal::RadosStore
* store
= sync_env
->store
;
674 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(dpp
, sync_env
->async_rados
, store
->svc()->sysobj
,
675 rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
680 set_status("failed to write sync status");
681 ldpp_dout(dpp
, 0) << "ERROR: failed to write sync status, retcode=" << retcode
<< dendl
;
682 yield lease_cr
->go_down();
683 return set_cr_error(retcode
);
685 /* fetch current position in logs */
686 set_status("fetching remote log position");
688 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
689 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, status
.period
, i
,
690 &shards_info
[i
]), false);
694 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
697 set_status("updating sync status");
698 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
699 rgw_meta_sync_marker marker
;
700 RGWMetadataLogInfo
& info
= shards_info
[i
];
701 marker
.next_step_marker
= info
.marker
;
702 marker
.timestamp
= info
.last_update
;
703 rgw::sal::RadosStore
* store
= sync_env
->store
;
704 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(dpp
,
705 sync_env
->async_rados
,
706 store
->svc()->sysobj
,
707 rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sync_env
->shard_obj_name(i
)),
712 set_status("changing sync state: build full sync maps");
713 status
.state
= rgw_meta_sync_info::StateBuildingFullSyncMaps
;
714 rgw::sal::RadosStore
* store
= sync_env
->store
;
715 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(dpp
, sync_env
->async_rados
, store
->svc()->sysobj
,
716 rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
719 set_status("drop lock lease");
720 yield lease_cr
->go_down();
721 while (collect(&ret
, NULL
)) {
723 return set_cr_error(ret
);
728 return set_cr_done();
734 class RGWReadSyncStatusMarkersCR
: public RGWShardCollectCR
{
735 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
738 const int num_shards
;
740 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
743 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv
*env
, int num_shards
,
744 map
<uint32_t, rgw_meta_sync_marker
>& markers
)
745 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
746 env(env
), num_shards(num_shards
), markers(markers
)
748 bool spawn_next() override
;
751 bool RGWReadSyncStatusMarkersCR::spawn_next()
753 if (shard_id
>= num_shards
) {
756 using CR
= RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>;
757 rgw_raw_obj obj
{env
->store
->svc()->zone
->get_zone_params().log_pool
,
758 env
->shard_obj_name(shard_id
)};
759 spawn(new CR(env
->dpp
, env
->async_rados
, env
->store
->svc()->sysobj
, obj
, &markers
[shard_id
]), false);
764 class RGWReadSyncStatusCoroutine
: public RGWCoroutine
{
765 RGWMetaSyncEnv
*sync_env
;
766 rgw_meta_sync_status
*sync_status
;
769 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
770 rgw_meta_sync_status
*_status
)
771 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
773 int operate(const DoutPrefixProvider
*dpp
) override
;
776 int RGWReadSyncStatusCoroutine::operate(const DoutPrefixProvider
*dpp
)
780 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_meta_sync_info
>;
782 bool empty_on_enoent
= false; // fail on ENOENT
783 rgw_raw_obj obj
{sync_env
->store
->svc()->zone
->get_zone_params().log_pool
,
784 sync_env
->status_oid()};
785 call(new ReadInfoCR(dpp
, sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
, obj
,
786 &sync_status
->sync_info
, empty_on_enoent
));
789 ldpp_dout(dpp
, 4) << "failed to read sync status info with "
790 << cpp_strerror(retcode
) << dendl
;
791 return set_cr_error(retcode
);
793 // read shard markers
794 using ReadMarkersCR
= RGWReadSyncStatusMarkersCR
;
795 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
796 sync_status
->sync_markers
));
798 ldpp_dout(dpp
, 4) << "failed to read sync status markers with "
799 << cpp_strerror(retcode
) << dendl
;
800 return set_cr_error(retcode
);
802 return set_cr_done();
807 class RGWFetchAllMetaCR
: public RGWCoroutine
{
808 RGWMetaSyncEnv
*sync_env
;
815 list
<string
> sections
;
816 list
<string
>::iterator sections_iter
;
818 struct meta_list_result
{
822 bool truncated
{false};
824 void decode_json(JSONObj
*obj
) {
825 JSONDecoder::decode_json("keys", keys
, obj
);
826 JSONDecoder::decode_json("marker", marker
, obj
);
827 JSONDecoder::decode_json("count", count
, obj
);
828 JSONDecoder::decode_json("truncated", truncated
, obj
);
831 list
<string
>::iterator iter
;
833 std::unique_ptr
<RGWShardedOmapCRManager
> entries_index
;
835 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
836 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
842 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
844 RGWSyncTraceNodeRef tn
;
847 RGWFetchAllMetaCR(RGWMetaSyncEnv
*_sync_env
, int _num_shards
,
848 map
<uint32_t, rgw_meta_sync_marker
>& _markers
,
849 RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
850 num_shards(_num_shards
),
851 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
852 lost_lock(false), failed(false), markers(_markers
) {
853 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "fetch_all_meta");
856 ~RGWFetchAllMetaCR() override
{
859 void append_section_from_set(set
<string
>& all_sections
, const string
& name
) {
860 set
<string
>::iterator iter
= all_sections
.find(name
);
861 if (iter
!= all_sections
.end()) {
862 sections
.emplace_back(std::move(*iter
));
863 all_sections
.erase(iter
);
867 * meta sync should go in the following order: user, bucket.instance, bucket
868 * then whatever other sections exist (if any)
870 void rearrange_sections() {
871 set
<string
> all_sections
;
872 std::move(sections
.begin(), sections
.end(),
873 std::inserter(all_sections
, all_sections
.end()));
876 append_section_from_set(all_sections
, "user");
877 append_section_from_set(all_sections
, "bucket.instance");
878 append_section_from_set(all_sections
, "bucket");
880 std::move(all_sections
.begin(), all_sections
.end(),
881 std::back_inserter(sections
));
884 int operate(const DoutPrefixProvider
*dpp
) override
{
885 RGWRESTConn
*conn
= sync_env
->conn
;
889 set_status(string("acquiring lock (") + sync_env
->status_oid() + ")");
890 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
891 string lock_name
= "sync_lock";
892 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
,
894 rgw_raw_obj(sync_env
->store
->svc()->zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
895 lock_name
, lock_duration
, this));
896 lease_stack
.reset(spawn(lease_cr
.get(), false));
898 while (!lease_cr
->is_locked()) {
899 if (lease_cr
->is_done()) {
900 ldpp_dout(dpp
, 5) << "lease cr failed, done early " << dendl
;
901 set_status("failed acquiring lock");
902 return set_cr_error(lease_cr
->get_ret_status());
907 entries_index
.reset(new RGWShardedOmapCRManager(sync_env
->async_rados
, sync_env
->store
, this, num_shards
,
908 sync_env
->store
->svc()->zone
->get_zone_params().log_pool
,
909 mdlog_sync_full_sync_index_prefix
));
911 call(new RGWReadRESTResourceCR
<list
<string
> >(cct
, conn
, sync_env
->http_manager
,
912 "/admin/metadata", NULL
, §ions
));
914 if (get_ret_status() < 0) {
915 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch metadata sections" << dendl
;
916 yield entries_index
->finish();
917 yield lease_cr
->go_down();
919 return set_cr_error(get_ret_status());
921 rearrange_sections();
922 sections_iter
= sections
.begin();
923 for (; sections_iter
!= sections
.end(); ++sections_iter
) {
926 #define META_FULL_SYNC_CHUNK_SIZE "1000"
927 string entrypoint
= string("/admin/metadata/") + *sections_iter
;
928 rgw_http_param_pair pairs
[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE
},
929 { "marker", result
.marker
.c_str() },
932 call(new RGWReadRESTResourceCR
<meta_list_result
>(cct
, conn
, sync_env
->http_manager
,
933 entrypoint
, pairs
, &result
));
935 ret_status
= get_ret_status();
936 if (ret_status
== -ENOENT
) {
937 set_retcode(0); /* reset coroutine status so that we don't return it */
940 if (ret_status
< 0) {
941 tn
->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter
));
942 yield entries_index
->finish();
943 yield lease_cr
->go_down();
945 return set_cr_error(ret_status
);
947 iter
= result
.keys
.begin();
948 for (; iter
!= result
.keys
.end(); ++iter
) {
949 if (!lease_cr
->is_locked()) {
953 yield
; // allow entries_index consumer to make progress
955 tn
->log(20, SSTR("list metadata: section=" << *sections_iter
<< " key=" << *iter
));
956 string s
= *sections_iter
+ ":" + *iter
;
958 rgw::sal::RadosStore
* store
= sync_env
->store
;
959 int ret
= store
->ctl()->meta
.mgr
->get_shard_id(*sections_iter
, *iter
, &shard_id
);
961 tn
->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter
<< ":" << *iter
));
965 if (!entries_index
->append(s
, shard_id
)) {
969 } while (result
.truncated
);
972 if (!entries_index
->finish()) {
977 for (map
<uint32_t, rgw_meta_sync_marker
>::iterator iter
= markers
.begin(); iter
!= markers
.end(); ++iter
) {
978 int shard_id
= (int)iter
->first
;
979 rgw_meta_sync_marker
& marker
= iter
->second
;
980 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
981 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(dpp
, sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
982 rgw_raw_obj(sync_env
->store
->svc()->zone
->get_zone_params().log_pool
, sync_env
->shard_obj_name(shard_id
)),
987 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
989 yield lease_cr
->go_down();
992 while (collect(&ret
, NULL
)) {
994 return set_cr_error(ret
);
1000 yield
return set_cr_error(-EIO
);
1003 yield
return set_cr_error(-EBUSY
);
1006 if (ret_status
< 0) {
1007 yield
return set_cr_error(ret_status
);
1010 yield
return set_cr_done();
1016 static string
full_sync_index_shard_oid(int shard_id
)
1018 char buf
[mdlog_sync_full_sync_index_prefix
.size() + 16];
1019 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_full_sync_index_prefix
.c_str(), shard_id
);
1023 class RGWReadRemoteMetadataCR
: public RGWCoroutine
{
1024 RGWMetaSyncEnv
*sync_env
;
1026 RGWRESTReadResource
*http_op
;
1033 RGWSyncTraceNodeRef tn
;
1036 RGWReadRemoteMetadataCR(RGWMetaSyncEnv
*_sync_env
,
1037 const string
& _section
, const string
& _key
, bufferlist
*_pbl
,
1038 const RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1043 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "read_remote_meta",
1044 section
+ ":" + key
);
1047 int operate(const DoutPrefixProvider
*dpp
) override
{
1048 RGWRESTConn
*conn
= sync_env
->conn
;
1052 url_encode(key
, key_encode
);
1053 rgw_http_param_pair pairs
[] = { { "key" , key
.c_str()},
1056 string p
= string("/admin/metadata/") + section
+ "/" + key_encode
;
1058 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
1060 init_new_io(http_op
);
1062 int ret
= http_op
->aio_read(dpp
);
1064 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
1065 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
1067 return set_cr_error(ret
);
1073 int ret
= http_op
->wait(pbl
, null_yield
);
1076 return set_cr_error(ret
);
1078 return set_cr_done();
1085 class RGWAsyncMetaStoreEntry
: public RGWAsyncRadosRequest
{
1086 rgw::sal::RadosStore
* store
;
1089 const DoutPrefixProvider
*dpp
;
1091 int _send_request(const DoutPrefixProvider
*dpp
) override
{
1092 int ret
= store
->ctl()->meta
.mgr
->put(raw_key
, bl
, null_yield
, dpp
, RGWMDLogSyncType::APPLY_ALWAYS
, true);
1094 ldpp_dout(dpp
, 0) << "ERROR: can't store key: " << raw_key
<< " ret=" << ret
<< dendl
;
1100 RGWAsyncMetaStoreEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
1101 const string
& _raw_key
,
1103 const DoutPrefixProvider
*dpp
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1104 raw_key(_raw_key
), bl(_bl
), dpp(dpp
) {}
1108 class RGWMetaStoreEntryCR
: public RGWSimpleCoroutine
{
1109 RGWMetaSyncEnv
*sync_env
;
1113 RGWAsyncMetaStoreEntry
*req
;
1116 RGWMetaStoreEntryCR(RGWMetaSyncEnv
*_sync_env
,
1117 const string
& _raw_key
,
1118 bufferlist
& _bl
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1119 raw_key(_raw_key
), bl(_bl
), req(NULL
) {
1122 ~RGWMetaStoreEntryCR() override
{
1128 int send_request(const DoutPrefixProvider
*dpp
) override
{
1129 req
= new RGWAsyncMetaStoreEntry(this, stack
->create_completion_notifier(),
1130 sync_env
->store
, raw_key
, bl
, dpp
);
1131 sync_env
->async_rados
->queue(req
);
1135 int request_complete() override
{
1136 return req
->get_ret_status();
1140 class RGWAsyncMetaRemoveEntry
: public RGWAsyncRadosRequest
{
1141 rgw::sal::RadosStore
* store
;
1143 const DoutPrefixProvider
*dpp
;
1145 int _send_request(const DoutPrefixProvider
*dpp
) override
{
1146 int ret
= store
->ctl()->meta
.mgr
->remove(raw_key
, null_yield
, dpp
);
1148 ldpp_dout(dpp
, 0) << "ERROR: can't remove key: " << raw_key
<< " ret=" << ret
<< dendl
;
1154 RGWAsyncMetaRemoveEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
1155 const string
& _raw_key
, const DoutPrefixProvider
*dpp
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1156 raw_key(_raw_key
), dpp(dpp
) {}
1160 class RGWMetaRemoveEntryCR
: public RGWSimpleCoroutine
{
1161 RGWMetaSyncEnv
*sync_env
;
1164 RGWAsyncMetaRemoveEntry
*req
;
1167 RGWMetaRemoveEntryCR(RGWMetaSyncEnv
*_sync_env
,
1168 const string
& _raw_key
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1169 raw_key(_raw_key
), req(NULL
) {
1172 ~RGWMetaRemoveEntryCR() override
{
1178 int send_request(const DoutPrefixProvider
*dpp
) override
{
1179 req
= new RGWAsyncMetaRemoveEntry(this, stack
->create_completion_notifier(),
1180 sync_env
->store
, raw_key
, dpp
);
1181 sync_env
->async_rados
->queue(req
);
1185 int request_complete() override
{
1186 int r
= req
->get_ret_status();
1194 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1197 int RGWLastCallerWinsCR::operate(const DoutPrefixProvider
*dpp
) {
1198 RGWCoroutine
*call_cr
;
1203 yield
call(call_cr
);
1204 /* cr might have been modified at this point */
1206 return set_cr_done();
1211 class RGWMetaSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
1212 RGWMetaSyncEnv
*sync_env
;
1215 rgw_meta_sync_marker sync_marker
;
1217 RGWSyncTraceNodeRef tn
;
1220 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv
*_sync_env
,
1221 const string
& _marker_oid
,
1222 const rgw_meta_sync_marker
& _marker
,
1223 RGWSyncTraceNodeRef
& _tn
) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW
),
1224 sync_env(_sync_env
),
1225 marker_oid(_marker_oid
),
1226 sync_marker(_marker
),
1229 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
1230 sync_marker
.marker
= new_marker
;
1231 if (index_pos
> 0) {
1232 sync_marker
.pos
= index_pos
;
1235 if (!real_clock::is_zero(timestamp
)) {
1236 sync_marker
.timestamp
= timestamp
;
1239 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< " realm_epoch=" << sync_marker
.realm_epoch
<< dendl
;
1240 tn
->log(20, SSTR("new marker=" << new_marker
));
1241 rgw::sal::RadosStore
* store
= sync_env
->store
;
1242 return new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->dpp
, sync_env
->async_rados
,
1243 store
->svc()->sysobj
,
1244 rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, marker_oid
),
1248 RGWOrderCallCR
*allocate_order_control_cr() override
{
1249 return new RGWLastCallerWinsCR(sync_env
->cct
);
1253 RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv
*_sync_env
,
1254 const string
& _raw_key
, const string
& _entry_marker
,
1255 const RGWMDLogStatus
& _op_status
,
1256 RGWMetaSyncShardMarkerTrack
*_marker_tracker
, const RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
),
1257 sync_env(_sync_env
),
1258 raw_key(_raw_key
), entry_marker(_entry_marker
),
1259 op_status(_op_status
),
1260 pos(0), sync_status(0),
1261 marker_tracker(_marker_tracker
), tries(0) {
1262 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_meta_inject_err_probability
> 0);
1263 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", raw_key
);
1266 int RGWMetaSyncSingleEntryCR::operate(const DoutPrefixProvider
*dpp
) {
1268 #define NUM_TRANSIENT_ERROR_RETRIES 10
1270 if (error_injection
&&
1271 rand() % 10000 < cct
->_conf
->rgw_sync_meta_inject_err_probability
* 10000.0) {
1272 return set_cr_error(-EIO
);
1275 if (op_status
!= MDLOG_STATUS_COMPLETE
) {
1276 tn
->log(20, "skipping pending operation");
1277 yield
call(marker_tracker
->finish(entry_marker
));
1279 return set_cr_error(retcode
);
1281 return set_cr_done();
1283 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
1284 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1286 pos
= raw_key
.find(':');
1287 section
= raw_key
.substr(0, pos
);
1288 key
= raw_key
.substr(pos
+ 1);
1289 tn
->log(10, SSTR("fetching remote metadata entry" << (tries
== 0 ? "" : " (retry)")));
1290 call(new RGWReadRemoteMetadataCR(sync_env
, section
, key
, &md_bl
, tn
));
1293 sync_status
= retcode
;
1295 if (sync_status
== -ENOENT
) {
1296 /* FIXME: do we need to remove the entry from the local zone? */
1300 if (sync_status
< 0) {
1301 if (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1) {
1302 ldpp_dout(dpp
, 20) << *this << ": failed to fetch remote metadata: " << section
<< ":" << key
<< ", will retry" << dendl
;
1306 tn
->log(10, SSTR("failed to read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
));
1307 log_error() << "failed to read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
<< std::endl
;
1308 yield
call(sync_env
->error_logger
->log_error_cr(dpp
, sync_env
->conn
->get_remote_id(), section
, key
, -sync_status
,
1309 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status
)));
1310 return set_cr_error(sync_status
);
1317 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1318 if (sync_status
!= -ENOENT
) {
1319 tn
->log(10, SSTR("storing local metadata entry"));
1320 yield
call(new RGWMetaStoreEntryCR(sync_env
, raw_key
, md_bl
));
1322 tn
->log(10, SSTR("removing local metadata entry"));
1323 yield
call(new RGWMetaRemoveEntryCR(sync_env
, raw_key
));
1325 if ((retcode
< 0) && (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1)) {
1326 ldpp_dout(dpp
, 20) << *this << ": failed to store metadata: " << section
<< ":" << key
<< ", got retcode=" << retcode
<< dendl
;
1332 sync_status
= retcode
;
1334 if (sync_status
== 0 && marker_tracker
) {
1336 yield
call(marker_tracker
->finish(entry_marker
));
1337 sync_status
= retcode
;
1339 if (sync_status
< 0) {
1340 tn
->log(10, SSTR("failed, status=" << sync_status
));
1341 return set_cr_error(sync_status
);
1343 tn
->log(10, "success");
1344 return set_cr_done();
1349 class RGWCloneMetaLogCoroutine
: public RGWCoroutine
{
1350 RGWMetaSyncEnv
*sync_env
;
1351 RGWMetadataLog
*mdlog
;
1353 const std::string
& period
;
1356 bool truncated
= false;
1359 int max_entries
= CLONE_MAX_ENTRIES
;
1361 RGWRESTReadResource
*http_op
= nullptr;
1362 boost::intrusive_ptr
<RGWMetadataLogInfoCompletion
> completion
;
1364 RGWMetadataLogInfo shard_info
;
1365 rgw_mdlog_shard_data data
;
1368 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
1369 const std::string
& period
, int _id
,
1370 const string
& _marker
, string
*_new_marker
)
1371 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
1372 period(period
), shard_id(_id
), marker(_marker
), new_marker(_new_marker
) {
1374 *new_marker
= marker
;
1377 ~RGWCloneMetaLogCoroutine() override
{
1382 completion
->cancel();
1386 int operate(const DoutPrefixProvider
*dpp
) override
;
1389 int state_read_shard_status();
1390 int state_read_shard_status_complete();
1391 int state_send_rest_request(const DoutPrefixProvider
*dpp
);
1392 int state_receive_rest_response();
1393 int state_store_mdlog_entries();
1394 int state_store_mdlog_entries_complete();
1397 #define META_SYNC_SPAWN_WINDOW 20
1399 class RGWMetaSyncShardCR
: public RGWCoroutine
{
1400 RGWMetaSyncEnv
*sync_env
;
1402 const rgw_pool
& pool
;
1403 const std::string
& period
; //< currently syncing period id
1404 const epoch_t realm_epoch
; //< realm_epoch of period
1405 RGWMetadataLog
* mdlog
; //< log of syncing period
1407 rgw_meta_sync_marker
& sync_marker
;
1408 boost::optional
<rgw_meta_sync_marker
> temp_marker
; //< for pending updates
1411 const std::string
& period_marker
; //< max marker stored in next period
1413 RGWRadosGetOmapKeysCR::ResultPtr omapkeys
;
1414 std::set
<std::string
> entries
;
1415 std::set
<std::string
>::iterator iter
;
1419 RGWMetaSyncShardMarkerTrack
*marker_tracker
= nullptr;
1421 list
<cls_log_entry
> log_entries
;
1422 list
<cls_log_entry
>::iterator log_iter
;
1423 bool truncated
= false;
1425 string mdlog_marker
;
1427 rgw_mdlog_entry mdlog_entry
;
1429 ceph::mutex inc_lock
= ceph::make_mutex("RGWMetaSyncShardCR::inc_lock");
1430 ceph::condition_variable inc_cond
;
1432 boost::asio::coroutine incremental_cr
;
1433 boost::asio::coroutine full_cr
;
1435 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1436 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1438 bool lost_lock
= false;
1440 bool *reset_backoff
;
1442 // hold a reference to the cr stack while it's in the map
1443 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1444 map
<StackRef
, string
> stack_to_pos
;
1445 map
<string
, string
> pos_to_prev
;
1447 bool can_adjust_marker
= false;
1448 bool done_with_period
= false;
1450 int total_entries
= 0;
1452 RGWSyncTraceNodeRef tn
;
1454 RGWMetaSyncShardCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1455 const std::string
& period
, epoch_t realm_epoch
,
1456 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1457 rgw_meta_sync_marker
& _marker
,
1458 const std::string
& period_marker
, bool *_reset_backoff
,
1459 RGWSyncTraceNodeRef
& _tn
)
1460 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), pool(_pool
),
1461 period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1462 shard_id(_shard_id
), sync_marker(_marker
),
1463 period_marker(period_marker
),
1464 reset_backoff(_reset_backoff
), tn(_tn
) {
1465 *reset_backoff
= false;
1468 ~RGWMetaSyncShardCR() override
{
1469 delete marker_tracker
;
1475 void set_marker_tracker(RGWMetaSyncShardMarkerTrack
*mt
) {
1476 delete marker_tracker
;
1477 marker_tracker
= mt
;
1480 int operate(const DoutPrefixProvider
*dpp
) override
{
1483 switch (sync_marker
.state
) {
1484 case rgw_meta_sync_marker::FullSync
:
1487 ldpp_dout(dpp
, 10) << "sync: full_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1488 return set_cr_error(r
);
1491 case rgw_meta_sync_marker::IncrementalSync
:
1492 r
= incremental_sync();
1494 ldpp_dout(dpp
, 10) << "sync: incremental_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1495 return set_cr_error(r
);
1504 void collect_children()
1507 RGWCoroutinesStack
*child
;
1508 while (collect_next(&child_ret
, &child
)) {
1509 auto iter
= stack_to_pos
.find(child
);
1510 if (iter
== stack_to_pos
.end()) {
1511 /* some other stack that we don't care about */
1515 string
& pos
= iter
->second
;
1517 if (child_ret
< 0) {
1518 ldpp_dout(sync_env
->dpp
, 0) << *this << ": child operation stack=" << child
<< " entry=" << pos
<< " returned " << child_ret
<< dendl
;
1519 // on any error code from RGWMetaSyncSingleEntryCR, we do not advance
1520 // the sync status marker past this entry, and set
1521 // can_adjust_marker=false to exit out of RGWMetaSyncShardCR.
1522 // RGWMetaSyncShardControlCR will rerun RGWMetaSyncShardCR from the
1523 // previous marker and retry
1524 can_adjust_marker
= false;
1527 map
<string
, string
>::iterator prev_iter
= pos_to_prev
.find(pos
);
1528 ceph_assert(prev_iter
!= pos_to_prev
.end());
1530 if (pos_to_prev
.size() == 1) {
1531 if (can_adjust_marker
) {
1532 sync_marker
.marker
= pos
;
1534 pos_to_prev
.erase(prev_iter
);
1536 ceph_assert(pos_to_prev
.size() > 1);
1537 pos_to_prev
.erase(prev_iter
);
1538 prev_iter
= pos_to_prev
.begin();
1539 if (can_adjust_marker
) {
1540 sync_marker
.marker
= prev_iter
->second
;
1544 ldpp_dout(sync_env
->dpp
, 4) << *this << ": adjusting marker pos=" << sync_marker
.marker
<< dendl
;
1545 stack_to_pos
.erase(iter
);
1550 #define OMAP_GET_MAX_ENTRIES 100
1551 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1553 set_status("full_sync");
1554 tn
->log(10, "start full sync");
1555 oid
= full_sync_index_shard_oid(shard_id
);
1556 can_adjust_marker
= true;
1559 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1560 string lock_name
= "sync_lock";
1561 rgw::sal::RadosStore
* store
= sync_env
->store
;
1562 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1563 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1564 lock_name
, lock_duration
, this));
1565 lease_stack
.reset(spawn(lease_cr
.get(), false));
1568 while (!lease_cr
->is_locked()) {
1569 if (lease_cr
->is_done()) {
1571 tn
->log(5, "failed to take lease");
1572 return lease_cr
->get_ret_status();
1577 tn
->log(10, "took lease");
1579 /* lock succeeded, a retry now should avoid previous backoff status */
1580 *reset_backoff
= true;
1582 /* prepare marker tracker */
1583 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1584 sync_env
->shard_obj_name(shard_id
),
1587 marker
= sync_marker
.marker
;
1589 total_entries
= sync_marker
.pos
;
1593 if (!lease_cr
->is_locked()) {
1594 tn
->log(10, "lost lease");
1598 omapkeys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
1599 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
),
1600 marker
, max_entries
, omapkeys
));
1602 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: " << __func__
<< "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode
<< dendl
;
1603 tn
->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode
));
1604 yield lease_cr
->go_down();
1608 entries
= std::move(omapkeys
->entries
);
1609 tn
->log(20, SSTR("retrieved " << entries
.size() << " entries to sync"));
1610 if (entries
.size() > 0) {
1611 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1613 iter
= entries
.begin();
1614 for (; iter
!= entries
.end(); ++iter
) {
1616 tn
->log(20, SSTR("full sync: " << marker
));
1618 if (!marker_tracker
->start(marker
, total_entries
, real_time())) {
1619 tn
->log(0, SSTR("ERROR: cannot start syncing " << marker
<< ". Duplicate entry?"));
1621 // fetch remote and write locally
1623 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, marker
, marker
, MDLOG_STATUS_COMPLETE
, marker_tracker
, tn
), false);
1624 // stack_to_pos holds a reference to the stack
1625 stack_to_pos
[stack
] = marker
;
1626 pos_to_prev
[marker
] = marker
;
1628 // limit spawn window
1629 while (num_spawned() > META_SYNC_SPAWN_WINDOW
) {
1630 yield
wait_for_child();
1636 } while (omapkeys
->more
&& can_adjust_marker
);
1638 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1640 while (num_spawned() > 1) {
1641 yield
wait_for_child();
1646 /* update marker to reflect we're done with full sync */
1647 if (can_adjust_marker
) {
1648 // apply updates to a temporary marker, or operate() will send us
1649 // to incremental_sync() after we yield
1650 temp_marker
= sync_marker
;
1651 temp_marker
->state
= rgw_meta_sync_marker::IncrementalSync
;
1652 temp_marker
->marker
= std::move(temp_marker
->next_step_marker
);
1653 temp_marker
->next_step_marker
.clear();
1654 temp_marker
->realm_epoch
= realm_epoch
;
1655 ldpp_dout(sync_env
->dpp
, 4) << *this << ": saving marker pos=" << temp_marker
->marker
<< " realm_epoch=" << realm_epoch
<< dendl
;
1657 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>;
1658 yield
call(new WriteMarkerCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
1659 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1664 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to set sync marker: retcode=" << retcode
<< dendl
;
1665 yield lease_cr
->go_down();
1669 // clean up full sync index
1671 auto oid
= full_sync_index_shard_oid(shard_id
);
1672 call(new RGWRadosRemoveCR(sync_env
->store
, {pool
, oid
}));
1677 * if we reached here, it means that lost_lock is true, otherwise the state
1678 * change in the previous block will prevent us from reaching here
1681 yield lease_cr
->go_down();
1687 if (!can_adjust_marker
) {
1695 tn
->log(10, "full sync complete");
1697 // apply the sync marker update
1698 ceph_assert(temp_marker
);
1699 sync_marker
= std::move(*temp_marker
);
1700 temp_marker
= boost::none
;
1701 // must not yield after this point!
1707 int incremental_sync() {
1708 reenter(&incremental_cr
) {
1709 set_status("incremental_sync");
1710 tn
->log(10, "start incremental sync");
1711 can_adjust_marker
= true;
1713 if (!lease_cr
) { /* could have had a lease_cr lock from previous state */
1715 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1716 string lock_name
= "sync_lock";
1717 rgw::sal::RadosStore
* store
= sync_env
->store
;
1718 lease_cr
.reset( new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1719 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1720 lock_name
, lock_duration
, this));
1721 lease_stack
.reset(spawn(lease_cr
.get(), false));
1724 while (!lease_cr
->is_locked()) {
1725 if (lease_cr
->is_done()) {
1727 tn
->log(10, "failed to take lease");
1728 return lease_cr
->get_ret_status();
1734 tn
->log(10, "took lease");
1735 // if the period has advanced, we can't use the existing marker
1736 if (sync_marker
.realm_epoch
< realm_epoch
) {
1737 ldpp_dout(sync_env
->dpp
, 4) << "clearing marker=" << sync_marker
.marker
1738 << " from old realm_epoch=" << sync_marker
.realm_epoch
1739 << " (now " << realm_epoch
<< ')' << dendl
;
1740 sync_marker
.realm_epoch
= realm_epoch
;
1741 sync_marker
.marker
.clear();
1743 mdlog_marker
= sync_marker
.marker
;
1744 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1745 sync_env
->shard_obj_name(shard_id
),
1749 * mdlog_marker: the remote sync marker positiion
1750 * sync_marker: the local sync marker position
1751 * max_marker: the max mdlog position that we fetched
1752 * marker: the current position we try to sync
1753 * period_marker: the last marker before the next period begins (optional)
1755 marker
= max_marker
= sync_marker
.marker
;
1758 if (!lease_cr
->is_locked()) {
1760 tn
->log(10, "lost lease");
1763 #define INCREMENTAL_MAX_ENTRIES 100
1764 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1765 if (!period_marker
.empty() && period_marker
<= mdlog_marker
) {
1766 tn
->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker
<< " sync_marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
));
1767 done_with_period
= true;
1770 if (mdlog_marker
<= max_marker
) {
1771 /* we're at the tip, try to bring more entries */
1772 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " syncing mdlog for shard_id=" << shard_id
<< dendl
;
1773 yield
call(new RGWCloneMetaLogCoroutine(sync_env
, mdlog
,
1775 mdlog_marker
, &mdlog_marker
));
1778 tn
->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode
));
1779 yield lease_cr
->go_down();
1781 *reset_backoff
= false; // back off and try again later
1784 *reset_backoff
= true; /* if we got to this point, all systems function */
1785 if (mdlog_marker
> max_marker
) {
1786 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1787 tn
->log(20, SSTR("mdlog_marker=" << mdlog_marker
<< " sync_marker=" << sync_marker
.marker
));
1788 marker
= max_marker
;
1789 yield
call(new RGWReadMDLogEntriesCR(sync_env
, mdlog
, shard_id
,
1790 &max_marker
, INCREMENTAL_MAX_ENTRIES
,
1791 &log_entries
, &truncated
));
1793 tn
->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode
));
1794 yield lease_cr
->go_down();
1796 *reset_backoff
= false; // back off and try again later
1799 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end() && !done_with_period
; ++log_iter
) {
1800 if (!period_marker
.empty() && period_marker
<= log_iter
->id
) {
1801 done_with_period
= true;
1802 if (period_marker
< log_iter
->id
) {
1803 tn
->log(10, SSTR("found key=" << log_iter
->id
1804 << " past period_marker=" << period_marker
));
1807 ldpp_dout(sync_env
->dpp
, 10) << "found key at period_marker=" << period_marker
<< dendl
;
1808 // sync this entry, then return control to RGWMetaSyncCR
1810 if (!mdlog_entry
.convert_from(*log_iter
)) {
1811 tn
->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id
<< " log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
<< " ... skipping entry"));
1814 tn
->log(20, SSTR("log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
));
1815 if (!marker_tracker
->start(log_iter
->id
, 0, log_iter
->timestamp
.to_real_time())) {
1816 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: cannot start syncing " << log_iter
->id
<< ". Duplicate entry?" << dendl
;
1818 raw_key
= log_iter
->section
+ ":" + log_iter
->name
;
1820 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, raw_key
, log_iter
->id
, mdlog_entry
.log_data
.status
, marker_tracker
, tn
), false);
1822 // stack_to_pos holds a reference to the stack
1823 stack_to_pos
[stack
] = log_iter
->id
;
1824 pos_to_prev
[log_iter
->id
] = marker
;
1826 // limit spawn window
1827 while (num_spawned() > META_SYNC_SPAWN_WINDOW
) {
1828 yield
wait_for_child();
1832 marker
= log_iter
->id
;
1836 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " max_marker=" << max_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1837 if (done_with_period
) {
1838 // return control to RGWMetaSyncCR and advance to the next period
1839 tn
->log(10, SSTR(*this << ": done with period"));
1842 if (mdlog_marker
== max_marker
&& can_adjust_marker
) {
1843 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1844 #define INCREMENTAL_INTERVAL 20
1845 yield
wait(utime_t(INCREMENTAL_INTERVAL
, 0));
1847 } while (can_adjust_marker
);
1849 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1851 while (num_spawned() > 1) {
1852 yield
wait_for_child();
1856 yield lease_cr
->go_down();
1864 if (!can_adjust_marker
) {
1868 return set_cr_done();
1875 class RGWMetaSyncShardControlCR
: public RGWBackoffControlCR
1877 RGWMetaSyncEnv
*sync_env
;
1879 const rgw_pool
& pool
;
1880 const std::string
& period
;
1881 epoch_t realm_epoch
;
1882 RGWMetadataLog
* mdlog
;
1884 rgw_meta_sync_marker sync_marker
;
1885 const std::string period_marker
;
1887 RGWSyncTraceNodeRef tn
;
1889 static constexpr bool exit_on_error
= false; // retry on all errors
1891 RGWMetaSyncShardControlCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1892 const std::string
& period
, epoch_t realm_epoch
,
1893 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1894 const rgw_meta_sync_marker
& _marker
,
1895 std::string
&& period_marker
,
1896 RGWSyncTraceNodeRef
& _tn_parent
)
1897 : RGWBackoffControlCR(_sync_env
->cct
, exit_on_error
), sync_env(_sync_env
),
1898 pool(_pool
), period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1899 shard_id(_shard_id
), sync_marker(_marker
),
1900 period_marker(std::move(period_marker
)) {
1901 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "shard",
1902 std::to_string(shard_id
));
1905 RGWCoroutine
*alloc_cr() override
{
1906 return new RGWMetaSyncShardCR(sync_env
, pool
, period
, realm_epoch
, mdlog
,
1907 shard_id
, sync_marker
, period_marker
, backoff_ptr(), tn
);
1910 RGWCoroutine
*alloc_finisher_cr() override
{
1911 rgw::sal::RadosStore
* store
= sync_env
->store
;
1912 return new RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>(sync_env
->dpp
, sync_env
->async_rados
, store
->svc()->sysobj
,
1913 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1918 class RGWMetaSyncCR
: public RGWCoroutine
{
1919 RGWMetaSyncEnv
*sync_env
;
1920 const rgw_pool
& pool
;
1921 RGWPeriodHistory::Cursor cursor
; //< sync position in period history
1922 RGWPeriodHistory::Cursor next
; //< next period in history
1923 rgw_meta_sync_status sync_status
;
1924 RGWSyncTraceNodeRef tn
;
1926 std::mutex mutex
; //< protect access to shard_crs
1928 // TODO: it should be enough to hold a reference on the stack only, as calling
1929 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1930 // already completed
1931 using ControlCRRef
= boost::intrusive_ptr
<RGWMetaSyncShardControlCR
>;
1932 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1933 using RefPair
= std::pair
<ControlCRRef
, StackRef
>;
1934 map
<int, RefPair
> shard_crs
;
1938 RGWMetaSyncCR(RGWMetaSyncEnv
*_sync_env
, const RGWPeriodHistory::Cursor
&cursor
,
1939 const rgw_meta_sync_status
& _sync_status
, RGWSyncTraceNodeRef
& _tn
)
1940 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1941 pool(sync_env
->store
->svc()->zone
->get_zone_params().log_pool
),
1942 cursor(cursor
), sync_status(_sync_status
), tn(_tn
) {}
1947 int operate(const DoutPrefixProvider
*dpp
) override
{
1949 // loop through one period at a time
1950 tn
->log(1, "start");
1952 if (cursor
== sync_env
->store
->svc()->mdlog
->get_period_history()->get_current()) {
1953 next
= RGWPeriodHistory::Cursor
{};
1955 ldpp_dout(dpp
, 10) << "RGWMetaSyncCR on current period="
1956 << cursor
.get_period().get_id() << dendl
;
1958 ldpp_dout(dpp
, 10) << "RGWMetaSyncCR with no period" << dendl
;
1963 ldpp_dout(dpp
, 10) << "RGWMetaSyncCR on period="
1964 << cursor
.get_period().get_id() << ", next="
1965 << next
.get_period().get_id() << dendl
;
1969 // get the mdlog for the current period (may be empty)
1970 auto& period_id
= sync_status
.sync_info
.period
;
1971 auto realm_epoch
= sync_status
.sync_info
.realm_epoch
;
1972 auto mdlog
= sync_env
->store
->svc()->mdlog
->get_log(period_id
);
1974 tn
->log(1, SSTR("realm epoch=" << realm_epoch
<< " period id=" << period_id
));
1976 // prevent wakeup() from accessing shard_crs while we're spawning them
1977 std::lock_guard
<std::mutex
> lock(mutex
);
1979 // sync this period on each shard
1980 for (const auto& m
: sync_status
.sync_markers
) {
1981 uint32_t shard_id
= m
.first
;
1982 auto& marker
= m
.second
;
1984 std::string period_marker
;
1986 // read the maximum marker from the next period's sync status
1987 period_marker
= next
.get_period().get_sync_status()[shard_id
];
1988 if (period_marker
.empty()) {
1989 // no metadata changes have occurred on this shard, skip it
1990 ldpp_dout(dpp
, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1991 << " with empty period marker" << dendl
;
1996 using ShardCR
= RGWMetaSyncShardControlCR
;
1997 auto cr
= new ShardCR(sync_env
, pool
, period_id
, realm_epoch
,
1998 mdlog
, shard_id
, marker
,
1999 std::move(period_marker
), tn
);
2000 auto stack
= spawn(cr
, false);
2001 shard_crs
[shard_id
] = RefPair
{cr
, stack
};
2004 // wait for each shard to complete
2005 while (ret
== 0 && num_spawned() > 0) {
2006 yield
wait_for_child();
2007 collect(&ret
, nullptr);
2011 // drop shard cr refs under lock
2012 std::lock_guard
<std::mutex
> lock(mutex
);
2016 return set_cr_error(ret
);
2018 // advance to the next period
2022 // write the updated sync info
2023 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
2024 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
2025 yield
call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(dpp
, sync_env
->async_rados
,
2026 sync_env
->store
->svc()->sysobj
,
2027 rgw_raw_obj(pool
, sync_env
->status_oid()),
2028 sync_status
.sync_info
));
2034 void wakeup(int shard_id
) {
2035 std::lock_guard
<std::mutex
> lock(mutex
);
2036 auto iter
= shard_crs
.find(shard_id
);
2037 if (iter
== shard_crs
.end()) {
2040 iter
->second
.first
->wakeup();
2044 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv
*env
) {
2046 env
->cct
= store
->ctx();
2049 env
->async_rados
= async_rados
;
2050 env
->http_manager
= &http_manager
;
2051 env
->error_logger
= error_logger
;
2052 env
->sync_tracer
= store
->getRados()->get_sync_tracer();
2055 int RGWRemoteMetaLog::read_sync_status(const DoutPrefixProvider
*dpp
, rgw_meta_sync_status
*sync_status
)
2057 if (store
->svc()->zone
->is_meta_master()) {
2060 // cannot run concurrently with run_sync(), so run in a separate manager
2061 RGWCoroutinesManager
crs(store
->ctx(), store
->getRados()->get_cr_registry());
2062 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
2063 int ret
= http_manager
.start();
2065 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
2068 RGWMetaSyncEnv sync_env_local
= sync_env
;
2069 sync_env_local
.http_manager
= &http_manager
;
2070 tn
->log(20, "read sync status");
2071 ret
= crs
.run(dpp
, new RGWReadSyncStatusCoroutine(&sync_env_local
, sync_status
));
2072 http_manager
.stop();
2076 int RGWRemoteMetaLog::init_sync_status(const DoutPrefixProvider
*dpp
)
2078 if (store
->svc()->zone
->is_meta_master()) {
2082 rgw_mdlog_info mdlog_info
;
2083 int r
= read_log_info(dpp
, &mdlog_info
);
2085 ldpp_dout(dpp
, -1) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
2089 rgw_meta_sync_info sync_info
;
2090 sync_info
.num_shards
= mdlog_info
.num_shards
;
2091 auto cursor
= store
->svc()->mdlog
->get_period_history()->get_current();
2093 sync_info
.period
= cursor
.get_period().get_id();
2094 sync_info
.realm_epoch
= cursor
.get_epoch();
2097 return run(dpp
, new RGWInitSyncStatusCoroutine(&sync_env
, sync_info
));
2100 int RGWRemoteMetaLog::store_sync_info(const DoutPrefixProvider
*dpp
, const rgw_meta_sync_info
& sync_info
)
2102 tn
->log(20, "store sync info");
2103 return run(dpp
, new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(dpp
, async_rados
, store
->svc()->sysobj
,
2104 rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sync_env
.status_oid()),
2108 // return a cursor to the period at our sync position
2109 static RGWPeriodHistory::Cursor
get_period_at(const DoutPrefixProvider
*dpp
,
2110 rgw::sal::RadosStore
* store
,
2111 const rgw_meta_sync_info
& info
,
2114 if (info
.period
.empty()) {
2115 // return an empty cursor with error=0
2116 return RGWPeriodHistory::Cursor
{};
2119 // look for an existing period in our history
2120 auto cursor
= store
->svc()->mdlog
->get_period_history()->lookup(info
.realm_epoch
);
2122 // verify that the period ids match
2123 auto& existing
= cursor
.get_period().get_id();
2124 if (existing
!= info
.period
) {
2125 ldpp_dout(dpp
, -1) << "ERROR: sync status period=" << info
.period
2126 << " does not match period=" << existing
2127 << " in history at realm epoch=" << info
.realm_epoch
<< dendl
;
2128 return RGWPeriodHistory::Cursor
{-EEXIST
};
2133 // read the period from rados or pull it from the master
2135 int r
= store
->svc()->mdlog
->pull_period(dpp
, info
.period
, period
, y
);
2137 ldpp_dout(dpp
, -1) << "ERROR: failed to read period id "
2138 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
2139 return RGWPeriodHistory::Cursor
{r
};
2141 // attach the period to our history
2142 cursor
= store
->svc()->mdlog
->get_period_history()->attach(dpp
, std::move(period
), y
);
2144 r
= cursor
.get_error();
2145 ldpp_dout(dpp
, -1) << "ERROR: failed to read period history back to "
2146 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
2151 int RGWRemoteMetaLog::run_sync(const DoutPrefixProvider
*dpp
, optional_yield y
)
2153 if (store
->svc()->zone
->is_meta_master()) {
2159 // get shard count and oldest log period from master
2160 rgw_mdlog_info mdlog_info
;
2163 ldpp_dout(dpp
, 1) << __func__
<< "(): going down" << dendl
;
2166 r
= read_log_info(dpp
, &mdlog_info
);
2167 if (r
== -EIO
|| r
== -ENOENT
) {
2168 // keep retrying if master isn't alive or hasn't initialized the log
2169 ldpp_dout(dpp
, 10) << __func__
<< "(): waiting for master.." << dendl
;
2170 backoff
.backoff_sleep();
2175 ldpp_dout(dpp
, -1) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
2181 rgw_meta_sync_status sync_status
;
2184 ldpp_dout(dpp
, 1) << __func__
<< "(): going down" << dendl
;
2187 r
= run(dpp
, new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2188 if (r
< 0 && r
!= -ENOENT
) {
2189 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch sync status r=" << r
<< dendl
;
2193 if (!mdlog_info
.period
.empty()) {
2194 // restart sync if the remote has a period, but:
2195 // a) our status does not, or
2196 // b) our sync period comes before the remote's oldest log period
2197 if (sync_status
.sync_info
.period
.empty() ||
2198 sync_status
.sync_info
.realm_epoch
< mdlog_info
.realm_epoch
) {
2199 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateInit
;
2201 if (sync_status
.sync_info
.period
.empty()) {
2202 reason
= "period is empty";
2204 reason
= SSTR("sync_info realm epoch is behind: " << sync_status
.sync_info
.realm_epoch
<< " < " << mdlog_info
.realm_epoch
);
2206 tn
->log(1, "initialize sync (reason: " + reason
+ ")");
2207 ldpp_dout(dpp
, 1) << "epoch=" << sync_status
.sync_info
.realm_epoch
2208 << " in sync status comes before remote's oldest mdlog epoch="
2209 << mdlog_info
.realm_epoch
<< ", restarting sync" << dendl
;
2213 if (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
) {
2214 ldpp_dout(dpp
, 20) << __func__
<< "(): init" << dendl
;
2215 sync_status
.sync_info
.num_shards
= mdlog_info
.num_shards
;
2216 auto cursor
= store
->svc()->mdlog
->get_period_history()->get_current();
2218 // run full sync, then start incremental from the current period/epoch
2219 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
2220 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
2222 r
= run(dpp
, new RGWInitSyncStatusCoroutine(&sync_env
, sync_status
.sync_info
));
2224 backoff
.backoff_sleep();
2229 ldpp_dout(dpp
, 0) << "ERROR: failed to init sync status r=" << r
<< dendl
;
2233 } while (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
);
2235 auto num_shards
= sync_status
.sync_info
.num_shards
;
2236 if (num_shards
!= mdlog_info
.num_shards
) {
2237 ldpp_dout(dpp
, -1) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info
.num_shards
<< " local num_shards=" << num_shards
<< dendl
;
2241 RGWPeriodHistory::Cursor cursor
;
2243 r
= run(dpp
, new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2244 if (r
< 0 && r
!= -ENOENT
) {
2245 tn
->log(0, SSTR("ERROR: failed to fetch sync status r=" << r
));
2249 switch ((rgw_meta_sync_info::SyncState
)sync_status
.sync_info
.state
) {
2250 case rgw_meta_sync_info::StateBuildingFullSyncMaps
:
2251 tn
->log(20, "building full sync maps");
2252 r
= run(dpp
, new RGWFetchAllMetaCR(&sync_env
, num_shards
, sync_status
.sync_markers
, tn
));
2253 if (r
== -EBUSY
|| r
== -EIO
) {
2254 backoff
.backoff_sleep();
2259 tn
->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r
<< ")"));
2263 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateSync
;
2264 r
= store_sync_info(dpp
, sync_status
.sync_info
);
2266 tn
->log(0, SSTR("ERROR: failed to update sync status (r=" << r
<< ")"));
2270 case rgw_meta_sync_info::StateSync
:
2271 tn
->log(20, "sync");
2272 // find our position in the period history (if any)
2273 cursor
= get_period_at(dpp
, store
, sync_status
.sync_info
, y
);
2274 r
= cursor
.get_error();
2278 meta_sync_cr
= new RGWMetaSyncCR(&sync_env
, cursor
, sync_status
, tn
);
2279 r
= run(dpp
, meta_sync_cr
);
2281 tn
->log(0, "ERROR: failed to fetch all metadata keys");
2286 tn
->log(0, "ERROR: bad sync state!");
2289 } while (!going_down
);
2294 void RGWRemoteMetaLog::wakeup(int shard_id
)
2296 if (!meta_sync_cr
) {
2299 meta_sync_cr
->wakeup(shard_id
);
2302 int RGWCloneMetaLogCoroutine::operate(const DoutPrefixProvider
*dpp
)
2307 ldpp_dout(dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": init request" << dendl
;
2308 return state_init();
2311 ldpp_dout(dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status" << dendl
;
2312 return state_read_shard_status();
2315 ldpp_dout(dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status complete" << dendl
;
2316 return state_read_shard_status_complete();
2319 ldpp_dout(dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": sending rest request" << dendl
;
2320 return state_send_rest_request(dpp
);
2323 ldpp_dout(dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": receiving rest response" << dendl
;
2324 return state_receive_rest_response();
2327 ldpp_dout(dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries" << dendl
;
2328 return state_store_mdlog_entries();
2330 } while (truncated
);
2332 ldpp_dout(dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries complete" << dendl
;
2333 return state_store_mdlog_entries_complete();
2340 int RGWCloneMetaLogCoroutine::state_init()
2342 data
= rgw_mdlog_shard_data();
2347 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2349 const bool add_ref
= false; // default constructs with refs=1
2351 completion
.reset(new RGWMetadataLogInfoCompletion(
2352 [this](int ret
, const cls_log_header
& header
) {
2354 if (ret
!= -ENOENT
) {
2355 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to read mdlog info with "
2356 << cpp_strerror(ret
) << dendl
;
2359 shard_info
.marker
= header
.max_marker
;
2360 shard_info
.last_update
= header
.max_time
.to_real_time();
2362 // wake up parent stack
2366 int ret
= mdlog
->get_info_async(sync_env
->dpp
, shard_id
, completion
.get());
2368 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret
<< dendl
;
2369 return set_cr_error(ret
);
2375 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2379 ldpp_dout(sync_env
->dpp
, 20) << "shard_id=" << shard_id
<< " marker=" << shard_info
.marker
<< " last_update=" << shard_info
.last_update
<< dendl
;
2381 marker
= shard_info
.marker
;
2386 int RGWCloneMetaLogCoroutine::state_send_rest_request(const DoutPrefixProvider
*dpp
)
2388 RGWRESTConn
*conn
= sync_env
->conn
;
2391 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
2393 char max_entries_buf
[32];
2394 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", max_entries
);
2396 const char *marker_key
= (marker
.empty() ? "" : "marker");
2398 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
2400 { "period", period
.c_str() },
2401 { "max-entries", max_entries_buf
},
2402 { marker_key
, marker
.c_str() },
2405 http_op
= new RGWRESTReadResource(conn
, "/admin/log", pairs
, NULL
, sync_env
->http_manager
);
2407 init_new_io(http_op
);
2409 int ret
= http_op
->aio_read(dpp
);
2411 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
2412 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
2415 return set_cr_error(ret
);
2421 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2423 int ret
= http_op
->wait(&data
, null_yield
);
2425 error_stream
<< "http operation failed: " << http_op
->to_str() << " status=" << http_op
->get_http_status() << std::endl
;
2426 ldpp_dout(sync_env
->dpp
, 5) << "failed to wait for op, ret=" << ret
<< dendl
;
2429 return set_cr_error(ret
);
2434 ldpp_dout(sync_env
->dpp
, 20) << "remote mdlog, shard_id=" << shard_id
<< " num of shard entries: " << data
.entries
.size() << dendl
;
2436 truncated
= ((int)data
.entries
.size() == max_entries
);
2438 if (data
.entries
.empty()) {
2440 *new_marker
= marker
;
2442 return set_cr_done();
2446 *new_marker
= data
.entries
.back().id
;
2453 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2455 list
<cls_log_entry
> dest_entries
;
2457 vector
<rgw_mdlog_entry
>::iterator iter
;
2458 for (iter
= data
.entries
.begin(); iter
!= data
.entries
.end(); ++iter
) {
2459 rgw_mdlog_entry
& entry
= *iter
;
2460 ldpp_dout(sync_env
->dpp
, 20) << "entry: name=" << entry
.name
<< dendl
;
2462 cls_log_entry dest_entry
;
2463 dest_entry
.id
= entry
.id
;
2464 dest_entry
.section
= entry
.section
;
2465 dest_entry
.name
= entry
.name
;
2466 dest_entry
.timestamp
= utime_t(entry
.timestamp
);
2468 encode(entry
.log_data
, dest_entry
.data
);
2470 dest_entries
.push_back(dest_entry
);
2475 RGWAioCompletionNotifier
*cn
= stack
->create_completion_notifier();
2477 int ret
= mdlog
->store_entries_in_shard(sync_env
->dpp
, dest_entries
, shard_id
, cn
->completion());
2480 ldpp_dout(sync_env
->dpp
, 10) << "failed to store md log entries shard_id=" << shard_id
<< " ret=" << ret
<< dendl
;
2481 return set_cr_error(ret
);
2486 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2488 return set_cr_done();
2491 void rgw_meta_sync_info::decode_json(JSONObj
*obj
)
2494 JSONDecoder::decode_json("status", s
, obj
);
2497 } else if (s
== "building-full-sync-maps") {
2498 state
= StateBuildingFullSyncMaps
;
2499 } else if (s
== "sync") {
2502 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
2503 JSONDecoder::decode_json("period", period
, obj
);
2504 JSONDecoder::decode_json("realm_epoch", realm_epoch
, obj
);
2507 void rgw_meta_sync_info::dump(Formatter
*f
) const
2510 switch ((SyncState
)state
) {
2514 case StateBuildingFullSyncMaps
:
2515 s
= "building-full-sync-maps";
2524 encode_json("status", s
, f
);
2525 encode_json("num_shards", num_shards
, f
);
2526 encode_json("period", period
, f
);
2527 encode_json("realm_epoch", realm_epoch
, f
);
2531 void rgw_meta_sync_marker::decode_json(JSONObj
*obj
)
2534 JSONDecoder::decode_json("state", s
, obj
);
2536 JSONDecoder::decode_json("marker", marker
, obj
);
2537 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
2538 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
2539 JSONDecoder::decode_json("pos", pos
, obj
);
2541 JSONDecoder::decode_json("timestamp", ut
, obj
);
2542 timestamp
= ut
.to_real_time();
2543 JSONDecoder::decode_json("realm_epoch", realm_epoch
, obj
);
2546 void rgw_meta_sync_marker::dump(Formatter
*f
) const
2548 encode_json("state", (int)state
, f
);
2549 encode_json("marker", marker
, f
);
2550 encode_json("next_step_marker", next_step_marker
, f
);
2551 encode_json("total_entries", total_entries
, f
);
2552 encode_json("pos", pos
, f
);
2553 encode_json("timestamp", utime_t(timestamp
), f
);
2554 encode_json("realm_epoch", realm_epoch
, f
);
2557 void rgw_meta_sync_status::decode_json(JSONObj
*obj
)
2559 JSONDecoder::decode_json("info", sync_info
, obj
);
2560 JSONDecoder::decode_json("markers", sync_markers
, obj
);
2563 void rgw_meta_sync_status::dump(Formatter
*f
) const {
2564 encode_json("info", sync_info
, f
);
2565 encode_json("markers", sync_markers
, f
);
2568 void rgw_sync_error_info::dump(Formatter
*f
) const {
2569 encode_json("source_zone", source_zone
, f
);
2570 encode_json("error_code", error_code
, f
);
2571 encode_json("message", message
, f
);