1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/optional.hpp>
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
18 #include "rgw_metadata.h"
19 #include "rgw_rest_conn.h"
20 #include "rgw_tools.h"
21 #include "rgw_cr_rados.h"
22 #include "rgw_cr_rest.h"
23 #include "rgw_http_client.h"
24 #include "rgw_sync_trace.h"
26 #include "cls/lock/cls_lock_client.h"
28 #include "services/svc_zone.h"
30 #include <boost/asio/yield.hpp>
32 #define dout_subsys ceph_subsys_rgw
35 #define dout_prefix (*_dout << "meta sync: ")
37 static string mdlog_sync_status_oid
= "mdlog.sync-status";
38 static string mdlog_sync_status_shard_prefix
= "mdlog.sync-status.shard";
39 static string mdlog_sync_full_sync_index_prefix
= "meta.full-sync.index";
41 RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados
*_store
, const string
&oid_prefix
, int _num_shards
) : store(_store
), num_shards(_num_shards
) {
42 for (int i
= 0; i
< num_shards
; i
++) {
43 oids
.push_back(get_shard_oid(oid_prefix
, i
));
46 string
RGWSyncErrorLogger::get_shard_oid(const string
& oid_prefix
, int shard_id
) {
47 char buf
[oid_prefix
.size() + 16];
48 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), shard_id
);
52 RGWCoroutine
*RGWSyncErrorLogger::log_error_cr(const string
& source_zone
, const string
& section
, const string
& name
, uint32_t error_code
, const string
& message
) {
55 rgw_sync_error_info
info(source_zone
, error_code
, message
);
58 store
->time_log_prepare_entry(entry
, real_clock::now(), section
, name
, bl
);
60 uint32_t shard_id
= ++counter
% num_shards
;
63 return new RGWRadosTimelogAddCR(store
, oids
[shard_id
], entry
);
66 void RGWSyncBackoff::update_wait_time()
71 cur_wait
= (cur_wait
<< 1);
73 if (cur_wait
>= max_secs
) {
78 void RGWSyncBackoff::backoff_sleep()
84 void RGWSyncBackoff::backoff(RGWCoroutine
*op
)
87 op
->wait(utime_t(cur_wait
, 0));
90 int RGWBackoffControlCR::operate() {
92 // retry the operation until it succeeds
95 Mutex::Locker
l(lock
);
101 Mutex::Locker
l(lock
);
108 if (retcode
!= -EBUSY
&& retcode
!= -EAGAIN
) {
109 ldout(cct
, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode
<< dendl
;
111 return set_cr_error(retcode
);
117 yield backoff
.backoff(this);
120 // run an optional finisher
121 yield
call(alloc_finisher_cr());
123 ldout(cct
, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode
<< dendl
;
124 return set_cr_error(retcode
);
126 return set_cr_done();
131 void rgw_mdlog_info::decode_json(JSONObj
*obj
) {
132 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
133 JSONDecoder::decode_json("period", period
, obj
);
134 JSONDecoder::decode_json("realm_epoch", realm_epoch
, obj
);
137 void rgw_mdlog_entry::decode_json(JSONObj
*obj
) {
138 JSONDecoder::decode_json("id", id
, obj
);
139 JSONDecoder::decode_json("section", section
, obj
);
140 JSONDecoder::decode_json("name", name
, obj
);
142 JSONDecoder::decode_json("timestamp", ut
, obj
);
143 timestamp
= ut
.to_real_time();
144 JSONDecoder::decode_json("data", log_data
, obj
);
147 void rgw_mdlog_shard_data::decode_json(JSONObj
*obj
) {
148 JSONDecoder::decode_json("marker", marker
, obj
);
149 JSONDecoder::decode_json("truncated", truncated
, obj
);
150 JSONDecoder::decode_json("entries", entries
, obj
);
153 int RGWShardCollectCR::operate() {
155 while (spawn_next()) {
158 while (current_running
>= max_concurrent
) {
160 yield
wait_for_child();
161 if (collect_next(&child_ret
)) {
163 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
164 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
170 while (current_running
> 0) {
172 yield
wait_for_child();
173 if (collect_next(&child_ret
)) {
175 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
176 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
182 return set_cr_error(status
);
184 return set_cr_done();
189 class RGWReadRemoteMDLogInfoCR
: public RGWShardCollectCR
{
190 RGWMetaSyncEnv
*sync_env
;
192 const std::string
& period
;
194 map
<int, RGWMetadataLogInfo
> *mdlog_info
;
197 #define READ_MDLOG_MAX_CONCURRENT 10
200 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv
*_sync_env
,
201 const std::string
& period
, int _num_shards
,
202 map
<int, RGWMetadataLogInfo
> *_mdlog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
204 period(period
), num_shards(_num_shards
),
205 mdlog_info(_mdlog_info
), shard_id(0) {}
206 bool spawn_next() override
;
209 class RGWListRemoteMDLogCR
: public RGWShardCollectCR
{
210 RGWMetaSyncEnv
*sync_env
;
212 const std::string
& period
;
213 map
<int, string
> shards
;
214 int max_entries_per_shard
;
215 map
<int, rgw_mdlog_shard_data
> *result
;
217 map
<int, string
>::iterator iter
;
218 #define READ_MDLOG_MAX_CONCURRENT 10
221 RGWListRemoteMDLogCR(RGWMetaSyncEnv
*_sync_env
,
222 const std::string
& period
, map
<int, string
>& _shards
,
223 int _max_entries_per_shard
,
224 map
<int, rgw_mdlog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
225 sync_env(_sync_env
), period(period
),
226 max_entries_per_shard(_max_entries_per_shard
),
228 shards
.swap(_shards
);
229 iter
= shards
.begin();
231 bool spawn_next() override
;
234 RGWRemoteMetaLog::~RGWRemoteMetaLog()
239 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info
*log_info
)
241 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
244 int ret
= conn
->get_json_resource("/admin/log", pairs
, *log_info
);
246 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch mdlog info" << dendl
;
250 ldpp_dout(dpp
, 20) << "remote mdlog, num_shards=" << log_info
->num_shards
<< dendl
;
255 int RGWRemoteMetaLog::read_master_log_shards_info(const string
&master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
)
257 if (store
->svc
.zone
->is_meta_master()) {
261 rgw_mdlog_info log_info
;
262 int ret
= read_log_info(&log_info
);
267 return run(new RGWReadRemoteMDLogInfoCR(&sync_env
, master_period
, log_info
.num_shards
, shards_info
));
270 int RGWRemoteMetaLog::read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
)
272 if (store
->svc
.zone
->is_meta_master()) {
276 return run(new RGWListRemoteMDLogCR(&sync_env
, period
, shard_markers
, 1, result
));
279 int RGWRemoteMetaLog::init()
281 conn
= store
->svc
.zone
->get_master_conn();
283 int ret
= http_manager
.start();
285 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
289 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
291 init_sync_env(&sync_env
);
293 tn
= sync_env
.sync_tracer
->add_node(sync_env
.sync_tracer
->root_node
, "meta");
298 void RGWRemoteMetaLog::finish()
304 #define CLONE_MAX_ENTRIES 100
306 int RGWMetaSyncStatusManager::init()
308 if (store
->svc
.zone
->is_meta_master()) {
312 if (!store
->svc
.zone
->get_master_conn()) {
313 lderr(store
->ctx()) << "no REST connection to master zone" << dendl
;
317 int r
= rgw_init_ioctx(store
->get_rados_handle(), store
->svc
.zone
->get_zone_params().log_pool
, ioctx
, true);
319 lderr(store
->ctx()) << "ERROR: failed to open log pool (" << store
->svc
.zone
->get_zone_params().log_pool
<< " ret=" << r
<< dendl
;
323 r
= master_log
.init();
325 lderr(store
->ctx()) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
329 RGWMetaSyncEnv
& sync_env
= master_log
.get_sync_env();
331 rgw_meta_sync_status sync_status
;
332 r
= read_sync_status(&sync_status
);
333 if (r
< 0 && r
!= -ENOENT
) {
334 lderr(store
->ctx()) << "ERROR: failed to read sync status, r=" << r
<< dendl
;
338 int num_shards
= sync_status
.sync_info
.num_shards
;
340 for (int i
= 0; i
< num_shards
; i
++) {
341 shard_objs
[i
] = rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, sync_env
.shard_obj_name(i
));
344 RWLock::WLocker
wl(ts_to_shard_lock
);
345 for (int i
= 0; i
< num_shards
; i
++) {
346 clone_markers
.push_back(string());
355 unsigned RGWMetaSyncStatusManager::get_subsys() const
360 std::ostream
& RGWMetaSyncStatusManager::gen_prefix(std::ostream
& out
) const
362 return out
<< "meta sync: ";
365 void RGWMetaSyncEnv::init(const DoutPrefixProvider
*_dpp
, CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
366 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
367 RGWSyncErrorLogger
*_error_logger
, RGWSyncTraceManager
*_sync_tracer
) {
372 async_rados
= _async_rados
;
373 http_manager
= _http_manager
;
374 error_logger
= _error_logger
;
375 sync_tracer
= _sync_tracer
;
378 string
RGWMetaSyncEnv::status_oid()
380 return mdlog_sync_status_oid
;
383 string
RGWMetaSyncEnv::shard_obj_name(int shard_id
)
385 char buf
[mdlog_sync_status_shard_prefix
.size() + 16];
386 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_status_shard_prefix
.c_str(), shard_id
);
391 class RGWAsyncReadMDLogEntries
: public RGWAsyncRadosRequest
{
393 RGWMetadataLog
*mdlog
;
397 list
<cls_log_entry
> *entries
;
401 int _send_request() override
{
407 mdlog
->init_list_entries(shard_id
, from_time
, end_time
, *marker
, &handle
);
409 int ret
= mdlog
->list_entries(handle
, max_entries
, *entries
, marker
, truncated
);
411 mdlog
->complete_list_entries(handle
);
416 RGWAsyncReadMDLogEntries(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
417 RGWMetadataLog
* mdlog
, int _shard_id
,
418 string
* _marker
, int _max_entries
,
419 list
<cls_log_entry
> *_entries
, bool *_truncated
)
420 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), mdlog(mdlog
),
421 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
),
422 entries(_entries
), truncated(_truncated
) {}
425 class RGWReadMDLogEntriesCR
: public RGWSimpleCoroutine
{
426 RGWMetaSyncEnv
*sync_env
;
427 RGWMetadataLog
*const mdlog
;
432 list
<cls_log_entry
> *entries
;
435 RGWAsyncReadMDLogEntries
*req
{nullptr};
438 RGWReadMDLogEntriesCR(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
439 int _shard_id
, string
*_marker
, int _max_entries
,
440 list
<cls_log_entry
> *_entries
, bool *_truncated
)
441 : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
442 shard_id(_shard_id
), pmarker(_marker
), max_entries(_max_entries
),
443 entries(_entries
), truncated(_truncated
) {}
445 ~RGWReadMDLogEntriesCR() override
{
451 int send_request() override
{
453 req
= new RGWAsyncReadMDLogEntries(this, stack
->create_completion_notifier(),
454 sync_env
->store
, mdlog
, shard_id
, &marker
,
455 max_entries
, entries
, truncated
);
456 sync_env
->async_rados
->queue(req
);
460 int request_complete() override
{
461 int ret
= req
->get_ret_status();
462 if (ret
>= 0 && !entries
->empty()) {
465 return req
->get_ret_status();
470 class RGWReadRemoteMDLogShardInfoCR
: public RGWCoroutine
{
472 RGWRESTReadResource
*http_op
;
474 const std::string
& period
;
476 RGWMetadataLogInfo
*shard_info
;
479 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
480 int _shard_id
, RGWMetadataLogInfo
*_shard_info
)
481 : RGWCoroutine(env
->store
->ctx()), env(env
), http_op(NULL
),
482 period(period
), shard_id(_shard_id
), shard_info(_shard_info
) {}
484 int operate() override
{
485 auto store
= env
->store
;
486 RGWRESTConn
*conn
= store
->svc
.zone
->get_master_conn();
490 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
491 rgw_http_param_pair pairs
[] = { { "type" , "metadata" },
493 { "period", period
.c_str() },
497 string p
= "/admin/log/";
499 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
,
502 init_new_io(http_op
);
504 int ret
= http_op
->aio_read();
506 ldpp_dout(env
->dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
507 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
509 return set_cr_error(ret
);
515 int ret
= http_op
->wait(shard_info
);
518 return set_cr_error(ret
);
520 return set_cr_done();
527 class RGWListRemoteMDLogShardCR
: public RGWSimpleCoroutine
{
528 RGWMetaSyncEnv
*sync_env
;
529 RGWRESTReadResource
*http_op
;
531 const std::string
& period
;
534 uint32_t max_entries
;
535 rgw_mdlog_shard_data
*result
;
538 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
539 int _shard_id
, const string
& _marker
, uint32_t _max_entries
,
540 rgw_mdlog_shard_data
*_result
)
541 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
542 period(period
), shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
544 int send_request() override
{
545 RGWRESTConn
*conn
= sync_env
->conn
;
548 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
550 char max_entries_buf
[32];
551 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
553 const char *marker_key
= (marker
.empty() ? "" : "marker");
555 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
557 { "period", period
.c_str() },
558 { "max-entries", max_entries_buf
},
559 { marker_key
, marker
.c_str() },
562 string p
= "/admin/log/";
564 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
565 init_new_io(http_op
);
567 int ret
= http_op
->aio_read();
569 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
570 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
578 int request_complete() override
{
579 int ret
= http_op
->wait(result
);
581 if (ret
< 0 && ret
!= -ENOENT
) {
582 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret
<< dendl
;
589 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
590 if (shard_id
>= num_shards
) {
593 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, period
, shard_id
, &(*mdlog_info
)[shard_id
]), false);
598 bool RGWListRemoteMDLogCR::spawn_next() {
599 if (iter
== shards
.end()) {
603 spawn(new RGWListRemoteMDLogShardCR(sync_env
, period
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
608 class RGWInitSyncStatusCoroutine
: public RGWCoroutine
{
609 RGWMetaSyncEnv
*sync_env
;
611 rgw_meta_sync_info status
;
612 vector
<RGWMetadataLogInfo
> shards_info
;
613 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
614 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
616 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
617 const rgw_meta_sync_info
&status
)
618 : RGWCoroutine(_sync_env
->store
->ctx()), sync_env(_sync_env
),
619 status(status
), shards_info(status
.num_shards
),
620 lease_cr(nullptr), lease_stack(nullptr) {}
622 ~RGWInitSyncStatusCoroutine() override
{
628 int operate() override
{
632 set_status("acquiring sync lock");
633 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
634 string lock_name
= "sync_lock";
635 RGWRados
*store
= sync_env
->store
;
636 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
637 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
638 lock_name
, lock_duration
, this));
639 lease_stack
.reset(spawn(lease_cr
.get(), false));
641 while (!lease_cr
->is_locked()) {
642 if (lease_cr
->is_done()) {
643 ldpp_dout(sync_env
->dpp
, 5) << "lease cr failed, done early " << dendl
;
644 set_status("lease lock failed, early abort");
645 return set_cr_error(lease_cr
->get_ret_status());
651 set_status("writing sync status");
652 RGWRados
*store
= sync_env
->store
;
653 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
, store
->svc
.sysobj
,
654 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
659 set_status("failed to write sync status");
660 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to write sync status, retcode=" << retcode
<< dendl
;
661 yield lease_cr
->go_down();
662 return set_cr_error(retcode
);
664 /* fetch current position in logs */
665 set_status("fetching remote log position");
667 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
668 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, status
.period
, i
,
669 &shards_info
[i
]), false);
673 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
676 set_status("updating sync status");
677 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
678 rgw_meta_sync_marker marker
;
679 RGWMetadataLogInfo
& info
= shards_info
[i
];
680 marker
.next_step_marker
= info
.marker
;
681 marker
.timestamp
= info
.last_update
;
682 RGWRados
*store
= sync_env
->store
;
683 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
,
685 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, sync_env
->shard_obj_name(i
)),
690 set_status("changing sync state: build full sync maps");
691 status
.state
= rgw_meta_sync_info::StateBuildingFullSyncMaps
;
692 RGWRados
*store
= sync_env
->store
;
693 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
, store
->svc
.sysobj
,
694 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
697 set_status("drop lock lease");
698 yield lease_cr
->go_down();
699 while (collect(&ret
, NULL
)) {
701 return set_cr_error(ret
);
706 return set_cr_done();
712 class RGWReadSyncStatusMarkersCR
: public RGWShardCollectCR
{
713 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
716 const int num_shards
;
718 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
721 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv
*env
, int num_shards
,
722 map
<uint32_t, rgw_meta_sync_marker
>& markers
)
723 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
724 env(env
), num_shards(num_shards
), markers(markers
)
726 bool spawn_next() override
;
729 bool RGWReadSyncStatusMarkersCR::spawn_next()
731 if (shard_id
>= num_shards
) {
734 using CR
= RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>;
735 rgw_raw_obj obj
{env
->store
->svc
.zone
->get_zone_params().log_pool
,
736 env
->shard_obj_name(shard_id
)};
737 spawn(new CR(env
->async_rados
, env
->store
->svc
.sysobj
, obj
, &markers
[shard_id
]), false);
742 class RGWReadSyncStatusCoroutine
: public RGWCoroutine
{
743 RGWMetaSyncEnv
*sync_env
;
744 rgw_meta_sync_status
*sync_status
;
747 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
748 rgw_meta_sync_status
*_status
)
749 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
751 int operate() override
;
754 int RGWReadSyncStatusCoroutine::operate()
758 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_meta_sync_info
>;
760 bool empty_on_enoent
= false; // fail on ENOENT
761 rgw_raw_obj obj
{sync_env
->store
->svc
.zone
->get_zone_params().log_pool
,
762 sync_env
->status_oid()};
763 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
->svc
.sysobj
, obj
,
764 &sync_status
->sync_info
, empty_on_enoent
));
767 ldpp_dout(sync_env
->dpp
, 4) << "failed to read sync status info with "
768 << cpp_strerror(retcode
) << dendl
;
769 return set_cr_error(retcode
);
771 // read shard markers
772 using ReadMarkersCR
= RGWReadSyncStatusMarkersCR
;
773 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
774 sync_status
->sync_markers
));
776 ldpp_dout(sync_env
->dpp
, 4) << "failed to read sync status markers with "
777 << cpp_strerror(retcode
) << dendl
;
778 return set_cr_error(retcode
);
780 return set_cr_done();
785 class RGWFetchAllMetaCR
: public RGWCoroutine
{
786 RGWMetaSyncEnv
*sync_env
;
793 list
<string
> sections
;
794 list
<string
>::iterator sections_iter
;
796 struct meta_list_result
{
800 bool truncated
{false};
802 void decode_json(JSONObj
*obj
) {
803 JSONDecoder::decode_json("keys", keys
, obj
);
804 JSONDecoder::decode_json("marker", marker
, obj
);
805 JSONDecoder::decode_json("count", count
, obj
);
806 JSONDecoder::decode_json("truncated", truncated
, obj
);
809 list
<string
>::iterator iter
;
811 std::unique_ptr
<RGWShardedOmapCRManager
> entries_index
;
813 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
814 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
820 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
822 RGWSyncTraceNodeRef tn
;
825 RGWFetchAllMetaCR(RGWMetaSyncEnv
*_sync_env
, int _num_shards
,
826 map
<uint32_t, rgw_meta_sync_marker
>& _markers
,
827 RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
828 num_shards(_num_shards
),
829 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
830 lost_lock(false), failed(false), markers(_markers
) {
831 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "fetch_all_meta");
834 ~RGWFetchAllMetaCR() override
{
837 void append_section_from_set(set
<string
>& all_sections
, const string
& name
) {
838 set
<string
>::iterator iter
= all_sections
.find(name
);
839 if (iter
!= all_sections
.end()) {
840 sections
.emplace_back(std::move(*iter
));
841 all_sections
.erase(iter
);
845 * meta sync should go in the following order: user, bucket.instance, bucket
846 * then whatever other sections exist (if any)
848 void rearrange_sections() {
849 set
<string
> all_sections
;
850 std::move(sections
.begin(), sections
.end(),
851 std::inserter(all_sections
, all_sections
.end()));
854 append_section_from_set(all_sections
, "user");
855 append_section_from_set(all_sections
, "bucket.instance");
856 append_section_from_set(all_sections
, "bucket");
858 std::move(all_sections
.begin(), all_sections
.end(),
859 std::back_inserter(sections
));
862 int operate() override
{
863 RGWRESTConn
*conn
= sync_env
->conn
;
867 set_status(string("acquiring lock (") + sync_env
->status_oid() + ")");
868 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
869 string lock_name
= "sync_lock";
870 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
,
872 rgw_raw_obj(sync_env
->store
->svc
.zone
->get_zone_params().log_pool
, sync_env
->status_oid()),
873 lock_name
, lock_duration
, this));
874 lease_stack
.reset(spawn(lease_cr
.get(), false));
876 while (!lease_cr
->is_locked()) {
877 if (lease_cr
->is_done()) {
878 ldpp_dout(sync_env
->dpp
, 5) << "lease cr failed, done early " << dendl
;
879 set_status("failed acquiring lock");
880 return set_cr_error(lease_cr
->get_ret_status());
885 entries_index
.reset(new RGWShardedOmapCRManager(sync_env
->async_rados
, sync_env
->store
, this, num_shards
,
886 sync_env
->store
->svc
.zone
->get_zone_params().log_pool
,
887 mdlog_sync_full_sync_index_prefix
));
889 call(new RGWReadRESTResourceCR
<list
<string
> >(cct
, conn
, sync_env
->http_manager
,
890 "/admin/metadata", NULL
, §ions
));
892 if (get_ret_status() < 0) {
893 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to fetch metadata sections" << dendl
;
894 yield entries_index
->finish();
895 yield lease_cr
->go_down();
897 return set_cr_error(get_ret_status());
899 rearrange_sections();
900 sections_iter
= sections
.begin();
901 for (; sections_iter
!= sections
.end(); ++sections_iter
) {
904 #define META_FULL_SYNC_CHUNK_SIZE "1000"
905 string entrypoint
= string("/admin/metadata/") + *sections_iter
;
906 rgw_http_param_pair pairs
[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE
},
907 { "marker", result
.marker
.c_str() },
910 call(new RGWReadRESTResourceCR
<meta_list_result
>(cct
, conn
, sync_env
->http_manager
,
911 entrypoint
, pairs
, &result
));
913 ret_status
= get_ret_status();
914 if (ret_status
== -ENOENT
) {
915 set_retcode(0); /* reset coroutine status so that we don't return it */
918 if (ret_status
< 0) {
919 tn
->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter
));
920 yield entries_index
->finish();
921 yield lease_cr
->go_down();
923 return set_cr_error(ret_status
);
925 iter
= result
.keys
.begin();
926 for (; iter
!= result
.keys
.end(); ++iter
) {
927 if (!lease_cr
->is_locked()) {
931 yield
; // allow entries_index consumer to make progress
933 tn
->log(20, SSTR("list metadata: section=" << *sections_iter
<< " key=" << *iter
));
934 string s
= *sections_iter
+ ":" + *iter
;
936 RGWRados
*store
= sync_env
->store
;
937 int ret
= store
->meta_mgr
->get_log_shard_id(*sections_iter
, *iter
, &shard_id
);
939 tn
->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter
<< ":" << *iter
));
943 if (!entries_index
->append(s
, shard_id
)) {
947 } while (result
.truncated
);
950 if (!entries_index
->finish()) {
955 for (map
<uint32_t, rgw_meta_sync_marker
>::iterator iter
= markers
.begin(); iter
!= markers
.end(); ++iter
) {
956 int shard_id
= (int)iter
->first
;
957 rgw_meta_sync_marker
& marker
= iter
->second
;
958 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
959 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
, sync_env
->store
->svc
.sysobj
,
960 rgw_raw_obj(sync_env
->store
->svc
.zone
->get_zone_params().log_pool
, sync_env
->shard_obj_name(shard_id
)),
965 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
967 yield lease_cr
->go_down();
970 while (collect(&ret
, NULL
)) {
972 return set_cr_error(ret
);
978 yield
return set_cr_error(-EIO
);
981 yield
return set_cr_error(-EBUSY
);
984 if (ret_status
< 0) {
985 yield
return set_cr_error(ret_status
);
988 yield
return set_cr_done();
994 static string
full_sync_index_shard_oid(int shard_id
)
996 char buf
[mdlog_sync_full_sync_index_prefix
.size() + 16];
997 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_full_sync_index_prefix
.c_str(), shard_id
);
1001 class RGWReadRemoteMetadataCR
: public RGWCoroutine
{
1002 RGWMetaSyncEnv
*sync_env
;
1004 RGWRESTReadResource
*http_op
;
1011 RGWSyncTraceNodeRef tn
;
1014 RGWReadRemoteMetadataCR(RGWMetaSyncEnv
*_sync_env
,
1015 const string
& _section
, const string
& _key
, bufferlist
*_pbl
,
1016 const RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1021 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "read_remote_meta",
1022 section
+ ":" + key
);
1025 int operate() override
{
1026 RGWRESTConn
*conn
= sync_env
->conn
;
1029 rgw_http_param_pair pairs
[] = { { "key" , key
.c_str()},
1032 string p
= string("/admin/metadata/") + section
+ "/" + key
;
1034 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
1036 init_new_io(http_op
);
1038 int ret
= http_op
->aio_read();
1040 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
1041 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
1043 return set_cr_error(ret
);
1049 int ret
= http_op
->wait(pbl
);
1052 return set_cr_error(ret
);
1054 return set_cr_done();
1061 class RGWAsyncMetaStoreEntry
: public RGWAsyncRadosRequest
{
1066 int _send_request() override
{
1067 int ret
= store
->meta_mgr
->put(raw_key
, bl
, RGWMetadataHandler::APPLY_ALWAYS
);
1069 ldout(store
->ctx(), 0) << "ERROR: can't store key: " << raw_key
<< " ret=" << ret
<< dendl
;
1075 RGWAsyncMetaStoreEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
1076 const string
& _raw_key
,
1077 bufferlist
& _bl
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1078 raw_key(_raw_key
), bl(_bl
) {}
1082 class RGWMetaStoreEntryCR
: public RGWSimpleCoroutine
{
1083 RGWMetaSyncEnv
*sync_env
;
1087 RGWAsyncMetaStoreEntry
*req
;
1090 RGWMetaStoreEntryCR(RGWMetaSyncEnv
*_sync_env
,
1091 const string
& _raw_key
,
1092 bufferlist
& _bl
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1093 raw_key(_raw_key
), bl(_bl
), req(NULL
) {
1096 ~RGWMetaStoreEntryCR() override
{
1102 int send_request() override
{
1103 req
= new RGWAsyncMetaStoreEntry(this, stack
->create_completion_notifier(),
1104 sync_env
->store
, raw_key
, bl
);
1105 sync_env
->async_rados
->queue(req
);
1109 int request_complete() override
{
1110 return req
->get_ret_status();
1114 class RGWAsyncMetaRemoveEntry
: public RGWAsyncRadosRequest
{
1118 int _send_request() override
{
1119 int ret
= store
->meta_mgr
->remove(raw_key
);
1121 ldout(store
->ctx(), 0) << "ERROR: can't remove key: " << raw_key
<< " ret=" << ret
<< dendl
;
1127 RGWAsyncMetaRemoveEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
1128 const string
& _raw_key
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1129 raw_key(_raw_key
) {}
1133 class RGWMetaRemoveEntryCR
: public RGWSimpleCoroutine
{
1134 RGWMetaSyncEnv
*sync_env
;
1137 RGWAsyncMetaRemoveEntry
*req
;
1140 RGWMetaRemoveEntryCR(RGWMetaSyncEnv
*_sync_env
,
1141 const string
& _raw_key
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1142 raw_key(_raw_key
), req(NULL
) {
1145 ~RGWMetaRemoveEntryCR() override
{
1151 int send_request() override
{
1152 req
= new RGWAsyncMetaRemoveEntry(this, stack
->create_completion_notifier(),
1153 sync_env
->store
, raw_key
);
1154 sync_env
->async_rados
->queue(req
);
1158 int request_complete() override
{
1159 int r
= req
->get_ret_status();
1167 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1170 int RGWLastCallerWinsCR::operate() {
1171 RGWCoroutine
*call_cr
;
1176 yield
call(call_cr
);
1177 /* cr might have been modified at this point */
1179 return set_cr_done();
1184 class RGWMetaSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
1185 RGWMetaSyncEnv
*sync_env
;
1188 rgw_meta_sync_marker sync_marker
;
1190 RGWSyncTraceNodeRef tn
;
1193 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv
*_sync_env
,
1194 const string
& _marker_oid
,
1195 const rgw_meta_sync_marker
& _marker
,
1196 RGWSyncTraceNodeRef
& _tn
) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW
),
1197 sync_env(_sync_env
),
1198 marker_oid(_marker_oid
),
1199 sync_marker(_marker
),
1202 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
1203 sync_marker
.marker
= new_marker
;
1204 if (index_pos
> 0) {
1205 sync_marker
.pos
= index_pos
;
1208 if (!real_clock::is_zero(timestamp
)) {
1209 sync_marker
.timestamp
= timestamp
;
1212 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< " realm_epoch=" << sync_marker
.realm_epoch
<< dendl
;
1213 tn
->log(20, SSTR("new marker=" << new_marker
));
1214 RGWRados
*store
= sync_env
->store
;
1215 return new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
,
1217 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, marker_oid
),
1221 RGWOrderCallCR
*allocate_order_control_cr() override
{
1222 return new RGWLastCallerWinsCR(sync_env
->cct
);
1226 RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv
*_sync_env
,
1227 const string
& _raw_key
, const string
& _entry_marker
,
1228 const RGWMDLogStatus
& _op_status
,
1229 RGWMetaSyncShardMarkerTrack
*_marker_tracker
, const RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
),
1230 sync_env(_sync_env
),
1231 raw_key(_raw_key
), entry_marker(_entry_marker
),
1232 op_status(_op_status
),
1233 pos(0), sync_status(0),
1234 marker_tracker(_marker_tracker
), tries(0) {
1235 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_meta_inject_err_probability
> 0);
1236 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", raw_key
);
1239 int RGWMetaSyncSingleEntryCR::operate() {
1241 #define NUM_TRANSIENT_ERROR_RETRIES 10
1243 if (error_injection
&&
1244 rand() % 10000 < cct
->_conf
->rgw_sync_meta_inject_err_probability
* 10000.0) {
1245 ldpp_dout(sync_env
->dpp
, 0) << __FILE__
<< ":" << __LINE__
<< ": injecting meta sync error on key=" << raw_key
<< dendl
;
1246 return set_cr_error(-EIO
);
1249 if (op_status
!= MDLOG_STATUS_COMPLETE
) {
1250 tn
->log(20, "skipping pending operation");
1251 yield
call(marker_tracker
->finish(entry_marker
));
1253 return set_cr_error(retcode
);
1255 return set_cr_done();
1257 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
1258 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1260 pos
= raw_key
.find(':');
1261 section
= raw_key
.substr(0, pos
);
1262 key
= raw_key
.substr(pos
+ 1);
1263 tn
->log(10, SSTR("fetching remote metadata entry" << (tries
== 0 ? "" : " (retry)")));
1264 call(new RGWReadRemoteMetadataCR(sync_env
, section
, key
, &md_bl
, tn
));
1267 sync_status
= retcode
;
1269 if (sync_status
== -ENOENT
) {
1270 /* FIXME: do we need to remove the entry from the local zone? */
1274 if ((sync_status
== -EAGAIN
|| sync_status
== -ECANCELED
) && (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1)) {
1275 ldpp_dout(sync_env
->dpp
, 20) << *this << ": failed to fetch remote metadata: " << section
<< ":" << key
<< ", will retry" << dendl
;
1279 if (sync_status
< 0) {
1280 tn
->log(10, SSTR("failed to send read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
));
1281 log_error() << "failed to send read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
<< std::endl
;
1282 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), section
, key
, -sync_status
,
1283 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status
)));
1284 return set_cr_error(sync_status
);
1291 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1292 if (sync_status
!= -ENOENT
) {
1293 tn
->log(10, SSTR("storing local metadata entry"));
1294 yield
call(new RGWMetaStoreEntryCR(sync_env
, raw_key
, md_bl
));
1296 tn
->log(10, SSTR("removing local metadata entry"));
1297 yield
call(new RGWMetaRemoveEntryCR(sync_env
, raw_key
));
1299 if ((retcode
== -EAGAIN
|| retcode
== -ECANCELED
) && (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1)) {
1300 ldpp_dout(sync_env
->dpp
, 20) << *this << ": failed to store metadata: " << section
<< ":" << key
<< ", got retcode=" << retcode
<< dendl
;
1306 sync_status
= retcode
;
1308 if (sync_status
== 0 && marker_tracker
) {
1310 yield
call(marker_tracker
->finish(entry_marker
));
1311 sync_status
= retcode
;
1313 if (sync_status
< 0) {
1314 tn
->log(10, SSTR("failed, status=" << sync_status
));
1315 return set_cr_error(sync_status
);
1317 tn
->log(10, "success");
1318 return set_cr_done();
1323 class RGWCloneMetaLogCoroutine
: public RGWCoroutine
{
1324 RGWMetaSyncEnv
*sync_env
;
1325 RGWMetadataLog
*mdlog
;
1327 const std::string
& period
;
1330 bool truncated
= false;
1333 int max_entries
= CLONE_MAX_ENTRIES
;
1335 RGWRESTReadResource
*http_op
= nullptr;
1336 boost::intrusive_ptr
<RGWMetadataLogInfoCompletion
> completion
;
1338 RGWMetadataLogInfo shard_info
;
1339 rgw_mdlog_shard_data data
;
1342 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
1343 const std::string
& period
, int _id
,
1344 const string
& _marker
, string
*_new_marker
)
1345 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
1346 period(period
), shard_id(_id
), marker(_marker
), new_marker(_new_marker
) {
1348 *new_marker
= marker
;
1351 ~RGWCloneMetaLogCoroutine() override
{
1356 completion
->cancel();
1360 int operate() override
;
1363 int state_read_shard_status();
1364 int state_read_shard_status_complete();
1365 int state_send_rest_request();
1366 int state_receive_rest_response();
1367 int state_store_mdlog_entries();
1368 int state_store_mdlog_entries_complete();
1371 class RGWMetaSyncShardCR
: public RGWCoroutine
{
1372 RGWMetaSyncEnv
*sync_env
;
1374 const rgw_pool
& pool
;
1375 const std::string
& period
; //< currently syncing period id
1376 const epoch_t realm_epoch
; //< realm_epoch of period
1377 RGWMetadataLog
* mdlog
; //< log of syncing period
1379 rgw_meta_sync_marker
& sync_marker
;
1380 boost::optional
<rgw_meta_sync_marker
> temp_marker
; //< for pending updates
1383 const std::string
& period_marker
; //< max marker stored in next period
1385 RGWRadosGetOmapKeysCR::ResultPtr omapkeys
;
1386 std::set
<std::string
> entries
;
1387 std::set
<std::string
>::iterator iter
;
1391 RGWMetaSyncShardMarkerTrack
*marker_tracker
= nullptr;
1393 list
<cls_log_entry
> log_entries
;
1394 list
<cls_log_entry
>::iterator log_iter
;
1395 bool truncated
= false;
1397 string mdlog_marker
;
1399 rgw_mdlog_entry mdlog_entry
;
1404 boost::asio::coroutine incremental_cr
;
1405 boost::asio::coroutine full_cr
;
1407 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1408 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1410 bool lost_lock
= false;
1412 bool *reset_backoff
;
1414 // hold a reference to the cr stack while it's in the map
1415 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1416 map
<StackRef
, string
> stack_to_pos
;
1417 map
<string
, string
> pos_to_prev
;
1419 bool can_adjust_marker
= false;
1420 bool done_with_period
= false;
1422 int total_entries
= 0;
1424 RGWSyncTraceNodeRef tn
;
1426 RGWMetaSyncShardCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1427 const std::string
& period
, epoch_t realm_epoch
,
1428 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1429 rgw_meta_sync_marker
& _marker
,
1430 const std::string
& period_marker
, bool *_reset_backoff
,
1431 RGWSyncTraceNodeRef
& _tn
)
1432 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), pool(_pool
),
1433 period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1434 shard_id(_shard_id
), sync_marker(_marker
),
1435 period_marker(period_marker
), inc_lock("RGWMetaSyncShardCR::inc_lock"),
1436 reset_backoff(_reset_backoff
), tn(_tn
) {
1437 *reset_backoff
= false;
1440 ~RGWMetaSyncShardCR() override
{
1441 delete marker_tracker
;
1447 void set_marker_tracker(RGWMetaSyncShardMarkerTrack
*mt
) {
1448 delete marker_tracker
;
1449 marker_tracker
= mt
;
1452 int operate() override
{
1455 switch (sync_marker
.state
) {
1456 case rgw_meta_sync_marker::FullSync
:
1459 ldpp_dout(sync_env
->dpp
, 10) << "sync: full_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1460 return set_cr_error(r
);
1463 case rgw_meta_sync_marker::IncrementalSync
:
1464 r
= incremental_sync();
1466 ldpp_dout(sync_env
->dpp
, 10) << "sync: incremental_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1467 return set_cr_error(r
);
1476 void collect_children()
1479 RGWCoroutinesStack
*child
;
1480 while (collect_next(&child_ret
, &child
)) {
1481 auto iter
= stack_to_pos
.find(child
);
1482 if (iter
== stack_to_pos
.end()) {
1483 /* some other stack that we don't care about */
1487 string
& pos
= iter
->second
;
1489 if (child_ret
< 0) {
1490 ldpp_dout(sync_env
->dpp
, 0) << *this << ": child operation stack=" << child
<< " entry=" << pos
<< " returned " << child_ret
<< dendl
;
1493 map
<string
, string
>::iterator prev_iter
= pos_to_prev
.find(pos
);
1494 ceph_assert(prev_iter
!= pos_to_prev
.end());
1497 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1498 * update the marker and abort. We'll get called again for these. Permanent errors will be
1499 * handled by marking the entry at the error log shard, so that we retry on it separately
1501 if (child_ret
== -EAGAIN
) {
1502 can_adjust_marker
= false;
1505 if (pos_to_prev
.size() == 1) {
1506 if (can_adjust_marker
) {
1507 sync_marker
.marker
= pos
;
1509 pos_to_prev
.erase(prev_iter
);
1511 ceph_assert(pos_to_prev
.size() > 1);
1512 pos_to_prev
.erase(prev_iter
);
1513 prev_iter
= pos_to_prev
.begin();
1514 if (can_adjust_marker
) {
1515 sync_marker
.marker
= prev_iter
->second
;
1519 ldpp_dout(sync_env
->dpp
, 4) << *this << ": adjusting marker pos=" << sync_marker
.marker
<< dendl
;
1520 stack_to_pos
.erase(iter
);
1525 #define OMAP_GET_MAX_ENTRIES 100
1526 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1528 set_status("full_sync");
1529 tn
->log(10, "start full sync");
1530 oid
= full_sync_index_shard_oid(shard_id
);
1531 can_adjust_marker
= true;
1534 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1535 string lock_name
= "sync_lock";
1536 RGWRados
*store
= sync_env
->store
;
1537 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1538 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1539 lock_name
, lock_duration
, this));
1540 lease_stack
.reset(spawn(lease_cr
.get(), false));
1543 while (!lease_cr
->is_locked()) {
1544 if (lease_cr
->is_done()) {
1546 tn
->log(5, "failed to take lease");
1547 return lease_cr
->get_ret_status();
1552 tn
->log(10, "took lease");
1554 /* lock succeeded, a retry now should avoid previous backoff status */
1555 *reset_backoff
= true;
1557 /* prepare marker tracker */
1558 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1559 sync_env
->shard_obj_name(shard_id
),
1562 marker
= sync_marker
.marker
;
1564 total_entries
= sync_marker
.pos
;
1568 if (!lease_cr
->is_locked()) {
1569 tn
->log(10, "lost lease");
1573 omapkeys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
1574 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
),
1575 marker
, max_entries
, omapkeys
));
1577 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: " << __func__
<< "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode
<< dendl
;
1578 tn
->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode
));
1579 yield lease_cr
->go_down();
1583 entries
= std::move(omapkeys
->entries
);
1584 tn
->log(20, SSTR("retrieved " << entries
.size() << " entries to sync"));
1585 if (entries
.size() > 0) {
1586 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1588 iter
= entries
.begin();
1589 for (; iter
!= entries
.end(); ++iter
) {
1591 tn
->log(20, SSTR("full sync: " << marker
));
1593 if (!marker_tracker
->start(marker
, total_entries
, real_time())) {
1594 tn
->log(0, SSTR("ERROR: cannot start syncing " << marker
<< ". Duplicate entry?"));
1596 // fetch remote and write locally
1598 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, marker
, marker
, MDLOG_STATUS_COMPLETE
, marker_tracker
, tn
), false);
1599 // stack_to_pos holds a reference to the stack
1600 stack_to_pos
[stack
] = marker
;
1601 pos_to_prev
[marker
] = marker
;
1606 } while (omapkeys
->more
&& can_adjust_marker
);
1608 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1610 while (num_spawned() > 1) {
1611 yield
wait_for_child();
1616 /* update marker to reflect we're done with full sync */
1617 if (can_adjust_marker
) {
1618 // apply updates to a temporary marker, or operate() will send us
1619 // to incremental_sync() after we yield
1620 temp_marker
= sync_marker
;
1621 temp_marker
->state
= rgw_meta_sync_marker::IncrementalSync
;
1622 temp_marker
->marker
= std::move(temp_marker
->next_step_marker
);
1623 temp_marker
->next_step_marker
.clear();
1624 temp_marker
->realm_epoch
= realm_epoch
;
1625 ldpp_dout(sync_env
->dpp
, 4) << *this << ": saving marker pos=" << temp_marker
->marker
<< " realm_epoch=" << realm_epoch
<< dendl
;
1627 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>;
1628 yield
call(new WriteMarkerCR(sync_env
->async_rados
, sync_env
->store
->svc
.sysobj
,
1629 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1634 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to set sync marker: retcode=" << retcode
<< dendl
;
1635 yield lease_cr
->go_down();
1642 * if we reached here, it means that lost_lock is true, otherwise the state
1643 * change in the previous block will prevent us from reaching here
1646 yield lease_cr
->go_down();
1652 if (!can_adjust_marker
) {
1660 tn
->log(10, "full sync complete");
1662 // apply the sync marker update
1663 ceph_assert(temp_marker
);
1664 sync_marker
= std::move(*temp_marker
);
1665 temp_marker
= boost::none
;
1666 // must not yield after this point!
1672 int incremental_sync() {
1673 reenter(&incremental_cr
) {
1674 set_status("incremental_sync");
1675 tn
->log(10, "start incremental sync");
1676 can_adjust_marker
= true;
1678 if (!lease_cr
) { /* could have had a lease_cr lock from previous state */
1680 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1681 string lock_name
= "sync_lock";
1682 RGWRados
*store
= sync_env
->store
;
1683 lease_cr
.reset( new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1684 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1685 lock_name
, lock_duration
, this));
1686 lease_stack
.reset(spawn(lease_cr
.get(), false));
1689 while (!lease_cr
->is_locked()) {
1690 if (lease_cr
->is_done()) {
1692 tn
->log(10, "failed to take lease");
1693 return lease_cr
->get_ret_status();
1699 tn
->log(10, "took lease");
1700 // if the period has advanced, we can't use the existing marker
1701 if (sync_marker
.realm_epoch
< realm_epoch
) {
1702 ldpp_dout(sync_env
->dpp
, 4) << "clearing marker=" << sync_marker
.marker
1703 << " from old realm_epoch=" << sync_marker
.realm_epoch
1704 << " (now " << realm_epoch
<< ')' << dendl
;
1705 sync_marker
.realm_epoch
= realm_epoch
;
1706 sync_marker
.marker
.clear();
1708 mdlog_marker
= sync_marker
.marker
;
1709 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1710 sync_env
->shard_obj_name(shard_id
),
1714 * mdlog_marker: the remote sync marker positiion
1715 * sync_marker: the local sync marker position
1716 * max_marker: the max mdlog position that we fetched
1717 * marker: the current position we try to sync
1718 * period_marker: the last marker before the next period begins (optional)
1720 marker
= max_marker
= sync_marker
.marker
;
1723 if (!lease_cr
->is_locked()) {
1725 tn
->log(10, "lost lease");
1728 #define INCREMENTAL_MAX_ENTRIES 100
1729 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1730 if (!period_marker
.empty() && period_marker
<= mdlog_marker
) {
1731 tn
->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker
<< " sync_marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
));
1732 done_with_period
= true;
1735 if (mdlog_marker
<= max_marker
) {
1736 /* we're at the tip, try to bring more entries */
1737 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " syncing mdlog for shard_id=" << shard_id
<< dendl
;
1738 yield
call(new RGWCloneMetaLogCoroutine(sync_env
, mdlog
,
1740 mdlog_marker
, &mdlog_marker
));
1743 tn
->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode
));
1744 yield lease_cr
->go_down();
1746 *reset_backoff
= false; // back off and try again later
1749 *reset_backoff
= true; /* if we got to this point, all systems function */
1750 if (mdlog_marker
> max_marker
) {
1751 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1752 tn
->log(20, SSTR("mdlog_marker=" << mdlog_marker
<< " sync_marker=" << sync_marker
.marker
));
1753 marker
= max_marker
;
1754 yield
call(new RGWReadMDLogEntriesCR(sync_env
, mdlog
, shard_id
,
1755 &max_marker
, INCREMENTAL_MAX_ENTRIES
,
1756 &log_entries
, &truncated
));
1758 tn
->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode
));
1759 yield lease_cr
->go_down();
1761 *reset_backoff
= false; // back off and try again later
1764 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end() && !done_with_period
; ++log_iter
) {
1765 if (!period_marker
.empty() && period_marker
<= log_iter
->id
) {
1766 done_with_period
= true;
1767 if (period_marker
< log_iter
->id
) {
1768 tn
->log(10, SSTR("found key=" << log_iter
->id
1769 << " past period_marker=" << period_marker
));
1772 ldpp_dout(sync_env
->dpp
, 10) << "found key at period_marker=" << period_marker
<< dendl
;
1773 // sync this entry, then return control to RGWMetaSyncCR
1775 if (!mdlog_entry
.convert_from(*log_iter
)) {
1776 tn
->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id
<< " log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
<< " ... skipping entry"));
1779 tn
->log(20, SSTR("log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
));
1780 if (!marker_tracker
->start(log_iter
->id
, 0, log_iter
->timestamp
.to_real_time())) {
1781 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: cannot start syncing " << log_iter
->id
<< ". Duplicate entry?" << dendl
;
1783 raw_key
= log_iter
->section
+ ":" + log_iter
->name
;
1785 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, raw_key
, log_iter
->id
, mdlog_entry
.log_data
.status
, marker_tracker
, tn
), false);
1787 // stack_to_pos holds a reference to the stack
1788 stack_to_pos
[stack
] = log_iter
->id
;
1789 pos_to_prev
[log_iter
->id
] = marker
;
1792 marker
= log_iter
->id
;
1796 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " max_marker=" << max_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1797 if (done_with_period
) {
1798 // return control to RGWMetaSyncCR and advance to the next period
1799 tn
->log(10, SSTR(*this << ": done with period"));
1802 if (mdlog_marker
== max_marker
&& can_adjust_marker
) {
1803 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1804 #define INCREMENTAL_INTERVAL 20
1805 yield
wait(utime_t(INCREMENTAL_INTERVAL
, 0));
1807 } while (can_adjust_marker
);
1809 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1811 while (num_spawned() > 1) {
1812 yield
wait_for_child();
1816 yield lease_cr
->go_down();
1824 if (!can_adjust_marker
) {
1828 return set_cr_done();
1835 class RGWMetaSyncShardControlCR
: public RGWBackoffControlCR
1837 RGWMetaSyncEnv
*sync_env
;
1839 const rgw_pool
& pool
;
1840 const std::string
& period
;
1841 epoch_t realm_epoch
;
1842 RGWMetadataLog
* mdlog
;
1844 rgw_meta_sync_marker sync_marker
;
1845 const std::string period_marker
;
1847 RGWSyncTraceNodeRef tn
;
1849 static constexpr bool exit_on_error
= false; // retry on all errors
1851 RGWMetaSyncShardControlCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1852 const std::string
& period
, epoch_t realm_epoch
,
1853 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1854 const rgw_meta_sync_marker
& _marker
,
1855 std::string
&& period_marker
,
1856 RGWSyncTraceNodeRef
& _tn_parent
)
1857 : RGWBackoffControlCR(_sync_env
->cct
, exit_on_error
), sync_env(_sync_env
),
1858 pool(_pool
), period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1859 shard_id(_shard_id
), sync_marker(_marker
),
1860 period_marker(std::move(period_marker
)) {
1861 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "shard",
1862 std::to_string(shard_id
));
1865 RGWCoroutine
*alloc_cr() override
{
1866 return new RGWMetaSyncShardCR(sync_env
, pool
, period
, realm_epoch
, mdlog
,
1867 shard_id
, sync_marker
, period_marker
, backoff_ptr(), tn
);
1870 RGWCoroutine
*alloc_finisher_cr() override
{
1871 RGWRados
*store
= sync_env
->store
;
1872 return new RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
, store
->svc
.sysobj
,
1873 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1878 class RGWMetaSyncCR
: public RGWCoroutine
{
1879 RGWMetaSyncEnv
*sync_env
;
1880 const rgw_pool
& pool
;
1881 RGWPeriodHistory::Cursor cursor
; //< sync position in period history
1882 RGWPeriodHistory::Cursor next
; //< next period in history
1883 rgw_meta_sync_status sync_status
;
1884 RGWSyncTraceNodeRef tn
;
1886 std::mutex mutex
; //< protect access to shard_crs
1888 // TODO: it should be enough to hold a reference on the stack only, as calling
1889 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1890 // already completed
1891 using ControlCRRef
= boost::intrusive_ptr
<RGWMetaSyncShardControlCR
>;
1892 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1893 using RefPair
= std::pair
<ControlCRRef
, StackRef
>;
1894 map
<int, RefPair
> shard_crs
;
1898 RGWMetaSyncCR(RGWMetaSyncEnv
*_sync_env
, const RGWPeriodHistory::Cursor
&cursor
,
1899 const rgw_meta_sync_status
& _sync_status
, RGWSyncTraceNodeRef
& _tn
)
1900 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1901 pool(sync_env
->store
->svc
.zone
->get_zone_params().log_pool
),
1902 cursor(cursor
), sync_status(_sync_status
), tn(_tn
) {}
1907 int operate() override
{
1909 // loop through one period at a time
1910 tn
->log(1, "start");
1912 if (cursor
== sync_env
->store
->period_history
->get_current()) {
1913 next
= RGWPeriodHistory::Cursor
{};
1915 ldpp_dout(sync_env
->dpp
, 10) << "RGWMetaSyncCR on current period="
1916 << cursor
.get_period().get_id() << dendl
;
1918 ldpp_dout(sync_env
->dpp
, 10) << "RGWMetaSyncCR with no period" << dendl
;
1923 ldpp_dout(sync_env
->dpp
, 10) << "RGWMetaSyncCR on period="
1924 << cursor
.get_period().get_id() << ", next="
1925 << next
.get_period().get_id() << dendl
;
1929 // get the mdlog for the current period (may be empty)
1930 auto& period_id
= sync_status
.sync_info
.period
;
1931 auto realm_epoch
= sync_status
.sync_info
.realm_epoch
;
1932 auto mdlog
= sync_env
->store
->meta_mgr
->get_log(period_id
);
1934 tn
->log(1, SSTR("realm epoch=" << realm_epoch
<< " period id=" << period_id
));
1936 // prevent wakeup() from accessing shard_crs while we're spawning them
1937 std::lock_guard
<std::mutex
> lock(mutex
);
1939 // sync this period on each shard
1940 for (const auto& m
: sync_status
.sync_markers
) {
1941 uint32_t shard_id
= m
.first
;
1942 auto& marker
= m
.second
;
1944 std::string period_marker
;
1946 // read the maximum marker from the next period's sync status
1947 period_marker
= next
.get_period().get_sync_status()[shard_id
];
1948 if (period_marker
.empty()) {
1949 // no metadata changes have occurred on this shard, skip it
1950 ldpp_dout(sync_env
->dpp
, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1951 << " with empty period marker" << dendl
;
1956 using ShardCR
= RGWMetaSyncShardControlCR
;
1957 auto cr
= new ShardCR(sync_env
, pool
, period_id
, realm_epoch
,
1958 mdlog
, shard_id
, marker
,
1959 std::move(period_marker
), tn
);
1960 auto stack
= spawn(cr
, false);
1961 shard_crs
[shard_id
] = RefPair
{cr
, stack
};
1964 // wait for each shard to complete
1965 while (ret
== 0 && num_spawned() > 0) {
1966 yield
wait_for_child();
1967 collect(&ret
, nullptr);
1971 // drop shard cr refs under lock
1972 std::lock_guard
<std::mutex
> lock(mutex
);
1976 return set_cr_error(ret
);
1978 // advance to the next period
1982 // write the updated sync info
1983 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
1984 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
1985 yield
call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
,
1986 sync_env
->store
->svc
.sysobj
,
1987 rgw_raw_obj(pool
, sync_env
->status_oid()),
1988 sync_status
.sync_info
));
1994 void wakeup(int shard_id
) {
1995 std::lock_guard
<std::mutex
> lock(mutex
);
1996 auto iter
= shard_crs
.find(shard_id
);
1997 if (iter
== shard_crs
.end()) {
2000 iter
->second
.first
->wakeup();
2004 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv
*env
) {
2006 env
->cct
= store
->ctx();
2009 env
->async_rados
= async_rados
;
2010 env
->http_manager
= &http_manager
;
2011 env
->error_logger
= error_logger
;
2012 env
->sync_tracer
= store
->get_sync_tracer();
2015 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status
*sync_status
)
2017 if (store
->svc
.zone
->is_meta_master()) {
2020 // cannot run concurrently with run_sync(), so run in a separate manager
2021 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
2022 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
2023 int ret
= http_manager
.start();
2025 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
2028 RGWMetaSyncEnv sync_env_local
= sync_env
;
2029 sync_env_local
.http_manager
= &http_manager
;
2030 tn
->log(20, "read sync status");
2031 ret
= crs
.run(new RGWReadSyncStatusCoroutine(&sync_env_local
, sync_status
));
2032 http_manager
.stop();
2036 int RGWRemoteMetaLog::init_sync_status()
2038 if (store
->svc
.zone
->is_meta_master()) {
2042 rgw_mdlog_info mdlog_info
;
2043 int r
= read_log_info(&mdlog_info
);
2045 lderr(store
->ctx()) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
2049 rgw_meta_sync_info sync_info
;
2050 sync_info
.num_shards
= mdlog_info
.num_shards
;
2051 auto cursor
= store
->period_history
->get_current();
2053 sync_info
.period
= cursor
.get_period().get_id();
2054 sync_info
.realm_epoch
= cursor
.get_epoch();
2057 return run(new RGWInitSyncStatusCoroutine(&sync_env
, sync_info
));
2060 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info
& sync_info
)
2062 tn
->log(20, "store sync info");
2063 return run(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(async_rados
, store
->svc
.sysobj
,
2064 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, sync_env
.status_oid()),
2068 // return a cursor to the period at our sync position
2069 static RGWPeriodHistory::Cursor
get_period_at(RGWRados
* store
,
2070 const rgw_meta_sync_info
& info
)
2072 if (info
.period
.empty()) {
2073 // return an empty cursor with error=0
2074 return RGWPeriodHistory::Cursor
{};
2077 // look for an existing period in our history
2078 auto cursor
= store
->period_history
->lookup(info
.realm_epoch
);
2080 // verify that the period ids match
2081 auto& existing
= cursor
.get_period().get_id();
2082 if (existing
!= info
.period
) {
2083 lderr(store
->ctx()) << "ERROR: sync status period=" << info
.period
2084 << " does not match period=" << existing
2085 << " in history at realm epoch=" << info
.realm_epoch
<< dendl
;
2086 return RGWPeriodHistory::Cursor
{-EEXIST
};
2091 // read the period from rados or pull it from the master
2093 int r
= store
->period_puller
->pull(info
.period
, period
);
2095 lderr(store
->ctx()) << "ERROR: failed to read period id "
2096 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
2097 return RGWPeriodHistory::Cursor
{r
};
2099 // attach the period to our history
2100 cursor
= store
->period_history
->attach(std::move(period
));
2102 r
= cursor
.get_error();
2103 lderr(store
->ctx()) << "ERROR: failed to read period history back to "
2104 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
2109 int RGWRemoteMetaLog::run_sync()
2111 if (store
->svc
.zone
->is_meta_master()) {
2117 // get shard count and oldest log period from master
2118 rgw_mdlog_info mdlog_info
;
2121 ldpp_dout(dpp
, 1) << __func__
<< "(): going down" << dendl
;
2124 r
= read_log_info(&mdlog_info
);
2125 if (r
== -EIO
|| r
== -ENOENT
) {
2126 // keep retrying if master isn't alive or hasn't initialized the log
2127 ldpp_dout(dpp
, 10) << __func__
<< "(): waiting for master.." << dendl
;
2128 backoff
.backoff_sleep();
2133 lderr(store
->ctx()) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
2139 rgw_meta_sync_status sync_status
;
2142 ldpp_dout(dpp
, 1) << __func__
<< "(): going down" << dendl
;
2145 r
= run(new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2146 if (r
< 0 && r
!= -ENOENT
) {
2147 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch sync status r=" << r
<< dendl
;
2151 if (!mdlog_info
.period
.empty()) {
2152 // restart sync if the remote has a period, but:
2153 // a) our status does not, or
2154 // b) our sync period comes before the remote's oldest log period
2155 if (sync_status
.sync_info
.period
.empty() ||
2156 sync_status
.sync_info
.realm_epoch
< mdlog_info
.realm_epoch
) {
2157 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateInit
;
2159 if (sync_status
.sync_info
.period
.empty()) {
2160 reason
= "period is empty";
2162 reason
= SSTR("sync_info realm epoch is behind: " << sync_status
.sync_info
.realm_epoch
<< " < " << mdlog_info
.realm_epoch
);
2164 tn
->log(1, "initialize sync (reason: " + reason
+ ")");
2165 ldpp_dout(dpp
, 1) << "epoch=" << sync_status
.sync_info
.realm_epoch
2166 << " in sync status comes before remote's oldest mdlog epoch="
2167 << mdlog_info
.realm_epoch
<< ", restarting sync" << dendl
;
2171 if (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
) {
2172 ldpp_dout(dpp
, 20) << __func__
<< "(): init" << dendl
;
2173 sync_status
.sync_info
.num_shards
= mdlog_info
.num_shards
;
2174 auto cursor
= store
->period_history
->get_current();
2176 // run full sync, then start incremental from the current period/epoch
2177 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
2178 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
2180 r
= run(new RGWInitSyncStatusCoroutine(&sync_env
, sync_status
.sync_info
));
2182 backoff
.backoff_sleep();
2187 ldpp_dout(dpp
, 0) << "ERROR: failed to init sync status r=" << r
<< dendl
;
2191 } while (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
);
2193 auto num_shards
= sync_status
.sync_info
.num_shards
;
2194 if (num_shards
!= mdlog_info
.num_shards
) {
2195 lderr(store
->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info
.num_shards
<< " local num_shards=" << num_shards
<< dendl
;
2199 RGWPeriodHistory::Cursor cursor
;
2201 r
= run(new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2202 if (r
< 0 && r
!= -ENOENT
) {
2203 tn
->log(0, SSTR("ERROR: failed to fetch sync status r=" << r
));
2207 switch ((rgw_meta_sync_info::SyncState
)sync_status
.sync_info
.state
) {
2208 case rgw_meta_sync_info::StateBuildingFullSyncMaps
:
2209 tn
->log(20, "building full sync maps");
2210 r
= run(new RGWFetchAllMetaCR(&sync_env
, num_shards
, sync_status
.sync_markers
, tn
));
2211 if (r
== -EBUSY
|| r
== -EAGAIN
) {
2212 backoff
.backoff_sleep();
2217 tn
->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r
<< ")"));
2221 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateSync
;
2222 r
= store_sync_info(sync_status
.sync_info
);
2224 tn
->log(0, SSTR("ERROR: failed to update sync status (r=" << r
<< ")"));
2228 case rgw_meta_sync_info::StateSync
:
2229 tn
->log(20, "sync");
2230 // find our position in the period history (if any)
2231 cursor
= get_period_at(store
, sync_status
.sync_info
);
2232 r
= cursor
.get_error();
2236 meta_sync_cr
= new RGWMetaSyncCR(&sync_env
, cursor
, sync_status
, tn
);
2237 r
= run(meta_sync_cr
);
2239 tn
->log(0, "ERROR: failed to fetch all metadata keys");
2244 tn
->log(0, "ERROR: bad sync state!");
2247 } while (!going_down
);
2252 void RGWRemoteMetaLog::wakeup(int shard_id
)
2254 if (!meta_sync_cr
) {
2257 meta_sync_cr
->wakeup(shard_id
);
2260 int RGWCloneMetaLogCoroutine::operate()
2265 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": init request" << dendl
;
2266 return state_init();
2269 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status" << dendl
;
2270 return state_read_shard_status();
2273 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status complete" << dendl
;
2274 return state_read_shard_status_complete();
2277 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": sending rest request" << dendl
;
2278 return state_send_rest_request();
2281 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": receiving rest response" << dendl
;
2282 return state_receive_rest_response();
2285 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries" << dendl
;
2286 return state_store_mdlog_entries();
2288 } while (truncated
);
2290 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries complete" << dendl
;
2291 return state_store_mdlog_entries_complete();
2298 int RGWCloneMetaLogCoroutine::state_init()
2300 data
= rgw_mdlog_shard_data();
2305 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2307 const bool add_ref
= false; // default constructs with refs=1
2309 completion
.reset(new RGWMetadataLogInfoCompletion(
2310 [this](int ret
, const cls_log_header
& header
) {
2312 if (ret
!= -ENOENT
) {
2313 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to read mdlog info with "
2314 << cpp_strerror(ret
) << dendl
;
2317 shard_info
.marker
= header
.max_marker
;
2318 shard_info
.last_update
= header
.max_time
.to_real_time();
2320 // wake up parent stack
2324 int ret
= mdlog
->get_info_async(shard_id
, completion
.get());
2326 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret
<< dendl
;
2327 return set_cr_error(ret
);
2333 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2337 ldpp_dout(sync_env
->dpp
, 20) << "shard_id=" << shard_id
<< " marker=" << shard_info
.marker
<< " last_update=" << shard_info
.last_update
<< dendl
;
2339 marker
= shard_info
.marker
;
2344 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2346 RGWRESTConn
*conn
= sync_env
->conn
;
2349 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
2351 char max_entries_buf
[32];
2352 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", max_entries
);
2354 const char *marker_key
= (marker
.empty() ? "" : "marker");
2356 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
2358 { "period", period
.c_str() },
2359 { "max-entries", max_entries_buf
},
2360 { marker_key
, marker
.c_str() },
2363 http_op
= new RGWRESTReadResource(conn
, "/admin/log", pairs
, NULL
, sync_env
->http_manager
);
2365 init_new_io(http_op
);
2367 int ret
= http_op
->aio_read();
2369 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
2370 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
2373 return set_cr_error(ret
);
2379 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2381 int ret
= http_op
->wait(&data
);
2383 error_stream
<< "http operation failed: " << http_op
->to_str() << " status=" << http_op
->get_http_status() << std::endl
;
2384 ldpp_dout(sync_env
->dpp
, 5) << "failed to wait for op, ret=" << ret
<< dendl
;
2387 return set_cr_error(ret
);
2392 ldpp_dout(sync_env
->dpp
, 20) << "remote mdlog, shard_id=" << shard_id
<< " num of shard entries: " << data
.entries
.size() << dendl
;
2394 truncated
= ((int)data
.entries
.size() == max_entries
);
2396 if (data
.entries
.empty()) {
2398 *new_marker
= marker
;
2400 return set_cr_done();
2404 *new_marker
= data
.entries
.back().id
;
2411 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2413 list
<cls_log_entry
> dest_entries
;
2415 vector
<rgw_mdlog_entry
>::iterator iter
;
2416 for (iter
= data
.entries
.begin(); iter
!= data
.entries
.end(); ++iter
) {
2417 rgw_mdlog_entry
& entry
= *iter
;
2418 ldpp_dout(sync_env
->dpp
, 20) << "entry: name=" << entry
.name
<< dendl
;
2420 cls_log_entry dest_entry
;
2421 dest_entry
.id
= entry
.id
;
2422 dest_entry
.section
= entry
.section
;
2423 dest_entry
.name
= entry
.name
;
2424 dest_entry
.timestamp
= utime_t(entry
.timestamp
);
2426 encode(entry
.log_data
, dest_entry
.data
);
2428 dest_entries
.push_back(dest_entry
);
2433 RGWAioCompletionNotifier
*cn
= stack
->create_completion_notifier();
2435 int ret
= mdlog
->store_entries_in_shard(dest_entries
, shard_id
, cn
->completion());
2438 ldpp_dout(sync_env
->dpp
, 10) << "failed to store md log entries shard_id=" << shard_id
<< " ret=" << ret
<< dendl
;
2439 return set_cr_error(ret
);
2444 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2446 return set_cr_done();
2450 // TODO: move into rgw_sync_trim.cc
2452 #define dout_prefix (*_dout << "meta trim: ")
2454 /// purge all log shards for the given mdlog
2455 class PurgeLogShardsCR
: public RGWShardCollectCR
{
2456 RGWRados
*const store
;
2457 const RGWMetadataLog
* mdlog
;
2458 const int num_shards
;
2462 static constexpr int max_concurrent
= 16;
2465 PurgeLogShardsCR(RGWRados
*store
, const RGWMetadataLog
* mdlog
,
2466 const rgw_pool
& pool
, int num_shards
)
2467 : RGWShardCollectCR(store
->ctx(), max_concurrent
),
2468 store(store
), mdlog(mdlog
), num_shards(num_shards
), obj(pool
, "")
2471 bool spawn_next() override
{
2472 if (i
== num_shards
) {
2475 mdlog
->get_shard_oid(i
++, obj
.oid
);
2476 spawn(new RGWRadosRemoveCR(store
, obj
), false);
2481 using Cursor
= RGWPeriodHistory::Cursor
;
2483 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
2484 class PurgePeriodLogsCR
: public RGWCoroutine
{
2485 RGWRados
*const store
;
2486 RGWMetadataManager
*const metadata
;
2487 RGWObjVersionTracker objv
;
2489 epoch_t realm_epoch
;
2490 epoch_t
*last_trim_epoch
; //< update last trim on success
2493 PurgePeriodLogsCR(RGWRados
*store
, epoch_t realm_epoch
, epoch_t
*last_trim
)
2494 : RGWCoroutine(store
->ctx()), store(store
), metadata(store
->meta_mgr
),
2495 realm_epoch(realm_epoch
), last_trim_epoch(last_trim
)
2498 int operate() override
;
2501 int PurgePeriodLogsCR::operate()
2504 // read our current oldest log period
2505 yield
call(metadata
->read_oldest_log_period_cr(&cursor
, &objv
));
2507 return set_cr_error(retcode
);
2509 ceph_assert(cursor
);
2510 ldout(cct
, 20) << "oldest log realm_epoch=" << cursor
.get_epoch()
2511 << " period=" << cursor
.get_period().get_id() << dendl
;
2513 // trim -up to- the given realm_epoch
2514 while (cursor
.get_epoch() < realm_epoch
) {
2515 ldout(cct
, 4) << "purging log shards for realm_epoch=" << cursor
.get_epoch()
2516 << " period=" << cursor
.get_period().get_id() << dendl
;
2518 const auto mdlog
= metadata
->get_log(cursor
.get_period().get_id());
2519 const auto& pool
= store
->svc
.zone
->get_zone_params().log_pool
;
2520 auto num_shards
= cct
->_conf
->rgw_md_log_max_shards
;
2521 call(new PurgeLogShardsCR(store
, mdlog
, pool
, num_shards
));
2524 ldout(cct
, 1) << "failed to remove log shards: "
2525 << cpp_strerror(retcode
) << dendl
;
2526 return set_cr_error(retcode
);
2528 ldout(cct
, 10) << "removed log shards for realm_epoch=" << cursor
.get_epoch()
2529 << " period=" << cursor
.get_period().get_id() << dendl
;
2531 // update our mdlog history
2532 yield
call(metadata
->trim_log_period_cr(cursor
, &objv
));
2533 if (retcode
== -ENOENT
) {
2534 // must have raced to update mdlog history. return success and allow the
2535 // winner to continue purging
2536 ldout(cct
, 10) << "already removed log shards for realm_epoch=" << cursor
.get_epoch()
2537 << " period=" << cursor
.get_period().get_id() << dendl
;
2538 return set_cr_done();
2539 } else if (retcode
< 0) {
2540 ldout(cct
, 1) << "failed to remove log shards for realm_epoch="
2541 << cursor
.get_epoch() << " period=" << cursor
.get_period().get_id()
2542 << " with: " << cpp_strerror(retcode
) << dendl
;
2543 return set_cr_error(retcode
);
2546 if (*last_trim_epoch
< cursor
.get_epoch()) {
2547 *last_trim_epoch
= cursor
.get_epoch();
2550 ceph_assert(cursor
.has_next()); // get_current() should always come after
2553 return set_cr_done();
2560 using connection_map
= std::map
<std::string
, std::unique_ptr
<RGWRESTConn
>>;
2562 /// construct a RGWRESTConn for each zone in the realm
2563 template <typename Zonegroups
>
2564 connection_map
make_peer_connections(RGWRados
*store
,
2565 const Zonegroups
& zonegroups
)
2567 connection_map connections
;
2568 for (auto& g
: zonegroups
) {
2569 for (auto& z
: g
.second
.zones
) {
2570 std::unique_ptr
<RGWRESTConn
> conn
{
2571 new RGWRESTConn(store
->ctx(), store
->svc
.zone
, z
.first
, z
.second
.endpoints
)};
2572 connections
.emplace(z
.first
, std::move(conn
));
2578 /// return the marker that it's safe to trim up to
2579 const std::string
& get_stable_marker(const rgw_meta_sync_marker
& m
)
2581 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
2584 /// comparison operator for take_min_status()
2585 bool operator<(const rgw_meta_sync_marker
& lhs
, const rgw_meta_sync_marker
& rhs
)
2587 // sort by stable marker
2588 return get_stable_marker(lhs
) < get_stable_marker(rhs
);
2591 /// populate the status with the minimum stable marker of each shard for any
2592 /// peer whose realm_epoch matches the minimum realm_epoch in the input
2593 template <typename Iter
>
2594 int take_min_status(CephContext
*cct
, Iter first
, Iter last
,
2595 rgw_meta_sync_status
*status
)
2597 if (first
== last
) {
2600 const size_t num_shards
= cct
->_conf
->rgw_md_log_max_shards
;
2602 status
->sync_info
.realm_epoch
= std::numeric_limits
<epoch_t
>::max();
2603 for (auto p
= first
; p
!= last
; ++p
) {
2604 // validate peer's shard count
2605 if (p
->sync_markers
.size() != num_shards
) {
2606 ldout(cct
, 1) << "take_min_status got peer status with "
2607 << p
->sync_markers
.size() << " shards, expected "
2608 << num_shards
<< dendl
;
2611 if (p
->sync_info
.realm_epoch
< status
->sync_info
.realm_epoch
) {
2612 // earlier epoch, take its entire status
2613 *status
= std::move(*p
);
2614 } else if (p
->sync_info
.realm_epoch
== status
->sync_info
.realm_epoch
) {
2615 // same epoch, take any earlier markers
2616 auto m
= status
->sync_markers
.begin();
2617 for (auto& shard
: p
->sync_markers
) {
2618 if (shard
.second
< m
->second
) {
2619 m
->second
= std::move(shard
.second
);
2629 const DoutPrefixProvider
*dpp
;
2630 RGWRados
*const store
;
2631 RGWHTTPManager
*const http
;
2633 const std::string
& zone
;
2634 Cursor current
; //< cursor to current period
2635 epoch_t last_trim_epoch
{0}; //< epoch of last mdlog that was purged
2637 TrimEnv(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2638 : dpp(dpp
), store(store
), http(http
), num_shards(num_shards
),
2639 zone(store
->svc
.zone
->get_zone_params().get_id()),
2640 current(store
->period_history
->get_current())
2644 struct MasterTrimEnv
: public TrimEnv
{
2645 connection_map connections
; //< peer connections
2646 std::vector
<rgw_meta_sync_status
> peer_status
; //< sync status for each peer
2647 /// last trim marker for each shard, only applies to current period's mdlog
2648 std::vector
<std::string
> last_trim_markers
;
2650 MasterTrimEnv(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2651 : TrimEnv(dpp
, store
, http
, num_shards
),
2652 last_trim_markers(num_shards
)
2654 auto& period
= current
.get_period();
2655 connections
= make_peer_connections(store
, period
.get_map().zonegroups
);
2656 connections
.erase(zone
);
2657 peer_status
.resize(connections
.size());
2661 struct PeerTrimEnv
: public TrimEnv
{
2662 /// last trim timestamp for each shard, only applies to current period's mdlog
2663 std::vector
<ceph::real_time
> last_trim_timestamps
;
2665 PeerTrimEnv(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2666 : TrimEnv(dpp
, store
, http
, num_shards
),
2667 last_trim_timestamps(num_shards
)
2670 void set_num_shards(int num_shards
) {
2671 this->num_shards
= num_shards
;
2672 last_trim_timestamps
.resize(num_shards
);
2676 } // anonymous namespace
2679 /// spawn a trim cr for each shard that needs it, while limiting the number
2680 /// of concurrent shards
2681 class MetaMasterTrimShardCollectCR
: public RGWShardCollectCR
{
2683 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2686 RGWMetadataLog
*mdlog
;
2689 const rgw_meta_sync_status
& sync_status
;
2692 MetaMasterTrimShardCollectCR(MasterTrimEnv
& env
, RGWMetadataLog
*mdlog
,
2693 const rgw_meta_sync_status
& sync_status
)
2694 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2695 env(env
), mdlog(mdlog
), sync_status(sync_status
)
2698 bool spawn_next() override
;
2701 bool MetaMasterTrimShardCollectCR::spawn_next()
2703 while (shard_id
< env
.num_shards
) {
2704 auto m
= sync_status
.sync_markers
.find(shard_id
);
2705 if (m
== sync_status
.sync_markers
.end()) {
2709 auto& stable
= get_stable_marker(m
->second
);
2710 auto& last_trim
= env
.last_trim_markers
[shard_id
];
2712 if (stable
<= last_trim
) {
2714 ldout(cct
, 20) << "skipping log shard " << shard_id
2715 << " at marker=" << stable
2716 << " last_trim=" << last_trim
2717 << " realm_epoch=" << sync_status
.sync_info
.realm_epoch
<< dendl
;
2722 mdlog
->get_shard_oid(shard_id
, oid
);
2724 ldout(cct
, 10) << "trimming log shard " << shard_id
2725 << " at marker=" << stable
2726 << " last_trim=" << last_trim
2727 << " realm_epoch=" << sync_status
.sync_info
.realm_epoch
<< dendl
;
2728 spawn(new RGWSyncLogTrimCR(env
.store
, oid
, stable
, &last_trim
), false);
2735 /// spawn rest requests to read each peer's sync status
2736 class MetaMasterStatusCollectCR
: public RGWShardCollectCR
{
2737 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2740 connection_map::iterator c
;
2741 std::vector
<rgw_meta_sync_status
>::iterator s
;
2743 explicit MetaMasterStatusCollectCR(MasterTrimEnv
& env
)
2744 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2745 env(env
), c(env
.connections
.begin()), s(env
.peer_status
.begin())
2748 bool spawn_next() override
{
2749 if (c
== env
.connections
.end()) {
2752 static rgw_http_param_pair params
[] = {
2753 { "type", "metadata" },
2754 { "status", nullptr },
2755 { nullptr, nullptr }
2758 ldout(cct
, 20) << "query sync status from " << c
->first
<< dendl
;
2759 auto conn
= c
->second
.get();
2760 using StatusCR
= RGWReadRESTResourceCR
<rgw_meta_sync_status
>;
2761 spawn(new StatusCR(cct
, conn
, env
.http
, "/admin/log/", params
, &*s
),
2769 class MetaMasterTrimCR
: public RGWCoroutine
{
2771 rgw_meta_sync_status min_status
; //< minimum sync status of all peers
2775 explicit MetaMasterTrimCR(MasterTrimEnv
& env
)
2776 : RGWCoroutine(env
.store
->ctx()), env(env
)
2779 int operate() override
;
2782 int MetaMasterTrimCR::operate()
2785 // TODO: detect this and fail before we spawn the trim thread?
2786 if (env
.connections
.empty()) {
2787 ldout(cct
, 4) << "no peers, exiting" << dendl
;
2788 return set_cr_done();
2791 ldout(cct
, 10) << "fetching sync status for zone " << env
.zone
<< dendl
;
2792 // query mdlog sync status from peers
2793 yield
call(new MetaMasterStatusCollectCR(env
));
2795 // must get a successful reply from all peers to consider trimming
2797 ldout(cct
, 4) << "failed to fetch sync status from all peers" << dendl
;
2798 return set_cr_error(ret
);
2801 // determine the minimum epoch and markers
2802 ret
= take_min_status(env
.store
->ctx(), env
.peer_status
.begin(),
2803 env
.peer_status
.end(), &min_status
);
2805 ldout(cct
, 4) << "failed to calculate min sync status from peers" << dendl
;
2806 return set_cr_error(ret
);
2809 auto store
= env
.store
;
2810 auto epoch
= min_status
.sync_info
.realm_epoch
;
2811 ldout(cct
, 4) << "realm epoch min=" << epoch
2812 << " current=" << env
.current
.get_epoch()<< dendl
;
2813 if (epoch
> env
.last_trim_epoch
+ 1) {
2814 // delete any prior mdlog periods
2815 spawn(new PurgePeriodLogsCR(store
, epoch
, &env
.last_trim_epoch
), true);
2817 ldout(cct
, 10) << "mdlogs already purged up to realm_epoch "
2818 << env
.last_trim_epoch
<< dendl
;
2821 // if realm_epoch == current, trim mdlog based on markers
2822 if (epoch
== env
.current
.get_epoch()) {
2823 auto mdlog
= store
->meta_mgr
->get_log(env
.current
.get_period().get_id());
2824 spawn(new MetaMasterTrimShardCollectCR(env
, mdlog
, min_status
), true);
2827 // ignore any errors during purge/trim because we want to hold the lock open
2828 return set_cr_done();
2834 /// read the first entry of the master's mdlog shard and trim to that position
2835 class MetaPeerTrimShardCR
: public RGWCoroutine
{
2836 RGWMetaSyncEnv
& env
;
2837 RGWMetadataLog
*mdlog
;
2838 const std::string
& period_id
;
2840 RGWMetadataLogInfo info
;
2841 ceph::real_time stable
; //< safe timestamp to trim, according to master
2842 ceph::real_time
*last_trim
; //< last trimmed timestamp, updated on trim
2843 rgw_mdlog_shard_data result
; //< result from master's mdlog listing
2846 MetaPeerTrimShardCR(RGWMetaSyncEnv
& env
, RGWMetadataLog
*mdlog
,
2847 const std::string
& period_id
, int shard_id
,
2848 ceph::real_time
*last_trim
)
2849 : RGWCoroutine(env
.store
->ctx()), env(env
), mdlog(mdlog
),
2850 period_id(period_id
), shard_id(shard_id
), last_trim(last_trim
)
2853 int operate() override
;
2856 int MetaPeerTrimShardCR::operate()
2859 // query master's first mdlog entry for this shard
2860 yield
call(new RGWListRemoteMDLogShardCR(&env
, period_id
, shard_id
,
2863 ldpp_dout(env
.dpp
, 5) << "failed to read first entry from master's mdlog shard "
2864 << shard_id
<< " for period " << period_id
2865 << ": " << cpp_strerror(retcode
) << dendl
;
2866 return set_cr_error(retcode
);
2868 if (result
.entries
.empty()) {
2869 // if there are no mdlog entries, we don't have a timestamp to compare. we
2870 // can't just trim everything, because there could be racing updates since
2871 // this empty reply. query the mdlog shard info to read its max timestamp,
2872 // then retry the listing to make sure it's still empty before trimming to
2874 ldpp_dout(env
.dpp
, 10) << "empty master mdlog shard " << shard_id
2875 << ", reading last timestamp from shard info" << dendl
;
2876 // read the mdlog shard info for the last timestamp
2877 using ShardInfoCR
= RGWReadRemoteMDLogShardInfoCR
;
2878 yield
call(new ShardInfoCR(&env
, period_id
, shard_id
, &info
));
2880 ldpp_dout(env
.dpp
, 5) << "failed to read info from master's mdlog shard "
2881 << shard_id
<< " for period " << period_id
2882 << ": " << cpp_strerror(retcode
) << dendl
;
2883 return set_cr_error(retcode
);
2885 if (ceph::real_clock::is_zero(info
.last_update
)) {
2886 return set_cr_done(); // nothing to trim
2888 ldpp_dout(env
.dpp
, 10) << "got mdlog shard info with last update="
2889 << info
.last_update
<< dendl
;
2890 // re-read the master's first mdlog entry to make sure it hasn't changed
2891 yield
call(new RGWListRemoteMDLogShardCR(&env
, period_id
, shard_id
,
2894 ldpp_dout(env
.dpp
, 5) << "failed to read first entry from master's mdlog shard "
2895 << shard_id
<< " for period " << period_id
2896 << ": " << cpp_strerror(retcode
) << dendl
;
2897 return set_cr_error(retcode
);
2899 // if the mdlog is still empty, trim to max marker
2900 if (result
.entries
.empty()) {
2901 stable
= info
.last_update
;
2903 stable
= result
.entries
.front().timestamp
;
2905 // can only trim -up to- master's first timestamp, so subtract a second.
2906 // (this is why we use timestamps instead of markers for the peers)
2907 stable
-= std::chrono::seconds(1);
2910 stable
= result
.entries
.front().timestamp
;
2911 stable
-= std::chrono::seconds(1);
2914 if (stable
<= *last_trim
) {
2915 ldpp_dout(env
.dpp
, 10) << "skipping log shard " << shard_id
2916 << " at timestamp=" << stable
2917 << " last_trim=" << *last_trim
<< dendl
;
2918 return set_cr_done();
2921 ldpp_dout(env
.dpp
, 10) << "trimming log shard " << shard_id
2922 << " at timestamp=" << stable
2923 << " last_trim=" << *last_trim
<< dendl
;
2926 mdlog
->get_shard_oid(shard_id
, oid
);
2927 call(new RGWRadosTimelogTrimCR(env
.store
, oid
, real_time
{}, stable
, "", ""));
2929 if (retcode
< 0 && retcode
!= -ENODATA
) {
2930 ldpp_dout(env
.dpp
, 1) << "failed to trim mdlog shard " << shard_id
2931 << ": " << cpp_strerror(retcode
) << dendl
;
2932 return set_cr_error(retcode
);
2934 *last_trim
= stable
;
2935 return set_cr_done();
2940 class MetaPeerTrimShardCollectCR
: public RGWShardCollectCR
{
2941 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2944 RGWMetadataLog
*mdlog
;
2945 const std::string
& period_id
;
2946 RGWMetaSyncEnv meta_env
; //< for RGWListRemoteMDLogShardCR
2950 MetaPeerTrimShardCollectCR(PeerTrimEnv
& env
, RGWMetadataLog
*mdlog
)
2951 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2952 env(env
), mdlog(mdlog
), period_id(env
.current
.get_period().get_id())
2954 meta_env
.init(env
.dpp
, cct
, env
.store
, env
.store
->svc
.zone
->get_master_conn(),
2955 env
.store
->get_async_rados(), env
.http
, nullptr,
2956 env
.store
->get_sync_tracer());
2959 bool spawn_next() override
;
2962 bool MetaPeerTrimShardCollectCR::spawn_next()
2964 if (shard_id
>= env
.num_shards
) {
2967 auto& last_trim
= env
.last_trim_timestamps
[shard_id
];
2968 spawn(new MetaPeerTrimShardCR(meta_env
, mdlog
, period_id
, shard_id
, &last_trim
),
2974 class MetaPeerTrimCR
: public RGWCoroutine
{
2976 rgw_mdlog_info mdlog_info
; //< master's mdlog info
2979 explicit MetaPeerTrimCR(PeerTrimEnv
& env
) : RGWCoroutine(env
.store
->ctx()), env(env
) {}
2981 int operate() override
;
2984 int MetaPeerTrimCR::operate()
2987 ldout(cct
, 10) << "fetching master mdlog info" << dendl
;
2989 // query mdlog_info from master for oldest_log_period
2990 rgw_http_param_pair params
[] = {
2991 { "type", "metadata" },
2992 { nullptr, nullptr }
2995 using LogInfoCR
= RGWReadRESTResourceCR
<rgw_mdlog_info
>;
2996 call(new LogInfoCR(cct
, env
.store
->svc
.zone
->get_master_conn(), env
.http
,
2997 "/admin/log/", params
, &mdlog_info
));
3000 ldout(cct
, 4) << "failed to read mdlog info from master" << dendl
;
3001 return set_cr_error(retcode
);
3003 // use master's shard count instead
3004 env
.set_num_shards(mdlog_info
.num_shards
);
3006 if (mdlog_info
.realm_epoch
> env
.last_trim_epoch
+ 1) {
3007 // delete any prior mdlog periods
3008 yield
call(new PurgePeriodLogsCR(env
.store
, mdlog_info
.realm_epoch
,
3009 &env
.last_trim_epoch
));
3011 ldout(cct
, 10) << "mdlogs already purged through realm_epoch "
3012 << env
.last_trim_epoch
<< dendl
;
3015 // if realm_epoch == current, trim mdlog based on master's markers
3016 if (mdlog_info
.realm_epoch
== env
.current
.get_epoch()) {
3018 auto meta_mgr
= env
.store
->meta_mgr
;
3019 auto mdlog
= meta_mgr
->get_log(env
.current
.get_period().get_id());
3020 call(new MetaPeerTrimShardCollectCR(env
, mdlog
));
3021 // ignore any errors during purge/trim because we want to hold the lock open
3024 return set_cr_done();
3029 class MetaTrimPollCR
: public RGWCoroutine
{
3030 RGWRados
*const store
;
3031 const utime_t interval
; //< polling interval
3032 const rgw_raw_obj obj
;
3033 const std::string name
{"meta_trim"}; //< lock name
3034 const std::string cookie
;
3037 /// allocate the coroutine to run within the lease
3038 virtual RGWCoroutine
* alloc_cr() = 0;
3041 MetaTrimPollCR(RGWRados
*store
, utime_t interval
)
3042 : RGWCoroutine(store
->ctx()), store(store
), interval(interval
),
3043 obj(store
->svc
.zone
->get_zone_params().log_pool
, RGWMetadataLogHistory::oid
),
3044 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
))
3047 int operate() override
;
3050 int MetaTrimPollCR::operate()
3054 set_status("sleeping");
3057 // prevent others from trimming for our entire wait interval
3058 set_status("acquiring trim lock");
3059 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
3060 obj
, name
, cookie
, interval
.sec()));
3062 ldout(cct
, 4) << "failed to lock: " << cpp_strerror(retcode
) << dendl
;
3066 set_status("trimming");
3067 yield
call(alloc_cr());
3070 // on errors, unlock so other gateways can try
3071 set_status("unlocking");
3072 yield
call(new RGWSimpleRadosUnlockCR(store
->get_async_rados(), store
,
3073 obj
, name
, cookie
));
3080 class MetaMasterTrimPollCR
: public MetaTrimPollCR
{
3081 MasterTrimEnv env
; //< trim state to share between calls
3082 RGWCoroutine
* alloc_cr() override
{
3083 return new MetaMasterTrimCR(env
);
3086 MetaMasterTrimPollCR(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
,
3087 int num_shards
, utime_t interval
)
3088 : MetaTrimPollCR(store
, interval
),
3089 env(dpp
, store
, http
, num_shards
)
3093 class MetaPeerTrimPollCR
: public MetaTrimPollCR
{
3094 PeerTrimEnv env
; //< trim state to share between calls
3095 RGWCoroutine
* alloc_cr() override
{
3096 return new MetaPeerTrimCR(env
);
3099 MetaPeerTrimPollCR(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
,
3100 int num_shards
, utime_t interval
)
3101 : MetaTrimPollCR(store
, interval
),
3102 env(dpp
, store
, http
, num_shards
)
3106 RGWCoroutine
* create_meta_log_trim_cr(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
,
3107 int num_shards
, utime_t interval
)
3109 if (store
->svc
.zone
->is_meta_master()) {
3110 return new MetaMasterTrimPollCR(dpp
, store
, http
, num_shards
, interval
);
3112 return new MetaPeerTrimPollCR(dpp
, store
, http
, num_shards
, interval
);
3116 struct MetaMasterAdminTrimCR
: private MasterTrimEnv
, public MetaMasterTrimCR
{
3117 MetaMasterAdminTrimCR(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
3118 : MasterTrimEnv(dpp
, store
, http
, num_shards
),
3119 MetaMasterTrimCR(*static_cast<MasterTrimEnv
*>(this))
3123 struct MetaPeerAdminTrimCR
: private PeerTrimEnv
, public MetaPeerTrimCR
{
3124 MetaPeerAdminTrimCR(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
3125 : PeerTrimEnv(dpp
, store
, http
, num_shards
),
3126 MetaPeerTrimCR(*static_cast<PeerTrimEnv
*>(this))
3130 RGWCoroutine
* create_admin_meta_log_trim_cr(const DoutPrefixProvider
*dpp
, RGWRados
*store
,
3131 RGWHTTPManager
*http
,
3134 if (store
->svc
.zone
->is_meta_master()) {
3135 return new MetaMasterAdminTrimCR(dpp
, store
, http
, num_shards
);
3137 return new MetaPeerAdminTrimCR(dpp
, store
, http
, num_shards
);