1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/optional.hpp>
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
17 #include "rgw_metadata.h"
18 #include "rgw_rest_conn.h"
19 #include "rgw_tools.h"
20 #include "rgw_cr_rados.h"
21 #include "rgw_cr_rest.h"
22 #include "rgw_http_client.h"
24 #include "cls/lock/cls_lock_client.h"
26 #include <boost/asio/yield.hpp>
28 #define dout_subsys ceph_subsys_rgw
31 #define dout_prefix (*_dout << "meta sync: ")
33 static string mdlog_sync_status_oid
= "mdlog.sync-status";
34 static string mdlog_sync_status_shard_prefix
= "mdlog.sync-status.shard";
35 static string mdlog_sync_full_sync_index_prefix
= "meta.full-sync.index";
37 RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados
*_store
, const string
&oid_prefix
, int _num_shards
) : store(_store
), num_shards(_num_shards
) {
38 for (int i
= 0; i
< num_shards
; i
++) {
39 oids
.push_back(get_shard_oid(oid_prefix
, i
));
42 string
RGWSyncErrorLogger::get_shard_oid(const string
& oid_prefix
, int shard_id
) {
43 char buf
[oid_prefix
.size() + 16];
44 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), shard_id
);
48 RGWCoroutine
*RGWSyncErrorLogger::log_error_cr(const string
& source_zone
, const string
& section
, const string
& name
, uint32_t error_code
, const string
& message
) {
51 rgw_sync_error_info
info(source_zone
, error_code
, message
);
54 store
->time_log_prepare_entry(entry
, real_clock::now(), section
, name
, bl
);
56 uint32_t shard_id
= ++counter
% num_shards
;
59 return new RGWRadosTimelogAddCR(store
, oids
[shard_id
], entry
);
62 void RGWSyncBackoff::update_wait_time()
67 cur_wait
= (cur_wait
<< 1);
69 if (cur_wait
>= max_secs
) {
74 void RGWSyncBackoff::backoff_sleep()
80 void RGWSyncBackoff::backoff(RGWCoroutine
*op
)
83 op
->wait(utime_t(cur_wait
, 0));
86 int RGWBackoffControlCR::operate() {
88 // retry the operation until it succeeds
91 Mutex::Locker
l(lock
);
97 Mutex::Locker
l(lock
);
104 if (retcode
!= -EBUSY
&& retcode
!= -EAGAIN
) {
105 ldout(cct
, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode
<< dendl
;
107 return set_cr_error(retcode
);
113 yield backoff
.backoff(this);
116 // run an optional finisher
117 yield
call(alloc_finisher_cr());
119 ldout(cct
, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode
<< dendl
;
120 return set_cr_error(retcode
);
122 return set_cr_done();
127 void rgw_mdlog_info::decode_json(JSONObj
*obj
) {
128 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
129 JSONDecoder::decode_json("period", period
, obj
);
130 JSONDecoder::decode_json("realm_epoch", realm_epoch
, obj
);
133 void rgw_mdlog_entry::decode_json(JSONObj
*obj
) {
134 JSONDecoder::decode_json("id", id
, obj
);
135 JSONDecoder::decode_json("section", section
, obj
);
136 JSONDecoder::decode_json("name", name
, obj
);
138 JSONDecoder::decode_json("timestamp", ut
, obj
);
139 timestamp
= ut
.to_real_time();
140 JSONDecoder::decode_json("data", log_data
, obj
);
143 void rgw_mdlog_shard_data::decode_json(JSONObj
*obj
) {
144 JSONDecoder::decode_json("marker", marker
, obj
);
145 JSONDecoder::decode_json("truncated", truncated
, obj
);
146 JSONDecoder::decode_json("entries", entries
, obj
);
149 int RGWShardCollectCR::operate() {
151 while (spawn_next()) {
154 while (current_running
>= max_concurrent
) {
156 yield
wait_for_child();
157 if (collect_next(&child_ret
)) {
159 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
160 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
166 while (current_running
> 0) {
168 yield
wait_for_child();
169 if (collect_next(&child_ret
)) {
171 if (child_ret
< 0 && child_ret
!= -ENOENT
) {
172 ldout(cct
, 10) << __func__
<< ": failed to fetch log status, ret=" << child_ret
<< dendl
;
178 return set_cr_error(status
);
180 return set_cr_done();
185 class RGWReadRemoteMDLogInfoCR
: public RGWShardCollectCR
{
186 RGWMetaSyncEnv
*sync_env
;
188 const std::string
& period
;
190 map
<int, RGWMetadataLogInfo
> *mdlog_info
;
193 #define READ_MDLOG_MAX_CONCURRENT 10
196 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv
*_sync_env
,
197 const std::string
& period
, int _num_shards
,
198 map
<int, RGWMetadataLogInfo
> *_mdlog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
200 period(period
), num_shards(_num_shards
),
201 mdlog_info(_mdlog_info
), shard_id(0) {}
202 bool spawn_next() override
;
205 class RGWListRemoteMDLogCR
: public RGWShardCollectCR
{
206 RGWMetaSyncEnv
*sync_env
;
208 const std::string
& period
;
209 map
<int, string
> shards
;
210 int max_entries_per_shard
;
211 map
<int, rgw_mdlog_shard_data
> *result
;
213 map
<int, string
>::iterator iter
;
214 #define READ_MDLOG_MAX_CONCURRENT 10
217 RGWListRemoteMDLogCR(RGWMetaSyncEnv
*_sync_env
,
218 const std::string
& period
, map
<int, string
>& _shards
,
219 int _max_entries_per_shard
,
220 map
<int, rgw_mdlog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_MDLOG_MAX_CONCURRENT
),
221 sync_env(_sync_env
), period(period
),
222 max_entries_per_shard(_max_entries_per_shard
),
224 shards
.swap(_shards
);
225 iter
= shards
.begin();
227 bool spawn_next() override
;
230 RGWRemoteMetaLog::~RGWRemoteMetaLog()
235 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info
*log_info
)
237 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
240 int ret
= conn
->get_json_resource("/admin/log", pairs
, *log_info
);
242 ldout(store
->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl
;
246 ldout(store
->ctx(), 20) << "remote mdlog, num_shards=" << log_info
->num_shards
<< dendl
;
251 int RGWRemoteMetaLog::read_master_log_shards_info(const string
&master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
)
253 if (store
->is_meta_master()) {
257 rgw_mdlog_info log_info
;
258 int ret
= read_log_info(&log_info
);
263 return run(new RGWReadRemoteMDLogInfoCR(&sync_env
, master_period
, log_info
.num_shards
, shards_info
));
266 int RGWRemoteMetaLog::read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
)
268 if (store
->is_meta_master()) {
272 return run(new RGWListRemoteMDLogCR(&sync_env
, period
, shard_markers
, 1, result
));
275 int RGWRemoteMetaLog::init()
277 conn
= store
->rest_master_conn
;
279 int ret
= http_manager
.set_threaded();
281 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
285 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
287 init_sync_env(&sync_env
);
292 void RGWRemoteMetaLog::finish()
298 #define CLONE_MAX_ENTRIES 100
300 int RGWMetaSyncStatusManager::init()
302 if (store
->is_meta_master()) {
306 if (!store
->rest_master_conn
) {
307 lderr(store
->ctx()) << "no REST connection to master zone" << dendl
;
311 int r
= rgw_init_ioctx(store
->get_rados_handle(), store
->get_zone_params().log_pool
, ioctx
, true);
313 lderr(store
->ctx()) << "ERROR: failed to open log pool (" << store
->get_zone_params().log_pool
<< " ret=" << r
<< dendl
;
317 r
= master_log
.init();
319 lderr(store
->ctx()) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
323 RGWMetaSyncEnv
& sync_env
= master_log
.get_sync_env();
325 rgw_meta_sync_status sync_status
;
326 r
= read_sync_status(&sync_status
);
327 if (r
< 0 && r
!= -ENOENT
) {
328 lderr(store
->ctx()) << "ERROR: failed to read sync status, r=" << r
<< dendl
;
332 int num_shards
= sync_status
.sync_info
.num_shards
;
334 for (int i
= 0; i
< num_shards
; i
++) {
335 shard_objs
[i
] = rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
.shard_obj_name(i
));
338 RWLock::WLocker
wl(ts_to_shard_lock
);
339 for (int i
= 0; i
< num_shards
; i
++) {
340 clone_markers
.push_back(string());
349 void RGWMetaSyncEnv::init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
350 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
351 RGWSyncErrorLogger
*_error_logger
) {
355 async_rados
= _async_rados
;
356 http_manager
= _http_manager
;
357 error_logger
= _error_logger
;
360 string
RGWMetaSyncEnv::status_oid()
362 return mdlog_sync_status_oid
;
365 string
RGWMetaSyncEnv::shard_obj_name(int shard_id
)
367 char buf
[mdlog_sync_status_shard_prefix
.size() + 16];
368 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_status_shard_prefix
.c_str(), shard_id
);
373 class RGWAsyncReadMDLogEntries
: public RGWAsyncRadosRequest
{
375 RGWMetadataLog
*mdlog
;
379 list
<cls_log_entry
> *entries
;
383 int _send_request() override
{
389 mdlog
->init_list_entries(shard_id
, from_time
, end_time
, *marker
, &handle
);
391 int ret
= mdlog
->list_entries(handle
, max_entries
, *entries
, marker
, truncated
);
393 mdlog
->complete_list_entries(handle
);
398 RGWAsyncReadMDLogEntries(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
399 RGWMetadataLog
* mdlog
, int _shard_id
,
400 string
* _marker
, int _max_entries
,
401 list
<cls_log_entry
> *_entries
, bool *_truncated
)
402 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), mdlog(mdlog
),
403 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
),
404 entries(_entries
), truncated(_truncated
) {}
407 class RGWReadMDLogEntriesCR
: public RGWSimpleCoroutine
{
408 RGWMetaSyncEnv
*sync_env
;
409 RGWMetadataLog
*const mdlog
;
414 list
<cls_log_entry
> *entries
;
417 RGWAsyncReadMDLogEntries
*req
{nullptr};
420 RGWReadMDLogEntriesCR(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
421 int _shard_id
, string
*_marker
, int _max_entries
,
422 list
<cls_log_entry
> *_entries
, bool *_truncated
)
423 : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
424 shard_id(_shard_id
), pmarker(_marker
), max_entries(_max_entries
),
425 entries(_entries
), truncated(_truncated
) {}
427 ~RGWReadMDLogEntriesCR() override
{
433 int send_request() override
{
435 req
= new RGWAsyncReadMDLogEntries(this, stack
->create_completion_notifier(),
436 sync_env
->store
, mdlog
, shard_id
, &marker
,
437 max_entries
, entries
, truncated
);
438 sync_env
->async_rados
->queue(req
);
442 int request_complete() override
{
443 int ret
= req
->get_ret_status();
444 if (ret
>= 0 && !entries
->empty()) {
447 return req
->get_ret_status();
452 class RGWReadRemoteMDLogShardInfoCR
: public RGWCoroutine
{
454 RGWRESTReadResource
*http_op
;
456 const std::string
& period
;
458 RGWMetadataLogInfo
*shard_info
;
461 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
462 int _shard_id
, RGWMetadataLogInfo
*_shard_info
)
463 : RGWCoroutine(env
->store
->ctx()), env(env
), http_op(NULL
),
464 period(period
), shard_id(_shard_id
), shard_info(_shard_info
) {}
466 int operate() override
{
467 auto store
= env
->store
;
468 RGWRESTConn
*conn
= store
->rest_master_conn
;
472 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
473 rgw_http_param_pair pairs
[] = { { "type" , "metadata" },
475 { "period", period
.c_str() },
479 string p
= "/admin/log/";
481 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
,
484 http_op
->set_user_info((void *)stack
);
486 int ret
= http_op
->aio_read();
488 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
489 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
491 return set_cr_error(ret
);
497 int ret
= http_op
->wait(shard_info
);
500 return set_cr_error(ret
);
502 return set_cr_done();
509 class RGWListRemoteMDLogShardCR
: public RGWSimpleCoroutine
{
510 RGWMetaSyncEnv
*sync_env
;
511 RGWRESTReadResource
*http_op
;
513 const std::string
& period
;
516 uint32_t max_entries
;
517 rgw_mdlog_shard_data
*result
;
520 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv
*env
, const std::string
& period
,
521 int _shard_id
, const string
& _marker
, uint32_t _max_entries
,
522 rgw_mdlog_shard_data
*_result
)
523 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
524 period(period
), shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
526 int send_request() override
{
527 RGWRESTConn
*conn
= sync_env
->conn
;
528 RGWRados
*store
= sync_env
->store
;
531 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
533 char max_entries_buf
[32];
534 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
536 const char *marker_key
= (marker
.empty() ? "" : "marker");
538 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
540 { "period", period
.c_str() },
541 { "max-entries", max_entries_buf
},
542 { marker_key
, marker
.c_str() },
545 string p
= "/admin/log/";
547 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
548 http_op
->set_user_info((void *)stack
);
550 int ret
= http_op
->aio_read();
552 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
553 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
561 int request_complete() override
{
562 int ret
= http_op
->wait(result
);
564 if (ret
< 0 && ret
!= -ENOENT
) {
565 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret
<< dendl
;
572 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
573 if (shard_id
>= num_shards
) {
576 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, period
, shard_id
, &(*mdlog_info
)[shard_id
]), false);
581 bool RGWListRemoteMDLogCR::spawn_next() {
582 if (iter
== shards
.end()) {
586 spawn(new RGWListRemoteMDLogShardCR(sync_env
, period
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
591 class RGWInitSyncStatusCoroutine
: public RGWCoroutine
{
592 RGWMetaSyncEnv
*sync_env
;
594 rgw_meta_sync_info status
;
595 vector
<RGWMetadataLogInfo
> shards_info
;
596 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
597 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
599 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
600 const rgw_meta_sync_info
&status
)
601 : RGWCoroutine(_sync_env
->store
->ctx()), sync_env(_sync_env
),
602 status(status
), shards_info(status
.num_shards
),
603 lease_cr(nullptr), lease_stack(nullptr) {}
605 ~RGWInitSyncStatusCoroutine() override
{
611 int operate() override
{
615 set_status("acquiring sync lock");
616 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
617 string lock_name
= "sync_lock";
618 RGWRados
*store
= sync_env
->store
;
619 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
620 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->status_oid()),
621 lock_name
, lock_duration
, this));
622 lease_stack
.reset(spawn(lease_cr
.get(), false));
624 while (!lease_cr
->is_locked()) {
625 if (lease_cr
->is_done()) {
626 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
627 set_status("lease lock failed, early abort");
628 return set_cr_error(lease_cr
->get_ret_status());
634 set_status("writing sync status");
635 RGWRados
*store
= sync_env
->store
;
636 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
, store
,
637 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->status_oid()),
642 set_status("failed to write sync status");
643 ldout(cct
, 0) << "ERROR: failed to write sync status, retcode=" << retcode
<< dendl
;
644 yield lease_cr
->go_down();
645 return set_cr_error(retcode
);
647 /* fetch current position in logs */
648 set_status("fetching remote log position");
650 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
651 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env
, status
.period
, i
,
652 &shards_info
[i
]), false);
656 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
659 set_status("updating sync status");
660 for (int i
= 0; i
< (int)status
.num_shards
; i
++) {
661 rgw_meta_sync_marker marker
;
662 RGWMetadataLogInfo
& info
= shards_info
[i
];
663 marker
.next_step_marker
= info
.marker
;
664 marker
.timestamp
= info
.last_update
;
665 RGWRados
*store
= sync_env
->store
;
666 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
,
668 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->shard_obj_name(i
)),
673 set_status("changing sync state: build full sync maps");
674 status
.state
= rgw_meta_sync_info::StateBuildingFullSyncMaps
;
675 RGWRados
*store
= sync_env
->store
;
676 call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
, store
,
677 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
->status_oid()),
680 set_status("drop lock lease");
681 yield lease_cr
->go_down();
682 while (collect(&ret
, NULL
)) {
684 return set_cr_error(ret
);
689 return set_cr_done();
695 class RGWReadSyncStatusMarkersCR
: public RGWShardCollectCR
{
696 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
699 const int num_shards
;
701 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
704 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv
*env
, int num_shards
,
705 map
<uint32_t, rgw_meta_sync_marker
>& markers
)
706 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
707 env(env
), num_shards(num_shards
), markers(markers
)
709 bool spawn_next() override
;
712 bool RGWReadSyncStatusMarkersCR::spawn_next()
714 if (shard_id
>= num_shards
) {
717 using CR
= RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>;
718 rgw_raw_obj obj
{env
->store
->get_zone_params().log_pool
,
719 env
->shard_obj_name(shard_id
)};
720 spawn(new CR(env
->async_rados
, env
->store
, obj
, &markers
[shard_id
]), false);
725 class RGWReadSyncStatusCoroutine
: public RGWCoroutine
{
726 RGWMetaSyncEnv
*sync_env
;
727 rgw_meta_sync_status
*sync_status
;
730 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv
*_sync_env
,
731 rgw_meta_sync_status
*_status
)
732 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
734 int operate() override
;
737 int RGWReadSyncStatusCoroutine::operate()
741 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_meta_sync_info
>;
743 bool empty_on_enoent
= false; // fail on ENOENT
744 rgw_raw_obj obj
{sync_env
->store
->get_zone_params().log_pool
,
745 sync_env
->status_oid()};
746 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
, obj
,
747 &sync_status
->sync_info
, empty_on_enoent
));
750 ldout(sync_env
->cct
, 4) << "failed to read sync status info with "
751 << cpp_strerror(retcode
) << dendl
;
752 return set_cr_error(retcode
);
754 // read shard markers
755 using ReadMarkersCR
= RGWReadSyncStatusMarkersCR
;
756 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
757 sync_status
->sync_markers
));
759 ldout(sync_env
->cct
, 4) << "failed to read sync status markers with "
760 << cpp_strerror(retcode
) << dendl
;
761 return set_cr_error(retcode
);
763 return set_cr_done();
768 class RGWFetchAllMetaCR
: public RGWCoroutine
{
769 RGWMetaSyncEnv
*sync_env
;
776 list
<string
> sections
;
777 list
<string
>::iterator sections_iter
;
779 struct meta_list_result
{
783 bool truncated
{false};
785 void decode_json(JSONObj
*obj
) {
786 JSONDecoder::decode_json("keys", keys
, obj
);
787 JSONDecoder::decode_json("marker", marker
, obj
);
788 JSONDecoder::decode_json("count", count
, obj
);
789 JSONDecoder::decode_json("truncated", truncated
, obj
);
792 list
<string
>::iterator iter
;
794 std::unique_ptr
<RGWShardedOmapCRManager
> entries_index
;
796 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
797 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
803 map
<uint32_t, rgw_meta_sync_marker
>& markers
;
806 RGWFetchAllMetaCR(RGWMetaSyncEnv
*_sync_env
, int _num_shards
,
807 map
<uint32_t, rgw_meta_sync_marker
>& _markers
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
808 num_shards(_num_shards
),
809 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
810 lost_lock(false), failed(false), markers(_markers
) {
813 ~RGWFetchAllMetaCR() override
{
816 void append_section_from_set(set
<string
>& all_sections
, const string
& name
) {
817 set
<string
>::iterator iter
= all_sections
.find(name
);
818 if (iter
!= all_sections
.end()) {
819 sections
.emplace_back(std::move(*iter
));
820 all_sections
.erase(iter
);
824 * meta sync should go in the following order: user, bucket.instance, bucket
825 * then whatever other sections exist (if any)
827 void rearrange_sections() {
828 set
<string
> all_sections
;
829 std::move(sections
.begin(), sections
.end(),
830 std::inserter(all_sections
, all_sections
.end()));
833 append_section_from_set(all_sections
, "user");
834 append_section_from_set(all_sections
, "bucket.instance");
835 append_section_from_set(all_sections
, "bucket");
837 std::move(all_sections
.begin(), all_sections
.end(),
838 std::back_inserter(sections
));
841 int operate() override
{
842 RGWRESTConn
*conn
= sync_env
->conn
;
846 set_status(string("acquiring lock (") + sync_env
->status_oid() + ")");
847 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
848 string lock_name
= "sync_lock";
849 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
,
851 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, sync_env
->status_oid()),
852 lock_name
, lock_duration
, this));
853 lease_stack
.reset(spawn(lease_cr
.get(), false));
855 while (!lease_cr
->is_locked()) {
856 if (lease_cr
->is_done()) {
857 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
858 set_status("failed acquiring lock");
859 return set_cr_error(lease_cr
->get_ret_status());
864 entries_index
.reset(new RGWShardedOmapCRManager(sync_env
->async_rados
, sync_env
->store
, this, num_shards
,
865 sync_env
->store
->get_zone_params().log_pool
,
866 mdlog_sync_full_sync_index_prefix
));
868 call(new RGWReadRESTResourceCR
<list
<string
> >(cct
, conn
, sync_env
->http_manager
,
869 "/admin/metadata", NULL
, §ions
));
871 if (get_ret_status() < 0) {
872 ldout(cct
, 0) << "ERROR: failed to fetch metadata sections" << dendl
;
873 yield entries_index
->finish();
874 yield lease_cr
->go_down();
876 return set_cr_error(get_ret_status());
878 rearrange_sections();
879 sections_iter
= sections
.begin();
880 for (; sections_iter
!= sections
.end(); ++sections_iter
) {
883 #define META_FULL_SYNC_CHUNK_SIZE "1000"
884 string entrypoint
= string("/admin/metadata/") + *sections_iter
;
885 rgw_http_param_pair pairs
[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE
},
886 { "marker", result
.marker
.c_str() },
889 call(new RGWReadRESTResourceCR
<meta_list_result
>(cct
, conn
, sync_env
->http_manager
,
890 entrypoint
, pairs
, &result
));
892 if (get_ret_status() < 0) {
893 ldout(cct
, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter
<< dendl
;
894 yield entries_index
->finish();
895 yield lease_cr
->go_down();
897 return set_cr_error(get_ret_status());
899 iter
= result
.keys
.begin();
900 for (; iter
!= result
.keys
.end(); ++iter
) {
901 if (!lease_cr
->is_locked()) {
905 yield
; // allow entries_index consumer to make progress
907 ldout(cct
, 20) << "list metadata: section=" << *sections_iter
<< " key=" << *iter
<< dendl
;
908 string s
= *sections_iter
+ ":" + *iter
;
910 RGWRados
*store
= sync_env
->store
;
911 int ret
= store
->meta_mgr
->get_log_shard_id(*sections_iter
, *iter
, &shard_id
);
913 ldout(cct
, 0) << "ERROR: could not determine shard id for " << *sections_iter
<< ":" << *iter
<< dendl
;
917 if (!entries_index
->append(s
, shard_id
)) {
921 } while (result
.truncated
);
924 if (!entries_index
->finish()) {
929 for (map
<uint32_t, rgw_meta_sync_marker
>::iterator iter
= markers
.begin(); iter
!= markers
.end(); ++iter
) {
930 int shard_id
= (int)iter
->first
;
931 rgw_meta_sync_marker
& marker
= iter
->second
;
932 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
933 spawn(new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
, sync_env
->store
,
934 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, sync_env
->shard_obj_name(shard_id
)),
939 drain_all_but_stack(lease_stack
.get()); /* the lease cr still needs to run */
941 yield lease_cr
->go_down();
944 while (collect(&ret
, NULL
)) {
946 return set_cr_error(ret
);
952 yield
return set_cr_error(-EIO
);
955 yield
return set_cr_error(-EBUSY
);
958 if (ret_status
< 0) {
959 yield
return set_cr_error(ret_status
);
962 yield
return set_cr_done();
968 static string
full_sync_index_shard_oid(int shard_id
)
970 char buf
[mdlog_sync_full_sync_index_prefix
.size() + 16];
971 snprintf(buf
, sizeof(buf
), "%s.%d", mdlog_sync_full_sync_index_prefix
.c_str(), shard_id
);
975 class RGWReadRemoteMetadataCR
: public RGWCoroutine
{
976 RGWMetaSyncEnv
*sync_env
;
978 RGWRESTReadResource
*http_op
;
986 RGWReadRemoteMetadataCR(RGWMetaSyncEnv
*_sync_env
,
987 const string
& _section
, const string
& _key
, bufferlist
*_pbl
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
994 int operate() override
{
995 RGWRESTConn
*conn
= sync_env
->conn
;
998 rgw_http_param_pair pairs
[] = { { "key" , key
.c_str()},
1001 string p
= string("/admin/metadata/") + section
+ "/" + key
;
1003 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
1005 http_op
->set_user_info((void *)stack
);
1007 int ret
= http_op
->aio_read();
1009 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
1010 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
1012 return set_cr_error(ret
);
1018 int ret
= http_op
->wait_bl(pbl
);
1021 return set_cr_error(ret
);
1023 return set_cr_done();
1030 class RGWAsyncMetaStoreEntry
: public RGWAsyncRadosRequest
{
1035 int _send_request() override
{
1036 int ret
= store
->meta_mgr
->put(raw_key
, bl
, RGWMetadataHandler::APPLY_ALWAYS
);
1038 ldout(store
->ctx(), 0) << "ERROR: can't store key: " << raw_key
<< " ret=" << ret
<< dendl
;
1044 RGWAsyncMetaStoreEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
1045 const string
& _raw_key
,
1046 bufferlist
& _bl
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1047 raw_key(_raw_key
), bl(_bl
) {}
1051 class RGWMetaStoreEntryCR
: public RGWSimpleCoroutine
{
1052 RGWMetaSyncEnv
*sync_env
;
1056 RGWAsyncMetaStoreEntry
*req
;
1059 RGWMetaStoreEntryCR(RGWMetaSyncEnv
*_sync_env
,
1060 const string
& _raw_key
,
1061 bufferlist
& _bl
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1062 raw_key(_raw_key
), bl(_bl
), req(NULL
) {
1065 ~RGWMetaStoreEntryCR() override
{
1071 int send_request() override
{
1072 req
= new RGWAsyncMetaStoreEntry(this, stack
->create_completion_notifier(),
1073 sync_env
->store
, raw_key
, bl
);
1074 sync_env
->async_rados
->queue(req
);
1078 int request_complete() override
{
1079 return req
->get_ret_status();
1083 class RGWAsyncMetaRemoveEntry
: public RGWAsyncRadosRequest
{
1087 int _send_request() override
{
1088 int ret
= store
->meta_mgr
->remove(raw_key
);
1090 ldout(store
->ctx(), 0) << "ERROR: can't remove key: " << raw_key
<< " ret=" << ret
<< dendl
;
1096 RGWAsyncMetaRemoveEntry(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
1097 const string
& _raw_key
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1098 raw_key(_raw_key
) {}
1102 class RGWMetaRemoveEntryCR
: public RGWSimpleCoroutine
{
1103 RGWMetaSyncEnv
*sync_env
;
1106 RGWAsyncMetaRemoveEntry
*req
;
1109 RGWMetaRemoveEntryCR(RGWMetaSyncEnv
*_sync_env
,
1110 const string
& _raw_key
) : RGWSimpleCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1111 raw_key(_raw_key
), req(NULL
) {
1114 ~RGWMetaRemoveEntryCR() override
{
1120 int send_request() override
{
1121 req
= new RGWAsyncMetaRemoveEntry(this, stack
->create_completion_notifier(),
1122 sync_env
->store
, raw_key
);
1123 sync_env
->async_rados
->queue(req
);
1127 int request_complete() override
{
1128 int r
= req
->get_ret_status();
1136 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1138 class RGWMetaSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
1139 RGWMetaSyncEnv
*sync_env
;
1142 rgw_meta_sync_marker sync_marker
;
1146 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv
*_sync_env
,
1147 const string
& _marker_oid
,
1148 const rgw_meta_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW
),
1149 sync_env(_sync_env
),
1150 marker_oid(_marker_oid
),
1151 sync_marker(_marker
) {}
1153 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
1154 sync_marker
.marker
= new_marker
;
1155 if (index_pos
> 0) {
1156 sync_marker
.pos
= index_pos
;
1159 if (!real_clock::is_zero(timestamp
)) {
1160 sync_marker
.timestamp
= timestamp
;
1163 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< " realm_epoch=" << sync_marker
.realm_epoch
<< dendl
;
1164 RGWRados
*store
= sync_env
->store
;
1165 return new RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
,
1167 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
1172 int RGWMetaSyncSingleEntryCR::operate() {
1174 #define NUM_TRANSIENT_ERROR_RETRIES 10
1176 if (error_injection
&&
1177 rand() % 10000 < cct
->_conf
->rgw_sync_meta_inject_err_probability
* 10000.0) {
1178 ldout(sync_env
->cct
, 0) << __FILE__
<< ":" << __LINE__
<< ": injecting meta sync error on key=" << raw_key
<< dendl
;
1179 return set_cr_error(-EIO
);
1182 if (op_status
!= MDLOG_STATUS_COMPLETE
) {
1183 ldout(sync_env
->cct
, 20) << "skipping pending operation" << dendl
;
1184 yield
call(marker_tracker
->finish(entry_marker
));
1186 return set_cr_error(retcode
);
1188 return set_cr_done();
1190 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1192 pos
= raw_key
.find(':');
1193 section
= raw_key
.substr(0, pos
);
1194 key
= raw_key
.substr(pos
+ 1);
1195 ldout(sync_env
->cct
, 20) << "fetching remote metadata: " << section
<< ":" << key
<< (tries
== 0 ? "" : " (retry)") << dendl
;
1196 call(new RGWReadRemoteMetadataCR(sync_env
, section
, key
, &md_bl
));
1199 sync_status
= retcode
;
1201 if (sync_status
== -ENOENT
) {
1202 /* FIXME: do we need to remove the entry from the local zone? */
1206 if ((sync_status
== -EAGAIN
|| sync_status
== -ECANCELED
) && (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1)) {
1207 ldout(sync_env
->cct
, 20) << *this << ": failed to fetch remote metadata: " << section
<< ":" << key
<< ", will retry" << dendl
;
1211 if (sync_status
< 0) {
1212 ldout(sync_env
->cct
, 10) << *this << ": failed to send read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
<< dendl
;
1213 log_error() << "failed to send read remote metadata entry: section=" << section
<< " key=" << key
<< " status=" << sync_status
<< std::endl
;
1214 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), section
, key
, -sync_status
,
1215 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status
)));
1216 return set_cr_error(sync_status
);
1223 for (tries
= 0; tries
< NUM_TRANSIENT_ERROR_RETRIES
; tries
++) {
1224 if (sync_status
!= -ENOENT
) {
1225 yield
call(new RGWMetaStoreEntryCR(sync_env
, raw_key
, md_bl
));
1227 yield
call(new RGWMetaRemoveEntryCR(sync_env
, raw_key
));
1229 if ((retcode
== -EAGAIN
|| retcode
== -ECANCELED
) && (tries
< NUM_TRANSIENT_ERROR_RETRIES
- 1)) {
1230 ldout(sync_env
->cct
, 20) << *this << ": failed to store metadata: " << section
<< ":" << key
<< ", got retcode=" << retcode
<< dendl
;
1236 sync_status
= retcode
;
1238 if (sync_status
== 0 && marker_tracker
) {
1240 yield
call(marker_tracker
->finish(entry_marker
));
1241 sync_status
= retcode
;
1243 if (sync_status
< 0) {
1244 return set_cr_error(sync_status
);
1246 return set_cr_done();
1251 class RGWCloneMetaLogCoroutine
: public RGWCoroutine
{
1252 RGWMetaSyncEnv
*sync_env
;
1253 RGWMetadataLog
*mdlog
;
1255 const std::string
& period
;
1258 bool truncated
= false;
1261 int max_entries
= CLONE_MAX_ENTRIES
;
1263 RGWRESTReadResource
*http_op
= nullptr;
1264 boost::intrusive_ptr
<RGWMetadataLogInfoCompletion
> completion
;
1266 RGWMetadataLogInfo shard_info
;
1267 rgw_mdlog_shard_data data
;
1270 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv
*_sync_env
, RGWMetadataLog
* mdlog
,
1271 const std::string
& period
, int _id
,
1272 const string
& _marker
, string
*_new_marker
)
1273 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), mdlog(mdlog
),
1274 period(period
), shard_id(_id
), marker(_marker
), new_marker(_new_marker
) {
1276 *new_marker
= marker
;
1279 ~RGWCloneMetaLogCoroutine() override
{
1284 completion
->cancel();
1288 int operate() override
;
1291 int state_read_shard_status();
1292 int state_read_shard_status_complete();
1293 int state_send_rest_request();
1294 int state_receive_rest_response();
1295 int state_store_mdlog_entries();
1296 int state_store_mdlog_entries_complete();
1299 class RGWMetaSyncShardCR
: public RGWCoroutine
{
1300 RGWMetaSyncEnv
*sync_env
;
1302 const rgw_pool
& pool
;
1303 const std::string
& period
; //< currently syncing period id
1304 const epoch_t realm_epoch
; //< realm_epoch of period
1305 RGWMetadataLog
* mdlog
; //< log of syncing period
1307 rgw_meta_sync_marker
& sync_marker
;
1308 boost::optional
<rgw_meta_sync_marker
> temp_marker
; //< for pending updates
1311 const std::string
& period_marker
; //< max marker stored in next period
1313 std::set
<std::string
> entries
;
1314 std::set
<std::string
>::iterator iter
;
1318 RGWMetaSyncShardMarkerTrack
*marker_tracker
= nullptr;
1320 list
<cls_log_entry
> log_entries
;
1321 list
<cls_log_entry
>::iterator log_iter
;
1322 bool truncated
= false;
1324 string mdlog_marker
;
1326 rgw_mdlog_entry mdlog_entry
;
1331 boost::asio::coroutine incremental_cr
;
1332 boost::asio::coroutine full_cr
;
1334 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1335 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1337 bool lost_lock
= false;
1339 bool *reset_backoff
;
1341 // hold a reference to the cr stack while it's in the map
1342 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1343 map
<StackRef
, string
> stack_to_pos
;
1344 map
<string
, string
> pos_to_prev
;
1346 bool can_adjust_marker
= false;
1347 bool done_with_period
= false;
1349 int total_entries
= 0;
1352 RGWMetaSyncShardCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1353 const std::string
& period
, epoch_t realm_epoch
,
1354 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1355 rgw_meta_sync_marker
& _marker
,
1356 const std::string
& period_marker
, bool *_reset_backoff
)
1357 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), pool(_pool
),
1358 period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1359 shard_id(_shard_id
), sync_marker(_marker
),
1360 period_marker(period_marker
), inc_lock("RGWMetaSyncShardCR::inc_lock"),
1361 reset_backoff(_reset_backoff
) {
1362 *reset_backoff
= false;
1365 ~RGWMetaSyncShardCR() override
{
1366 delete marker_tracker
;
1372 void set_marker_tracker(RGWMetaSyncShardMarkerTrack
*mt
) {
1373 delete marker_tracker
;
1374 marker_tracker
= mt
;
1377 int operate() override
{
1380 switch (sync_marker
.state
) {
1381 case rgw_meta_sync_marker::FullSync
:
1384 ldout(sync_env
->cct
, 10) << "sync: full_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1385 return set_cr_error(r
);
1388 case rgw_meta_sync_marker::IncrementalSync
:
1389 r
= incremental_sync();
1391 ldout(sync_env
->cct
, 10) << "sync: incremental_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1392 return set_cr_error(r
);
1401 void collect_children()
1404 RGWCoroutinesStack
*child
;
1405 while (collect_next(&child_ret
, &child
)) {
1406 auto iter
= stack_to_pos
.find(child
);
1407 if (iter
== stack_to_pos
.end()) {
1408 /* some other stack that we don't care about */
1412 string
& pos
= iter
->second
;
1414 if (child_ret
< 0) {
1415 ldout(sync_env
->cct
, 0) << *this << ": child operation stack=" << child
<< " entry=" << pos
<< " returned " << child_ret
<< dendl
;
1418 map
<string
, string
>::iterator prev_iter
= pos_to_prev
.find(pos
);
1419 assert(prev_iter
!= pos_to_prev
.end());
1422 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1423 * update the marker and abort. We'll get called again for these. Permanent errors will be
1424 * handled by marking the entry at the error log shard, so that we retry on it separately
1426 if (child_ret
== -EAGAIN
) {
1427 can_adjust_marker
= false;
1430 if (pos_to_prev
.size() == 1) {
1431 if (can_adjust_marker
) {
1432 sync_marker
.marker
= pos
;
1434 pos_to_prev
.erase(prev_iter
);
1436 assert(pos_to_prev
.size() > 1);
1437 pos_to_prev
.erase(prev_iter
);
1438 prev_iter
= pos_to_prev
.begin();
1439 if (can_adjust_marker
) {
1440 sync_marker
.marker
= prev_iter
->second
;
1444 ldout(sync_env
->cct
, 4) << *this << ": adjusting marker pos=" << sync_marker
.marker
<< dendl
;
1445 stack_to_pos
.erase(iter
);
1450 #define OMAP_GET_MAX_ENTRIES 100
1451 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1453 set_status("full_sync");
1454 oid
= full_sync_index_shard_oid(shard_id
);
1455 can_adjust_marker
= true;
1458 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1459 string lock_name
= "sync_lock";
1460 RGWRados
*store
= sync_env
->store
;
1461 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1462 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1463 lock_name
, lock_duration
, this));
1464 lease_stack
.reset(spawn(lease_cr
.get(), false));
1467 while (!lease_cr
->is_locked()) {
1468 if (lease_cr
->is_done()) {
1469 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1471 return lease_cr
->get_ret_status();
1477 /* lock succeeded, a retry now should avoid previous backoff status */
1478 *reset_backoff
= true;
1480 /* prepare marker tracker */
1481 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1482 sync_env
->shard_obj_name(shard_id
),
1485 marker
= sync_marker
.marker
;
1487 total_entries
= sync_marker
.pos
;
1491 if (!lease_cr
->is_locked()) {
1495 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
),
1496 marker
, &entries
, max_entries
));
1498 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode
<< dendl
;
1499 yield lease_cr
->go_down();
1503 iter
= entries
.begin();
1504 for (; iter
!= entries
.end(); ++iter
) {
1506 ldout(sync_env
->cct
, 20) << __func__
<< ": full sync: " << marker
<< dendl
;
1508 if (!marker_tracker
->start(marker
, total_entries
, real_time())) {
1509 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << marker
<< ". Duplicate entry?" << dendl
;
1511 // fetch remote and write locally
1513 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, marker
, marker
, MDLOG_STATUS_COMPLETE
, marker_tracker
), false);
1514 // stack_to_pos holds a reference to the stack
1515 stack_to_pos
[stack
] = marker
;
1516 pos_to_prev
[marker
] = marker
;
1521 } while ((int)entries
.size() == max_entries
&& can_adjust_marker
);
1523 while (num_spawned() > 1) {
1524 yield
wait_for_child();
1529 /* update marker to reflect we're done with full sync */
1530 if (can_adjust_marker
) {
1531 // apply updates to a temporary marker, or operate() will send us
1532 // to incremental_sync() after we yield
1533 temp_marker
= sync_marker
;
1534 temp_marker
->state
= rgw_meta_sync_marker::IncrementalSync
;
1535 temp_marker
->marker
= std::move(temp_marker
->next_step_marker
);
1536 temp_marker
->next_step_marker
.clear();
1537 temp_marker
->realm_epoch
= realm_epoch
;
1538 ldout(sync_env
->cct
, 4) << *this << ": saving marker pos=" << temp_marker
->marker
<< " realm_epoch=" << realm_epoch
<< dendl
;
1540 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_meta_sync_marker
>;
1541 yield
call(new WriteMarkerCR(sync_env
->async_rados
, sync_env
->store
,
1542 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1547 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync marker: retcode=" << retcode
<< dendl
;
1548 yield lease_cr
->go_down();
1555 * if we reached here, it means that lost_lock is true, otherwise the state
1556 * change in the previous block will prevent us from reaching here
1559 yield lease_cr
->go_down();
1565 if (!can_adjust_marker
) {
1573 // apply the sync marker update
1574 assert(temp_marker
);
1575 sync_marker
= std::move(*temp_marker
);
1576 temp_marker
= boost::none
;
1577 // must not yield after this point!
1583 int incremental_sync() {
1584 reenter(&incremental_cr
) {
1585 set_status("incremental_sync");
1586 can_adjust_marker
= true;
1588 if (!lease_cr
) { /* could have had a lease_cr lock from previous state */
1590 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1591 string lock_name
= "sync_lock";
1592 RGWRados
*store
= sync_env
->store
;
1593 lease_cr
.reset( new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1594 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1595 lock_name
, lock_duration
, this));
1596 lease_stack
.reset(spawn(lease_cr
.get(), false));
1599 while (!lease_cr
->is_locked()) {
1600 if (lease_cr
->is_done()) {
1601 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1603 return lease_cr
->get_ret_status();
1609 // if the period has advanced, we can't use the existing marker
1610 if (sync_marker
.realm_epoch
< realm_epoch
) {
1611 ldout(sync_env
->cct
, 4) << "clearing marker=" << sync_marker
.marker
1612 << " from old realm_epoch=" << sync_marker
.realm_epoch
1613 << " (now " << realm_epoch
<< ')' << dendl
;
1614 sync_marker
.realm_epoch
= realm_epoch
;
1615 sync_marker
.marker
.clear();
1617 mdlog_marker
= sync_marker
.marker
;
1618 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env
,
1619 sync_env
->shard_obj_name(shard_id
),
1623 * mdlog_marker: the remote sync marker positiion
1624 * sync_marker: the local sync marker position
1625 * max_marker: the max mdlog position that we fetched
1626 * marker: the current position we try to sync
1627 * period_marker: the last marker before the next period begins (optional)
1629 marker
= max_marker
= sync_marker
.marker
;
1632 if (!lease_cr
->is_locked()) {
1636 #define INCREMENTAL_MAX_ENTRIES 100
1637 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1638 if (!period_marker
.empty() && period_marker
<= mdlog_marker
) {
1639 ldout(cct
, 10) << "mdlog_marker past period_marker=" << period_marker
<< dendl
;
1640 done_with_period
= true;
1643 if (mdlog_marker
<= max_marker
) {
1644 /* we're at the tip, try to bring more entries */
1645 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " syncing mdlog for shard_id=" << shard_id
<< dendl
;
1646 yield
call(new RGWCloneMetaLogCoroutine(sync_env
, mdlog
,
1648 mdlog_marker
, &mdlog_marker
));
1651 ldout(sync_env
->cct
, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode
<< dendl
;
1652 yield lease_cr
->go_down();
1654 *reset_backoff
= false; // back off and try again later
1657 *reset_backoff
= true; /* if we got to this point, all systems function */
1658 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< dendl
;
1659 if (mdlog_marker
> max_marker
) {
1660 marker
= max_marker
;
1661 yield
call(new RGWReadMDLogEntriesCR(sync_env
, mdlog
, shard_id
,
1662 &max_marker
, INCREMENTAL_MAX_ENTRIES
,
1663 &log_entries
, &truncated
));
1665 ldout(sync_env
->cct
, 10) << *this << ": failed to list mdlog entries, retcode=" << retcode
<< dendl
;
1666 yield lease_cr
->go_down();
1668 *reset_backoff
= false; // back off and try again later
1671 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end() && !done_with_period
; ++log_iter
) {
1672 if (!period_marker
.empty() && period_marker
<= log_iter
->id
) {
1673 done_with_period
= true;
1674 if (period_marker
< log_iter
->id
) {
1675 ldout(cct
, 10) << "found key=" << log_iter
->id
1676 << " past period_marker=" << period_marker
<< dendl
;
1679 ldout(cct
, 10) << "found key at period_marker=" << period_marker
<< dendl
;
1680 // sync this entry, then return control to RGWMetaSyncCR
1682 if (!mdlog_entry
.convert_from(*log_iter
)) {
1683 ldout(sync_env
->cct
, 0) << __func__
<< ":" << __LINE__
<< ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id
<< " log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
<< " ... skipping entry" << dendl
;
1686 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " log_entry: " << log_iter
->id
<< ":" << log_iter
->section
<< ":" << log_iter
->name
<< ":" << log_iter
->timestamp
<< dendl
;
1687 if (!marker_tracker
->start(log_iter
->id
, 0, log_iter
->timestamp
.to_real_time())) {
1688 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << log_iter
->id
<< ". Duplicate entry?" << dendl
;
1690 raw_key
= log_iter
->section
+ ":" + log_iter
->name
;
1692 RGWCoroutinesStack
*stack
= spawn(new RGWMetaSyncSingleEntryCR(sync_env
, raw_key
, log_iter
->id
, mdlog_entry
.log_data
.status
, marker_tracker
), false);
1694 // stack_to_pos holds a reference to the stack
1695 stack_to_pos
[stack
] = log_iter
->id
;
1696 pos_to_prev
[log_iter
->id
] = marker
;
1699 marker
= log_iter
->id
;
1703 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " mdlog_marker=" << mdlog_marker
<< " max_marker=" << max_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< " period_marker=" << period_marker
<< dendl
;
1704 if (done_with_period
) {
1705 // return control to RGWMetaSyncCR and advance to the next period
1706 ldout(sync_env
->cct
, 10) << *this << ": done with period" << dendl
;
1709 if (mdlog_marker
== max_marker
&& can_adjust_marker
) {
1710 #define INCREMENTAL_INTERVAL 20
1711 yield
wait(utime_t(INCREMENTAL_INTERVAL
, 0));
1713 } while (can_adjust_marker
);
1715 while (num_spawned() > 1) {
1716 yield
wait_for_child();
1720 yield lease_cr
->go_down();
1728 if (!can_adjust_marker
) {
1732 return set_cr_done();
1739 class RGWMetaSyncShardControlCR
: public RGWBackoffControlCR
1741 RGWMetaSyncEnv
*sync_env
;
1743 const rgw_pool
& pool
;
1744 const std::string
& period
;
1745 epoch_t realm_epoch
;
1746 RGWMetadataLog
* mdlog
;
1748 rgw_meta_sync_marker sync_marker
;
1749 const std::string period_marker
;
1751 static constexpr bool exit_on_error
= false; // retry on all errors
1753 RGWMetaSyncShardControlCR(RGWMetaSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1754 const std::string
& period
, epoch_t realm_epoch
,
1755 RGWMetadataLog
* mdlog
, uint32_t _shard_id
,
1756 const rgw_meta_sync_marker
& _marker
,
1757 std::string
&& period_marker
)
1758 : RGWBackoffControlCR(_sync_env
->cct
, exit_on_error
), sync_env(_sync_env
),
1759 pool(_pool
), period(period
), realm_epoch(realm_epoch
), mdlog(mdlog
),
1760 shard_id(_shard_id
), sync_marker(_marker
),
1761 period_marker(std::move(period_marker
)) {}
1763 RGWCoroutine
*alloc_cr() override
{
1764 return new RGWMetaSyncShardCR(sync_env
, pool
, period
, realm_epoch
, mdlog
,
1765 shard_id
, sync_marker
, period_marker
, backoff_ptr());
1768 RGWCoroutine
*alloc_finisher_cr() override
{
1769 RGWRados
*store
= sync_env
->store
;
1770 return new RGWSimpleRadosReadCR
<rgw_meta_sync_marker
>(sync_env
->async_rados
, store
,
1771 rgw_raw_obj(pool
, sync_env
->shard_obj_name(shard_id
)),
1776 class RGWMetaSyncCR
: public RGWCoroutine
{
1777 RGWMetaSyncEnv
*sync_env
;
1778 const rgw_pool
& pool
;
1779 RGWPeriodHistory::Cursor cursor
; //< sync position in period history
1780 RGWPeriodHistory::Cursor next
; //< next period in history
1781 rgw_meta_sync_status sync_status
;
1783 std::mutex mutex
; //< protect access to shard_crs
1785 // TODO: it should be enough to hold a reference on the stack only, as calling
1786 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1787 // already completed
1788 using ControlCRRef
= boost::intrusive_ptr
<RGWMetaSyncShardControlCR
>;
1789 using StackRef
= boost::intrusive_ptr
<RGWCoroutinesStack
>;
1790 using RefPair
= std::pair
<ControlCRRef
, StackRef
>;
1791 map
<int, RefPair
> shard_crs
;
1795 RGWMetaSyncCR(RGWMetaSyncEnv
*_sync_env
, RGWPeriodHistory::Cursor cursor
,
1796 const rgw_meta_sync_status
& _sync_status
)
1797 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1798 pool(sync_env
->store
->get_zone_params().log_pool
),
1799 cursor(cursor
), sync_status(_sync_status
) {}
1801 int operate() override
{
1803 // loop through one period at a time
1805 if (cursor
== sync_env
->store
->period_history
->get_current()) {
1806 next
= RGWPeriodHistory::Cursor
{};
1808 ldout(cct
, 10) << "RGWMetaSyncCR on current period="
1809 << cursor
.get_period().get_id() << dendl
;
1811 ldout(cct
, 10) << "RGWMetaSyncCR with no period" << dendl
;
1816 ldout(cct
, 10) << "RGWMetaSyncCR on period="
1817 << cursor
.get_period().get_id() << ", next="
1818 << next
.get_period().get_id() << dendl
;
1822 // get the mdlog for the current period (may be empty)
1823 auto& period_id
= sync_status
.sync_info
.period
;
1824 auto realm_epoch
= sync_status
.sync_info
.realm_epoch
;
1825 auto mdlog
= sync_env
->store
->meta_mgr
->get_log(period_id
);
1827 // prevent wakeup() from accessing shard_crs while we're spawning them
1828 std::lock_guard
<std::mutex
> lock(mutex
);
1830 // sync this period on each shard
1831 for (const auto& m
: sync_status
.sync_markers
) {
1832 uint32_t shard_id
= m
.first
;
1833 auto& marker
= m
.second
;
1835 std::string period_marker
;
1837 // read the maximum marker from the next period's sync status
1838 period_marker
= next
.get_period().get_sync_status()[shard_id
];
1839 if (period_marker
.empty()) {
1840 // no metadata changes have occurred on this shard, skip it
1841 ldout(cct
, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1842 << " with empty period marker" << dendl
;
1847 using ShardCR
= RGWMetaSyncShardControlCR
;
1848 auto cr
= new ShardCR(sync_env
, pool
, period_id
, realm_epoch
,
1849 mdlog
, shard_id
, marker
,
1850 std::move(period_marker
));
1851 auto stack
= spawn(cr
, false);
1852 shard_crs
[shard_id
] = RefPair
{cr
, stack
};
1855 // wait for each shard to complete
1856 while (ret
== 0 && num_spawned() > 0) {
1857 yield
wait_for_child();
1858 collect(&ret
, nullptr);
1862 // drop shard cr refs under lock
1863 std::lock_guard
<std::mutex
> lock(mutex
);
1867 return set_cr_error(ret
);
1869 // advance to the next period
1873 // write the updated sync info
1874 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
1875 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
1876 yield
call(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(sync_env
->async_rados
,
1878 rgw_raw_obj(pool
, sync_env
->status_oid()),
1879 sync_status
.sync_info
));
1885 void wakeup(int shard_id
) {
1886 std::lock_guard
<std::mutex
> lock(mutex
);
1887 auto iter
= shard_crs
.find(shard_id
);
1888 if (iter
== shard_crs
.end()) {
1891 iter
->second
.first
->wakeup();
1895 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv
*env
) {
1896 env
->cct
= store
->ctx();
1899 env
->async_rados
= async_rados
;
1900 env
->http_manager
= &http_manager
;
1901 env
->error_logger
= error_logger
;
1904 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status
*sync_status
)
1906 if (store
->is_meta_master()) {
1909 // cannot run concurrently with run_sync(), so run in a separate manager
1910 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
1911 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
1912 int ret
= http_manager
.set_threaded();
1914 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
1917 RGWMetaSyncEnv sync_env_local
= sync_env
;
1918 sync_env_local
.http_manager
= &http_manager
;
1919 ret
= crs
.run(new RGWReadSyncStatusCoroutine(&sync_env_local
, sync_status
));
1920 http_manager
.stop();
1924 int RGWRemoteMetaLog::init_sync_status()
1926 if (store
->is_meta_master()) {
1930 rgw_mdlog_info mdlog_info
;
1931 int r
= read_log_info(&mdlog_info
);
1933 lderr(store
->ctx()) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
1937 rgw_meta_sync_info sync_info
;
1938 sync_info
.num_shards
= mdlog_info
.num_shards
;
1939 auto cursor
= store
->period_history
->get_current();
1941 sync_info
.period
= cursor
.get_period().get_id();
1942 sync_info
.realm_epoch
= cursor
.get_epoch();
1945 return run(new RGWInitSyncStatusCoroutine(&sync_env
, sync_info
));
1948 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info
& sync_info
)
1950 return run(new RGWSimpleRadosWriteCR
<rgw_meta_sync_info
>(async_rados
, store
,
1951 rgw_raw_obj(store
->get_zone_params().log_pool
, sync_env
.status_oid()),
1955 // return a cursor to the period at our sync position
1956 static RGWPeriodHistory::Cursor
get_period_at(RGWRados
* store
,
1957 const rgw_meta_sync_info
& info
)
1959 if (info
.period
.empty()) {
1960 // return an empty cursor with error=0
1961 return RGWPeriodHistory::Cursor
{};
1964 // look for an existing period in our history
1965 auto cursor
= store
->period_history
->lookup(info
.realm_epoch
);
1967 // verify that the period ids match
1968 auto& existing
= cursor
.get_period().get_id();
1969 if (existing
!= info
.period
) {
1970 lderr(store
->ctx()) << "ERROR: sync status period=" << info
.period
1971 << " does not match period=" << existing
1972 << " in history at realm epoch=" << info
.realm_epoch
<< dendl
;
1973 return RGWPeriodHistory::Cursor
{-EEXIST
};
1978 // read the period from rados or pull it from the master
1980 int r
= store
->period_puller
->pull(info
.period
, period
);
1982 lderr(store
->ctx()) << "ERROR: failed to read period id "
1983 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
1984 return RGWPeriodHistory::Cursor
{r
};
1986 // attach the period to our history
1987 cursor
= store
->period_history
->attach(std::move(period
));
1989 r
= cursor
.get_error();
1990 lderr(store
->ctx()) << "ERROR: failed to read period history back to "
1991 << info
.period
<< ": " << cpp_strerror(r
) << dendl
;
1996 int RGWRemoteMetaLog::run_sync()
1998 if (store
->is_meta_master()) {
2004 // get shard count and oldest log period from master
2005 rgw_mdlog_info mdlog_info
;
2008 ldout(store
->ctx(), 1) << __func__
<< "(): going down" << dendl
;
2011 r
= read_log_info(&mdlog_info
);
2012 if (r
== -EIO
|| r
== -ENOENT
) {
2013 // keep retrying if master isn't alive or hasn't initialized the log
2014 ldout(store
->ctx(), 10) << __func__
<< "(): waiting for master.." << dendl
;
2015 backoff
.backoff_sleep();
2020 lderr(store
->ctx()) << "ERROR: fail to fetch master log info (r=" << r
<< ")" << dendl
;
2026 rgw_meta_sync_status sync_status
;
2029 ldout(store
->ctx(), 1) << __func__
<< "(): going down" << dendl
;
2032 r
= run(new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2033 if (r
< 0 && r
!= -ENOENT
) {
2034 ldout(store
->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r
<< dendl
;
2038 if (!mdlog_info
.period
.empty()) {
2039 // restart sync if the remote has a period, but:
2040 // a) our status does not, or
2041 // b) our sync period comes before the remote's oldest log period
2042 if (sync_status
.sync_info
.period
.empty() ||
2043 sync_status
.sync_info
.realm_epoch
< mdlog_info
.realm_epoch
) {
2044 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateInit
;
2045 ldout(store
->ctx(), 1) << "epoch=" << sync_status
.sync_info
.realm_epoch
2046 << " in sync status comes before remote's oldest mdlog epoch="
2047 << mdlog_info
.realm_epoch
<< ", restarting sync" << dendl
;
2051 if (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
) {
2052 ldout(store
->ctx(), 20) << __func__
<< "(): init" << dendl
;
2053 sync_status
.sync_info
.num_shards
= mdlog_info
.num_shards
;
2054 auto cursor
= store
->period_history
->get_current();
2056 // run full sync, then start incremental from the current period/epoch
2057 sync_status
.sync_info
.period
= cursor
.get_period().get_id();
2058 sync_status
.sync_info
.realm_epoch
= cursor
.get_epoch();
2060 r
= run(new RGWInitSyncStatusCoroutine(&sync_env
, sync_status
.sync_info
));
2062 backoff
.backoff_sleep();
2067 ldout(store
->ctx(), 0) << "ERROR: failed to init sync status r=" << r
<< dendl
;
2071 } while (sync_status
.sync_info
.state
== rgw_meta_sync_info::StateInit
);
2073 auto num_shards
= sync_status
.sync_info
.num_shards
;
2074 if (num_shards
!= mdlog_info
.num_shards
) {
2075 lderr(store
->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info
.num_shards
<< " local num_shards=" << num_shards
<< dendl
;
2079 RGWPeriodHistory::Cursor cursor
;
2081 r
= run(new RGWReadSyncStatusCoroutine(&sync_env
, &sync_status
));
2082 if (r
< 0 && r
!= -ENOENT
) {
2083 ldout(store
->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r
<< dendl
;
2087 switch ((rgw_meta_sync_info::SyncState
)sync_status
.sync_info
.state
) {
2088 case rgw_meta_sync_info::StateBuildingFullSyncMaps
:
2089 ldout(store
->ctx(), 20) << __func__
<< "(): building full sync maps" << dendl
;
2090 r
= run(new RGWFetchAllMetaCR(&sync_env
, num_shards
, sync_status
.sync_markers
));
2091 if (r
== -EBUSY
|| r
== -EAGAIN
) {
2092 backoff
.backoff_sleep();
2097 ldout(store
->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl
;
2101 sync_status
.sync_info
.state
= rgw_meta_sync_info::StateSync
;
2102 r
= store_sync_info(sync_status
.sync_info
);
2104 ldout(store
->ctx(), 0) << "ERROR: failed to update sync status" << dendl
;
2108 case rgw_meta_sync_info::StateSync
:
2109 ldout(store
->ctx(), 20) << __func__
<< "(): sync" << dendl
;
2110 // find our position in the period history (if any)
2111 cursor
= get_period_at(store
, sync_status
.sync_info
);
2112 r
= cursor
.get_error();
2116 meta_sync_cr
= new RGWMetaSyncCR(&sync_env
, cursor
, sync_status
);
2117 r
= run(meta_sync_cr
);
2119 ldout(store
->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl
;
2124 ldout(store
->ctx(), 0) << "ERROR: bad sync state!" << dendl
;
2127 } while (!going_down
);
2132 void RGWRemoteMetaLog::wakeup(int shard_id
)
2134 if (!meta_sync_cr
) {
2137 meta_sync_cr
->wakeup(shard_id
);
2140 int RGWCloneMetaLogCoroutine::operate()
2145 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": init request" << dendl
;
2146 return state_init();
2149 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status" << dendl
;
2150 return state_read_shard_status();
2153 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": reading shard status complete" << dendl
;
2154 return state_read_shard_status_complete();
2157 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": sending rest request" << dendl
;
2158 return state_send_rest_request();
2161 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": receiving rest response" << dendl
;
2162 return state_receive_rest_response();
2165 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries" << dendl
;
2166 return state_store_mdlog_entries();
2168 } while (truncated
);
2170 ldout(cct
, 20) << __func__
<< ": shard_id=" << shard_id
<< ": storing mdlog entries complete" << dendl
;
2171 return state_store_mdlog_entries_complete();
2178 int RGWCloneMetaLogCoroutine::state_init()
2180 data
= rgw_mdlog_shard_data();
2185 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2187 const bool add_ref
= false; // default constructs with refs=1
2189 completion
.reset(new RGWMetadataLogInfoCompletion(
2190 [this](int ret
, const cls_log_header
& header
) {
2192 ldout(cct
, 1) << "ERROR: failed to read mdlog info with "
2193 << cpp_strerror(ret
) << dendl
;
2195 shard_info
.marker
= header
.max_marker
;
2196 shard_info
.last_update
= header
.max_time
.to_real_time();
2198 // wake up parent stack
2199 stack
->get_completion_mgr()->complete(nullptr, stack
);
2202 int ret
= mdlog
->get_info_async(shard_id
, completion
.get());
2204 ldout(cct
, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret
<< dendl
;
2205 return set_cr_error(ret
);
2211 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2215 ldout(cct
, 20) << "shard_id=" << shard_id
<< " marker=" << shard_info
.marker
<< " last_update=" << shard_info
.last_update
<< dendl
;
2217 marker
= shard_info
.marker
;
2222 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2224 RGWRESTConn
*conn
= sync_env
->conn
;
2227 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
2229 char max_entries_buf
[32];
2230 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", max_entries
);
2232 const char *marker_key
= (marker
.empty() ? "" : "marker");
2234 rgw_http_param_pair pairs
[] = { { "type", "metadata" },
2236 { "period", period
.c_str() },
2237 { "max-entries", max_entries_buf
},
2238 { marker_key
, marker
.c_str() },
2241 http_op
= new RGWRESTReadResource(conn
, "/admin/log", pairs
, NULL
, sync_env
->http_manager
);
2243 http_op
->set_user_info((void *)stack
);
2245 int ret
= http_op
->aio_read();
2247 ldout(cct
, 0) << "ERROR: failed to fetch mdlog data" << dendl
;
2248 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
2257 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2259 int ret
= http_op
->wait(&data
);
2261 error_stream
<< "http operation failed: " << http_op
->to_str() << " status=" << http_op
->get_http_status() << std::endl
;
2262 ldout(cct
, 5) << "failed to wait for op, ret=" << ret
<< dendl
;
2265 return set_cr_error(ret
);
2270 ldout(cct
, 20) << "remote mdlog, shard_id=" << shard_id
<< " num of shard entries: " << data
.entries
.size() << dendl
;
2272 truncated
= ((int)data
.entries
.size() == max_entries
);
2274 if (data
.entries
.empty()) {
2276 *new_marker
= marker
;
2278 return set_cr_done();
2282 *new_marker
= data
.entries
.back().id
;
2289 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2291 list
<cls_log_entry
> dest_entries
;
2293 vector
<rgw_mdlog_entry
>::iterator iter
;
2294 for (iter
= data
.entries
.begin(); iter
!= data
.entries
.end(); ++iter
) {
2295 rgw_mdlog_entry
& entry
= *iter
;
2296 ldout(cct
, 20) << "entry: name=" << entry
.name
<< dendl
;
2298 cls_log_entry dest_entry
;
2299 dest_entry
.id
= entry
.id
;
2300 dest_entry
.section
= entry
.section
;
2301 dest_entry
.name
= entry
.name
;
2302 dest_entry
.timestamp
= utime_t(entry
.timestamp
);
2304 ::encode(entry
.log_data
, dest_entry
.data
);
2306 dest_entries
.push_back(dest_entry
);
2311 RGWAioCompletionNotifier
*cn
= stack
->create_completion_notifier();
2313 int ret
= mdlog
->store_entries_in_shard(dest_entries
, shard_id
, cn
->completion());
2316 ldout(cct
, 10) << "failed to store md log entries shard_id=" << shard_id
<< " ret=" << ret
<< dendl
;
2317 return set_cr_error(ret
);
2322 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2324 return set_cr_done();
2328 // TODO: move into rgw_sync_trim.cc
2330 #define dout_prefix (*_dout << "meta trim: ")
2332 /// purge all log shards for the given mdlog
2333 class PurgeLogShardsCR
: public RGWShardCollectCR
{
2334 RGWRados
*const store
;
2335 const RGWMetadataLog
* mdlog
;
2336 const int num_shards
;
2340 static constexpr int max_concurrent
= 16;
2343 PurgeLogShardsCR(RGWRados
*store
, const RGWMetadataLog
* mdlog
,
2344 const rgw_pool
& pool
, int num_shards
)
2345 : RGWShardCollectCR(store
->ctx(), max_concurrent
),
2346 store(store
), mdlog(mdlog
), num_shards(num_shards
), obj(pool
, "")
2349 bool spawn_next() override
{
2350 if (i
== num_shards
) {
2353 mdlog
->get_shard_oid(i
++, obj
.oid
);
2354 spawn(new RGWRadosRemoveCR(store
, obj
), false);
2359 using Cursor
= RGWPeriodHistory::Cursor
;
2361 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
2362 class PurgePeriodLogsCR
: public RGWCoroutine
{
2363 RGWRados
*const store
;
2364 RGWMetadataManager
*const metadata
;
2365 RGWObjVersionTracker objv
;
2367 epoch_t realm_epoch
;
2368 epoch_t
*last_trim_epoch
; //< update last trim on success
2371 PurgePeriodLogsCR(RGWRados
*store
, epoch_t realm_epoch
, epoch_t
*last_trim
)
2372 : RGWCoroutine(store
->ctx()), store(store
), metadata(store
->meta_mgr
),
2373 realm_epoch(realm_epoch
), last_trim_epoch(last_trim
)
2379 int PurgePeriodLogsCR::operate()
2382 // read our current oldest log period
2383 yield
call(metadata
->read_oldest_log_period_cr(&cursor
, &objv
));
2385 return set_cr_error(retcode
);
2388 ldout(cct
, 20) << "oldest log realm_epoch=" << cursor
.get_epoch()
2389 << " period=" << cursor
.get_period().get_id() << dendl
;
2391 // trim -up to- the given realm_epoch
2392 while (cursor
.get_epoch() < realm_epoch
) {
2393 ldout(cct
, 4) << "purging log shards for realm_epoch=" << cursor
.get_epoch()
2394 << " period=" << cursor
.get_period().get_id() << dendl
;
2396 const auto mdlog
= metadata
->get_log(cursor
.get_period().get_id());
2397 const auto& pool
= store
->get_zone_params().log_pool
;
2398 auto num_shards
= cct
->_conf
->rgw_md_log_max_shards
;
2399 call(new PurgeLogShardsCR(store
, mdlog
, pool
, num_shards
));
2402 ldout(cct
, 1) << "failed to remove log shards: "
2403 << cpp_strerror(retcode
) << dendl
;
2404 return set_cr_error(retcode
);
2406 ldout(cct
, 10) << "removed log shards for realm_epoch=" << cursor
.get_epoch()
2407 << " period=" << cursor
.get_period().get_id() << dendl
;
2409 // update our mdlog history
2410 yield
call(metadata
->trim_log_period_cr(cursor
, &objv
));
2411 if (retcode
== -ENOENT
) {
2412 // must have raced to update mdlog history. return success and allow the
2413 // winner to continue purging
2414 ldout(cct
, 10) << "already removed log shards for realm_epoch=" << cursor
.get_epoch()
2415 << " period=" << cursor
.get_period().get_id() << dendl
;
2416 return set_cr_done();
2417 } else if (retcode
< 0) {
2418 ldout(cct
, 1) << "failed to remove log shards for realm_epoch="
2419 << cursor
.get_epoch() << " period=" << cursor
.get_period().get_id()
2420 << " with: " << cpp_strerror(retcode
) << dendl
;
2421 return set_cr_error(retcode
);
2424 if (*last_trim_epoch
< cursor
.get_epoch()) {
2425 *last_trim_epoch
= cursor
.get_epoch();
2428 assert(cursor
.has_next()); // get_current() should always come after
2431 return set_cr_done();
2438 using connection_map
= std::map
<std::string
, std::unique_ptr
<RGWRESTConn
>>;
2440 /// construct a RGWRESTConn for each zone in the realm
2441 template <typename Zonegroups
>
2442 connection_map
make_peer_connections(RGWRados
*store
,
2443 const Zonegroups
& zonegroups
)
2445 connection_map connections
;
2446 for (auto& g
: zonegroups
) {
2447 for (auto& z
: g
.second
.zones
) {
2448 std::unique_ptr
<RGWRESTConn
> conn
{
2449 new RGWRESTConn(store
->ctx(), store
, z
.first
, z
.second
.endpoints
)};
2450 connections
.emplace(z
.first
, std::move(conn
));
2456 /// return the marker that it's safe to trim up to
2457 const std::string
& get_stable_marker(const rgw_meta_sync_marker
& m
)
2459 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
2462 /// comparison operator for take_min_status()
2463 bool operator<(const rgw_meta_sync_marker
& lhs
, const rgw_meta_sync_marker
& rhs
)
2465 // sort by stable marker
2466 return get_stable_marker(lhs
) < get_stable_marker(rhs
);
2469 /// populate the status with the minimum stable marker of each shard for any
2470 /// peer whose realm_epoch matches the minimum realm_epoch in the input
2471 template <typename Iter
>
2472 int take_min_status(CephContext
*cct
, Iter first
, Iter last
,
2473 rgw_meta_sync_status
*status
)
2475 if (first
== last
) {
2478 const size_t num_shards
= cct
->_conf
->rgw_md_log_max_shards
;
2480 status
->sync_info
.realm_epoch
= std::numeric_limits
<epoch_t
>::max();
2481 for (auto p
= first
; p
!= last
; ++p
) {
2482 // validate peer's shard count
2483 if (p
->sync_markers
.size() != num_shards
) {
2484 ldout(cct
, 1) << "take_min_status got peer status with "
2485 << p
->sync_markers
.size() << " shards, expected "
2486 << num_shards
<< dendl
;
2489 if (p
->sync_info
.realm_epoch
< status
->sync_info
.realm_epoch
) {
2490 // earlier epoch, take its entire status
2491 *status
= std::move(*p
);
2492 } else if (p
->sync_info
.realm_epoch
== status
->sync_info
.realm_epoch
) {
2493 // same epoch, take any earlier markers
2494 auto m
= status
->sync_markers
.begin();
2495 for (auto& shard
: p
->sync_markers
) {
2496 if (shard
.second
< m
->second
) {
2497 m
->second
= std::move(shard
.second
);
2507 RGWRados
*const store
;
2508 RGWHTTPManager
*const http
;
2510 const std::string
& zone
;
2511 Cursor current
; //< cursor to current period
2512 epoch_t last_trim_epoch
{0}; //< epoch of last mdlog that was purged
2514 TrimEnv(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2515 : store(store
), http(http
), num_shards(num_shards
),
2516 zone(store
->get_zone_params().get_id()),
2517 current(store
->period_history
->get_current())
2521 struct MasterTrimEnv
: public TrimEnv
{
2522 connection_map connections
; //< peer connections
2523 std::vector
<rgw_meta_sync_status
> peer_status
; //< sync status for each peer
2524 /// last trim marker for each shard, only applies to current period's mdlog
2525 std::vector
<std::string
> last_trim_markers
;
2527 MasterTrimEnv(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2528 : TrimEnv(store
, http
, num_shards
),
2529 last_trim_markers(num_shards
)
2531 auto& period
= current
.get_period();
2532 connections
= make_peer_connections(store
, period
.get_map().zonegroups
);
2533 connections
.erase(zone
);
2534 peer_status
.resize(connections
.size());
2538 struct PeerTrimEnv
: public TrimEnv
{
2539 /// last trim timestamp for each shard, only applies to current period's mdlog
2540 std::vector
<ceph::real_time
> last_trim_timestamps
;
2542 PeerTrimEnv(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2543 : TrimEnv(store
, http
, num_shards
),
2544 last_trim_timestamps(num_shards
)
2547 void set_num_shards(int num_shards
) {
2548 this->num_shards
= num_shards
;
2549 last_trim_timestamps
.resize(num_shards
);
2553 } // anonymous namespace
2556 /// spawn a trim cr for each shard that needs it, while limiting the number
2557 /// of concurrent shards
2558 class MetaMasterTrimShardCollectCR
: public RGWShardCollectCR
{
2560 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2563 RGWMetadataLog
*mdlog
;
2566 const rgw_meta_sync_status
& sync_status
;
2569 MetaMasterTrimShardCollectCR(MasterTrimEnv
& env
, RGWMetadataLog
*mdlog
,
2570 const rgw_meta_sync_status
& sync_status
)
2571 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2572 env(env
), mdlog(mdlog
), sync_status(sync_status
)
2575 bool spawn_next() override
;
2578 bool MetaMasterTrimShardCollectCR::spawn_next()
2580 while (shard_id
< env
.num_shards
) {
2581 auto m
= sync_status
.sync_markers
.find(shard_id
);
2582 if (m
== sync_status
.sync_markers
.end()) {
2586 auto& stable
= get_stable_marker(m
->second
);
2587 auto& last_trim
= env
.last_trim_markers
[shard_id
];
2589 if (stable
<= last_trim
) {
2591 ldout(cct
, 20) << "skipping log shard " << shard_id
2592 << " at marker=" << stable
2593 << " last_trim=" << last_trim
2594 << " realm_epoch=" << sync_status
.sync_info
.realm_epoch
<< dendl
;
2599 mdlog
->get_shard_oid(shard_id
, oid
);
2601 ldout(cct
, 10) << "trimming log shard " << shard_id
2602 << " at marker=" << stable
2603 << " last_trim=" << last_trim
2604 << " realm_epoch=" << sync_status
.sync_info
.realm_epoch
<< dendl
;
2605 spawn(new RGWSyncLogTrimCR(env
.store
, oid
, stable
, &last_trim
), false);
2612 /// spawn rest requests to read each peer's sync status
2613 class MetaMasterStatusCollectCR
: public RGWShardCollectCR
{
2614 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2617 connection_map::iterator c
;
2618 std::vector
<rgw_meta_sync_status
>::iterator s
;
2620 MetaMasterStatusCollectCR(MasterTrimEnv
& env
)
2621 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2622 env(env
), c(env
.connections
.begin()), s(env
.peer_status
.begin())
2625 bool spawn_next() override
{
2626 if (c
== env
.connections
.end()) {
2629 static rgw_http_param_pair params
[] = {
2630 { "type", "metadata" },
2631 { "status", nullptr },
2632 { nullptr, nullptr }
2635 ldout(cct
, 20) << "query sync status from " << c
->first
<< dendl
;
2636 auto conn
= c
->second
.get();
2637 using StatusCR
= RGWReadRESTResourceCR
<rgw_meta_sync_status
>;
2638 spawn(new StatusCR(cct
, conn
, env
.http
, "/admin/log/", params
, &*s
),
2646 class MetaMasterTrimCR
: public RGWCoroutine
{
2648 rgw_meta_sync_status min_status
; //< minimum sync status of all peers
2652 MetaMasterTrimCR(MasterTrimEnv
& env
)
2653 : RGWCoroutine(env
.store
->ctx()), env(env
)
2659 int MetaMasterTrimCR::operate()
2662 // TODO: detect this and fail before we spawn the trim thread?
2663 if (env
.connections
.empty()) {
2664 ldout(cct
, 4) << "no peers, exiting" << dendl
;
2665 return set_cr_done();
2668 ldout(cct
, 10) << "fetching sync status for zone " << env
.zone
<< dendl
;
2669 // query mdlog sync status from peers
2670 yield
call(new MetaMasterStatusCollectCR(env
));
2672 // must get a successful reply from all peers to consider trimming
2674 ldout(cct
, 4) << "failed to fetch sync status from all peers" << dendl
;
2675 return set_cr_error(ret
);
2678 // determine the minimum epoch and markers
2679 ret
= take_min_status(env
.store
->ctx(), env
.peer_status
.begin(),
2680 env
.peer_status
.end(), &min_status
);
2682 ldout(cct
, 4) << "failed to calculate min sync status from peers" << dendl
;
2683 return set_cr_error(ret
);
2686 auto store
= env
.store
;
2687 auto epoch
= min_status
.sync_info
.realm_epoch
;
2688 ldout(cct
, 4) << "realm epoch min=" << epoch
2689 << " current=" << env
.current
.get_epoch()<< dendl
;
2690 if (epoch
> env
.last_trim_epoch
+ 1) {
2691 // delete any prior mdlog periods
2692 spawn(new PurgePeriodLogsCR(store
, epoch
, &env
.last_trim_epoch
), true);
2694 ldout(cct
, 10) << "mdlogs already purged up to realm_epoch "
2695 << env
.last_trim_epoch
<< dendl
;
2698 // if realm_epoch == current, trim mdlog based on markers
2699 if (epoch
== env
.current
.get_epoch()) {
2700 auto mdlog
= store
->meta_mgr
->get_log(env
.current
.get_period().get_id());
2701 spawn(new MetaMasterTrimShardCollectCR(env
, mdlog
, min_status
), true);
2704 // ignore any errors during purge/trim because we want to hold the lock open
2705 return set_cr_done();
2711 /// read the first entry of the master's mdlog shard and trim to that position
2712 class MetaPeerTrimShardCR
: public RGWCoroutine
{
2713 RGWMetaSyncEnv
& env
;
2714 RGWMetadataLog
*mdlog
;
2715 const std::string
& period_id
;
2717 RGWMetadataLogInfo info
;
2718 ceph::real_time stable
; //< safe timestamp to trim, according to master
2719 ceph::real_time
*last_trim
; //< last trimmed timestamp, updated on trim
2720 rgw_mdlog_shard_data result
; //< result from master's mdlog listing
2723 MetaPeerTrimShardCR(RGWMetaSyncEnv
& env
, RGWMetadataLog
*mdlog
,
2724 const std::string
& period_id
, int shard_id
,
2725 ceph::real_time
*last_trim
)
2726 : RGWCoroutine(env
.store
->ctx()), env(env
), mdlog(mdlog
),
2727 period_id(period_id
), shard_id(shard_id
), last_trim(last_trim
)
2730 int operate() override
;
2733 int MetaPeerTrimShardCR::operate()
2736 // query master's first mdlog entry for this shard
2737 yield
call(new RGWListRemoteMDLogShardCR(&env
, period_id
, shard_id
,
2740 ldout(cct
, 5) << "failed to read first entry from master's mdlog shard "
2741 << shard_id
<< " for period " << period_id
2742 << ": " << cpp_strerror(retcode
) << dendl
;
2743 return set_cr_error(retcode
);
2745 if (result
.entries
.empty()) {
2746 // if there are no mdlog entries, we don't have a timestamp to compare. we
2747 // can't just trim everything, because there could be racing updates since
2748 // this empty reply. query the mdlog shard info to read its max timestamp,
2749 // then retry the listing to make sure it's still empty before trimming to
2751 ldout(cct
, 10) << "empty master mdlog shard " << shard_id
2752 << ", reading last timestamp from shard info" << dendl
;
2753 // read the mdlog shard info for the last timestamp
2754 using ShardInfoCR
= RGWReadRemoteMDLogShardInfoCR
;
2755 yield
call(new ShardInfoCR(&env
, period_id
, shard_id
, &info
));
2757 ldout(cct
, 5) << "failed to read info from master's mdlog shard "
2758 << shard_id
<< " for period " << period_id
2759 << ": " << cpp_strerror(retcode
) << dendl
;
2760 return set_cr_error(retcode
);
2762 if (ceph::real_clock::is_zero(info
.last_update
)) {
2763 return set_cr_done(); // nothing to trim
2765 ldout(cct
, 10) << "got mdlog shard info with last update="
2766 << info
.last_update
<< dendl
;
2767 // re-read the master's first mdlog entry to make sure it hasn't changed
2768 yield
call(new RGWListRemoteMDLogShardCR(&env
, period_id
, shard_id
,
2771 ldout(cct
, 5) << "failed to read first entry from master's mdlog shard "
2772 << shard_id
<< " for period " << period_id
2773 << ": " << cpp_strerror(retcode
) << dendl
;
2774 return set_cr_error(retcode
);
2776 // if the mdlog is still empty, trim to max marker
2777 if (result
.entries
.empty()) {
2778 stable
= info
.last_update
;
2780 stable
= result
.entries
.front().timestamp
;
2782 // can only trim -up to- master's first timestamp, so subtract a second.
2783 // (this is why we use timestamps instead of markers for the peers)
2784 stable
-= std::chrono::seconds(1);
2787 stable
= result
.entries
.front().timestamp
;
2788 stable
-= std::chrono::seconds(1);
2791 if (stable
<= *last_trim
) {
2792 ldout(cct
, 10) << "skipping log shard " << shard_id
2793 << " at timestamp=" << stable
2794 << " last_trim=" << *last_trim
<< dendl
;
2795 return set_cr_done();
2798 ldout(cct
, 10) << "trimming log shard " << shard_id
2799 << " at timestamp=" << stable
2800 << " last_trim=" << *last_trim
<< dendl
;
2803 mdlog
->get_shard_oid(shard_id
, oid
);
2804 call(new RGWRadosTimelogTrimCR(env
.store
, oid
, real_time
{}, stable
, "", ""));
2806 if (retcode
< 0 && retcode
!= -ENODATA
) {
2807 ldout(cct
, 1) << "failed to trim mdlog shard " << shard_id
2808 << ": " << cpp_strerror(retcode
) << dendl
;
2809 return set_cr_error(retcode
);
2811 *last_trim
= stable
;
2812 return set_cr_done();
2817 class MetaPeerTrimShardCollectCR
: public RGWShardCollectCR
{
2818 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
2821 RGWMetadataLog
*mdlog
;
2822 const std::string
& period_id
;
2823 RGWMetaSyncEnv meta_env
; //< for RGWListRemoteMDLogShardCR
2827 MetaPeerTrimShardCollectCR(PeerTrimEnv
& env
, RGWMetadataLog
*mdlog
)
2828 : RGWShardCollectCR(env
.store
->ctx(), MAX_CONCURRENT_SHARDS
),
2829 env(env
), mdlog(mdlog
), period_id(env
.current
.get_period().get_id())
2831 meta_env
.init(cct
, env
.store
, env
.store
->rest_master_conn
,
2832 env
.store
->get_async_rados(), env
.http
, nullptr);
2835 bool spawn_next() override
;
2838 bool MetaPeerTrimShardCollectCR::spawn_next()
2840 if (shard_id
>= env
.num_shards
) {
2843 auto& last_trim
= env
.last_trim_timestamps
[shard_id
];
2844 spawn(new MetaPeerTrimShardCR(meta_env
, mdlog
, period_id
, shard_id
, &last_trim
),
2850 class MetaPeerTrimCR
: public RGWCoroutine
{
2852 rgw_mdlog_info mdlog_info
; //< master's mdlog info
2855 MetaPeerTrimCR(PeerTrimEnv
& env
) : RGWCoroutine(env
.store
->ctx()), env(env
) {}
2860 int MetaPeerTrimCR::operate()
2863 ldout(cct
, 10) << "fetching master mdlog info" << dendl
;
2865 // query mdlog_info from master for oldest_log_period
2866 rgw_http_param_pair params
[] = {
2867 { "type", "metadata" },
2868 { nullptr, nullptr }
2871 using LogInfoCR
= RGWReadRESTResourceCR
<rgw_mdlog_info
>;
2872 call(new LogInfoCR(cct
, env
.store
->rest_master_conn
, env
.http
,
2873 "/admin/log/", params
, &mdlog_info
));
2876 ldout(cct
, 4) << "failed to read mdlog info from master" << dendl
;
2877 return set_cr_error(retcode
);
2879 // use master's shard count instead
2880 env
.set_num_shards(mdlog_info
.num_shards
);
2882 if (mdlog_info
.realm_epoch
> env
.last_trim_epoch
+ 1) {
2883 // delete any prior mdlog periods
2884 yield
call(new PurgePeriodLogsCR(env
.store
, mdlog_info
.realm_epoch
,
2885 &env
.last_trim_epoch
));
2887 ldout(cct
, 10) << "mdlogs already purged through realm_epoch "
2888 << env
.last_trim_epoch
<< dendl
;
2891 // if realm_epoch == current, trim mdlog based on master's markers
2892 if (mdlog_info
.realm_epoch
== env
.current
.get_epoch()) {
2894 auto meta_mgr
= env
.store
->meta_mgr
;
2895 auto mdlog
= meta_mgr
->get_log(env
.current
.get_period().get_id());
2896 call(new MetaPeerTrimShardCollectCR(env
, mdlog
));
2897 // ignore any errors during purge/trim because we want to hold the lock open
2900 return set_cr_done();
2905 class MetaTrimPollCR
: public RGWCoroutine
{
2906 RGWRados
*const store
;
2907 const utime_t interval
; //< polling interval
2908 const rgw_raw_obj obj
;
2909 const std::string name
{"meta_trim"}; //< lock name
2910 const std::string cookie
;
2913 /// allocate the coroutine to run within the lease
2914 virtual RGWCoroutine
* alloc_cr() = 0;
2917 MetaTrimPollCR(RGWRados
*store
, utime_t interval
)
2918 : RGWCoroutine(store
->ctx()), store(store
), interval(interval
),
2919 obj(store
->get_zone_params().log_pool
, RGWMetadataLogHistory::oid
),
2920 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
))
2926 int MetaTrimPollCR::operate()
2930 set_status("sleeping");
2933 // prevent others from trimming for our entire wait interval
2934 set_status("acquiring trim lock");
2935 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
2936 obj
, name
, cookie
, interval
.sec()));
2938 ldout(cct
, 4) << "failed to lock: " << cpp_strerror(retcode
) << dendl
;
2942 set_status("trimming");
2943 yield
call(alloc_cr());
2946 // on errors, unlock so other gateways can try
2947 set_status("unlocking");
2948 yield
call(new RGWSimpleRadosUnlockCR(store
->get_async_rados(), store
,
2949 obj
, name
, cookie
));
2956 class MetaMasterTrimPollCR
: public MetaTrimPollCR
{
2957 MasterTrimEnv env
; //< trim state to share between calls
2958 RGWCoroutine
* alloc_cr() override
{
2959 return new MetaMasterTrimCR(env
);
2962 MetaMasterTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
2963 int num_shards
, utime_t interval
)
2964 : MetaTrimPollCR(store
, interval
),
2965 env(store
, http
, num_shards
)
2969 class MetaPeerTrimPollCR
: public MetaTrimPollCR
{
2970 PeerTrimEnv env
; //< trim state to share between calls
2971 RGWCoroutine
* alloc_cr() override
{
2972 return new MetaPeerTrimCR(env
);
2975 MetaPeerTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
2976 int num_shards
, utime_t interval
)
2977 : MetaTrimPollCR(store
, interval
),
2978 env(store
, http
, num_shards
)
2982 RGWCoroutine
* create_meta_log_trim_cr(RGWRados
*store
, RGWHTTPManager
*http
,
2983 int num_shards
, utime_t interval
)
2985 if (store
->is_meta_master()) {
2986 return new MetaMasterTrimPollCR(store
, http
, num_shards
, interval
);
2988 return new MetaPeerTrimPollCR(store
, http
, num_shards
, interval
);
2992 struct MetaMasterAdminTrimCR
: private MasterTrimEnv
, public MetaMasterTrimCR
{
2993 MetaMasterAdminTrimCR(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
2994 : MasterTrimEnv(store
, http
, num_shards
),
2995 MetaMasterTrimCR(*static_cast<MasterTrimEnv
*>(this))
2999 struct MetaPeerAdminTrimCR
: private PeerTrimEnv
, public MetaPeerTrimCR
{
3000 MetaPeerAdminTrimCR(RGWRados
*store
, RGWHTTPManager
*http
, int num_shards
)
3001 : PeerTrimEnv(store
, http
, num_shards
),
3002 MetaPeerTrimCR(*static_cast<PeerTrimEnv
*>(this))
3006 RGWCoroutine
* create_admin_meta_log_trim_cr(RGWRados
*store
,
3007 RGWHTTPManager
*http
,
3010 if (store
->is_meta_master()) {
3011 return new MetaMasterAdminTrimCR(store
, http
, num_shards
);
3013 return new MetaPeerAdminTrimCR(store
, http
, num_shards
);