1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/optional.hpp>
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
17 #include "rgw_metadata.h"
18 #include "rgw_rest_conn.h"
19 #include "rgw_tools.h"
20 #include "rgw_cr_rados.h"
21 #include "rgw_cr_rest.h"
22 #include "rgw_http_client.h"
24 #include "cls/lock/cls_lock_client.h"
26 #include <boost/asio/yield.hpp>
28 #define dout_subsys ceph_subsys_rgw
31 #define dout_prefix (*_dout << "meta sync: ")
33 static string mdlog_sync_status_oid
= "mdlog.sync-status";
34 static string mdlog_sync_status_shard_prefix
= "mdlog.sync-status.shard";
35 static string mdlog_sync_full_sync_index_prefix
= "meta.full-sync.index";
37 RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados
*_store
, const string
&oid_prefix
, int _num_shards
) : store(_store
), num_shards(_num_shards
) {
38 for (int i
= 0; i
< num_shards
; i
++) {
39 oids
.push_back(get_shard_oid(oid_prefix
, i
));
42 string
RGWSyncErrorLogger::get_shard_oid(const string
& oid_prefix
, int shard_id
) {
43 char buf
[oid_prefix
.size() + 16];
44 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), shard_id
);
48 RGWCoroutine
*RGWSyncErrorLogger::log_error_cr(const string
& source_zone
, const string
& section
, const string
& name
, uint32_t error_code
, const string
& message
) {
51 rgw_sync_error_info
info(source_zone
, error_code
, message
);
54 store
->time_log_prepare_entry(entry
, real_clock::now(), section
, name
, bl
);
56 uint32_t shard_id
= ++counter
% num_shards
;
59 return new RGWRadosTimelogAddCR(store
, oids
[shard_id
], entry
);
62 void RGWSyncBackoff::update_wait_time()
67 cur_wait
= (cur_wait
<< 1);
69 if (cur_wait
>= max_secs
) {
74 void RGWSyncBackoff::backoff_sleep()
80 void RGWSyncBackoff::backoff(RGWCoroutine
*op
)
83 op
->wait(utime_t(cur_wait
, 0));
86 int RGWBackoffControlCR::operate() {
88 // retry the operation until it succeeds
91 Mutex::Locker
l(lock
);
97 Mutex::Locker
l(lock
);
104 if (retcode
!= -EBUSY
&& retcode
!= -EAGAIN
) {
105 ldout(cct
, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode
<< dendl
;
107 return set_cr_error(retcode
);
113 yield backoff
.backoff(this);
116 // run an optional finisher
117 yield
call(alloc_finisher_cr());
119 ldout(cct
, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode
<< dendl
;
120 return set_cr_error(retcode
);
122 return set_cr_done();
127 void rgw_mdlog_info::decode_json(JSONObj
*obj
) {
128 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
129 JSONDecoder::decode_json("period", period
, obj
);
130 JSONDecoder::decode_json("realm_epoch", realm_epoch
, obj
);
133 void rgw_mdlog_entry::decode_json(JSONObj
*obj
) {
134 JSONDecoder::decode_json("id", id
, obj
);
135 JSONDecoder::decode_json("section", section
, obj
);
136 JSONDecoder::decode_json("name", name
, obj
);
138 JSONDecoder::decode_json("timestamp", ut
, obj
);
139 timestamp
= ut
.to_real_time();
140 JSONDecoder::decode_json("data", log_data
, obj
);
143 void rgw_mdlog_shard_data::decode_json(JSONObj
*obj
) {
144 JSONDecoder::decode_json("marker", marker
, obj
);
145 JSONDecoder::decode_json("truncated", truncated
, obj
);
146 JSONDecoder::decode_json("entries", entries
, obj
);
149 int RGWShardCollectCR::operate() {
151 while (spawn_next()) {
154 while (current_running
>= max_concurrent
) {
156 yield
wait_for_child();
157 if (collect_next(&child_ret
)) {
159 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
160 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
166 while (current_running
> 0) {
168 yield
wait_for_child();
169 if (collect_next(&child_ret
)) {
171 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
172 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
178 return set_cr_error(status
);
180 return set_cr_done();
185 class RGWReadRemoteMDLogInfoCR
: public RGWShardCollectCR
{
186 RGWMetaSyncEnv
*sync_env
;
188 const std::string
& period
;
190 map
<int, RGWMetadataLogInfo
> *mdlog_info
;
193 #define READ_MDLOG_MAX_CONCURRENT 10
196 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv
*_sync_env
,
197 const std::string
& period
, int _num_shards
,
198 map
<int, RGWMetadataLogInfo
> *_mdlog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
200 period(period
), num_shards(_num_shards
),
201 mdlog_info(_mdlog_info
), shard_id(0) {}
202 bool spawn_next() override
;
205 class RGWListRemoteMDLogCR
: public RGWShardCollectCR
{
206 RGWMetaSyncEnv
*sync_env
;
208 const std::string
& period
;
209 map
<int, string
> shards
;
210 int max_entries_per_shard
;
211 map
<int, rgw_mdlog_shard_data
> *result
;
213 map
<int, string
>::iterator iter
;
214 #define READ_MDLOG_MAX_CONCURRENT 10
217 RGWListRemoteMDLogCR(RGWMetaSyncEnv
*_sync_env
,
218 const std::string
& period
, map
<int, string
>& _shards
,
219 int _max_entries_per_shard
,
220 map
<int, rgw_mdlog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
221 sync_env(_sync_env
), period(period
),
222 max_entries_per_shard(_max_entries_per_shard
),
224 shards
.swap(_shards
);
225 iter
= shards
.begin();
227 bool spawn_next() override
;
230 RGWRemoteMetaLog::~RGWRemoteMetaLog()
235 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info
*log_info
)
237 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
240 int ret
= conn
->get_json_resource("/admin/log", pairs
, *log_info
);
242 ldout(store
->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl
;
246 ldout(store
->ctx(), 20) << "remote mdlog, num_shards=" << log_info
->num_shards
<< dendl
;
251 int RGWRemoteMetaLog::read_master_log_shards_info(const string
&master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
)
253 if (store
->is_meta_master()) {
257 rgw_mdlog_info log_info
;
258 int ret
= read_log_info(&log_info
);
263 return run(new RGWReadRemoteMDLogInfoCR(&sync_env
, master_period
, log_info
.num_shards
, shards_info
));
266 int RGWRemoteMetaLog::read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
)
268 if (store
->is_meta_master()) {
272 return run(new RGWListRemoteMDLogCR(&sync_env
, period
, shard_markers
, 1, result
));
275 int RGWRemoteMetaLog::init()
277 conn
= store
->rest_master_conn
;
279 int ret
= http_manager
.set_threaded();
281 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
285 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
287 init_sync_env(&sync_env
);
292 void RGWRemoteMetaLog::finish()
298 #define CLONE_MAX_ENTRIES 100
300 int RGWMetaSyncStatusManager::init()
302 if (store
->is_meta_master()) {
306 if (!store
->rest_master_conn
) {
307 lderr(store
->ctx()) << "no REST connection to master zone" << dendl
;
311 int r
= rgw_init_ioctx(store
->get_rados_handle(), store
->get_zone_params().log_pool
, ioctx
, true);
313 lderr(store
->ctx()) << "ERROR: failed to open log pool (" << store
->get_zone_params().log_pool
<< " ret=" << r
<< dendl
;
317 r
= master_log
.init();
319 lderr(store
->ctx()) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
323 RGWMetaSyncEnv
& sync_env
= master_log
.get_sync_env();
325 rgw_meta_sync_status sync_status
;
326 r
= read_sync_status(&sync_status
);
327 if (r
< 0 && r
!= -ENOENT
) {
328 lderr(store
->ctx()) << "ERROR: failed to read sync status, r=" << r
<< dendl
;
332 int num_shards
= sync_status
.sync_info
.num_shards
;
334 for (int i
= 0; i
< num_shards
; i
++) {
335 shard_objs
[i
] = rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
.shard_obj_name(i
));
338 RWLock::WLocker
wl(ts_to_shard_lock
);
339 for (int i
= 0; i
< num_shards
; i
++) {
340 clone_markers
.push_back(string());
349 void RGWMetaSyncEnv::init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
350 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
351 RGWSyncErrorLogger
*_error_logger
) {
355 async_rados
= _async_rados
;
356 http_manager
= _http_manager
;
357 error_logger
= _error_logger
;
360 string
RGWMetaSyncEnv::status_oid()
362 return mdlog_sync_status_oid
;
365 string
RGWMetaSyncEnv::shard_obj_name(int shard_id
)
367 char buf
[mdlog_sync_status_shard_prefix
.size() + 16];
368 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_status_shard_prefix
.c_str(), shard_id
);
373 class RGWAsyncReadMDLogEntries
: public RGWAsyncRadosRequest
{
375 RGWMetadataLog
*mdlog
;
379 list
<cls_log_entry
> *entries
;
383 int _send_request() override
{
389 mdlog
->init_list_entries(shard_id
, from_time
, end_time
, *marker
, &handle
);
391 int ret
= mdlog
->list_entries(handle
, max_entries
, *entries
, marker
, truncated
);
393 mdlog
->complete_list_entries(handle
);
398 RGWAsyncReadMDLogEntries(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
399 RGWMetadataLog
* mdlog
, int _shard_id
,
400 string
* _marker
, int _max_entries
,
401 list
<cls_log_entry
> *_entries
, bool *_truncated
)
402 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), mdlog(mdlog
),
403 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
),
404 entries(_entries
), truncated(_truncated
) {}
407 class RGWReadMDLogEntriesCR
: public RGWSimpleCoroutine
{
408 RGWMetaSyncEnv
*sync_env
;
409 RGWMetadataLog
*const mdlog
;
414 list
<cls_log_entry
> *entries
;
417 RGWAsyncReadMDLogEntries
*req
{nullptr};
420 RGWReadMDLogEntriesCR(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
421 int _shard_id
, string
*_marker
, int _max_entries
,
422 list
<cls_log_entry
> *_entries
, bool *_truncated
)
423 : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
424 shard_id(_shard_id
), pmarker(_marker
), max_entries(_max_entries
),
425 entries(_entries
), truncated(_truncated
) {}
427 ~RGWReadMDLogEntriesCR() override
{
433 int send_request() override
{
435 req
= new RGWAsyncReadMDLogEntries(this, stack
->create_completion_notifier(),
436 sync_env
->store
, mdlog
, shard_id
, &marker
,
437 max_entries
, entries
, truncated
);
438 sync_env
->async_rados
->queue(req
);
442 int request_complete() override
{
443 int ret
= req
->get_ret_status();
444 if (ret
>= 0 && !entries
->empty()) {
447 return req
->get_ret_status();
452 class RGWReadRemoteMDLogShardInfoCR
: public RGWCoroutine
{
454 RGWRESTReadResource
*http_op
;
456 const std::string
& period
;
458 RGWMetadataLogInfo
*shard_info
;
461 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
462 int _shard_id
, RGWMetadataLogInfo
*_shard_info
)
463 : RGWCoroutine(env
->store
->ctx()), env(env
), http_op(NULL
),
464 period(period
), shard_id(_shard_id
), shard_info(_shard_info
) {}
466 int operate() override
{
467 auto store
= env
->store
;
468 RGWRESTConn
*conn
= store
->rest_master_conn
;
472 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
473 rgw_http_param_pair pairs
[] = { { "type" , "metadata" },
475 { "period", period
.c_str() },
479 string p
= "/admin/log/";
481 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
,
484 http_op
->set_user_info((void *)stack
);
486 int ret
= http_op
->aio_read();
488 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
489 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
491 return set_cr_error(ret
);
497 int ret
= http_op
->wait(shard_info
);
500 return set_cr_error(ret
);
502 return set_cr_done();
509 class RGWListRemoteMDLogShardCR
: public RGWSimpleCoroutine
{
510 RGWMetaSyncEnv
*sync_env
;
511 RGWRESTReadResource
*http_op
;
513 const std::string
& period
;
516 uint32_t max_entries
;
517 rgw_mdlog_shard_data
*result
;
520 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
521 int _shard_id
, const string
& _marker
, uint32_t _max_entries
,
522 rgw_mdlog_shard_data
*_result
)
523 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
524 period(period
), shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
526 int send_request() override
{
527 RGWRESTConn
*conn
= sync_env
->conn
;
528 RGWRados
*store
= sync_env
->store
;
531 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
533 char max_entries_buf
[32];
534 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
536 const char *marker_key
= (marker
.empty() ? "" : "marker");
538 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
540 { "period", period
.c_str() },
541 { "max-entries", max_entries_buf
},
542 { marker_key
, marker
.c_str() },
545 string p
= "/admin/log/";
547 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
548 http_op
->set_user_info((void *)stack
);
550 int ret
= http_op
->aio_read();
552 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
553 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
561 int request_complete() override
{
562 int ret
= http_op
->wait(result
);
564 if (ret
< 0 && ret
!= -ENOENT
) {
565 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret
<< dendl
;
572 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
573 if (shard_id
>= num_shards
) {
576 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, period
, shard_id
, &(*mdlog_info
)[shard_id
]), false);
581 bool RGWListRemoteMDLogCR::spawn_next() {
582 if (iter
== shards
.end()) {
586 spawn(new RGWListRemoteMDLogShardCR(sync_env
, period
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
591 class RGWInitSyncStatusCoroutine
: public RGWCoroutine
{
592 RGWMetaSyncEnv
*sync_env
;
594 rgw_meta_sync_info status
;
595 vector
<RGWMetadataLogInfo
> shards_info
;
596 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
597 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
599 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
600 const rgw_meta_sync_info
&status
)
601 : RGWCoroutine(_sync_env
->store
->ctx()), sync_env(_sync_env
),
602 status(status
), shards_info(status
.num_shards
),
603 lease_cr(nullptr), lease_stack(nullptr) {}
605 ~RGWInitSyncStatusCoroutine() override
{
611 int operate() override
{
615 set_status("acquiring sync lock");
616 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
617 string lock_name
= "sync_lock";
618 RGWRados
*store
= sync_env
->store
;
619 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
620 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->status_oid()),
621 lock_name
, lock_duration
, this));
622 lease_stack
.reset(spawn(lease_cr
.get(), false));
624 while (!lease_cr
->is_locked()) {
625 if (lease_cr
->is_done()) {
626 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
627 set_status("lease lock failed, early abort");
628 return set_cr_error(lease_cr
->get_ret_status());
634 set_status("writing sync status");
635 RGWRados
*store
= sync_env
->store
;
636 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
, store
,
637 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->status_oid()),
642 set_status("failed to write sync status");
643 ldout(cct
, 0) << "ERROR: failed to write sync status, retcode=" << retcode
<< dendl
;
644 yield lease_cr
->go_down();
645 return set_cr_error(retcode
);
647 /* fetch current position in logs */
648 set_status("fetching remote log position");
650 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
651 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, status
.period
, i
,
652 &shards_info
[i
]), false);
656 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
659 set_status("updating sync status");
660 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
661 rgw_meta_sync_marker marker
;
662 RGWMetadataLogInfo
& info
= shards_info
[i
];
663 marker
.next_step_marker
= info
.marker
;
664 marker
.timestamp
= info
.last_update
;
665 RGWRados
*store
= sync_env
->store
;
666 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
,
668 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->shard_obj_name(i
)),
673 set_status("changing sync state: build full sync maps");
674 status
.state
= rgw_meta_sync_info::StateBuildingFullSyncMaps
;
675 RGWRados
*store
= sync_env
->store
;
676 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
, store
,
677 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->status_oid()),
680 set_status("drop lock lease");
681 yield lease_cr
->go_down();
682 while (collect(&ret
, NULL
)) {
684 return set_cr_error(ret
);
689 return set_cr_done();
695 class RGWReadSyncStatusMarkersCR
: public RGWShardCollectCR
{
696 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
699 const int num_shards
;
701 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
704 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv
*env
, int num_shards
,
705 map
<uint32_t, rgw_meta_sync_marker
>& markers
)
706 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
707 env(env
), num_shards(num_shards
), markers(markers
)
709 bool spawn_next() override
;
712 bool RGWReadSyncStatusMarkersCR::spawn_next()
714 if (shard_id
>= num_shards
) {
717 using CR
= RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>;
718 rgw_raw_obj obj
{env
->store
->get_zone_params().log_pool
,
719 env
->shard_obj_name(shard_id
)};
720 spawn(new CR(env
->async_rados
, env
->store
, obj
, &markers
[shard_id
]), false);
725 class RGWReadSyncStatusCoroutine
: public RGWCoroutine
{
726 RGWMetaSyncEnv
*sync_env
;
727 rgw_meta_sync_status
*sync_status
;
730 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
731 rgw_meta_sync_status
*_status
)
732 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
734 int operate() override
;
737 int RGWReadSyncStatusCoroutine::operate()
741 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_meta_sync_info
>;
743 bool empty_on_enoent
= false; // fail on ENOENT
744 rgw_raw_obj obj
{sync_env
->store
->get_zone_params().log_pool
,
745 sync_env
->status_oid()};
746 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
, obj
,
747 &sync_status
->sync_info
, empty_on_enoent
));
750 ldout(sync_env
->cct
, 4) << "failed to read sync status info with "
751 << cpp_strerror(retcode
) << dendl
;
752 return set_cr_error(retcode
);
754 // read shard markers
755 using ReadMarkersCR
= RGWReadSyncStatusMarkersCR
;
756 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
757 sync_status
->sync_markers
));
759 ldout(sync_env
->cct
, 4) << "failed to read sync status markers with "
760 << cpp_strerror(retcode
) << dendl
;
761 return set_cr_error(retcode
);
763 return set_cr_done();
768 class RGWFetchAllMetaCR
: public RGWCoroutine
{
769 RGWMetaSyncEnv
*sync_env
;
776 list
<string
> sections
;
777 list
<string
>::iterator sections_iter
;
779 list
<string
>::iterator iter
;
781 std::unique_ptr
<RGWShardedOmapCRManager
> entries_index
;
783 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
784 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
788 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
791 RGWFetchAllMetaCR(RGWMetaSyncEnv
*_sync_env
, int _num_shards
,
792 map
<uint32_t, rgw_meta_sync_marker
>& _markers
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
793 num_shards(_num_shards
),
794 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
795 lost_lock(false), failed(false), markers(_markers
) {
798 ~RGWFetchAllMetaCR() override
{
801 void append_section_from_set(set
<string
>& all_sections
, const string
& name
) {
802 set
<string
>::iterator iter
= all_sections
.find(name
);
803 if (iter
!= all_sections
.end()) {
804 sections
.emplace_back(std::move(*iter
));
805 all_sections
.erase(iter
);
809 * meta sync should go in the following order: user, bucket.instance, bucket
810 * then whatever other sections exist (if any)
812 void rearrange_sections() {
813 set
<string
> all_sections
;
814 std::move(sections
.begin(), sections
.end(),
815 std::inserter(all_sections
, all_sections
.end()));
818 append_section_from_set(all_sections
, "user");
819 append_section_from_set(all_sections
, "bucket.instance");
820 append_section_from_set(all_sections
, "bucket");
822 std::move(all_sections
.begin(), all_sections
.end(),
823 std::back_inserter(sections
));
826 int operate() override
{
827 RGWRESTConn
*conn
= sync_env
->conn
;
831 set_status(string("acquiring lock (") + sync_env
->status_oid() + ")");
832 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
833 string lock_name
= "sync_lock";
834 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
,
836 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, sync_env
->status_oid()),
837 lock_name
, lock_duration
, this));
838 lease_stack
.reset(spawn(lease_cr
.get(), false));
840 while (!lease_cr
->is_locked()) {
841 if (lease_cr
->is_done()) {
842 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
843 set_status("failed acquiring lock");
844 return set_cr_error(lease_cr
->get_ret_status());
849 entries_index
.reset(new RGWShardedOmapCRManager(sync_env
->async_rados
, sync_env
->store
, this, num_shards
,
850 sync_env
->store
->get_zone_params().log_pool
,
851 mdlog_sync_full_sync_index_prefix
));
853 call(new RGWReadRESTResourceCR
<list
<string
> >(cct
, conn
, sync_env
->http_manager
,
854 "/admin/metadata", NULL
, §ions
));
856 if (get_ret_status() < 0) {
857 ldout(cct
, 0) << "ERROR: failed to fetch metadata sections" << dendl
;
858 yield entries_index
->finish();
859 yield lease_cr
->go_down();
861 return set_cr_error(get_ret_status());
863 rearrange_sections();
864 sections_iter
= sections
.begin();
865 for (; sections_iter
!= sections
.end(); ++sections_iter
) {
867 string entrypoint
= string("/admin/metadata/") + *sections_iter
;
868 /* FIXME: need a better scaling solution here, requires streaming output */
869 call(new RGWReadRESTResourceCR
<list
<string
> >(cct
, conn
, sync_env
->http_manager
,
870 entrypoint
, NULL
, &result
));
872 if (get_ret_status() < 0) {
873 ldout(cct
, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter
<< dendl
;
874 yield entries_index
->finish();
875 yield lease_cr
->go_down();
877 return set_cr_error(get_ret_status());
879 iter
= result
.begin();
880 for (; iter
!= result
.end(); ++iter
) {
881 if (!lease_cr
->is_locked()) {
885 yield
; // allow entries_index consumer to make progress
887 ldout(cct
, 20) << "list metadata: section=" << *sections_iter
<< " key=" << *iter
<< dendl
;
888 string s
= *sections_iter
+ ":" + *iter
;
890 RGWRados
*store
= sync_env
->store
;
891 int ret
= store
->meta_mgr
->get_log_shard_id(*sections_iter
, *iter
, &shard_id
);
893 ldout(cct
, 0) << "ERROR: could not determine shard id for " << *sections_iter
<< ":" << *iter
<< dendl
;
897 if (!entries_index
->append(s
, shard_id
)) {
903 if (!entries_index
->finish()) {
908 for (map
<uint32_t, rgw_meta_sync_marker
>::iterator iter
= markers
.begin(); iter
!= markers
.end(); ++iter
) {
909 int shard_id
= (int)iter
->first
;
910 rgw_meta_sync_marker
& marker
= iter
->second
;
911 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
912 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
, sync_env
->store
,
913 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, sync_env
->shard_obj_name(shard_id
)),
918 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
920 yield lease_cr
->go_down();
923 while (collect(&ret
, NULL
)) {
925 return set_cr_error(ret
);
931 yield
return set_cr_error(-EIO
);
934 yield
return set_cr_error(-EBUSY
);
937 if (ret_status
< 0) {
938 yield
return set_cr_error(ret_status
);
941 yield
return set_cr_done();
947 static string
full_sync_index_shard_oid(int shard_id
)
949 char buf
[mdlog_sync_full_sync_index_prefix
.size() + 16];
950 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_full_sync_index_prefix
.c_str(), shard_id
);
954 class RGWReadRemoteMetadataCR
: public RGWCoroutine
{
955 RGWMetaSyncEnv
*sync_env
;
957 RGWRESTReadResource
*http_op
;
965 RGWReadRemoteMetadataCR(RGWMetaSyncEnv
*_sync_env
,
966 const string
& _section
, const string
& _key
, bufferlist
*_pbl
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
973 int operate() override
{
974 RGWRESTConn
*conn
= sync_env
->conn
;
977 rgw_http_param_pair pairs
[] = { { "key" , key
.c_str()},
980 string p
= string("/admin/metadata/") + section
+ "/" + key
;
982 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
984 http_op
->set_user_info((void *)stack
);
986 int ret
= http_op
->aio_read();
988 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
989 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
991 return set_cr_error(ret
);
997 int ret
= http_op
->wait_bl(pbl
);
1000 return set_cr_error(ret
);
1002 return set_cr_done();
1009 class RGWAsyncMetaStoreEntry
: public RGWAsyncRadosRequest
{
1014 int _send_request() override
{
1015 int ret
= store
->meta_mgr
->put(raw_key
, bl
, RGWMetadataHandler::APPLY_ALWAYS
);
1017 ldout(store
->ctx(), 0) << "ERROR: can't store key: " << raw_key
<< " ret=" << ret
<< dendl
;
1023 RGWAsyncMetaStoreEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
1024 const string
& _raw_key
,
1025 bufferlist
& _bl
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1026 raw_key(_raw_key
), bl(_bl
) {}
1030 class RGWMetaStoreEntryCR
: public RGWSimpleCoroutine
{
1031 RGWMetaSyncEnv
*sync_env
;
1035 RGWAsyncMetaStoreEntry
*req
;
1038 RGWMetaStoreEntryCR(RGWMetaSyncEnv
*_sync_env
,
1039 const string
& _raw_key
,
1040 bufferlist
& _bl
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1041 raw_key(_raw_key
), bl(_bl
), req(NULL
) {
1044 ~RGWMetaStoreEntryCR() override
{
1050 int send_request() override
{
1051 req
= new RGWAsyncMetaStoreEntry(this, stack
->create_completion_notifier(),
1052 sync_env
->store
, raw_key
, bl
);
1053 sync_env
->async_rados
->queue(req
);
1057 int request_complete() override
{
1058 return req
->get_ret_status();
1062 class RGWAsyncMetaRemoveEntry
: public RGWAsyncRadosRequest
{
1066 int _send_request() override
{
1067 int ret
= store
->meta_mgr
->remove(raw_key
);
1069 ldout(store
->ctx(), 0) << "ERROR: can't remove key: " << raw_key
<< " ret=" << ret
<< dendl
;
1075 RGWAsyncMetaRemoveEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
1076 const string
& _raw_key
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1077 raw_key(_raw_key
) {}
1081 class RGWMetaRemoveEntryCR
: public RGWSimpleCoroutine
{
1082 RGWMetaSyncEnv
*sync_env
;
1085 RGWAsyncMetaRemoveEntry
*req
;
1088 RGWMetaRemoveEntryCR(RGWMetaSyncEnv
*_sync_env
,
1089 const string
& _raw_key
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1090 raw_key(_raw_key
), req(NULL
) {
1093 ~RGWMetaRemoveEntryCR() override
{
1099 int send_request() override
{
1100 req
= new RGWAsyncMetaRemoveEntry(this, stack
->create_completion_notifier(),
1101 sync_env
->store
, raw_key
);
1102 sync_env
->async_rados
->queue(req
);
1106 int request_complete() override
{
1107 int r
= req
->get_ret_status();
1115 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1117 class RGWMetaSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
1118 RGWMetaSyncEnv
*sync_env
;
1121 rgw_meta_sync_marker sync_marker
;
1125 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv
*_sync_env
,
1126 const string
& _marker_oid
,
1127 const rgw_meta_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW
),
1128 sync_env(_sync_env
),
1129 marker_oid(_marker_oid
),
1130 sync_marker(_marker
) {}
1132 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
1133 sync_marker
.marker
= new_marker
;
1134 if (index_pos
> 0) {
1135 sync_marker
.pos
= index_pos
;
1138 if (!real_clock::is_zero(timestamp
)) {
1139 sync_marker
.timestamp
= timestamp
;
1142 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< " realm_epoch=" << sync_marker
.realm_epoch
<< dendl
;
1143 RGWRados
*store
= sync_env
->store
;
1144 return new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
,
1146 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
1151 int RGWMetaSyncSingleEntryCR::operate() {
1153 #define NUM_TRANSIENT_ERROR_RETRIES 10
1155 if (error_injection
&&
1156 rand() % 10000 < cct
->_conf
->rgw_sync_meta_inject_err_probability
* 10000.0) {
1157 ldout(sync_env
->cct
, 0) << __FILE__
<< ":" << __LINE__
<< ": injecting meta sync error on key=" << raw_key
<< dendl
;
1158 return set_cr_error(-EIO
);
1161 if (op_status
!= MDLOG_STATUS_COMPLETE
) {
1162 ldout(sync_env
->cct
, 20) << "skipping pending operation" << dendl
;
1163 yield
call(marker_tracker
->finish(entry_marker
));
1165 return set_cr_error(retcode
);
1167 return set_cr_done();
1169 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1171 pos
= raw_key
.find(':');
1172 section
= raw_key
.substr(0, pos
);
1173 key
= raw_key
.substr(pos
+ 1);
1174 ldout(sync_env
->cct
, 20) << "fetching remote metadata: " << section
<< ":" << key
<< (tries
== 0 ? "" : " (retry)") << dendl
;
1175 call(new RGWReadRemoteMetadataCR(sync_env
, section
, key
, &md_bl
));
1178 sync_status
= retcode
;
1180 if (sync_status
== -ENOENT
) {
1181 /* FIXME: do we need to remove the entry from the local zone? */
1185 if ((sync_status
== -EAGAIN
|| sync_status
== -ECANCELED
) && (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1)) {
1186 ldout(sync_env
->cct
, 20) << *this << ": failed to fetch remote metadata: " << section
<< ":" << key
<< ", will retry" << dendl
;
1190 if (sync_status
< 0) {
1191 ldout(sync_env
->cct
, 10) << *this << ": failed to send read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
<< dendl
;
1192 log_error() << "failed to send read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
<< std::endl
;
1193 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), section
, key
, -sync_status
,
1194 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status
)));
1195 return set_cr_error(sync_status
);
1202 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1203 if (sync_status
!= -ENOENT
) {
1204 yield
call(new RGWMetaStoreEntryCR(sync_env
, raw_key
, md_bl
));
1206 yield
call(new RGWMetaRemoveEntryCR(sync_env
, raw_key
));
1208 if ((retcode
== -EAGAIN
|| retcode
== -ECANCELED
) && (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1)) {
1209 ldout(sync_env
->cct
, 20) << *this << ": failed to store metadata: " << section
<< ":" << key
<< ", got retcode=" << retcode
<< dendl
;
1215 sync_status
= retcode
;
1217 if (sync_status
== 0 && marker_tracker
) {
1219 yield
call(marker_tracker
->finish(entry_marker
));
1220 sync_status
= retcode
;
1222 if (sync_status
< 0) {
1223 return set_cr_error(sync_status
);
1225 return set_cr_done();
1230 class RGWCloneMetaLogCoroutine
: public RGWCoroutine
{
1231 RGWMetaSyncEnv
*sync_env
;
1232 RGWMetadataLog
*mdlog
;
1234 const std::string
& period
;
1237 bool truncated
= false;
1240 int max_entries
= CLONE_MAX_ENTRIES
;
1242 RGWRESTReadResource
*http_op
= nullptr;
1243 boost::intrusive_ptr
<RGWMetadataLogInfoCompletion
> completion
;
1245 RGWMetadataLogInfo shard_info
;
1246 rgw_mdlog_shard_data data
;
1249 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
1250 const std::string
& period
, int _id
,
1251 const string
& _marker
, string
*_new_marker
)
1252 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
1253 period(period
), shard_id(_id
), marker(_marker
), new_marker(_new_marker
) {
1255 *new_marker
= marker
;
1258 ~RGWCloneMetaLogCoroutine() override
{
1263 completion
->cancel();
1267 int operate() override
;
1270 int state_read_shard_status();
1271 int state_read_shard_status_complete();
1272 int state_send_rest_request();
1273 int state_receive_rest_response();
1274 int state_store_mdlog_entries();
1275 int state_store_mdlog_entries_complete();
1278 class RGWMetaSyncShardCR
: public RGWCoroutine
{
1279 RGWMetaSyncEnv
*sync_env
;
1281 const rgw_pool
& pool
;
1282 const std::string
& period
; //< currently syncing period id
1283 const epoch_t realm_epoch
; //< realm_epoch of period
1284 RGWMetadataLog
* mdlog
; //< log of syncing period
1286 rgw_meta_sync_marker
& sync_marker
;
1287 boost::optional
<rgw_meta_sync_marker
> temp_marker
; //< for pending updates
1290 const std::string
& period_marker
; //< max marker stored in next period
1292 map
<string
, bufferlist
> entries
;
1293 map
<string
, bufferlist
>::iterator iter
;
1297 RGWMetaSyncShardMarkerTrack
*marker_tracker
= nullptr;
1299 list
<cls_log_entry
> log_entries
;
1300 list
<cls_log_entry
>::iterator log_iter
;
1301 bool truncated
= false;
1303 string mdlog_marker
;
1305 rgw_mdlog_entry mdlog_entry
;
1310 boost::asio::coroutine incremental_cr
;
1311 boost::asio::coroutine full_cr
;
1313 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1314 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1316 bool lost_lock
= false;
1318 bool *reset_backoff
;
1320 // hold a reference to the cr stack while it's in the map
1321 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1322 map
<StackRef
, string
> stack_to_pos
;
1323 map
<string
, string
> pos_to_prev
;
1325 bool can_adjust_marker
= false;
1326 bool done_with_period
= false;
1328 int total_entries
= 0;
1331 RGWMetaSyncShardCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1332 const std::string
& period
, epoch_t realm_epoch
,
1333 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1334 rgw_meta_sync_marker
& _marker
,
1335 const std::string
& period_marker
, bool *_reset_backoff
)
1336 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), pool(_pool
),
1337 period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1338 shard_id(_shard_id
), sync_marker(_marker
),
1339 period_marker(period_marker
), inc_lock("RGWMetaSyncShardCR::inc_lock"),
1340 reset_backoff(_reset_backoff
) {
1341 *reset_backoff
= false;
1344 ~RGWMetaSyncShardCR() override
{
1345 delete marker_tracker
;
1351 void set_marker_tracker(RGWMetaSyncShardMarkerTrack
*mt
) {
1352 delete marker_tracker
;
1353 marker_tracker
= mt
;
1356 int operate() override
{
1359 switch (sync_marker
.state
) {
1360 case rgw_meta_sync_marker::FullSync
:
1363 ldout(sync_env
->cct
, 10) << "sync: full_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1364 return set_cr_error(r
);
1367 case rgw_meta_sync_marker::IncrementalSync
:
1368 r
= incremental_sync();
1370 ldout(sync_env
->cct
, 10) << "sync: incremental_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1371 return set_cr_error(r
);
1380 void collect_children()
1383 RGWCoroutinesStack
*child
;
1384 while (collect_next(&child_ret
, &child
)) {
1385 auto iter
= stack_to_pos
.find(child
);
1386 if (iter
== stack_to_pos
.end()) {
1387 /* some other stack that we don't care about */
1391 string
& pos
= iter
->second
;
1393 if (child_ret
< 0) {
1394 ldout(sync_env
->cct
, 0) << *this << ": child operation stack=" << child
<< " entry=" << pos
<< " returned " << child_ret
<< dendl
;
1397 map
<string
, string
>::iterator prev_iter
= pos_to_prev
.find(pos
);
1398 assert(prev_iter
!= pos_to_prev
.end());
1401 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1402 * update the marker and abort. We'll get called again for these. Permanent errors will be
1403 * handled by marking the entry at the error log shard, so that we retry on it separately
1405 if (child_ret
== -EAGAIN
) {
1406 can_adjust_marker
= false;
1409 if (pos_to_prev
.size() == 1) {
1410 if (can_adjust_marker
) {
1411 sync_marker
.marker
= pos
;
1413 pos_to_prev
.erase(prev_iter
);
1415 assert(pos_to_prev
.size() > 1);
1416 pos_to_prev
.erase(prev_iter
);
1417 prev_iter
= pos_to_prev
.begin();
1418 if (can_adjust_marker
) {
1419 sync_marker
.marker
= prev_iter
->second
;
1423 ldout(sync_env
->cct
, 4) << *this << ": adjusting marker pos=" << sync_marker
.marker
<< dendl
;
1424 stack_to_pos
.erase(iter
);
1429 #define OMAP_GET_MAX_ENTRIES 100
1430 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1432 set_status("full_sync");
1433 oid
= full_sync_index_shard_oid(shard_id
);
1434 can_adjust_marker
= true;
1437 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1438 string lock_name
= "sync_lock";
1439 RGWRados
*store
= sync_env
->store
;
1440 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1441 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1442 lock_name
, lock_duration
, this));
1443 lease_stack
.reset(spawn(lease_cr
.get(), false));
1446 while (!lease_cr
->is_locked()) {
1447 if (lease_cr
->is_done()) {
1448 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1450 return lease_cr
->get_ret_status();
1456 /* lock succeeded, a retry now should avoid previous backoff status */
1457 *reset_backoff
= true;
1459 /* prepare marker tracker */
1460 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1461 sync_env
->shard_obj_name(shard_id
),
1464 marker
= sync_marker
.marker
;
1466 total_entries
= sync_marker
.pos
;
1470 if (!lease_cr
->is_locked()) {
1474 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
),
1475 marker
, &entries
, max_entries
));
1477 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode
<< dendl
;
1478 yield lease_cr
->go_down();
1482 iter
= entries
.begin();
1483 for (; iter
!= entries
.end(); ++iter
) {
1484 ldout(sync_env
->cct
, 20) << __func__
<< ": full sync: " << iter
->first
<< dendl
;
1486 if (!marker_tracker
->start(iter
->first
, total_entries
, real_time())) {
1487 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << iter
->first
<< ". Duplicate entry?" << dendl
;
1489 // fetch remote and write locally
1491 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, iter
->first
, iter
->first
, MDLOG_STATUS_COMPLETE
, marker_tracker
), false);
1492 // stack_to_pos holds a reference to the stack
1493 stack_to_pos
[stack
] = iter
->first
;
1494 pos_to_prev
[iter
->first
] = marker
;
1497 marker
= iter
->first
;
1500 } while ((int)entries
.size() == max_entries
&& can_adjust_marker
);
1502 while (num_spawned() > 1) {
1503 yield
wait_for_child();
1508 /* update marker to reflect we're done with full sync */
1509 if (can_adjust_marker
) {
1510 // apply updates to a temporary marker, or operate() will send us
1511 // to incremental_sync() after we yield
1512 temp_marker
= sync_marker
;
1513 temp_marker
->state
= rgw_meta_sync_marker::IncrementalSync
;
1514 temp_marker
->marker
= std::move(temp_marker
->next_step_marker
);
1515 temp_marker
->next_step_marker
.clear();
1516 temp_marker
->realm_epoch
= realm_epoch
;
1517 ldout(sync_env
->cct
, 4) << *this << ": saving marker pos=" << temp_marker
->marker
<< " realm_epoch=" << realm_epoch
<< dendl
;
1519 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>;
1520 yield
call(new WriteMarkerCR(sync_env
->async_rados
, sync_env
->store
,
1521 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1526 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync marker: retcode=" << retcode
<< dendl
;
1527 yield lease_cr
->go_down();
1534 * if we reached here, it means that lost_lock is true, otherwise the state
1535 * change in the previous block will prevent us from reaching here
1538 yield lease_cr
->go_down();
1544 if (!can_adjust_marker
) {
1552 // apply the sync marker update
1553 assert(temp_marker
);
1554 sync_marker
= std::move(*temp_marker
);
1555 temp_marker
= boost::none
;
1556 // must not yield after this point!
1562 int incremental_sync() {
1563 reenter(&incremental_cr
) {
1564 set_status("incremental_sync");
1565 can_adjust_marker
= true;
1567 if (!lease_cr
) { /* could have had a lease_cr lock from previous state */
1569 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1570 string lock_name
= "sync_lock";
1571 RGWRados
*store
= sync_env
->store
;
1572 lease_cr
.reset( new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1573 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1574 lock_name
, lock_duration
, this));
1575 lease_stack
.reset(spawn(lease_cr
.get(), false));
1578 while (!lease_cr
->is_locked()) {
1579 if (lease_cr
->is_done()) {
1580 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1582 return lease_cr
->get_ret_status();
1588 // if the period has advanced, we can't use the existing marker
1589 if (sync_marker
.realm_epoch
< realm_epoch
) {
1590 ldout(sync_env
->cct
, 4) << "clearing marker=" << sync_marker
.marker
1591 << " from old realm_epoch=" << sync_marker
.realm_epoch
1592 << " (now " << realm_epoch
<< ')' << dendl
;
1593 sync_marker
.realm_epoch
= realm_epoch
;
1594 sync_marker
.marker
.clear();
1596 mdlog_marker
= sync_marker
.marker
;
1597 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1598 sync_env
->shard_obj_name(shard_id
),
1602 * mdlog_marker: the remote sync marker positiion
1603 * sync_marker: the local sync marker position
1604 * max_marker: the max mdlog position that we fetched
1605 * marker: the current position we try to sync
1606 * period_marker: the last marker before the next period begins (optional)
1608 marker
= max_marker
= sync_marker
.marker
;
1611 if (!lease_cr
->is_locked()) {
1615 #define INCREMENTAL_MAX_ENTRIES 100
1616 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1617 if (!period_marker
.empty() && period_marker
<= mdlog_marker
) {
1618 ldout(cct
, 10) << "mdlog_marker past period_marker=" << period_marker
<< dendl
;
1619 done_with_period
= true;
1622 if (mdlog_marker
<= max_marker
) {
1623 /* we're at the tip, try to bring more entries */
1624 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " syncing mdlog for shard_id=" << shard_id
<< dendl
;
1625 yield
call(new RGWCloneMetaLogCoroutine(sync_env
, mdlog
,
1627 mdlog_marker
, &mdlog_marker
));
1630 ldout(sync_env
->cct
, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode
<< dendl
;
1631 yield lease_cr
->go_down();
1633 *reset_backoff
= false; // back off and try again later
1636 *reset_backoff
= true; /* if we got to this point, all systems function */
1637 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< dendl
;
1638 if (mdlog_marker
> max_marker
) {
1639 marker
= max_marker
;
1640 yield
call(new RGWReadMDLogEntriesCR(sync_env
, mdlog
, shard_id
,
1641 &max_marker
, INCREMENTAL_MAX_ENTRIES
,
1642 &log_entries
, &truncated
));
1644 ldout(sync_env
->cct
, 10) << *this << ": failed to list mdlog entries, retcode=" << retcode
<< dendl
;
1645 yield lease_cr
->go_down();
1647 *reset_backoff
= false; // back off and try again later
1650 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end() && !done_with_period
; ++log_iter
) {
1651 if (!period_marker
.empty() && period_marker
<= log_iter
->id
) {
1652 done_with_period
= true;
1653 if (period_marker
< log_iter
->id
) {
1654 ldout(cct
, 10) << "found key=" << log_iter
->id
1655 << " past period_marker=" << period_marker
<< dendl
;
1658 ldout(cct
, 10) << "found key at period_marker=" << period_marker
<< dendl
;
1659 // sync this entry, then return control to RGWMetaSyncCR
1661 if (!mdlog_entry
.convert_from(*log_iter
)) {
1662 ldout(sync_env
->cct
, 0) << __func__
<< ":" << __LINE__
<< ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id
<< " log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
<< " ... skipping entry" << dendl
;
1665 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
<< dendl
;
1666 if (!marker_tracker
->start(log_iter
->id
, 0, log_iter
->timestamp
.to_real_time())) {
1667 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << log_iter
->id
<< ". Duplicate entry?" << dendl
;
1669 raw_key
= log_iter
->section
+ ":" + log_iter
->name
;
1671 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, raw_key
, log_iter
->id
, mdlog_entry
.log_data
.status
, marker_tracker
), false);
1673 // stack_to_pos holds a reference to the stack
1674 stack_to_pos
[stack
] = log_iter
->id
;
1675 pos_to_prev
[log_iter
->id
] = marker
;
1678 marker
= log_iter
->id
;
1682 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " max_marker=" << max_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1683 if (done_with_period
) {
1684 // return control to RGWMetaSyncCR and advance to the next period
1685 ldout(sync_env
->cct
, 10) << *this << ": done with period" << dendl
;
1688 if (mdlog_marker
== max_marker
&& can_adjust_marker
) {
1689 #define INCREMENTAL_INTERVAL 20
1690 yield
wait(utime_t(INCREMENTAL_INTERVAL
, 0));
1692 } while (can_adjust_marker
);
1694 while (num_spawned() > 1) {
1695 yield
wait_for_child();
1699 yield lease_cr
->go_down();
1707 if (!can_adjust_marker
) {
1711 return set_cr_done();
1718 class RGWMetaSyncShardControlCR
: public RGWBackoffControlCR
1720 RGWMetaSyncEnv
*sync_env
;
1722 const rgw_pool
& pool
;
1723 const std::string
& period
;
1724 epoch_t realm_epoch
;
1725 RGWMetadataLog
* mdlog
;
1727 rgw_meta_sync_marker sync_marker
;
1728 const std::string period_marker
;
1730 static constexpr bool exit_on_error
= false; // retry on all errors
1732 RGWMetaSyncShardControlCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1733 const std::string
& period
, epoch_t realm_epoch
,
1734 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1735 const rgw_meta_sync_marker
& _marker
,
1736 std::string
&& period_marker
)
1737 : RGWBackoffControlCR(_sync_env
->cct
, exit_on_error
), sync_env(_sync_env
),
1738 pool(_pool
), period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1739 shard_id(_shard_id
), sync_marker(_marker
),
1740 period_marker(std::move(period_marker
)) {}
1742 RGWCoroutine
*alloc_cr() override
{
1743 return new RGWMetaSyncShardCR(sync_env
, pool
, period
, realm_epoch
, mdlog
,
1744 shard_id
, sync_marker
, period_marker
, backoff_ptr());
1747 RGWCoroutine
*alloc_finisher_cr() override
{
1748 RGWRados
*store
= sync_env
->store
;
1749 return new RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
, store
,
1750 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1755 class RGWMetaSyncCR
: public RGWCoroutine
{
1756 RGWMetaSyncEnv
*sync_env
;
1757 const rgw_pool
& pool
;
1758 RGWPeriodHistory::Cursor cursor
; //< sync position in period history
1759 RGWPeriodHistory::Cursor next
; //< next period in history
1760 rgw_meta_sync_status sync_status
;
1762 std::mutex mutex
; //< protect access to shard_crs
1764 // TODO: it should be enough to hold a reference on the stack only, as calling
1765 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1766 // already completed
1767 using ControlCRRef
= boost::intrusive_ptr
<RGWMetaSyncShardControlCR
>;
1768 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1769 using RefPair
= std::pair
<ControlCRRef
, StackRef
>;
1770 map
<int, RefPair
> shard_crs
;
1774 RGWMetaSyncCR(RGWMetaSyncEnv
*_sync_env
, RGWPeriodHistory::Cursor cursor
,
1775 const rgw_meta_sync_status
& _sync_status
)
1776 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1777 pool(sync_env
->store
->get_zone_params().log_pool
),
1778 cursor(cursor
), sync_status(_sync_status
) {}
1780 int operate() override
{
1782 // loop through one period at a time
1784 if (cursor
== sync_env
->store
->period_history
->get_current()) {
1785 next
= RGWPeriodHistory::Cursor
{};
1787 ldout(cct
, 10) << "RGWMetaSyncCR on current period="
1788 << cursor
.get_period().get_id() << dendl
;
1790 ldout(cct
, 10) << "RGWMetaSyncCR with no period" << dendl
;
1795 ldout(cct
, 10) << "RGWMetaSyncCR on period="
1796 << cursor
.get_period().get_id() << ", next="
1797 << next
.get_period().get_id() << dendl
;
1801 // get the mdlog for the current period (may be empty)
1802 auto& period_id
= sync_status
.sync_info
.period
;
1803 auto realm_epoch
= sync_status
.sync_info
.realm_epoch
;
1804 auto mdlog
= sync_env
->store
->meta_mgr
->get_log(period_id
);
1806 // prevent wakeup() from accessing shard_crs while we're spawning them
1807 std::lock_guard
<std::mutex
> lock(mutex
);
1809 // sync this period on each shard
1810 for (const auto& m
: sync_status
.sync_markers
) {
1811 uint32_t shard_id
= m
.first
;
1812 auto& marker
= m
.second
;
1814 std::string period_marker
;
1816 // read the maximum marker from the next period's sync status
1817 period_marker
= next
.get_period().get_sync_status()[shard_id
];
1818 if (period_marker
.empty()) {
1819 // no metadata changes have occurred on this shard, skip it
1820 ldout(cct
, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1821 << " with empty period marker" << dendl
;
1826 using ShardCR
= RGWMetaSyncShardControlCR
;
1827 auto cr
= new ShardCR(sync_env
, pool
, period_id
, realm_epoch
,
1828 mdlog
, shard_id
, marker
,
1829 std::move(period_marker
));
1830 auto stack
= spawn(cr
, false);
1831 shard_crs
[shard_id
] = RefPair
{cr
, stack
};
1834 // wait for each shard to complete
1835 while (ret
== 0 && num_spawned() > 0) {
1836 yield
wait_for_child();
1837 collect(&ret
, nullptr);
1841 // drop shard cr refs under lock
1842 std::lock_guard
<std::mutex
> lock(mutex
);
1846 return set_cr_error(ret
);
1848 // advance to the next period
1852 // write the updated sync info
1853 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
1854 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
1855 yield
call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
,
1857 rgw_raw_obj(pool
, sync_env
->status_oid()),
1858 sync_status
.sync_info
));
1864 void wakeup(int shard_id
) {
1865 std::lock_guard
<std::mutex
> lock(mutex
);
1866 auto iter
= shard_crs
.find(shard_id
);
1867 if (iter
== shard_crs
.end()) {
1870 iter
->second
.first
->wakeup();
1874 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv
*env
) {
1875 env
->cct
= store
->ctx();
1878 env
->async_rados
= async_rados
;
1879 env
->http_manager
= &http_manager
;
1880 env
->error_logger
= error_logger
;
1883 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status
*sync_status
)
1885 if (store
->is_meta_master()) {
1888 // cannot run concurrently with run_sync(), so run in a separate manager
1889 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
1890 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
1891 int ret
= http_manager
.set_threaded();
1893 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
1896 RGWMetaSyncEnv sync_env_local
= sync_env
;
1897 sync_env_local
.http_manager
= &http_manager
;
1898 ret
= crs
.run(new RGWReadSyncStatusCoroutine(&sync_env_local
, sync_status
));
1899 http_manager
.stop();
1903 int RGWRemoteMetaLog::init_sync_status()
1905 if (store
->is_meta_master()) {
1909 rgw_mdlog_info mdlog_info
;
1910 int r
= read_log_info(&mdlog_info
);
1912 lderr(store
->ctx()) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
1916 rgw_meta_sync_info sync_info
;
1917 sync_info
.num_shards
= mdlog_info
.num_shards
;
1918 auto cursor
= store
->period_history
->get_current();
1920 sync_info
.period
= cursor
.get_period().get_id();
1921 sync_info
.realm_epoch
= cursor
.get_epoch();
1924 return run(new RGWInitSyncStatusCoroutine(&sync_env
, sync_info
));
1927 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info
& sync_info
)
1929 return run(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(async_rados
, store
,
1930 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
.status_oid()),
1934 // return a cursor to the period at our sync position
1935 static RGWPeriodHistory::Cursor
get_period_at(RGWRados
* store
,
1936 const rgw_meta_sync_info
& info
)
1938 if (info
.period
.empty()) {
1939 // return an empty cursor with error=0
1940 return RGWPeriodHistory::Cursor
{};
1943 // look for an existing period in our history
1944 auto cursor
= store
->period_history
->lookup(info
.realm_epoch
);
1946 // verify that the period ids match
1947 auto& existing
= cursor
.get_period().get_id();
1948 if (existing
!= info
.period
) {
1949 lderr(store
->ctx()) << "ERROR: sync status period=" << info
.period
1950 << " does not match period=" << existing
1951 << " in history at realm epoch=" << info
.realm_epoch
<< dendl
;
1952 return RGWPeriodHistory::Cursor
{-EEXIST
};
1957 // read the period from rados or pull it from the master
1959 int r
= store
->period_puller
->pull(info
.period
, period
);
1961 lderr(store
->ctx()) << "ERROR: failed to read period id "
1962 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
1963 return RGWPeriodHistory::Cursor
{r
};
1965 // attach the period to our history
1966 cursor
= store
->period_history
->attach(std::move(period
));
1968 r
= cursor
.get_error();
1969 lderr(store
->ctx()) << "ERROR: failed to read period history back to "
1970 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
1975 int RGWRemoteMetaLog::run_sync()
1977 if (store
->is_meta_master()) {
1983 // get shard count and oldest log period from master
1984 rgw_mdlog_info mdlog_info
;
1987 ldout(store
->ctx(), 1) << __func__
<< "(): going down" << dendl
;
1990 r
= read_log_info(&mdlog_info
);
1991 if (r
== -EIO
|| r
== -ENOENT
) {
1992 // keep retrying if master isn't alive or hasn't initialized the log
1993 ldout(store
->ctx(), 10) << __func__
<< "(): waiting for master.." << dendl
;
1994 backoff
.backoff_sleep();
1999 lderr(store
->ctx()) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
2005 rgw_meta_sync_status sync_status
;
2008 ldout(store
->ctx(), 1) << __func__
<< "(): going down" << dendl
;
2011 r
= run(new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2012 if (r
< 0 && r
!= -ENOENT
) {
2013 ldout(store
->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r
<< dendl
;
2017 if (!mdlog_info
.period
.empty()) {
2018 // restart sync if the remote has a period, but:
2019 // a) our status does not, or
2020 // b) our sync period comes before the remote's oldest log period
2021 if (sync_status
.sync_info
.period
.empty() ||
2022 sync_status
.sync_info
.realm_epoch
< mdlog_info
.realm_epoch
) {
2023 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateInit
;
2024 ldout(store
->ctx(), 1) << "epoch=" << sync_status
.sync_info
.realm_epoch
2025 << " in sync status comes before remote's oldest mdlog epoch="
2026 << mdlog_info
.realm_epoch
<< ", restarting sync" << dendl
;
2030 if (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
) {
2031 ldout(store
->ctx(), 20) << __func__
<< "(): init" << dendl
;
2032 sync_status
.sync_info
.num_shards
= mdlog_info
.num_shards
;
2033 auto cursor
= store
->period_history
->get_current();
2035 // run full sync, then start incremental from the current period/epoch
2036 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
2037 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
2039 r
= run(new RGWInitSyncStatusCoroutine(&sync_env
, sync_status
.sync_info
));
2041 backoff
.backoff_sleep();
2046 ldout(store
->ctx(), 0) << "ERROR: failed to init sync status r=" << r
<< dendl
;
2050 } while (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
);
2052 auto num_shards
= sync_status
.sync_info
.num_shards
;
2053 if (num_shards
!= mdlog_info
.num_shards
) {
2054 lderr(store
->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info
.num_shards
<< " local num_shards=" << num_shards
<< dendl
;
2058 RGWPeriodHistory::Cursor cursor
;
2060 r
= run(new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2061 if (r
< 0 && r
!= -ENOENT
) {
2062 ldout(store
->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r
<< dendl
;
2066 switch ((rgw_meta_sync_info::SyncState
)sync_status
.sync_info
.state
) {
2067 case rgw_meta_sync_info::StateBuildingFullSyncMaps
:
2068 ldout(store
->ctx(), 20) << __func__
<< "(): building full sync maps" << dendl
;
2069 r
= run(new RGWFetchAllMetaCR(&sync_env
, num_shards
, sync_status
.sync_markers
));
2070 if (r
== -EBUSY
|| r
== -EAGAIN
) {
2071 backoff
.backoff_sleep();
2076 ldout(store
->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl
;
2080 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateSync
;
2081 r
= store_sync_info(sync_status
.sync_info
);
2083 ldout(store
->ctx(), 0) << "ERROR: failed to update sync status" << dendl
;
2087 case rgw_meta_sync_info::StateSync
:
2088 ldout(store
->ctx(), 20) << __func__
<< "(): sync" << dendl
;
2089 // find our position in the period history (if any)
2090 cursor
= get_period_at(store
, sync_status
.sync_info
);
2091 r
= cursor
.get_error();
2095 meta_sync_cr
= new RGWMetaSyncCR(&sync_env
, cursor
, sync_status
);
2096 r
= run(meta_sync_cr
);
2098 ldout(store
->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl
;
2103 ldout(store
->ctx(), 0) << "ERROR: bad sync state!" << dendl
;
2106 } while (!going_down
);
2111 void RGWRemoteMetaLog::wakeup(int shard_id
)
2113 if (!meta_sync_cr
) {
2116 meta_sync_cr
->wakeup(shard_id
);
2119 int RGWCloneMetaLogCoroutine::operate()
2124 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": init request" << dendl
;
2125 return state_init();
2128 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status" << dendl
;
2129 return state_read_shard_status();
2132 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status complete" << dendl
;
2133 return state_read_shard_status_complete();
2136 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": sending rest request" << dendl
;
2137 return state_send_rest_request();
2140 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": receiving rest response" << dendl
;
2141 return state_receive_rest_response();
2144 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries" << dendl
;
2145 return state_store_mdlog_entries();
2147 } while (truncated
);
2149 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries complete" << dendl
;
2150 return state_store_mdlog_entries_complete();
2157 int RGWCloneMetaLogCoroutine::state_init()
2159 data
= rgw_mdlog_shard_data();
2164 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2166 const bool add_ref
= false; // default constructs with refs=1
2168 completion
.reset(new RGWMetadataLogInfoCompletion(
2169 [this](int ret
, const cls_log_header
& header
) {
2171 ldout(cct
, 1) << "ERROR: failed to read mdlog info with "
2172 << cpp_strerror(ret
) << dendl
;
2174 shard_info
.marker
= header
.max_marker
;
2175 shard_info
.last_update
= header
.max_time
.to_real_time();
2177 // wake up parent stack
2178 stack
->get_completion_mgr()->complete(nullptr, stack
);
2181 int ret
= mdlog
->get_info_async(shard_id
, completion
.get());
2183 ldout(cct
, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret
<< dendl
;
2184 return set_cr_error(ret
);
2190 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2194 ldout(cct
, 20) << "shard_id=" << shard_id
<< " marker=" << shard_info
.marker
<< " last_update=" << shard_info
.last_update
<< dendl
;
2196 marker
= shard_info
.marker
;
2201 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2203 RGWRESTConn
*conn
= sync_env
->conn
;
2206 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
2208 char max_entries_buf
[32];
2209 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", max_entries
);
2211 const char *marker_key
= (marker
.empty() ? "" : "marker");
2213 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
2215 { "period", period
.c_str() },
2216 { "max-entries", max_entries_buf
},
2217 { marker_key
, marker
.c_str() },
2220 http_op
= new RGWRESTReadResource(conn
, "/admin/log", pairs
, NULL
, sync_env
->http_manager
);
2222 http_op
->set_user_info((void *)stack
);
2224 int ret
= http_op
->aio_read();
2226 ldout(cct
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
2227 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
2236 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2238 int ret
= http_op
->wait(&data
);
2240 error_stream
<< "http operation failed: " << http_op
->to_str() << " status=" << http_op
->get_http_status() << std::endl
;
2241 ldout(cct
, 5) << "failed to wait for op, ret=" << ret
<< dendl
;
2244 return set_cr_error(ret
);
2249 ldout(cct
, 20) << "remote mdlog, shard_id=" << shard_id
<< " num of shard entries: " << data
.entries
.size() << dendl
;
2251 truncated
= ((int)data
.entries
.size() == max_entries
);
2253 if (data
.entries
.empty()) {
2255 *new_marker
= marker
;
2257 return set_cr_done();
2261 *new_marker
= data
.entries
.back().id
;
2268 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2270 list
<cls_log_entry
> dest_entries
;
2272 vector
<rgw_mdlog_entry
>::iterator iter
;
2273 for (iter
= data
.entries
.begin(); iter
!= data
.entries
.end(); ++iter
) {
2274 rgw_mdlog_entry
& entry
= *iter
;
2275 ldout(cct
, 20) << "entry: name=" << entry
.name
<< dendl
;
2277 cls_log_entry dest_entry
;
2278 dest_entry
.id
= entry
.id
;
2279 dest_entry
.section
= entry
.section
;
2280 dest_entry
.name
= entry
.name
;
2281 dest_entry
.timestamp
= utime_t(entry
.timestamp
);
2283 ::encode(entry
.log_data
, dest_entry
.data
);
2285 dest_entries
.push_back(dest_entry
);
2290 RGWAioCompletionNotifier
*cn
= stack
->create_completion_notifier();
2292 int ret
= mdlog
->store_entries_in_shard(dest_entries
, shard_id
, cn
->completion());
2295 ldout(cct
, 10) << "failed to store md log entries shard_id=" << shard_id
<< " ret=" << ret
<< dendl
;
2296 return set_cr_error(ret
);
2301 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2303 return set_cr_done();
2307 // TODO: move into rgw_sync_trim.cc
2309 #define dout_prefix (*_dout << "meta trim: ")
2311 /// purge all log shards for the given mdlog
2312 class PurgeLogShardsCR
: public RGWShardCollectCR
{
2313 RGWRados
*const store
;
2314 const RGWMetadataLog
* mdlog
;
2315 const int num_shards
;
2319 static constexpr int max_concurrent
= 16;
2322 PurgeLogShardsCR(RGWRados
*store
, const RGWMetadataLog
* mdlog
,
2323 const rgw_pool
& pool
, int num_shards
)
2324 : RGWShardCollectCR(store
->ctx(), max_concurrent
),
2325 store(store
), mdlog(mdlog
), num_shards(num_shards
), obj(pool
, "")
2328 bool spawn_next() override
{
2329 if (i
== num_shards
) {
2332 mdlog
->get_shard_oid(i
++, obj
.oid
);
2333 spawn(new RGWRadosRemoveCR(store
, obj
), false);
2338 using Cursor
= RGWPeriodHistory::Cursor
;
2340 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
2341 class PurgePeriodLogsCR
: public RGWCoroutine
{
2342 RGWRados
*const store
;
2343 RGWMetadataManager
*const metadata
;
2344 RGWObjVersionTracker objv
;
2346 epoch_t realm_epoch
;
2347 epoch_t
*last_trim_epoch
; //< update last trim on success
2350 PurgePeriodLogsCR(RGWRados
*store
, epoch_t realm_epoch
, epoch_t
*last_trim
)
2351 : RGWCoroutine(store
->ctx()), store(store
), metadata(store
->meta_mgr
),
2352 realm_epoch(realm_epoch
), last_trim_epoch(last_trim
)
2358 int PurgePeriodLogsCR::operate()
2361 // read our current oldest log period
2362 yield
call(metadata
->read_oldest_log_period_cr(&cursor
, &objv
));
2364 return set_cr_error(retcode
);
2367 ldout(cct
, 20) << "oldest log realm_epoch=" << cursor
.get_epoch()
2368 << " period=" << cursor
.get_period().get_id() << dendl
;
2370 // trim -up to- the given realm_epoch
2371 while (cursor
.get_epoch() < realm_epoch
) {
2372 ldout(cct
, 4) << "purging log shards for realm_epoch=" << cursor
.get_epoch()
2373 << " period=" << cursor
.get_period().get_id() << dendl
;
2375 const auto mdlog
= metadata
->get_log(cursor
.get_period().get_id());
2376 const auto& pool
= store
->get_zone_params().log_pool
;
2377 auto num_shards
= cct
->_conf
->rgw_md_log_max_shards
;
2378 call(new PurgeLogShardsCR(store
, mdlog
, pool
, num_shards
));
2381 ldout(cct
, 1) << "failed to remove log shards: "
2382 << cpp_strerror(retcode
) << dendl
;
2383 return set_cr_error(retcode
);
2385 ldout(cct
, 10) << "removed log shards for realm_epoch=" << cursor
.get_epoch()
2386 << " period=" << cursor
.get_period().get_id() << dendl
;
2388 // update our mdlog history
2389 yield
call(metadata
->trim_log_period_cr(cursor
, &objv
));
2390 if (retcode
== -ENOENT
) {
2391 // must have raced to update mdlog history. return success and allow the
2392 // winner to continue purging
2393 ldout(cct
, 10) << "already removed log shards for realm_epoch=" << cursor
.get_epoch()
2394 << " period=" << cursor
.get_period().get_id() << dendl
;
2395 return set_cr_done();
2396 } else if (retcode
< 0) {
2397 ldout(cct
, 1) << "failed to remove log shards for realm_epoch="
2398 << cursor
.get_epoch() << " period=" << cursor
.get_period().get_id()
2399 << " with: " << cpp_strerror(retcode
) << dendl
;
2400 return set_cr_error(retcode
);
2403 if (*last_trim_epoch
< cursor
.get_epoch()) {
2404 *last_trim_epoch
= cursor
.get_epoch();
2407 assert(cursor
.has_next()); // get_current() should always come after
2410 return set_cr_done();
2417 using connection_map
= std::map
<std::string
, std::unique_ptr
<RGWRESTConn
>>;
2419 /// construct a RGWRESTConn for each zone in the realm
2420 template <typename Zonegroups
>
2421 connection_map
make_peer_connections(RGWRados
*store
,
2422 const Zonegroups
& zonegroups
)
2424 connection_map connections
;
2425 for (auto& g
: zonegroups
) {
2426 for (auto& z
: g
.second
.zones
) {
2427 std::unique_ptr
<RGWRESTConn
> conn
{
2428 new RGWRESTConn(store
->ctx(), store
, z
.first
, z
.second
.endpoints
)};
2429 connections
.emplace(z
.first
, std::move(conn
));
2435 /// return the marker that it's safe to trim up to
2436 const std::string
& get_stable_marker(const rgw_meta_sync_marker
& m
)
2438 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
2441 /// comparison operator for take_min_status()
2442 bool operator<(const rgw_meta_sync_marker
& lhs
, const rgw_meta_sync_marker
& rhs
)
2444 // sort by stable marker
2445 return get_stable_marker(lhs
) < get_stable_marker(rhs
);
2448 /// populate the status with the minimum stable marker of each shard for any
2449 /// peer whose realm_epoch matches the minimum realm_epoch in the input
2450 template <typename Iter
>
2451 int take_min_status(CephContext
*cct
, Iter first
, Iter last
,
2452 rgw_meta_sync_status
*status
)
2454 if (first
== last
) {
2457 const size_t num_shards
= cct
->_conf
->rgw_md_log_max_shards
;
2459 status
->sync_info
.realm_epoch
= std::numeric_limits
<epoch_t
>::max();
2460 for (auto p
= first
; p
!= last
; ++p
) {
2461 // validate peer's shard count
2462 if (p
->sync_markers
.size() != num_shards
) {
2463 ldout(cct
, 1) << "take_min_status got peer status with "
2464 << p
->sync_markers
.size() << " shards, expected "
2465 << num_shards
<< dendl
;
2468 if (p
->sync_info
.realm_epoch
< status
->sync_info
.realm_epoch
) {
2469 // earlier epoch, take its entire status
2470 *status
= std::move(*p
);
2471 } else if (p
->sync_info
.realm_epoch
== status
->sync_info
.realm_epoch
) {
2472 // same epoch, take any earlier markers
2473 auto m
= status
->sync_markers
.begin();
2474 for (auto& shard
: p
->sync_markers
) {
2475 if (shard
.second
< m
->second
) {
2476 m
->second
= std::move(shard
.second
);
2486 RGWRados
*const store
;
2487 RGWHTTPManager
*const http
;
2489 const std::string
& zone
;
2490 Cursor current
; //< cursor to current period
2491 epoch_t last_trim_epoch
{0}; //< epoch of last mdlog that was purged
2493 TrimEnv(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2494 : store(store
), http(http
), num_shards(num_shards
),
2495 zone(store
->get_zone_params().get_id()),
2496 current(store
->period_history
->get_current())
2500 struct MasterTrimEnv
: public TrimEnv
{
2501 connection_map connections
; //< peer connections
2502 std::vector
<rgw_meta_sync_status
> peer_status
; //< sync status for each peer
2503 /// last trim marker for each shard, only applies to current period's mdlog
2504 std::vector
<std::string
> last_trim_markers
;
2506 MasterTrimEnv(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2507 : TrimEnv(store
, http
, num_shards
),
2508 last_trim_markers(num_shards
)
2510 auto& period
= current
.get_period();
2511 connections
= make_peer_connections(store
, period
.get_map().zonegroups
);
2512 connections
.erase(zone
);
2513 peer_status
.resize(connections
.size());
2517 struct PeerTrimEnv
: public TrimEnv
{
2518 /// last trim timestamp for each shard, only applies to current period's mdlog
2519 std::vector
<ceph::real_time
> last_trim_timestamps
;
2521 PeerTrimEnv(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2522 : TrimEnv(store
, http
, num_shards
),
2523 last_trim_timestamps(num_shards
)
2526 void set_num_shards(int num_shards
) {
2527 this->num_shards
= num_shards
;
2528 last_trim_timestamps
.resize(num_shards
);
2532 } // anonymous namespace
2535 /// spawn a trim cr for each shard that needs it, while limiting the number
2536 /// of concurrent shards
2537 class MetaMasterTrimShardCollectCR
: public RGWShardCollectCR
{
2539 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2542 RGWMetadataLog
*mdlog
;
2545 const rgw_meta_sync_status
& sync_status
;
2548 MetaMasterTrimShardCollectCR(MasterTrimEnv
& env
, RGWMetadataLog
*mdlog
,
2549 const rgw_meta_sync_status
& sync_status
)
2550 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2551 env(env
), mdlog(mdlog
), sync_status(sync_status
)
2554 bool spawn_next() override
;
2557 bool MetaMasterTrimShardCollectCR::spawn_next()
2559 while (shard_id
< env
.num_shards
) {
2560 auto m
= sync_status
.sync_markers
.find(shard_id
);
2561 if (m
== sync_status
.sync_markers
.end()) {
2565 auto& stable
= get_stable_marker(m
->second
);
2566 auto& last_trim
= env
.last_trim_markers
[shard_id
];
2568 if (stable
<= last_trim
) {
2570 ldout(cct
, 20) << "skipping log shard " << shard_id
2571 << " at marker=" << stable
2572 << " last_trim=" << last_trim
2573 << " realm_epoch=" << sync_status
.sync_info
.realm_epoch
<< dendl
;
2578 mdlog
->get_shard_oid(shard_id
, oid
);
2580 ldout(cct
, 10) << "trimming log shard " << shard_id
2581 << " at marker=" << stable
2582 << " last_trim=" << last_trim
2583 << " realm_epoch=" << sync_status
.sync_info
.realm_epoch
<< dendl
;
2584 spawn(new RGWSyncLogTrimCR(env
.store
, oid
, stable
, &last_trim
), false);
2591 /// spawn rest requests to read each peer's sync status
2592 class MetaMasterStatusCollectCR
: public RGWShardCollectCR
{
2593 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2596 connection_map::iterator c
;
2597 std::vector
<rgw_meta_sync_status
>::iterator s
;
2599 MetaMasterStatusCollectCR(MasterTrimEnv
& env
)
2600 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2601 env(env
), c(env
.connections
.begin()), s(env
.peer_status
.begin())
2604 bool spawn_next() override
{
2605 if (c
== env
.connections
.end()) {
2608 static rgw_http_param_pair params
[] = {
2609 { "type", "metadata" },
2610 { "status", nullptr },
2611 { nullptr, nullptr }
2614 ldout(cct
, 20) << "query sync status from " << c
->first
<< dendl
;
2615 auto conn
= c
->second
.get();
2616 using StatusCR
= RGWReadRESTResourceCR
<rgw_meta_sync_status
>;
2617 spawn(new StatusCR(cct
, conn
, env
.http
, "/admin/log/", params
, &*s
),
2625 class MetaMasterTrimCR
: public RGWCoroutine
{
2627 rgw_meta_sync_status min_status
; //< minimum sync status of all peers
2631 MetaMasterTrimCR(MasterTrimEnv
& env
)
2632 : RGWCoroutine(env
.store
->ctx()), env(env
)
2638 int MetaMasterTrimCR::operate()
2641 // TODO: detect this and fail before we spawn the trim thread?
2642 if (env
.connections
.empty()) {
2643 ldout(cct
, 4) << "no peers, exiting" << dendl
;
2644 return set_cr_done();
2647 ldout(cct
, 10) << "fetching sync status for zone " << env
.zone
<< dendl
;
2648 // query mdlog sync status from peers
2649 yield
call(new MetaMasterStatusCollectCR(env
));
2651 // must get a successful reply from all peers to consider trimming
2653 ldout(cct
, 4) << "failed to fetch sync status from all peers" << dendl
;
2654 return set_cr_error(ret
);
2657 // determine the minimum epoch and markers
2658 ret
= take_min_status(env
.store
->ctx(), env
.peer_status
.begin(),
2659 env
.peer_status
.end(), &min_status
);
2661 ldout(cct
, 4) << "failed to calculate min sync status from peers" << dendl
;
2662 return set_cr_error(ret
);
2665 auto store
= env
.store
;
2666 auto epoch
= min_status
.sync_info
.realm_epoch
;
2667 ldout(cct
, 4) << "realm epoch min=" << epoch
2668 << " current=" << env
.current
.get_epoch()<< dendl
;
2669 if (epoch
> env
.last_trim_epoch
+ 1) {
2670 // delete any prior mdlog periods
2671 spawn(new PurgePeriodLogsCR(store
, epoch
, &env
.last_trim_epoch
), true);
2673 ldout(cct
, 10) << "mdlogs already purged up to realm_epoch "
2674 << env
.last_trim_epoch
<< dendl
;
2677 // if realm_epoch == current, trim mdlog based on markers
2678 if (epoch
== env
.current
.get_epoch()) {
2679 auto mdlog
= store
->meta_mgr
->get_log(env
.current
.get_period().get_id());
2680 spawn(new MetaMasterTrimShardCollectCR(env
, mdlog
, min_status
), true);
2683 // ignore any errors during purge/trim because we want to hold the lock open
2684 return set_cr_done();
2690 /// read the first entry of the master's mdlog shard and trim to that position
2691 class MetaPeerTrimShardCR
: public RGWCoroutine
{
2692 RGWMetaSyncEnv
& env
;
2693 RGWMetadataLog
*mdlog
;
2694 const std::string
& period_id
;
2696 RGWMetadataLogInfo info
;
2697 ceph::real_time stable
; //< safe timestamp to trim, according to master
2698 ceph::real_time
*last_trim
; //< last trimmed timestamp, updated on trim
2699 rgw_mdlog_shard_data result
; //< result from master's mdlog listing
2702 MetaPeerTrimShardCR(RGWMetaSyncEnv
& env
, RGWMetadataLog
*mdlog
,
2703 const std::string
& period_id
, int shard_id
,
2704 ceph::real_time
*last_trim
)
2705 : RGWCoroutine(env
.store
->ctx()), env(env
), mdlog(mdlog
),
2706 period_id(period_id
), shard_id(shard_id
), last_trim(last_trim
)
2709 int operate() override
;
2712 int MetaPeerTrimShardCR::operate()
2715 // query master's first mdlog entry for this shard
2716 yield
call(new RGWListRemoteMDLogShardCR(&env
, period_id
, shard_id
,
2719 ldout(cct
, 5) << "failed to read first entry from master's mdlog shard "
2720 << shard_id
<< " for period " << period_id
2721 << ": " << cpp_strerror(retcode
) << dendl
;
2722 return set_cr_error(retcode
);
2724 if (result
.entries
.empty()) {
2725 // if there are no mdlog entries, we don't have a timestamp to compare. we
2726 // can't just trim everything, because there could be racing updates since
2727 // this empty reply. query the mdlog shard info to read its max timestamp,
2728 // then retry the listing to make sure it's still empty before trimming to
2730 ldout(cct
, 10) << "empty master mdlog shard " << shard_id
2731 << ", reading last timestamp from shard info" << dendl
;
2732 // read the mdlog shard info for the last timestamp
2733 using ShardInfoCR
= RGWReadRemoteMDLogShardInfoCR
;
2734 yield
call(new ShardInfoCR(&env
, period_id
, shard_id
, &info
));
2736 ldout(cct
, 5) << "failed to read info from master's mdlog shard "
2737 << shard_id
<< " for period " << period_id
2738 << ": " << cpp_strerror(retcode
) << dendl
;
2739 return set_cr_error(retcode
);
2741 if (ceph::real_clock::is_zero(info
.last_update
)) {
2742 return set_cr_done(); // nothing to trim
2744 ldout(cct
, 10) << "got mdlog shard info with last update="
2745 << info
.last_update
<< dendl
;
2746 // re-read the master's first mdlog entry to make sure it hasn't changed
2747 yield
call(new RGWListRemoteMDLogShardCR(&env
, period_id
, shard_id
,
2750 ldout(cct
, 5) << "failed to read first entry from master's mdlog shard "
2751 << shard_id
<< " for period " << period_id
2752 << ": " << cpp_strerror(retcode
) << dendl
;
2753 return set_cr_error(retcode
);
2755 // if the mdlog is still empty, trim to max marker
2756 if (result
.entries
.empty()) {
2757 stable
= info
.last_update
;
2759 stable
= result
.entries
.front().timestamp
;
2761 // can only trim -up to- master's first timestamp, so subtract a second.
2762 // (this is why we use timestamps instead of markers for the peers)
2763 stable
-= std::chrono::seconds(1);
2766 stable
= result
.entries
.front().timestamp
;
2767 stable
-= std::chrono::seconds(1);
2770 if (stable
<= *last_trim
) {
2771 ldout(cct
, 10) << "skipping log shard " << shard_id
2772 << " at timestamp=" << stable
2773 << " last_trim=" << *last_trim
<< dendl
;
2774 return set_cr_done();
2777 ldout(cct
, 10) << "trimming log shard " << shard_id
2778 << " at timestamp=" << stable
2779 << " last_trim=" << *last_trim
<< dendl
;
2782 mdlog
->get_shard_oid(shard_id
, oid
);
2783 call(new RGWRadosTimelogTrimCR(env
.store
, oid
, real_time
{}, stable
, "", ""));
2785 if (retcode
< 0 && retcode
!= -ENODATA
) {
2786 ldout(cct
, 1) << "failed to trim mdlog shard " << shard_id
2787 << ": " << cpp_strerror(retcode
) << dendl
;
2788 return set_cr_error(retcode
);
2790 *last_trim
= stable
;
2791 return set_cr_done();
2796 class MetaPeerTrimShardCollectCR
: public RGWShardCollectCR
{
2797 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2800 RGWMetadataLog
*mdlog
;
2801 const std::string
& period_id
;
2802 RGWMetaSyncEnv meta_env
; //< for RGWListRemoteMDLogShardCR
2806 MetaPeerTrimShardCollectCR(PeerTrimEnv
& env
, RGWMetadataLog
*mdlog
)
2807 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2808 env(env
), mdlog(mdlog
), period_id(env
.current
.get_period().get_id())
2810 meta_env
.init(cct
, env
.store
, env
.store
->rest_master_conn
,
2811 env
.store
->get_async_rados(), env
.http
, nullptr);
2814 bool spawn_next() override
;
2817 bool MetaPeerTrimShardCollectCR::spawn_next()
2819 if (shard_id
>= env
.num_shards
) {
2822 auto& last_trim
= env
.last_trim_timestamps
[shard_id
];
2823 spawn(new MetaPeerTrimShardCR(meta_env
, mdlog
, period_id
, shard_id
, &last_trim
),
2829 class MetaPeerTrimCR
: public RGWCoroutine
{
2831 rgw_mdlog_info mdlog_info
; //< master's mdlog info
2834 MetaPeerTrimCR(PeerTrimEnv
& env
) : RGWCoroutine(env
.store
->ctx()), env(env
) {}
2839 int MetaPeerTrimCR::operate()
2842 ldout(cct
, 10) << "fetching master mdlog info" << dendl
;
2844 // query mdlog_info from master for oldest_log_period
2845 rgw_http_param_pair params
[] = {
2846 { "type", "metadata" },
2847 { nullptr, nullptr }
2850 using LogInfoCR
= RGWReadRESTResourceCR
<rgw_mdlog_info
>;
2851 call(new LogInfoCR(cct
, env
.store
->rest_master_conn
, env
.http
,
2852 "/admin/log/", params
, &mdlog_info
));
2855 ldout(cct
, 4) << "failed to read mdlog info from master" << dendl
;
2856 return set_cr_error(retcode
);
2858 // use master's shard count instead
2859 env
.set_num_shards(mdlog_info
.num_shards
);
2861 if (mdlog_info
.realm_epoch
> env
.last_trim_epoch
+ 1) {
2862 // delete any prior mdlog periods
2863 yield
call(new PurgePeriodLogsCR(env
.store
, mdlog_info
.realm_epoch
,
2864 &env
.last_trim_epoch
));
2866 ldout(cct
, 10) << "mdlogs already purged through realm_epoch "
2867 << env
.last_trim_epoch
<< dendl
;
2870 // if realm_epoch == current, trim mdlog based on master's markers
2871 if (mdlog_info
.realm_epoch
== env
.current
.get_epoch()) {
2873 auto meta_mgr
= env
.store
->meta_mgr
;
2874 auto mdlog
= meta_mgr
->get_log(env
.current
.get_period().get_id());
2875 call(new MetaPeerTrimShardCollectCR(env
, mdlog
));
2876 // ignore any errors during purge/trim because we want to hold the lock open
2879 return set_cr_done();
2884 class MetaTrimPollCR
: public RGWCoroutine
{
2885 RGWRados
*const store
;
2886 const utime_t interval
; //< polling interval
2887 const rgw_raw_obj obj
;
2888 const std::string name
{"meta_trim"}; //< lock name
2889 const std::string cookie
;
2892 /// allocate the coroutine to run within the lease
2893 virtual RGWCoroutine
* alloc_cr() = 0;
2896 MetaTrimPollCR(RGWRados
*store
, utime_t interval
)
2897 : RGWCoroutine(store
->ctx()), store(store
), interval(interval
),
2898 obj(store
->get_zone_params().log_pool
, RGWMetadataLogHistory::oid
),
2899 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
))
2905 int MetaTrimPollCR::operate()
2909 set_status("sleeping");
2912 // prevent others from trimming for our entire wait interval
2913 set_status("acquiring trim lock");
2914 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
2915 obj
, name
, cookie
, interval
.sec()));
2917 ldout(cct
, 4) << "failed to lock: " << cpp_strerror(retcode
) << dendl
;
2921 set_status("trimming");
2922 yield
call(alloc_cr());
2925 // on errors, unlock so other gateways can try
2926 set_status("unlocking");
2927 yield
call(new RGWSimpleRadosUnlockCR(store
->get_async_rados(), store
,
2928 obj
, name
, cookie
));
2935 class MetaMasterTrimPollCR
: public MetaTrimPollCR
{
2936 MasterTrimEnv env
; //< trim state to share between calls
2937 RGWCoroutine
* alloc_cr() override
{
2938 return new MetaMasterTrimCR(env
);
2941 MetaMasterTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
2942 int num_shards
, utime_t interval
)
2943 : MetaTrimPollCR(store
, interval
),
2944 env(store
, http
, num_shards
)
2948 class MetaPeerTrimPollCR
: public MetaTrimPollCR
{
2949 PeerTrimEnv env
; //< trim state to share between calls
2950 RGWCoroutine
* alloc_cr() override
{
2951 return new MetaPeerTrimCR(env
);
2954 MetaPeerTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
2955 int num_shards
, utime_t interval
)
2956 : MetaTrimPollCR(store
, interval
),
2957 env(store
, http
, num_shards
)
2961 RGWCoroutine
* create_meta_log_trim_cr(RGWRados
*store
, RGWHTTPManager
*http
,
2962 int num_shards
, utime_t interval
)
2964 if (store
->is_meta_master()) {
2965 return new MetaMasterTrimPollCR(store
, http
, num_shards
, interval
);
2967 return new MetaPeerTrimPollCR(store
, http
, num_shards
, interval
);
2971 struct MetaMasterAdminTrimCR
: private MasterTrimEnv
, public MetaMasterTrimCR
{
2972 MetaMasterAdminTrimCR(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2973 : MasterTrimEnv(store
, http
, num_shards
),
2974 MetaMasterTrimCR(*static_cast<MasterTrimEnv
*>(this))
2978 struct MetaPeerAdminTrimCR
: private PeerTrimEnv
, public MetaPeerTrimCR
{
2979 MetaPeerAdminTrimCR(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2980 : PeerTrimEnv(store
, http
, num_shards
),
2981 MetaPeerTrimCR(*static_cast<PeerTrimEnv
*>(this))
2985 RGWCoroutine
* create_admin_meta_log_trim_cr(RGWRados
*store
,
2986 RGWHTTPManager
*http
,
2989 if (store
->is_meta_master()) {
2990 return new MetaMasterAdminTrimCR(store
, http
, num_shards
);
2992 return new MetaPeerAdminTrimCR(store
, http
, num_shards
);