1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/ceph_json.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
10 #include "rgw_common.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_cr_tools.h"
18 #include "rgw_http_client.h"
19 #include "rgw_bucket.h"
20 #include "rgw_bucket_sync.h"
21 #include "rgw_bucket_sync_cache.h"
22 #include "rgw_datalog.h"
23 #include "rgw_metadata.h"
24 #include "rgw_sync_counters.h"
25 #include "rgw_sync_error_repo.h"
26 #include "rgw_sync_module.h"
29 #include "cls/lock/cls_lock_client.h"
30 #include "cls/rgw/cls_rgw_client.h"
32 #include "services/svc_zone.h"
33 #include "services/svc_sync_modules.h"
34 #include "rgw_bucket.h"
36 #include "include/common_fwd.h"
37 #include "include/random.h"
39 #include <boost/asio/yield.hpp>
40 #include <string_view>
42 #define dout_subsys ceph_subsys_rgw
45 #define dout_prefix (*_dout << "data sync: ")
49 static string datalog_sync_status_oid_prefix
= "datalog.sync-status";
50 static string datalog_sync_status_shard_prefix
= "datalog.sync-status.shard";
51 static string datalog_sync_full_sync_index_prefix
= "data.full-sync.index";
52 static string bucket_status_oid_prefix
= "bucket.sync-status";
53 static string object_status_oid_prefix
= "bucket.sync-status";
56 void rgw_datalog_info::decode_json(JSONObj
*obj
) {
57 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
60 void rgw_datalog_entry::decode_json(JSONObj
*obj
) {
61 JSONDecoder::decode_json("key", key
, obj
);
63 JSONDecoder::decode_json("timestamp", ut
, obj
);
64 timestamp
= ut
.to_real_time();
67 void rgw_datalog_shard_data::decode_json(JSONObj
*obj
) {
68 JSONDecoder::decode_json("marker", marker
, obj
);
69 JSONDecoder::decode_json("truncated", truncated
, obj
);
70 JSONDecoder::decode_json("entries", entries
, obj
);
74 class RGWReadDataSyncStatusMarkersCR
: public RGWShardCollectCR
{
75 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
82 map
<uint32_t, rgw_data_sync_marker
>& markers
;
85 RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx
*sc
, int num_shards
,
86 map
<uint32_t, rgw_data_sync_marker
>& markers
)
87 : RGWShardCollectCR(sc
->cct
, MAX_CONCURRENT_SHARDS
),
88 sc(sc
), env(sc
->env
), num_shards(num_shards
), markers(markers
)
90 bool spawn_next() override
;
93 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
95 if (shard_id
>= num_shards
) {
98 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
99 spawn(new CR(env
->dpp
, env
->async_rados
, env
->svc
->sysobj
,
100 rgw_raw_obj(env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
)),
107 class RGWReadDataSyncRecoveringShardsCR
: public RGWShardCollectCR
{
108 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
113 uint64_t max_entries
;
118 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
>& omapkeys
;
121 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncCtx
*sc
, uint64_t _max_entries
, int _num_shards
,
122 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
>& omapkeys
)
123 : RGWShardCollectCR(sc
->cct
, MAX_CONCURRENT_SHARDS
), sc(sc
), env(sc
->env
),
124 max_entries(_max_entries
), num_shards(_num_shards
), omapkeys(omapkeys
)
126 bool spawn_next() override
;
129 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
131 if (shard_id
>= num_shards
)
134 string error_oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
) + ".retry";
135 auto& shard_keys
= omapkeys
[shard_id
];
136 shard_keys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
137 spawn(new RGWRadosGetOmapKeysCR(env
->store
, rgw_raw_obj(env
->svc
->zone
->get_zone_params().log_pool
, error_oid
),
138 marker
, max_entries
, shard_keys
), false);
144 class RGWReadDataSyncStatusCoroutine
: public RGWCoroutine
{
146 RGWDataSyncEnv
*sync_env
;
147 rgw_data_sync_status
*sync_status
;
150 RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx
*_sc
,
151 rgw_data_sync_status
*_status
)
152 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(sc
->env
), sync_status(_status
)
154 int operate(const DoutPrefixProvider
*dpp
) override
;
157 int RGWReadDataSyncStatusCoroutine::operate(const DoutPrefixProvider
*dpp
)
161 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_data_sync_info
>;
163 bool empty_on_enoent
= false; // fail on ENOENT
164 call(new ReadInfoCR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
165 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sc
->source_zone
)),
166 &sync_status
->sync_info
, empty_on_enoent
));
169 ldpp_dout(dpp
, 4) << "failed to read sync status info with "
170 << cpp_strerror(retcode
) << dendl
;
171 return set_cr_error(retcode
);
173 // read shard markers
174 using ReadMarkersCR
= RGWReadDataSyncStatusMarkersCR
;
175 yield
call(new ReadMarkersCR(sc
, sync_status
->sync_info
.num_shards
,
176 sync_status
->sync_markers
));
178 ldpp_dout(dpp
, 4) << "failed to read sync status markers with "
179 << cpp_strerror(retcode
) << dendl
;
180 return set_cr_error(retcode
);
182 return set_cr_done();
187 class RGWReadRemoteDataLogShardInfoCR
: public RGWCoroutine
{
189 RGWDataSyncEnv
*sync_env
;
191 RGWRESTReadResource
*http_op
;
194 RGWDataChangesLogInfo
*shard_info
;
197 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx
*_sc
,
198 int _shard_id
, RGWDataChangesLogInfo
*_shard_info
) : RGWCoroutine(_sc
->cct
),
203 shard_info(_shard_info
) {
206 ~RGWReadRemoteDataLogShardInfoCR() override
{
212 int operate(const DoutPrefixProvider
*dpp
) override
{
216 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
217 rgw_http_param_pair pairs
[] = { { "type" , "data" },
222 string p
= "/admin/log/";
224 http_op
= new RGWRESTReadResource(sc
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
226 init_new_io(http_op
);
228 int ret
= http_op
->aio_read(dpp
);
230 ldpp_dout(dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
231 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
232 return set_cr_error(ret
);
238 int ret
= http_op
->wait(shard_info
, null_yield
);
240 return set_cr_error(ret
);
242 return set_cr_done();
249 struct read_remote_data_log_response
{
252 list
<rgw_data_change_log_entry
> entries
;
254 read_remote_data_log_response() : truncated(false) {}
256 void decode_json(JSONObj
*obj
) {
257 JSONDecoder::decode_json("marker", marker
, obj
);
258 JSONDecoder::decode_json("truncated", truncated
, obj
);
259 JSONDecoder::decode_json("entries", entries
, obj
);
263 class RGWReadRemoteDataLogShardCR
: public RGWCoroutine
{
265 RGWDataSyncEnv
*sync_env
;
267 RGWRESTReadResource
*http_op
= nullptr;
270 const std::string
& marker
;
271 string
*pnext_marker
;
272 list
<rgw_data_change_log_entry
> *entries
;
275 read_remote_data_log_response response
;
276 std::optional
<TOPNSPC::common::PerfGuard
> timer
;
279 RGWReadRemoteDataLogShardCR(RGWDataSyncCtx
*_sc
, int _shard_id
,
280 const std::string
& marker
, string
*pnext_marker
,
281 list
<rgw_data_change_log_entry
> *_entries
,
283 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
284 shard_id(_shard_id
), marker(marker
), pnext_marker(pnext_marker
),
285 entries(_entries
), truncated(_truncated
) {
287 ~RGWReadRemoteDataLogShardCR() override
{
293 int operate(const DoutPrefixProvider
*dpp
) override
{
297 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
298 rgw_http_param_pair pairs
[] = { { "type" , "data" },
300 { "marker", marker
.c_str() },
301 { "extra-info", "true" },
304 string p
= "/admin/log/";
306 http_op
= new RGWRESTReadResource(sc
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
308 init_new_io(http_op
);
310 if (sync_env
->counters
) {
311 timer
.emplace(sync_env
->counters
, sync_counters::l_poll
);
313 int ret
= http_op
->aio_read(dpp
);
315 ldpp_dout(dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
316 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
317 if (sync_env
->counters
) {
318 sync_env
->counters
->inc(sync_counters::l_poll_err
);
320 return set_cr_error(ret
);
327 int ret
= http_op
->wait(&response
, null_yield
);
329 if (sync_env
->counters
&& ret
!= -ENOENT
) {
330 sync_env
->counters
->inc(sync_counters::l_poll_err
);
332 return set_cr_error(ret
);
335 entries
->swap(response
.entries
);
336 *pnext_marker
= response
.marker
;
337 *truncated
= response
.truncated
;
338 return set_cr_done();
345 class RGWReadRemoteDataLogInfoCR
: public RGWShardCollectCR
{
347 RGWDataSyncEnv
*sync_env
;
350 map
<int, RGWDataChangesLogInfo
> *datalog_info
;
353 #define READ_DATALOG_MAX_CONCURRENT 10
356 RGWReadRemoteDataLogInfoCR(RGWDataSyncCtx
*_sc
,
358 map
<int, RGWDataChangesLogInfo
> *_datalog_info
) : RGWShardCollectCR(_sc
->cct
, READ_DATALOG_MAX_CONCURRENT
),
359 sc(_sc
), sync_env(_sc
->env
), num_shards(_num_shards
),
360 datalog_info(_datalog_info
), shard_id(0) {}
361 bool spawn_next() override
;
364 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
365 if (shard_id
>= num_shards
) {
368 spawn(new RGWReadRemoteDataLogShardInfoCR(sc
, shard_id
, &(*datalog_info
)[shard_id
]), false);
373 class RGWListRemoteDataLogShardCR
: public RGWSimpleCoroutine
{
375 RGWDataSyncEnv
*sync_env
;
376 RGWRESTReadResource
*http_op
;
380 uint32_t max_entries
;
381 rgw_datalog_shard_data
*result
;
384 RGWListRemoteDataLogShardCR(RGWDataSyncCtx
*sc
, int _shard_id
,
385 const string
& _marker
, uint32_t _max_entries
,
386 rgw_datalog_shard_data
*_result
)
387 : RGWSimpleCoroutine(sc
->cct
), sc(sc
), sync_env(sc
->env
), http_op(NULL
),
388 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
390 int send_request(const DoutPrefixProvider
*dpp
) override
{
391 RGWRESTConn
*conn
= sc
->conn
;
394 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
396 char max_entries_buf
[32];
397 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
399 const char *marker_key
= (marker
.empty() ? "" : "marker");
401 rgw_http_param_pair pairs
[] = { { "type", "data" },
403 { "max-entries", max_entries_buf
},
404 { marker_key
, marker
.c_str() },
407 string p
= "/admin/log/";
409 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
410 init_new_io(http_op
);
412 int ret
= http_op
->aio_read(dpp
);
414 ldpp_dout(dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
415 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
423 int request_complete() override
{
424 int ret
= http_op
->wait(result
, null_yield
);
426 if (ret
< 0 && ret
!= -ENOENT
) {
427 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret
<< dendl
;
434 class RGWListRemoteDataLogCR
: public RGWShardCollectCR
{
436 RGWDataSyncEnv
*sync_env
;
438 map
<int, string
> shards
;
439 int max_entries_per_shard
;
440 map
<int, rgw_datalog_shard_data
> *result
;
442 map
<int, string
>::iterator iter
;
443 #define READ_DATALOG_MAX_CONCURRENT 10
446 RGWListRemoteDataLogCR(RGWDataSyncCtx
*_sc
,
447 map
<int, string
>& _shards
,
448 int _max_entries_per_shard
,
449 map
<int, rgw_datalog_shard_data
> *_result
) : RGWShardCollectCR(_sc
->cct
, READ_DATALOG_MAX_CONCURRENT
),
450 sc(_sc
), sync_env(_sc
->env
), max_entries_per_shard(_max_entries_per_shard
),
452 shards
.swap(_shards
);
453 iter
= shards
.begin();
455 bool spawn_next() override
;
458 bool RGWListRemoteDataLogCR::spawn_next() {
459 if (iter
== shards
.end()) {
463 spawn(new RGWListRemoteDataLogShardCR(sc
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
468 class RGWInitDataSyncStatusCoroutine
: public RGWCoroutine
{
469 static constexpr uint32_t lock_duration
= 30;
471 RGWDataSyncEnv
*sync_env
;
472 rgw::sal::RadosStore
* store
;
473 const rgw_pool
& pool
;
474 const uint32_t num_shards
;
476 string sync_status_oid
;
480 rgw_data_sync_status
*status
;
481 map
<int, RGWDataChangesLogInfo
> shards_info
;
483 RGWSyncTraceNodeRef tn
;
485 RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx
*_sc
, uint32_t num_shards
,
486 uint64_t instance_id
,
487 RGWSyncTraceNodeRef
& _tn_parent
,
488 rgw_data_sync_status
*status
)
489 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
), store(sync_env
->store
),
490 pool(sync_env
->svc
->zone
->get_zone_params().log_pool
),
491 num_shards(num_shards
), status(status
),
492 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "init_data_sync_status")) {
493 lock_name
= "sync_lock";
495 status
->sync_info
.instance_id
= instance_id
;
497 #define COOKIE_LEN 16
498 char buf
[COOKIE_LEN
+ 1];
500 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
503 sync_status_oid
= RGWDataSyncStatusManager::sync_status_oid(sc
->source_zone
);
507 int operate(const DoutPrefixProvider
*dpp
) override
{
510 using LockCR
= RGWSimpleRadosLockCR
;
511 yield
call(new LockCR(sync_env
->async_rados
, store
,
512 rgw_raw_obj
{pool
, sync_status_oid
},
513 lock_name
, cookie
, lock_duration
));
515 tn
->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid
));
516 return set_cr_error(retcode
);
518 using WriteInfoCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_info
>;
519 yield
call(new WriteInfoCR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
520 rgw_raw_obj
{pool
, sync_status_oid
},
523 tn
->log(0, SSTR("ERROR: failed to write sync status info with " << retcode
));
524 return set_cr_error(retcode
);
527 /* take lock again, we just recreated the object */
528 yield
call(new LockCR(sync_env
->async_rados
, store
,
529 rgw_raw_obj
{pool
, sync_status_oid
},
530 lock_name
, cookie
, lock_duration
));
532 tn
->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid
));
533 return set_cr_error(retcode
);
536 tn
->log(10, "took lease");
538 /* fetch current position in logs */
540 RGWRESTConn
*conn
= sync_env
->svc
->zone
->get_zone_conn(sc
->source_zone
);
542 tn
->log(0, SSTR("ERROR: connection to zone " << sc
->source_zone
<< " does not exist!"));
543 return set_cr_error(-EIO
);
545 for (uint32_t i
= 0; i
< num_shards
; i
++) {
546 spawn(new RGWReadRemoteDataLogShardInfoCR(sc
, i
, &shards_info
[i
]), true);
549 while (collect(&ret
, NULL
)) {
551 tn
->log(0, SSTR("ERROR: failed to read remote data log shards"));
552 return set_state(RGWCoroutine_Error
);
557 for (uint32_t i
= 0; i
< num_shards
; i
++) {
558 RGWDataChangesLogInfo
& info
= shards_info
[i
];
559 auto& marker
= status
->sync_markers
[i
];
560 marker
.next_step_marker
= info
.marker
;
561 marker
.timestamp
= info
.last_update
;
562 const auto& oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, i
);
563 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>;
564 spawn(new WriteMarkerCR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
565 rgw_raw_obj
{pool
, oid
}, marker
), true);
568 while (collect(&ret
, NULL
)) {
570 tn
->log(0, SSTR("ERROR: failed to write data sync status markers"));
571 return set_state(RGWCoroutine_Error
);
576 status
->sync_info
.state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
577 yield
call(new WriteInfoCR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
578 rgw_raw_obj
{pool
, sync_status_oid
},
581 tn
->log(0, SSTR("ERROR: failed to write sync status info with " << retcode
));
582 return set_cr_error(retcode
);
584 yield
call(new RGWSimpleRadosUnlockCR(sync_env
->async_rados
, store
,
585 rgw_raw_obj
{pool
, sync_status_oid
},
587 return set_cr_done();
593 RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider
*dpp
,
594 rgw::sal::RadosStore
* store
,
595 RGWAsyncRadosProcessor
*async_rados
)
596 : RGWCoroutinesManager(store
->ctx(), store
->getRados()->get_cr_registry()),
597 dpp(dpp
), store(store
),
598 cct(store
->ctx()), cr_registry(store
->getRados()->get_cr_registry()),
599 async_rados(async_rados
),
600 http_manager(store
->ctx(), completion_mgr
),
606 int RGWRemoteDataLog::read_log_info(const DoutPrefixProvider
*dpp
, rgw_datalog_info
*log_info
)
608 rgw_http_param_pair pairs
[] = { { "type", "data" },
611 int ret
= sc
.conn
->get_json_resource(dpp
, "/admin/log", pairs
, null_yield
, *log_info
);
613 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch datalog info" << dendl
;
617 ldpp_dout(dpp
, 20) << "remote datalog, num_shards=" << log_info
->num_shards
<< dendl
;
622 int RGWRemoteDataLog::read_source_log_shards_info(const DoutPrefixProvider
*dpp
, map
<int, RGWDataChangesLogInfo
> *shards_info
)
624 rgw_datalog_info log_info
;
625 int ret
= read_log_info(dpp
, &log_info
);
630 return run(dpp
, new RGWReadRemoteDataLogInfoCR(&sc
, log_info
.num_shards
, shards_info
));
633 int RGWRemoteDataLog::read_source_log_shards_next(const DoutPrefixProvider
*dpp
, map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
)
635 return run(dpp
, new RGWListRemoteDataLogCR(&sc
, shard_markers
, 1, result
));
638 int RGWRemoteDataLog::init(const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
,
639 RGWSyncTraceManager
*_sync_tracer
, RGWSyncModuleInstanceRef
& _sync_module
,
640 PerfCounters
* counters
)
642 sync_env
.init(dpp
, cct
, store
, store
->svc(), async_rados
, &http_manager
, _error_logger
,
643 _sync_tracer
, _sync_module
, counters
);
644 sc
.init(&sync_env
, _conn
, _source_zone
);
650 int ret
= http_manager
.start();
652 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
656 tn
= sync_env
.sync_tracer
->add_node(sync_env
.sync_tracer
->root_node
, "data");
663 void RGWRemoteDataLog::finish()
668 int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider
*dpp
, rgw_data_sync_status
*sync_status
)
670 // cannot run concurrently with run_sync(), so run in a separate manager
671 RGWCoroutinesManager
crs(cct
, cr_registry
);
672 RGWHTTPManager
http_manager(cct
, crs
.get_completion_mgr());
673 int ret
= http_manager
.start();
675 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
678 RGWDataSyncEnv sync_env_local
= sync_env
;
679 sync_env_local
.http_manager
= &http_manager
;
681 RGWDataSyncCtx sc_local
= sc
;
682 sc_local
.env
= &sync_env_local
;
684 ret
= crs
.run(dpp
, new RGWReadDataSyncStatusCoroutine(&sc_local
, sync_status
));
689 int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider
*dpp
, const int num_shards
, set
<int>& recovering_shards
)
691 // cannot run concurrently with run_sync(), so run in a separate manager
692 RGWCoroutinesManager
crs(cct
, cr_registry
);
693 RGWHTTPManager
http_manager(cct
, crs
.get_completion_mgr());
694 int ret
= http_manager
.start();
696 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
699 RGWDataSyncEnv sync_env_local
= sync_env
;
700 sync_env_local
.http_manager
= &http_manager
;
702 RGWDataSyncCtx sc_local
= sc
;
703 sc_local
.env
= &sync_env_local
;
705 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
> omapkeys
;
706 omapkeys
.resize(num_shards
);
707 uint64_t max_entries
{1};
709 ret
= crs
.run(dpp
, new RGWReadDataSyncRecoveringShardsCR(&sc_local
, max_entries
, num_shards
, omapkeys
));
713 for (int i
= 0; i
< num_shards
; i
++) {
714 if (omapkeys
[i
]->entries
.size() != 0) {
715 recovering_shards
.insert(i
);
723 int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider
*dpp
, int num_shards
)
725 rgw_data_sync_status sync_status
;
726 sync_status
.sync_info
.num_shards
= num_shards
;
728 RGWCoroutinesManager
crs(cct
, cr_registry
);
729 RGWHTTPManager
http_manager(cct
, crs
.get_completion_mgr());
730 int ret
= http_manager
.start();
732 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
735 RGWDataSyncEnv sync_env_local
= sync_env
;
736 sync_env_local
.http_manager
= &http_manager
;
737 auto instance_id
= ceph::util::generate_random_number
<uint64_t>();
738 RGWDataSyncCtx sc_local
= sc
;
739 sc_local
.env
= &sync_env_local
;
740 ret
= crs
.run(dpp
, new RGWInitDataSyncStatusCoroutine(&sc_local
, num_shards
, instance_id
, tn
, &sync_status
));
745 static string
full_data_sync_index_shard_oid(const rgw_zone_id
& source_zone
, int shard_id
)
747 char buf
[datalog_sync_full_sync_index_prefix
.size() + 1 + source_zone
.id
.size() + 1 + 16];
748 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_full_sync_index_prefix
.c_str(), source_zone
.id
.c_str(), shard_id
);
752 struct read_metadata_list
{
758 read_metadata_list() : truncated(false), count(0) {}
760 void decode_json(JSONObj
*obj
) {
761 JSONDecoder::decode_json("marker", marker
, obj
);
762 JSONDecoder::decode_json("truncated", truncated
, obj
);
763 JSONDecoder::decode_json("keys", keys
, obj
);
764 JSONDecoder::decode_json("count", count
, obj
);
768 struct bucket_instance_meta_info
{
772 RGWBucketInstanceMetadataObject data
;
774 bucket_instance_meta_info() {}
776 void decode_json(JSONObj
*obj
) {
777 JSONDecoder::decode_json("key", key
, obj
);
778 JSONDecoder::decode_json("ver", ver
, obj
);
779 JSONDecoder::decode_json("mtime", mtime
, obj
);
780 JSONDecoder::decode_json("data", data
, obj
);
784 class RGWListBucketIndexesCR
: public RGWCoroutine
{
786 RGWDataSyncEnv
*sync_env
;
788 rgw::sal::RadosStore
* store
;
790 rgw_data_sync_status
*sync_status
;
796 list
<string
>::iterator iter
;
798 RGWShardedOmapCRManager
*entries_index
;
803 bucket_instance_meta_info meta_info
;
810 read_metadata_list result
;
813 RGWListBucketIndexesCR(RGWDataSyncCtx
*_sc
,
814 rgw_data_sync_status
*_sync_status
) : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
815 store(sync_env
->store
), sync_status(_sync_status
),
816 req_ret(0), ret(0), entries_index(NULL
), i(0), failed(false), truncated(false) {
817 oid_prefix
= datalog_sync_full_sync_index_prefix
+ "." + sc
->source_zone
.id
;
818 path
= "/admin/metadata/bucket.instance";
819 num_shards
= sync_status
->sync_info
.num_shards
;
821 ~RGWListBucketIndexesCR() override
{
822 delete entries_index
;
825 int operate(const DoutPrefixProvider
*dpp
) override
{
827 entries_index
= new RGWShardedOmapCRManager(sync_env
->async_rados
, store
, this, num_shards
,
828 sync_env
->svc
->zone
->get_zone_params().log_pool
,
830 yield
; // yield so OmapAppendCRs can start
834 string entrypoint
= string("/admin/metadata/bucket.instance");
836 rgw_http_param_pair pairs
[] = {{"max-entries", "1000"},
837 {"marker", result
.marker
.c_str()},
840 call(new RGWReadRESTResourceCR
<read_metadata_list
>(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
,
841 entrypoint
, pairs
, &result
));
844 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl
;
845 return set_cr_error(retcode
);
848 for (iter
= result
.keys
.begin(); iter
!= result
.keys
.end(); ++iter
) {
849 ldpp_dout(dpp
, 20) << "list metadata: section=bucket.instance key=" << *iter
<< dendl
;
853 rgw_http_param_pair pairs
[] = {{"key", key
.c_str()},
856 call(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
, path
, pairs
, &meta_info
));
859 num_shards
= meta_info
.data
.get_bucket_info().layout
.current_index
.layout
.normal
.num_shards
;
860 if (num_shards
> 0) {
861 for (i
= 0; i
< num_shards
; i
++) {
863 snprintf(buf
, sizeof(buf
), ":%d", i
);
865 yield entries_index
->append(s
, sync_env
->svc
->datalog_rados
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, i
));
868 yield entries_index
->append(key
, sync_env
->svc
->datalog_rados
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, -1));
871 truncated
= result
.truncated
;
875 if (!entries_index
->finish()) {
880 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
->sync_markers
.begin(); iter
!= sync_status
->sync_markers
.end(); ++iter
) {
881 int shard_id
= (int)iter
->first
;
882 rgw_data_sync_marker
& marker
= iter
->second
;
883 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
884 spawn(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
885 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
)),
890 yield
call(sync_env
->error_logger
->log_error_cr(dpp
, sc
->conn
->get_remote_id(), "data.init", "",
891 EIO
, string("failed to build bucket instances map")));
893 while (collect(&ret
, NULL
)) {
895 yield
call(sync_env
->error_logger
->log_error_cr(dpp
, sc
->conn
->get_remote_id(), "data.init", "",
896 -ret
, string("failed to store sync status: ") + cpp_strerror(-ret
)));
904 yield
return set_cr_error(req_ret
);
906 yield
return set_cr_done();
912 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
914 class RGWDataSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
916 RGWDataSyncEnv
*sync_env
;
918 rgw_data_sync_marker sync_marker
;
919 RGWSyncTraceNodeRef tn
;
922 RGWDataSyncShardMarkerTrack(RGWDataSyncCtx
*_sc
,
923 const string
& _marker_oid
,
924 const rgw_data_sync_marker
& _marker
,
925 RGWSyncTraceNodeRef
& _tn
) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW
),
926 sc(_sc
), sync_env(_sc
->env
),
927 marker_oid(_marker_oid
),
928 sync_marker(_marker
),
931 RGWCoroutine
* store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
932 sync_marker
.marker
= new_marker
;
933 sync_marker
.pos
= index_pos
;
934 sync_marker
.timestamp
= timestamp
;
936 tn
->log(20, SSTR("updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
));
938 return new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
939 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, marker_oid
),
943 RGWOrderCallCR
*allocate_order_control_cr() override
{
944 return new RGWLastCallerWinsCR(sync_env
->cct
);
948 // ostream wrappers to print buckets without copying strings
951 explicit bucket_str(const rgw_bucket
& b
) : b(b
) {}
953 std::ostream
& operator<<(std::ostream
& out
, const bucket_str
& rhs
) {
955 if (!b
.tenant
.empty()) {
956 out
<< b
.tenant
<< '/';
959 if (!b
.bucket_id
.empty()) {
960 out
<< ':' << b
.bucket_id
;
965 struct bucket_str_noinstance
{
967 explicit bucket_str_noinstance(const rgw_bucket
& b
) : b(b
) {}
969 std::ostream
& operator<<(std::ostream
& out
, const bucket_str_noinstance
& rhs
) {
971 if (!b
.tenant
.empty()) {
972 out
<< b
.tenant
<< '/';
978 struct bucket_shard_str
{
979 const rgw_bucket_shard
& bs
;
980 explicit bucket_shard_str(const rgw_bucket_shard
& bs
) : bs(bs
) {}
982 std::ostream
& operator<<(std::ostream
& out
, const bucket_shard_str
& rhs
) {
984 out
<< bucket_str
{bs
.bucket
};
985 if (bs
.shard_id
>= 0) {
986 out
<< ':' << bs
.shard_id
;
991 class RGWRunBucketSyncCoroutine
: public RGWCoroutine
{
993 RGWDataSyncEnv
*sync_env
;
994 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
995 rgw_bucket_sync_pair_info sync_pair
;
996 rgw_bucket_sync_pipe sync_pipe
;
997 rgw_bucket_shard_sync_info sync_status
;
998 RGWMetaSyncEnv meta_sync_env
;
999 RGWObjVersionTracker objv_tracker
;
1000 ceph::real_time
* progress
;
1002 const std::string status_oid
;
1004 RGWSyncTraceNodeRef tn
;
1007 RGWRunBucketSyncCoroutine(RGWDataSyncCtx
*_sc
,
1008 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1009 const rgw_bucket_sync_pair_info
& _sync_pair
,
1010 const RGWSyncTraceNodeRef
& _tn_parent
,
1011 ceph::real_time
* progress
)
1012 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
1013 lease_cr(std::move(lease_cr
)), sync_pair(_sync_pair
), progress(progress
),
1014 status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc
->source_zone
, sync_pair
)),
1015 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "bucket",
1016 SSTR(bucket_shard_str
{_sync_pair
.dest_bs
} << "<-" << bucket_shard_str
{_sync_pair
.source_bs
} ))) {
1019 int operate(const DoutPrefixProvider
*dpp
) override
;
1022 struct all_bucket_info
{
1023 RGWBucketInfo bucket_info
;
1024 map
<string
, bufferlist
> attrs
;
1027 struct rgw_sync_pipe_info_entity
1030 RGWBucketInfo bucket_info
;
1031 map
<string
, bufferlist
> bucket_attrs
;
1032 bool _has_bucket_info
{false};
1037 rgw_sync_pipe_info_entity() {}
1038 rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity
& e
,
1039 std::optional
<all_bucket_info
>& binfo
) {
1047 binfo
->bucket_info
.bucket
!= *e
.bucket
) {
1048 bucket_info
.bucket
= *e
.bucket
;
1050 set_bucket_info(*binfo
);
1054 void update_empty_bucket_info(const std::map
<rgw_bucket
, all_bucket_info
>& buckets_info
) {
1055 if (_has_bucket_info
) {
1058 if (bucket_info
.bucket
.name
.empty()) {
1062 auto iter
= buckets_info
.find(bucket_info
.bucket
);
1063 if (iter
== buckets_info
.end()) {
1067 set_bucket_info(iter
->second
);
1070 bool has_bucket_info() const {
1071 return _has_bucket_info
;
1074 void set_bucket_info(const all_bucket_info
& all_info
) {
1075 bucket_info
= all_info
.bucket_info
;
1076 bucket_attrs
= all_info
.attrs
;
1077 _has_bucket_info
= true;
1080 const RGWBucketInfo
& get_bucket_info() const {
1084 const rgw_bucket
& get_bucket() const {
1085 return bucket_info
.bucket
;
1088 bool operator<(const rgw_sync_pipe_info_entity
& e
) const {
1089 if (zone
< e
.zone
) {
1092 if (zone
> e
.zone
) {
1095 return (bucket_info
.bucket
< e
.bucket_info
.bucket
);
1099 std::ostream
& operator<<(std::ostream
& out
, const rgw_sync_pipe_info_entity
& e
) {
1100 auto& bucket
= e
.get_bucket_info().bucket
;
1102 out
<< e
.zone
<< ":" << bucket
.get_key();
1106 struct rgw_sync_pipe_handler_info
{
1107 RGWBucketSyncFlowManager::pipe_handler handler
;
1108 rgw_sync_pipe_info_entity source
;
1109 rgw_sync_pipe_info_entity target
;
1111 rgw_sync_pipe_handler_info() {}
1112 rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler
& _handler
,
1113 std::optional
<all_bucket_info
> source_bucket_info
,
1114 std::optional
<all_bucket_info
> target_bucket_info
) : handler(_handler
),
1115 source(handler
.source
, source_bucket_info
),
1116 target(handler
.dest
, target_bucket_info
) {
1119 bool operator<(const rgw_sync_pipe_handler_info
& p
) const {
1120 if (source
< p
.source
) {
1123 if (p
.source
< source
) {
1126 return (target
< p
.target
);
1129 void update_empty_bucket_info(const std::map
<rgw_bucket
, all_bucket_info
>& buckets_info
) {
1130 source
.update_empty_bucket_info(buckets_info
);
1131 target
.update_empty_bucket_info(buckets_info
);
1135 std::ostream
& operator<<(std::ostream
& out
, const rgw_sync_pipe_handler_info
& p
) {
1136 out
<< p
.source
<< ">" << p
.target
;
1140 struct rgw_sync_pipe_info_set
{
1141 std::set
<rgw_sync_pipe_handler_info
> handlers
;
1143 using iterator
= std::set
<rgw_sync_pipe_handler_info
>::iterator
;
1149 void insert(const RGWBucketSyncFlowManager::pipe_handler
& handler
,
1150 std::optional
<all_bucket_info
>& source_bucket_info
,
1151 std::optional
<all_bucket_info
>& target_bucket_info
) {
1152 rgw_sync_pipe_handler_info
p(handler
, source_bucket_info
, target_bucket_info
);
1157 return handlers
.begin();
1161 return handlers
.end();
1164 bool empty() const {
1165 return handlers
.empty();
1168 void update_empty_bucket_info(const std::map
<rgw_bucket
, all_bucket_info
>& buckets_info
) {
1169 if (buckets_info
.empty()) {
1173 std::set
<rgw_sync_pipe_handler_info
> p
;
1175 for (auto pipe
: handlers
) {
1176 pipe
.update_empty_bucket_info(buckets_info
);
1180 handlers
= std::move(p
);
1184 class RGWRunBucketsSyncBySourceCR
: public RGWCoroutine
{
1186 RGWDataSyncEnv
*sync_env
;
1188 rgw_bucket_shard source
;
1190 RGWSyncTraceNodeRef tn
;
1193 RGWRunBucketsSyncBySourceCR(RGWDataSyncCtx
*_sc
, const rgw_bucket_shard
& _source
, const RGWSyncTraceNodeRef
& _tn_parent
)
1194 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
), source(_source
),
1195 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "source",
1196 SSTR(bucket_shard_str
{_source
} ))) {
1198 ~RGWRunBucketsSyncBySourceCR() override
{
1201 int operate(const DoutPrefixProvider
*dpp
) override
;
1204 class RGWRunBucketSourcesSyncCR
: public RGWCoroutine
{
1206 RGWDataSyncEnv
*sync_env
;
1207 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
1209 std::optional
<rgw_bucket_shard
> target_bs
;
1210 std::optional
<rgw_bucket_shard
> source_bs
;
1212 std::optional
<rgw_bucket
> target_bucket
;
1213 std::optional
<rgw_bucket
> source_bucket
;
1215 rgw_sync_pipe_info_set pipes
;
1216 rgw_sync_pipe_info_set::iterator siter
;
1218 rgw_bucket_sync_pair_info sync_pair
;
1220 RGWSyncTraceNodeRef tn
;
1221 ceph::real_time
* progress
;
1222 std::map
<uint64_t, ceph::real_time
> shard_progress
;
1224 ceph::real_time
*cur_progress
{nullptr};
1225 std::optional
<ceph::real_time
> min_progress
;
1227 RGWRESTConn
*conn
{nullptr};
1228 rgw_zone_id last_zone
;
1230 int source_num_shards
{0};
1231 int target_num_shards
{0};
1238 RGWRunBucketSourcesSyncCR(RGWDataSyncCtx
*_sc
,
1239 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1240 std::optional
<rgw_bucket_shard
> _target_bs
,
1241 std::optional
<rgw_bucket_shard
> _source_bs
,
1242 const RGWSyncTraceNodeRef
& _tn_parent
,
1243 ceph::real_time
* progress
);
1245 int operate(const DoutPrefixProvider
*dpp
) override
;
1247 void handle_complete_stack(uint64_t stack_id
) {
1248 auto iter
= shard_progress
.find(stack_id
);
1249 if (iter
== shard_progress
.end()) {
1250 lderr(cct
) << "ERROR: RGWRunBucketSourcesSyncCR::handle_complete_stack(): stack_id=" << stack_id
<< " not found! Likely a bug" << dendl
;
1254 if (!min_progress
) {
1255 min_progress
= iter
->second
;
1257 if (iter
->second
< *min_progress
) {
1258 min_progress
= iter
->second
;
1263 shard_progress
.erase(stack_id
);
1267 class RGWDataSyncSingleEntryCR
: public RGWCoroutine
{
1269 RGWDataSyncEnv
*sync_env
;
1270 rgw::bucket_sync::Handle state
; // cached bucket-shard state
1271 rgw_data_sync_obligation obligation
; // input obligation
1272 std::optional
<rgw_data_sync_obligation
> complete
; // obligation to complete
1273 uint32_t obligation_counter
= 0;
1274 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1275 const rgw_raw_obj
& error_repo
;
1276 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
1277 RGWSyncTraceNodeRef tn
;
1279 ceph::real_time progress
;
1280 int sync_status
= 0;
1282 RGWDataSyncSingleEntryCR(RGWDataSyncCtx
*_sc
, rgw::bucket_sync::Handle state
,
1283 rgw_data_sync_obligation obligation
,
1284 RGWDataSyncShardMarkerTrack
*_marker_tracker
,
1285 const rgw_raw_obj
& error_repo
,
1286 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1287 const RGWSyncTraceNodeRef
& _tn_parent
)
1288 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
1289 state(std::move(state
)), obligation(std::move(obligation
)),
1290 marker_tracker(_marker_tracker
), error_repo(error_repo
),
1291 lease_cr(std::move(lease_cr
)) {
1292 set_description() << "data sync single entry (source_zone=" << sc
->source_zone
<< ") " << obligation
;
1293 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", obligation
.key
);
1296 int operate(const DoutPrefixProvider
*dpp
) override
{
1298 if (state
->obligation
) {
1299 // this is already syncing in another DataSyncSingleEntryCR
1300 if (state
->obligation
->timestamp
< obligation
.timestamp
) {
1301 // cancel existing obligation and overwrite it
1302 tn
->log(10, SSTR("canceling existing obligation " << *state
->obligation
));
1303 complete
= std::move(*state
->obligation
);
1304 *state
->obligation
= std::move(obligation
);
1307 // cancel new obligation
1308 tn
->log(10, SSTR("canceling new obligation " << obligation
));
1309 complete
= std::move(obligation
);
1312 // start syncing a new obligation
1313 state
->obligation
= obligation
;
1314 obligation_counter
= state
->counter
;
1317 // loop until the latest obligation is satisfied, because other callers
1318 // may update the obligation while we're syncing
1319 while ((state
->obligation
->timestamp
== ceph::real_time() ||
1320 state
->progress_timestamp
< state
->obligation
->timestamp
) &&
1321 obligation_counter
!= state
->counter
) {
1322 obligation_counter
= state
->counter
;
1323 progress
= ceph::real_time
{};
1325 ldout(cct
, 4) << "starting sync on " << bucket_shard_str
{state
->key
}
1326 << ' ' << *state
->obligation
<< dendl
;
1327 yield
call(new RGWRunBucketSourcesSyncCR(sc
, lease_cr
,
1328 std::nullopt
, /* target_bs */
1329 state
->key
, tn
, &progress
));
1333 state
->progress_timestamp
= std::max(progress
, state
->progress_timestamp
);
1335 // any new obligations will process themselves
1336 complete
= std::move(*state
->obligation
);
1337 state
->obligation
.reset();
1339 tn
->log(10, SSTR("sync finished on " << bucket_shard_str
{state
->key
}
1340 << " progress=" << progress
<< ' ' << complete
<< " r=" << retcode
));
1342 sync_status
= retcode
;
1344 if (sync_status
== -ENOENT
) {
1345 // this was added when 'tenant/' was added to datalog entries, because
1346 // preexisting tenant buckets could never sync and would stay in the
1347 // error_repo forever
1348 tn
->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete
->key
));
1352 if (sync_status
< 0) {
1353 // write actual sync failures for 'radosgw-admin sync error list'
1354 if (sync_status
!= -EBUSY
&& sync_status
!= -EAGAIN
) {
1355 yield
call(sync_env
->error_logger
->log_error_cr(dpp
, sc
->conn
->get_remote_id(), "data", complete
->key
,
1356 -sync_status
, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status
)));
1358 tn
->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode
));
1361 if (complete
->timestamp
!= ceph::real_time
{}) {
1362 tn
->log(10, SSTR("writing " << *complete
<< " to error repo for retry"));
1363 yield
call(rgw_error_repo_write_cr(sync_env
->store
->svc()->rados
, error_repo
,
1364 complete
->key
, complete
->timestamp
));
1366 tn
->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode
));
1369 } else if (complete
->retry
) {
1370 yield
call(rgw_error_repo_remove_cr(sync_env
->store
->svc()->rados
, error_repo
,
1371 complete
->key
, complete
->timestamp
));
1373 tn
->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1374 << error_repo
<< " retcode=" << retcode
));
1377 /* FIXME: what do do in case of error */
1378 if (marker_tracker
&& !complete
->marker
.empty()) {
1380 yield
call(marker_tracker
->finish(complete
->marker
));
1382 if (sync_status
== 0) {
1383 sync_status
= retcode
;
1385 if (sync_status
< 0) {
1386 return set_cr_error(sync_status
);
1388 return set_cr_done();
1394 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1395 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1397 class RGWDataSyncShardCR
: public RGWCoroutine
{
1399 RGWDataSyncEnv
*sync_env
;
1404 rgw_data_sync_marker
& sync_marker
;
1406 RGWRadosGetOmapValsCR::ResultPtr omapvals
;
1407 std::map
<std::string
, bufferlist
> entries
;
1408 std::map
<std::string
, bufferlist
>::iterator iter
;
1412 std::optional
<RGWDataSyncShardMarkerTrack
> marker_tracker
;
1414 std::string next_marker
;
1415 list
<rgw_data_change_log_entry
> log_entries
;
1416 list
<rgw_data_change_log_entry
>::iterator log_iter
;
1417 bool truncated
= false;
1419 ceph::mutex inc_lock
= ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
1420 ceph::condition_variable inc_cond
;
1422 boost::asio::coroutine incremental_cr
;
1423 boost::asio::coroutine full_cr
;
1426 set
<string
> modified_shards
;
1427 set
<string
> current_modified
;
1429 set
<string
>::iterator modified_iter
;
1431 uint64_t total_entries
= 0;
1432 static constexpr int spawn_window
= BUCKET_SHARD_SYNC_SPAWN_WINDOW
;
1433 bool *reset_backoff
= nullptr;
1435 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1436 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1439 rgw_raw_obj error_repo
;
1440 std::map
<std::string
, bufferlist
> error_entries
;
1441 string error_marker
;
1442 ceph::real_time entry_timestamp
;
1443 static constexpr int max_error_entries
= DATA_SYNC_MAX_ERR_ENTRIES
;
1445 ceph::coarse_real_time error_retry_time
;
1447 #define RETRY_BACKOFF_SECS_MIN 60
1448 #define RETRY_BACKOFF_SECS_DEFAULT 60
1449 #define RETRY_BACKOFF_SECS_MAX 600
1450 uint32_t retry_backoff_secs
= RETRY_BACKOFF_SECS_DEFAULT
;
1452 RGWSyncTraceNodeRef tn
;
1454 rgw_bucket_shard source_bs
;
1456 // target number of entries to cache before recycling idle ones
1457 static constexpr size_t target_cache_size
= 256;
1458 boost::intrusive_ptr
<rgw::bucket_sync::Cache
> bucket_shard_cache
;
1460 int parse_bucket_key(const std::string
& key
, rgw_bucket_shard
& bs
) const {
1461 return rgw_bucket_parse_bucket_key(sync_env
->cct
, key
,
1462 &bs
.bucket
, &bs
.shard_id
);
1464 RGWCoroutine
* sync_single_entry(const rgw_bucket_shard
& src
,
1465 const std::string
& key
,
1466 const std::string
& marker
,
1467 ceph::real_time timestamp
, bool retry
) {
1468 auto state
= bucket_shard_cache
->get(src
);
1469 auto obligation
= rgw_data_sync_obligation
{key
, marker
, timestamp
, retry
};
1470 return new RGWDataSyncSingleEntryCR(sc
, std::move(state
), std::move(obligation
),
1471 &*marker_tracker
, error_repo
,
1472 lease_cr
.get(), tn
);
1475 RGWDataSyncShardCR(RGWDataSyncCtx
*_sc
, rgw_pool
& _pool
,
1476 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
,
1477 RGWSyncTraceNodeRef
& _tn
, bool *_reset_backoff
)
1478 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
1479 pool(_pool
), shard_id(_shard_id
), sync_marker(_marker
),
1480 status_oid(RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
)),
1481 error_repo(pool
, status_oid
+ ".retry"), tn(_tn
),
1482 bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size
))
1484 set_description() << "data sync shard source_zone=" << sc
->source_zone
<< " shard_id=" << shard_id
;
1487 ~RGWDataSyncShardCR() override
{
1493 void append_modified_shards(set
<string
>& keys
) {
1494 std::lock_guard l
{inc_lock
};
1495 modified_shards
.insert(keys
.begin(), keys
.end());
1498 int operate(const DoutPrefixProvider
*dpp
) override
{
1501 switch (sync_marker
.state
) {
1502 case rgw_data_sync_marker::FullSync
:
1506 tn
->log(10, SSTR("full sync failed (r=" << r
<< ")"));
1508 return set_cr_error(r
);
1511 case rgw_data_sync_marker::IncrementalSync
:
1512 r
= incremental_sync();
1515 tn
->log(10, SSTR("incremental sync failed (r=" << r
<< ")"));
1517 return set_cr_error(r
);
1521 return set_cr_error(-EIO
);
1527 void init_lease_cr() {
1528 set_status("acquiring sync lock");
1529 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1530 string lock_name
= "sync_lock";
1534 auto store
= sync_env
->store
;
1535 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1536 rgw_raw_obj(pool
, status_oid
),
1537 lock_name
, lock_duration
, this));
1538 lease_stack
.reset(spawn(lease_cr
.get(), false));
1542 #define OMAP_GET_MAX_ENTRIES 100
1543 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1545 tn
->log(10, "start full sync");
1546 yield
init_lease_cr();
1547 while (!lease_cr
->is_locked()) {
1548 if (lease_cr
->is_done()) {
1549 tn
->log(5, "failed to take lease");
1550 set_status("lease lock failed, early abort");
1552 return set_cr_error(lease_cr
->get_ret_status());
1557 tn
->log(10, "took lease");
1558 oid
= full_data_sync_index_shard_oid(sc
->source_zone
, shard_id
);
1559 marker_tracker
.emplace(sc
, status_oid
, sync_marker
, tn
);
1560 total_entries
= sync_marker
.pos
;
1561 entry_timestamp
= sync_marker
.timestamp
; // time when full sync started
1563 if (!lease_cr
->is_locked()) {
1564 lease_cr
->go_down();
1566 return set_cr_error(-ECANCELED
);
1568 omapvals
= std::make_shared
<RGWRadosGetOmapValsCR::Result
>();
1569 yield
call(new RGWRadosGetOmapValsCR(sync_env
->store
, rgw_raw_obj(pool
, oid
),
1570 sync_marker
.marker
, max_entries
, omapvals
));
1572 lease_cr
->go_down();
1574 return set_cr_error(retcode
);
1576 entries
= std::move(omapvals
->entries
);
1577 if (entries
.size() > 0) {
1578 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1580 tn
->log(20, SSTR("retrieved " << entries
.size() << " entries to sync"));
1581 iter
= entries
.begin();
1582 for (; iter
!= entries
.end(); ++iter
) {
1583 retcode
= parse_bucket_key(iter
->first
, source_bs
);
1585 tn
->log(1, SSTR("failed to parse bucket shard: " << iter
->first
));
1586 marker_tracker
->try_update_high_marker(iter
->first
, 0, entry_timestamp
);
1589 tn
->log(20, SSTR("full sync: " << iter
->first
));
1591 if (!marker_tracker
->start(iter
->first
, total_entries
, entry_timestamp
)) {
1592 tn
->log(0, SSTR("ERROR: cannot start syncing " << iter
->first
<< ". Duplicate entry?"));
1594 // fetch remote and write locally
1595 yield_spawn_window(sync_single_entry(source_bs
, iter
->first
, iter
->first
,
1596 entry_timestamp
, false),
1597 spawn_window
, std::nullopt
);
1599 sync_marker
.marker
= iter
->first
;
1601 } while (omapvals
->more
);
1604 drain_all_but_stack(lease_stack
.get());
1606 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1609 /* update marker to reflect we're done with full sync */
1610 sync_marker
.state
= rgw_data_sync_marker::IncrementalSync
;
1611 sync_marker
.marker
= sync_marker
.next_step_marker
;
1612 sync_marker
.next_step_marker
.clear();
1613 call(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
1614 rgw_raw_obj(pool
, status_oid
),
1618 tn
->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode
));
1619 lease_cr
->go_down();
1621 return set_cr_error(retcode
);
1623 // clean up full sync index
1625 const auto& pool
= sync_env
->svc
->zone
->get_zone_params().log_pool
;
1626 auto oid
= full_data_sync_index_shard_oid(sc
->source_zone
.id
, shard_id
);
1627 call(new RGWRadosRemoveCR(sync_env
->store
, {pool
, oid
}));
1629 // keep lease and transition to incremental_sync()
1634 int incremental_sync() {
1635 reenter(&incremental_cr
) {
1636 tn
->log(10, "start incremental sync");
1638 tn
->log(10, "lease already held from full sync");
1640 yield
init_lease_cr();
1641 while (!lease_cr
->is_locked()) {
1642 if (lease_cr
->is_done()) {
1643 tn
->log(5, "failed to take lease");
1644 set_status("lease lock failed, early abort");
1646 return set_cr_error(lease_cr
->get_ret_status());
1651 set_status("lease acquired");
1652 tn
->log(10, "took lease");
1654 marker_tracker
.emplace(sc
, status_oid
, sync_marker
, tn
);
1656 if (!lease_cr
->is_locked()) {
1657 lease_cr
->go_down();
1659 return set_cr_error(-ECANCELED
);
1661 current_modified
.clear();
1663 current_modified
.swap(modified_shards
);
1666 if (current_modified
.size() > 0) {
1667 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1669 /* process out of band updates */
1670 for (modified_iter
= current_modified
.begin(); modified_iter
!= current_modified
.end(); ++modified_iter
) {
1671 retcode
= parse_bucket_key(*modified_iter
, source_bs
);
1673 tn
->log(1, SSTR("failed to parse bucket shard: " << *modified_iter
));
1676 tn
->log(20, SSTR("received async update notification: " << *modified_iter
));
1677 spawn(sync_single_entry(source_bs
, *modified_iter
, string(),
1678 ceph::real_time
{}, false), false);
1681 if (error_retry_time
<= ceph::coarse_real_clock::now()) {
1682 /* process bucket shards that previously failed */
1683 omapvals
= std::make_shared
<RGWRadosGetOmapValsCR::Result
>();
1684 yield
call(new RGWRadosGetOmapValsCR(sync_env
->store
, error_repo
,
1685 error_marker
, max_error_entries
, omapvals
));
1686 error_entries
= std::move(omapvals
->entries
);
1687 tn
->log(20, SSTR("read error repo, got " << error_entries
.size() << " entries"));
1688 iter
= error_entries
.begin();
1689 for (; iter
!= error_entries
.end(); ++iter
) {
1690 error_marker
= iter
->first
;
1691 entry_timestamp
= rgw_error_repo_decode_value(iter
->second
);
1692 retcode
= parse_bucket_key(error_marker
, source_bs
);
1694 tn
->log(1, SSTR("failed to parse bucket shard: " << error_marker
));
1695 spawn(rgw_error_repo_remove_cr(sync_env
->store
->svc()->rados
, error_repo
,
1696 error_marker
, entry_timestamp
), false);
1699 tn
->log(20, SSTR("handle error entry key=" << error_marker
<< " timestamp=" << entry_timestamp
));
1700 spawn(sync_single_entry(source_bs
, error_marker
, "",
1701 entry_timestamp
, true), false);
1703 if (!omapvals
->more
) {
1704 if (error_marker
.empty() && error_entries
.empty()) {
1705 /* the retry repo is empty, we back off a bit before calling it again */
1706 retry_backoff_secs
*= 2;
1707 if (retry_backoff_secs
> RETRY_BACKOFF_SECS_MAX
) {
1708 retry_backoff_secs
= RETRY_BACKOFF_SECS_MAX
;
1711 retry_backoff_secs
= RETRY_BACKOFF_SECS_DEFAULT
;
1713 error_retry_time
= ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs
);
1714 error_marker
.clear();
1719 tn
->log(20, SSTR("shard_id=" << shard_id
<< " sync_marker=" << sync_marker
.marker
));
1720 yield
call(new RGWReadRemoteDataLogShardCR(sc
, shard_id
, sync_marker
.marker
,
1721 &next_marker
, &log_entries
, &truncated
));
1722 if (retcode
< 0 && retcode
!= -ENOENT
) {
1723 tn
->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode
));
1724 lease_cr
->go_down();
1726 return set_cr_error(retcode
);
1729 if (log_entries
.size() > 0) {
1730 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1733 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end(); ++log_iter
) {
1734 tn
->log(20, SSTR("shard_id=" << shard_id
<< " log_entry: " << log_iter
->log_id
<< ":" << log_iter
->log_timestamp
<< ":" << log_iter
->entry
.key
));
1735 retcode
= parse_bucket_key(log_iter
->entry
.key
, source_bs
);
1737 tn
->log(1, SSTR("failed to parse bucket shard: " << log_iter
->entry
.key
));
1738 marker_tracker
->try_update_high_marker(log_iter
->log_id
, 0, log_iter
->log_timestamp
);
1741 if (!marker_tracker
->start(log_iter
->log_id
, 0, log_iter
->log_timestamp
)) {
1742 tn
->log(0, SSTR("ERROR: cannot start syncing " << log_iter
->log_id
<< ". Duplicate entry?"));
1744 yield_spawn_window(sync_single_entry(source_bs
, log_iter
->entry
.key
, log_iter
->log_id
,
1745 log_iter
->log_timestamp
, false),
1746 spawn_window
, std::nullopt
);
1750 tn
->log(20, SSTR("shard_id=" << shard_id
<< " sync_marker=" << sync_marker
.marker
1751 << " next_marker=" << next_marker
<< " truncated=" << truncated
));
1752 if (!next_marker
.empty()) {
1753 sync_marker
.marker
= next_marker
;
1754 } else if (!log_entries
.empty()) {
1755 sync_marker
.marker
= log_entries
.back().log_id
;
1758 // we reached the end, wait a while before checking for more
1759 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1760 yield
wait(get_idle_interval());
1767 utime_t
get_idle_interval() const {
1768 #define INCREMENTAL_INTERVAL 20
1769 ceph::timespan interval
= std::chrono::seconds(INCREMENTAL_INTERVAL
);
1770 if (!ceph::coarse_real_clock::is_zero(error_retry_time
)) {
1771 auto now
= ceph::coarse_real_clock::now();
1772 if (error_retry_time
> now
) {
1773 auto d
= error_retry_time
- now
;
1779 // convert timespan -> time_point -> utime_t
1780 return utime_t(ceph::coarse_real_clock::zero() + interval
);
1784 class RGWDataSyncShardControlCR
: public RGWBackoffControlCR
{
1786 RGWDataSyncEnv
*sync_env
;
1791 rgw_data_sync_marker sync_marker
;
1793 RGWSyncTraceNodeRef tn
;
1795 RGWDataSyncShardControlCR(RGWDataSyncCtx
*_sc
, const rgw_pool
& _pool
,
1796 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
,
1797 RGWSyncTraceNodeRef
& _tn_parent
) : RGWBackoffControlCR(_sc
->cct
, false),
1798 sc(_sc
), sync_env(_sc
->env
),
1800 shard_id(_shard_id
),
1801 sync_marker(_marker
) {
1802 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "shard", std::to_string(shard_id
));
1805 RGWCoroutine
*alloc_cr() override
{
1806 return new RGWDataSyncShardCR(sc
, pool
, shard_id
, sync_marker
, tn
, backoff_ptr());
1809 RGWCoroutine
*alloc_finisher_cr() override
{
1810 return new RGWSimpleRadosReadCR
<rgw_data_sync_marker
>(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
1811 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
)),
1815 void append_modified_shards(set
<string
>& keys
) {
1816 std::lock_guard l
{cr_lock()};
1818 RGWDataSyncShardCR
*cr
= static_cast<RGWDataSyncShardCR
*>(get_cr());
1823 cr
->append_modified_shards(keys
);
1827 class RGWDataSyncCR
: public RGWCoroutine
{
1829 RGWDataSyncEnv
*sync_env
;
1830 uint32_t num_shards
;
1832 rgw_data_sync_status sync_status
;
1834 ceph::mutex shard_crs_lock
=
1835 ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
1836 map
<int, RGWDataSyncShardControlCR
*> shard_crs
;
1838 bool *reset_backoff
;
1840 RGWSyncTraceNodeRef tn
;
1842 RGWDataSyncModule
*data_sync_module
{nullptr};
1844 RGWDataSyncCR(RGWDataSyncCtx
*_sc
, uint32_t _num_shards
, RGWSyncTraceNodeRef
& _tn
, bool *_reset_backoff
) : RGWCoroutine(_sc
->cct
),
1845 sc(_sc
), sync_env(_sc
->env
),
1846 num_shards(_num_shards
),
1847 reset_backoff(_reset_backoff
), tn(_tn
) {
1851 ~RGWDataSyncCR() override
{
1852 for (auto iter
: shard_crs
) {
1857 int operate(const DoutPrefixProvider
*dpp
) override
{
1860 /* read sync status */
1861 yield
call(new RGWReadDataSyncStatusCoroutine(sc
, &sync_status
));
1863 data_sync_module
= sync_env
->sync_module
->get_data_handler();
1865 if (retcode
< 0 && retcode
!= -ENOENT
) {
1866 tn
->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode
));
1867 return set_cr_error(retcode
);
1870 /* state: init status */
1871 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateInit
) {
1872 tn
->log(20, SSTR("init"));
1873 sync_status
.sync_info
.num_shards
= num_shards
;
1874 uint64_t instance_id
;
1875 instance_id
= ceph::util::generate_random_number
<uint64_t>();
1876 yield
call(new RGWInitDataSyncStatusCoroutine(sc
, num_shards
, instance_id
, tn
, &sync_status
));
1878 tn
->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode
));
1879 return set_cr_error(retcode
);
1881 // sets state = StateBuildingFullSyncMaps
1883 *reset_backoff
= true;
1886 data_sync_module
->init(sc
, sync_status
.sync_info
.instance_id
);
1888 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateBuildingFullSyncMaps
) {
1889 tn
->log(10, SSTR("building full sync maps"));
1890 /* call sync module init here */
1891 sync_status
.sync_info
.num_shards
= num_shards
;
1892 yield
call(data_sync_module
->init_sync(dpp
, sc
));
1894 tn
->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode
));
1895 return set_cr_error(retcode
);
1897 /* state: building full sync maps */
1898 yield
call(new RGWListBucketIndexesCR(sc
, &sync_status
));
1900 tn
->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode
));
1901 return set_cr_error(retcode
);
1903 sync_status
.sync_info
.state
= rgw_data_sync_info::StateSync
;
1905 /* update new state */
1906 yield
call(set_sync_info_cr());
1908 tn
->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode
));
1909 return set_cr_error(retcode
);
1912 *reset_backoff
= true;
1915 yield
call(data_sync_module
->start_sync(dpp
, sc
));
1917 tn
->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode
));
1918 return set_cr_error(retcode
);
1922 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateSync
) {
1923 tn
->log(10, SSTR("spawning " << num_shards
<< " shards sync"));
1924 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
.sync_markers
.begin();
1925 iter
!= sync_status
.sync_markers
.end(); ++iter
) {
1926 RGWDataSyncShardControlCR
*cr
= new RGWDataSyncShardControlCR(sc
, sync_env
->svc
->zone
->get_zone_params().log_pool
,
1927 iter
->first
, iter
->second
, tn
);
1929 shard_crs_lock
.lock();
1930 shard_crs
[iter
->first
] = cr
;
1931 shard_crs_lock
.unlock();
1937 return set_cr_done();
1942 RGWCoroutine
*set_sync_info_cr() {
1943 return new RGWSimpleRadosWriteCR
<rgw_data_sync_info
>(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
1944 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sc
->source_zone
)),
1945 sync_status
.sync_info
);
1948 void wakeup(int shard_id
, set
<string
>& keys
) {
1949 std::lock_guard l
{shard_crs_lock
};
1950 map
<int, RGWDataSyncShardControlCR
*>::iterator iter
= shard_crs
.find(shard_id
);
1951 if (iter
== shard_crs
.end()) {
1954 iter
->second
->append_modified_shards(keys
);
1955 iter
->second
->wakeup();
1959 class RGWDefaultDataSyncModule
: public RGWDataSyncModule
{
1961 RGWDefaultDataSyncModule() {}
1963 RGWCoroutine
*sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1964 RGWCoroutine
*remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1965 RGWCoroutine
*create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
1966 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1969 class RGWDefaultSyncModuleInstance
: public RGWSyncModuleInstance
{
1970 RGWDefaultDataSyncModule data_handler
;
1972 RGWDefaultSyncModuleInstance() {}
1973 RGWDataSyncModule
*get_data_handler() override
{
1974 return &data_handler
;
1976 bool supports_user_writes() override
{
1981 int RGWDefaultSyncModule::create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
)
1983 instance
->reset(new RGWDefaultSyncModuleInstance());
1987 class RGWUserPermHandler
{
1989 friend class Bucket
;
1991 RGWDataSyncEnv
*sync_env
;
1995 RGWUserInfo user_info
;
1996 rgw::IAM::Environment env
;
1997 std::unique_ptr
<rgw::auth::Identity
> identity
;
1998 RGWAccessControlPolicy user_acl
;
2001 std::shared_ptr
<_info
> info
;
2005 std::shared_ptr
<Init
> init_action
;
2007 struct Init
: public RGWGenericAsyncCR::Action
{
2008 RGWDataSyncEnv
*sync_env
;
2011 std::shared_ptr
<RGWUserPermHandler::_info
> info
;
2015 Init(RGWUserPermHandler
*handler
) : sync_env(handler
->sync_env
),
2017 info(handler
->info
) {}
2018 int operate() override
{
2019 auto user_ctl
= sync_env
->store
->getRados()->ctl
.user
;
2021 ret
= user_ctl
->get_info_by_uid(sync_env
->dpp
, uid
, &info
->user_info
, null_yield
);
2026 info
->identity
= rgw::auth::transform_old_authinfo(sync_env
->cct
,
2028 RGW_PERM_FULL_CONTROL
,
2029 false, /* system_request? */
2032 map
<string
, bufferlist
> uattrs
;
2034 ret
= user_ctl
->get_attrs_by_uid(sync_env
->dpp
, uid
, &uattrs
, null_yield
);
2036 ret
= RGWUserPermHandler::policy_from_attrs(sync_env
->cct
, uattrs
, &info
->user_acl
);
2038 if (ret
== -ENOENT
) {
2039 info
->user_acl
.create_default(uid
, info
->user_info
.display_name
);
2047 RGWUserPermHandler(RGWDataSyncEnv
*_sync_env
,
2048 const rgw_user
& _uid
) : sync_env(_sync_env
),
2051 RGWCoroutine
*init_cr() {
2052 info
= make_shared
<_info
>();
2053 init_action
= make_shared
<Init
>(this);
2055 return new RGWGenericAsyncCR(sync_env
->cct
,
2056 sync_env
->async_rados
,
2061 RGWDataSyncEnv
*sync_env
;
2062 std::shared_ptr
<_info
> info
;
2063 RGWAccessControlPolicy bucket_acl
;
2064 std::optional
<perm_state
> ps
;
2068 int init(RGWUserPermHandler
*handler
,
2069 const RGWBucketInfo
& bucket_info
,
2070 const map
<string
, bufferlist
>& bucket_attrs
);
2072 bool verify_bucket_permission(int perm
);
2073 bool verify_object_permission(const map
<string
, bufferlist
>& obj_attrs
,
2077 static int policy_from_attrs(CephContext
*cct
,
2078 const map
<string
, bufferlist
>& attrs
,
2079 RGWAccessControlPolicy
*acl
) {
2082 auto aiter
= attrs
.find(RGW_ATTR_ACL
);
2083 if (aiter
== attrs
.end()) {
2086 auto iter
= aiter
->second
.begin();
2089 } catch (buffer::error
& err
) {
2090 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): could not decode policy, caught buffer::error" << dendl
;
2097 int init_bucket(const RGWBucketInfo
& bucket_info
,
2098 const map
<string
, bufferlist
>& bucket_attrs
,
2100 return bs
->init(this, bucket_info
, bucket_attrs
);
2104 int RGWUserPermHandler::Bucket::init(RGWUserPermHandler
*handler
,
2105 const RGWBucketInfo
& bucket_info
,
2106 const map
<string
, bufferlist
>& bucket_attrs
)
2108 sync_env
= handler
->sync_env
;
2109 info
= handler
->info
;
2111 int r
= RGWUserPermHandler::policy_from_attrs(sync_env
->cct
, bucket_attrs
, &bucket_acl
);
2116 ps
.emplace(sync_env
->cct
,
2118 info
->identity
.get(),
2120 info
->identity
->get_perm_mask(),
2121 false, /* defer to bucket acls */
2122 nullptr, /* referer */
2123 false); /* request_payer */
2128 bool RGWUserPermHandler::Bucket::verify_bucket_permission(int perm
)
2130 return verify_bucket_permission_no_policy(sync_env
->dpp
,
2137 bool RGWUserPermHandler::Bucket::verify_object_permission(const map
<string
, bufferlist
>& obj_attrs
,
2140 RGWAccessControlPolicy obj_acl
;
2142 int r
= policy_from_attrs(sync_env
->cct
, obj_attrs
, &obj_acl
);
2147 return verify_bucket_permission_no_policy(sync_env
->dpp
,
2154 class RGWFetchObjFilter_Sync
: public RGWFetchObjFilter_Default
{
2155 rgw_bucket_sync_pipe sync_pipe
;
2157 std::shared_ptr
<RGWUserPermHandler::Bucket
> bucket_perms
;
2158 std::optional
<rgw_sync_pipe_dest_params
> verify_dest_params
;
2160 std::optional
<ceph::real_time
> mtime
;
2161 std::optional
<string
> etag
;
2162 std::optional
<uint64_t> obj_size
;
2164 std::unique_ptr
<rgw::auth::Identity
> identity
;
2166 std::shared_ptr
<bool> need_retry
;
2169 RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe
& _sync_pipe
,
2170 std::shared_ptr
<RGWUserPermHandler::Bucket
>& _bucket_perms
,
2171 std::optional
<rgw_sync_pipe_dest_params
>&& _verify_dest_params
,
2172 std::shared_ptr
<bool>& _need_retry
) : sync_pipe(_sync_pipe
),
2173 bucket_perms(_bucket_perms
),
2174 verify_dest_params(std::move(_verify_dest_params
)),
2175 need_retry(_need_retry
) {
2176 *need_retry
= false;
2179 int filter(CephContext
*cct
,
2180 const rgw_obj_key
& source_key
,
2181 const RGWBucketInfo
& dest_bucket_info
,
2182 std::optional
<rgw_placement_rule
> dest_placement_rule
,
2183 const map
<string
, bufferlist
>& obj_attrs
,
2184 std::optional
<rgw_user
> *poverride_owner
,
2185 const rgw_placement_rule
**prule
) override
;
2188 int RGWFetchObjFilter_Sync::filter(CephContext
*cct
,
2189 const rgw_obj_key
& source_key
,
2190 const RGWBucketInfo
& dest_bucket_info
,
2191 std::optional
<rgw_placement_rule
> dest_placement_rule
,
2192 const map
<string
, bufferlist
>& obj_attrs
,
2193 std::optional
<rgw_user
> *poverride_owner
,
2194 const rgw_placement_rule
**prule
)
2196 int abort_err
= -ERR_PRECONDITION_FAILED
;
2198 rgw_sync_pipe_params params
;
2200 RGWObjTags obj_tags
;
2202 auto iter
= obj_attrs
.find(RGW_ATTR_TAGS
);
2203 if (iter
!= obj_attrs
.end()) {
2205 auto it
= iter
->second
.cbegin();
2206 obj_tags
.decode(it
);
2207 } catch (buffer::error
&err
) {
2208 ldout(cct
, 0) << "ERROR: " << __func__
<< ": caught buffer::error couldn't decode TagSet " << dendl
;
2212 if (!sync_pipe
.info
.handler
.find_obj_params(source_key
,
2213 obj_tags
.get_tags(),
2218 if (verify_dest_params
&&
2219 !(*verify_dest_params
== params
.dest
)) {
2220 /* raced! original dest params were different, will need to retry */
2221 ldout(cct
, 0) << "WARNING: " << __func__
<< ": pipe dest params are different than original params, must have raced with object rewrite, retrying" << dendl
;
2226 std::optional
<std::map
<string
, bufferlist
> > new_attrs
;
2228 if (params
.dest
.acl_translation
) {
2229 rgw_user
& acl_translation_owner
= params
.dest
.acl_translation
->owner
;
2230 if (!acl_translation_owner
.empty()) {
2231 if (params
.mode
== rgw_sync_pipe_params::MODE_USER
&&
2232 acl_translation_owner
!= dest_bucket_info
.owner
) {
2233 ldout(cct
, 0) << "ERROR: " << __func__
<< ": acl translation was requested, but user (" << acl_translation_owner
2234 << ") is not dest bucket owner (" << dest_bucket_info
.owner
<< ")" << dendl
;
2237 *poverride_owner
= acl_translation_owner
;
2240 if (params
.mode
== rgw_sync_pipe_params::MODE_USER
) {
2241 if (!bucket_perms
->verify_object_permission(obj_attrs
, RGW_PERM_READ
)) {
2242 ldout(cct
, 0) << "ERROR: " << __func__
<< ": permission check failed: user not allowed to fetch object" << dendl
;
2247 if (!dest_placement_rule
&&
2248 params
.dest
.storage_class
) {
2249 dest_rule
.storage_class
= *params
.dest
.storage_class
;
2250 dest_rule
.inherit_from(dest_bucket_info
.placement_rule
);
2251 dest_placement_rule
= dest_rule
;
2252 *prule
= &dest_rule
;
2255 return RGWFetchObjFilter_Default::filter(cct
,
2258 dest_placement_rule
,
2264 class RGWObjFetchCR
: public RGWCoroutine
{
2266 RGWDataSyncEnv
*sync_env
;
2267 rgw_bucket_sync_pipe
& sync_pipe
;
2269 std::optional
<rgw_obj_key
> dest_key
;
2270 std::optional
<uint64_t> versioned_epoch
;
2271 rgw_zone_set
*zones_trace
;
2273 bool need_more_info
{false};
2274 bool check_change
{false};
2276 ceph::real_time src_mtime
;
2279 map
<string
, bufferlist
> src_attrs
;
2280 map
<string
, string
> src_headers
;
2282 std::optional
<rgw_user
> param_user
;
2283 rgw_sync_pipe_params::Mode param_mode
;
2285 std::optional
<RGWUserPermHandler
> user_perms
;
2286 std::shared_ptr
<RGWUserPermHandler::Bucket
> source_bucket_perms
;
2287 RGWUserPermHandler::Bucket dest_bucket_perms
;
2289 std::optional
<rgw_sync_pipe_dest_params
> dest_params
;
2292 std::shared_ptr
<bool> need_retry
;
2294 RGWObjFetchCR(RGWDataSyncCtx
*_sc
,
2295 rgw_bucket_sync_pipe
& _sync_pipe
,
2297 std::optional
<rgw_obj_key
> _dest_key
,
2298 std::optional
<uint64_t> _versioned_epoch
,
2299 rgw_zone_set
*_zones_trace
) : RGWCoroutine(_sc
->cct
),
2300 sc(_sc
), sync_env(_sc
->env
),
2301 sync_pipe(_sync_pipe
),
2303 dest_key(_dest_key
),
2304 versioned_epoch(_versioned_epoch
),
2305 zones_trace(_zones_trace
) {
2309 int operate(const DoutPrefixProvider
*dpp
) override
{
2312 #define MAX_RACE_RETRIES_OBJ_FETCH 10
2313 for (try_num
= 0; try_num
< MAX_RACE_RETRIES_OBJ_FETCH
; ++try_num
) {
2316 std::optional
<rgw_user
> param_acl_translation
;
2317 std::optional
<string
> param_storage_class
;
2319 if (!sync_pipe
.info
.handler
.find_basic_info_without_tags(key
,
2321 ¶m_acl_translation
,
2322 ¶m_storage_class
,
2325 if (!need_more_info
) {
2326 return set_cr_error(-ERR_PRECONDITION_FAILED
);
2331 if (need_more_info
) {
2332 ldout(cct
, 20) << "Could not determine exact policy rule for obj=" << key
<< ", will read source object attributes" << dendl
;
2334 * we need to fetch info about source object, so that we can determine
2335 * the correct policy configuration. This can happen if there are multiple
2336 * policy rules, and some depend on the object tagging */
2337 yield
call(new RGWStatRemoteObjCR(sync_env
->async_rados
,
2340 sync_pipe
.info
.source_bs
.bucket
,
2348 return set_cr_error(retcode
);
2351 RGWObjTags obj_tags
;
2353 auto iter
= src_attrs
.find(RGW_ATTR_TAGS
);
2354 if (iter
!= src_attrs
.end()) {
2356 auto it
= iter
->second
.cbegin();
2357 obj_tags
.decode(it
);
2358 } catch (buffer::error
&err
) {
2359 ldout(cct
, 0) << "ERROR: " << __func__
<< ": caught buffer::error couldn't decode TagSet " << dendl
;
2363 rgw_sync_pipe_params params
;
2364 if (!sync_pipe
.info
.handler
.find_obj_params(key
,
2365 obj_tags
.get_tags(),
2367 return set_cr_error(-ERR_PRECONDITION_FAILED
);
2370 param_user
= params
.user
;
2371 param_mode
= params
.mode
;
2373 dest_params
= params
.dest
;
2376 if (param_mode
== rgw_sync_pipe_params::MODE_USER
) {
2378 ldout(cct
, 20) << "ERROR: " << __func__
<< ": user level sync but user param not set" << dendl
;
2379 return set_cr_error(-EPERM
);
2381 user_perms
.emplace(sync_env
, *param_user
);
2383 yield
call(user_perms
->init_cr());
2385 ldout(cct
, 20) << "ERROR: " << __func__
<< ": failed to init user perms manager for uid=" << *param_user
<< dendl
;
2386 return set_cr_error(retcode
);
2389 /* verify that user is allowed to write at the target bucket */
2390 int r
= user_perms
->init_bucket(sync_pipe
.dest_bucket_info
,
2391 sync_pipe
.dest_bucket_attrs
,
2392 &dest_bucket_perms
);
2394 ldout(cct
, 20) << "ERROR: " << __func__
<< ": failed to init bucket perms manager for uid=" << *param_user
<< " bucket=" << sync_pipe
.source_bucket_info
.bucket
.get_key() << dendl
;
2395 return set_cr_error(retcode
);
2398 if (!dest_bucket_perms
.verify_bucket_permission(RGW_PERM_WRITE
)) {
2399 ldout(cct
, 0) << "ERROR: " << __func__
<< ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe
.info
.dest_bs
.bucket
.get_key() << ")" << dendl
;
2403 /* init source bucket permission structure */
2404 source_bucket_perms
= make_shared
<RGWUserPermHandler::Bucket
>();
2405 r
= user_perms
->init_bucket(sync_pipe
.source_bucket_info
,
2406 sync_pipe
.source_bucket_attrs
,
2407 source_bucket_perms
.get());
2409 ldout(cct
, 20) << "ERROR: " << __func__
<< ": failed to init bucket perms manager for uid=" << *param_user
<< " bucket=" << sync_pipe
.source_bucket_info
.bucket
.get_key() << dendl
;
2410 return set_cr_error(retcode
);
2416 need_retry
= make_shared
<bool>();
2418 auto filter
= make_shared
<RGWFetchObjFilter_Sync
>(sync_pipe
,
2419 source_bucket_perms
,
2420 std::move(dest_params
),
2423 call(new RGWFetchRemoteObjCR(sync_env
->async_rados
, sync_env
->store
, sc
->source_zone
,
2425 sync_pipe
.info
.source_bs
.bucket
,
2426 std::nullopt
, sync_pipe
.dest_bucket_info
,
2427 key
, dest_key
, versioned_epoch
,
2429 std::static_pointer_cast
<RGWFetchObjFilter
>(filter
),
2430 zones_trace
, sync_env
->counters
, dpp
));
2436 return set_cr_error(retcode
);
2439 return set_cr_done();
2442 ldout(cct
, 0) << "ERROR: " << __func__
<< ": Too many retries trying to fetch object, possibly a bug: bucket=" << sync_pipe
.source_bucket_info
.bucket
.get_key() << " key=" << key
<< dendl
;
2444 return set_cr_error(-EIO
);
2450 RGWCoroutine
*RGWDefaultDataSyncModule::sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
)
2452 return new RGWObjFetchCR(sc
, sync_pipe
, key
, std::nullopt
, versioned_epoch
, zones_trace
);
2455 RGWCoroutine
*RGWDefaultDataSyncModule::remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
2456 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
2458 auto sync_env
= sc
->env
;
2459 return new RGWRemoveObjCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->store
, sc
->source_zone
,
2460 sync_pipe
.dest_bucket_info
, key
, versioned
, versioned_epoch
,
2461 NULL
, NULL
, false, &mtime
, zones_trace
);
2464 RGWCoroutine
*RGWDefaultDataSyncModule::create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
2465 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
2467 auto sync_env
= sc
->env
;
2468 return new RGWRemoveObjCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->store
, sc
->source_zone
,
2469 sync_pipe
.dest_bucket_info
, key
, versioned
, versioned_epoch
,
2470 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
2473 class RGWArchiveDataSyncModule
: public RGWDefaultDataSyncModule
{
2475 RGWArchiveDataSyncModule() {}
2477 RGWCoroutine
*sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
2478 RGWCoroutine
*remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
2479 RGWCoroutine
*create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
2480 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
2483 class RGWArchiveSyncModuleInstance
: public RGWDefaultSyncModuleInstance
{
2484 RGWArchiveDataSyncModule data_handler
;
2486 RGWArchiveSyncModuleInstance() {}
2487 RGWDataSyncModule
*get_data_handler() override
{
2488 return &data_handler
;
2490 RGWMetadataHandler
*alloc_bucket_meta_handler() override
{
2491 return RGWArchiveBucketMetaHandlerAllocator::alloc();
2493 RGWBucketInstanceMetadataHandlerBase
*alloc_bucket_instance_meta_handler() override
{
2494 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
2498 int RGWArchiveSyncModule::create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
)
2500 instance
->reset(new RGWArchiveSyncModuleInstance());
2504 RGWCoroutine
*RGWArchiveDataSyncModule::sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
)
2506 auto sync_env
= sc
->env
;
2507 ldout(sc
->cct
, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
2508 if (!sync_pipe
.dest_bucket_info
.versioned() ||
2509 (sync_pipe
.dest_bucket_info
.flags
& BUCKET_VERSIONS_SUSPENDED
)) {
2510 ldout(sc
->cct
, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl
;
2511 sync_pipe
.dest_bucket_info
.flags
= (sync_pipe
.dest_bucket_info
.flags
& ~BUCKET_VERSIONS_SUSPENDED
) | BUCKET_VERSIONED
;
2512 int op_ret
= sync_env
->store
->getRados()->put_bucket_instance_info(sync_pipe
.dest_bucket_info
, false, real_time(), NULL
, sync_env
->dpp
);
2514 ldpp_dout(sync_env
->dpp
, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl
;
2519 std::optional
<rgw_obj_key
> dest_key
;
2521 if (versioned_epoch
.value_or(0) == 0) { /* force version if not set */
2522 versioned_epoch
= 0;
2524 if (key
.instance
.empty()) {
2525 sync_env
->store
->getRados()->gen_rand_obj_instance_name(&(*dest_key
));
2529 return new RGWObjFetchCR(sc
, sync_pipe
, key
, dest_key
, versioned_epoch
, zones_trace
);
2532 RGWCoroutine
*RGWArchiveDataSyncModule::remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
2533 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
2535 ldout(sc
->cct
, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
2539 RGWCoroutine
*RGWArchiveDataSyncModule::create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
2540 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
2542 ldout(sc
->cct
, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
2543 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
2544 auto sync_env
= sc
->env
;
2545 return new RGWRemoveObjCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->store
, sc
->source_zone
,
2546 sync_pipe
.dest_bucket_info
, key
, versioned
, versioned_epoch
,
2547 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
2550 class RGWDataSyncControlCR
: public RGWBackoffControlCR
2553 RGWDataSyncEnv
*sync_env
;
2554 uint32_t num_shards
;
2556 RGWSyncTraceNodeRef tn
;
2558 static constexpr bool exit_on_error
= false; // retry on all errors
2560 RGWDataSyncControlCR(RGWDataSyncCtx
*_sc
, uint32_t _num_shards
,
2561 RGWSyncTraceNodeRef
& _tn_parent
) : RGWBackoffControlCR(_sc
->cct
, exit_on_error
),
2562 sc(_sc
), sync_env(_sc
->env
), num_shards(_num_shards
) {
2563 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "sync");
2566 RGWCoroutine
*alloc_cr() override
{
2567 return new RGWDataSyncCR(sc
, num_shards
, tn
, backoff_ptr());
2570 void wakeup(int shard_id
, set
<string
>& keys
) {
2571 ceph::mutex
& m
= cr_lock();
2574 RGWDataSyncCR
*cr
= static_cast<RGWDataSyncCR
*>(get_cr());
2584 tn
->log(20, SSTR("notify shard=" << shard_id
<< " keys=" << keys
));
2585 cr
->wakeup(shard_id
, keys
);
2592 void RGWRemoteDataLog::wakeup(int shard_id
, set
<string
>& keys
) {
2593 std::shared_lock rl
{lock
};
2594 if (!data_sync_cr
) {
2597 data_sync_cr
->wakeup(shard_id
, keys
);
2600 int RGWRemoteDataLog::run_sync(const DoutPrefixProvider
*dpp
, int num_shards
)
2603 data_sync_cr
= new RGWDataSyncControlCR(&sc
, num_shards
, tn
);
2604 data_sync_cr
->get(); // run() will drop a ref, so take another
2607 int r
= run(dpp
, data_sync_cr
);
2610 data_sync_cr
->put();
2611 data_sync_cr
= NULL
;
2615 ldpp_dout(dpp
, 0) << "ERROR: failed to run sync" << dendl
;
2621 CephContext
*RGWDataSyncStatusManager::get_cct() const
2623 return store
->ctx();
2626 int RGWDataSyncStatusManager::init(const DoutPrefixProvider
*dpp
)
2630 if (!store
->svc()->zone
->find_zone(source_zone
, &zone_def
)) {
2631 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone
<< dendl
;
2635 if (!store
->svc()->sync_modules
->get_manager()->supports_data_export(zone_def
->tier_type
)) {
2639 const RGWZoneParams
& zone_params
= store
->svc()->zone
->get_zone_params();
2641 if (sync_module
== nullptr) {
2642 sync_module
= store
->get_sync_module();
2645 conn
= store
->svc()->zone
->get_zone_conn(source_zone
);
2647 ldpp_dout(this, 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
2651 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
2653 int r
= source_log
.init(source_zone
, conn
, error_logger
, store
->getRados()->get_sync_tracer(),
2654 sync_module
, counters
);
2656 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
2661 rgw_datalog_info datalog_info
;
2662 r
= source_log
.read_log_info(dpp
, &datalog_info
);
2664 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r
<< dendl
;
2669 num_shards
= datalog_info
.num_shards
;
2671 for (int i
= 0; i
< num_shards
; i
++) {
2672 shard_objs
[i
] = rgw_raw_obj(zone_params
.log_pool
, shard_obj_name(source_zone
, i
));
2678 void RGWDataSyncStatusManager::finalize()
2680 delete error_logger
;
2681 error_logger
= nullptr;
2684 unsigned RGWDataSyncStatusManager::get_subsys() const
2689 std::ostream
& RGWDataSyncStatusManager::gen_prefix(std::ostream
& out
) const
2691 auto zone
= std::string_view
{source_zone
.id
};
2692 return out
<< "data sync zone:" << zone
.substr(0, 8) << ' ';
2695 string
RGWDataSyncStatusManager::sync_status_oid(const rgw_zone_id
& source_zone
)
2697 char buf
[datalog_sync_status_oid_prefix
.size() + source_zone
.id
.size() + 16];
2698 snprintf(buf
, sizeof(buf
), "%s.%s", datalog_sync_status_oid_prefix
.c_str(), source_zone
.id
.c_str());
2703 string
RGWDataSyncStatusManager::shard_obj_name(const rgw_zone_id
& source_zone
, int shard_id
)
2705 char buf
[datalog_sync_status_shard_prefix
.size() + source_zone
.id
.size() + 16];
2706 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_status_shard_prefix
.c_str(), source_zone
.id
.c_str(), shard_id
);
2711 class RGWReadRemoteBucketIndexLogInfoCR
: public RGWCoroutine
{
2713 RGWDataSyncEnv
*sync_env
;
2714 const string instance_key
;
2716 rgw_bucket_index_marker_info
*info
;
2719 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx
*_sc
,
2720 const rgw_bucket_shard
& bs
,
2721 rgw_bucket_index_marker_info
*_info
)
2722 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
2723 instance_key(bs
.get_key()), info(_info
) {}
2725 int operate(const DoutPrefixProvider
*dpp
) override
{
2728 rgw_http_param_pair pairs
[] = { { "type" , "bucket-index" },
2729 { "bucket-instance", instance_key
.c_str() },
2733 string p
= "/admin/log/";
2734 call(new RGWReadRESTResourceCR
<rgw_bucket_index_marker_info
>(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
, p
, pairs
, info
));
2737 return set_cr_error(retcode
);
2739 return set_cr_done();
2745 class RGWInitBucketShardSyncStatusCoroutine
: public RGWCoroutine
{
2747 RGWDataSyncEnv
*sync_env
;
2749 const rgw_bucket_sync_pair_info
& sync_pair
;
2750 const string sync_status_oid
;
2752 rgw_bucket_shard_sync_info
& status
;
2753 RGWObjVersionTracker
& objv_tracker
;
2754 rgw_bucket_index_marker_info info
;
2756 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx
*_sc
,
2757 const rgw_bucket_sync_pair_info
& _sync_pair
,
2758 rgw_bucket_shard_sync_info
& _status
,
2759 RGWObjVersionTracker
& objv_tracker
)
2760 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
2761 sync_pair(_sync_pair
),
2762 sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc
->source_zone
, _sync_pair
)),
2763 status(_status
), objv_tracker(objv_tracker
)
2766 int operate(const DoutPrefixProvider
*dpp
) override
{
2768 /* fetch current position in logs */
2769 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sc
, sync_pair
.source_bs
, &info
));
2770 if (retcode
< 0 && retcode
!= -ENOENT
) {
2771 ldout(cct
, 0) << "ERROR: failed to fetch bucket index status" << dendl
;
2772 return set_cr_error(retcode
);
2775 auto store
= sync_env
->store
;
2776 rgw_raw_obj
obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, sync_status_oid
);
2777 const bool stopped
= status
.state
== rgw_bucket_shard_sync_info::StateStopped
;
2778 bool write_status
= false;
2780 if (info
.syncstopped
) {
2781 if (stopped
&& !sync_env
->sync_module
->should_full_sync()) {
2782 // preserve our current incremental marker position
2783 write_status
= true;
2786 // whether or not to do full sync, incremental sync will follow anyway
2787 if (sync_env
->sync_module
->should_full_sync()) {
2788 status
.state
= rgw_bucket_shard_sync_info::StateFullSync
;
2789 status
.inc_marker
.position
= info
.max_marker
;
2791 // clear the marker position unless we're resuming from SYNCSTOP
2793 status
.inc_marker
.position
= "";
2795 status
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
2797 write_status
= true;
2798 status
.inc_marker
.timestamp
= ceph::real_clock::now();
2802 map
<string
, bufferlist
> attrs
;
2803 status
.encode_all_attrs(attrs
);
2804 call(new RGWSimpleRadosWriteAttrsCR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
, obj
, attrs
, &objv_tracker
));
2806 call(new RGWRadosRemoveCR(store
, obj
, &objv_tracker
));
2809 if (info
.syncstopped
) {
2813 return set_cr_error(retcode
);
2815 return set_cr_done();
2821 RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider
*_dpp
,
2822 RGWDataSyncEnv
*_sync_env
,
2823 const rgw_zone_id
& _source_zone
,
2825 const RGWBucketInfo
& source_bucket_info
,
2826 const rgw_bucket
& dest_bucket
) : dpp(_dpp
), sync_env(_sync_env
)
2829 source_zone
= _source_zone
;
2831 int num_shards
= (source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
<= 0 ?
2832 1 : source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
);
2834 sync_pairs
.resize(num_shards
);
2836 int cur_shard
= std::min
<int>(source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
, 0);
2838 for (int i
= 0; i
< num_shards
; ++i
, ++cur_shard
) {
2839 auto& sync_pair
= sync_pairs
[i
];
2841 sync_pair
.source_bs
.bucket
= source_bucket_info
.bucket
;
2842 sync_pair
.dest_bs
.bucket
= dest_bucket
;
2844 sync_pair
.source_bs
.shard_id
= (source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
> 0 ? cur_shard
: -1);
2846 if (dest_bucket
== source_bucket_info
.bucket
) {
2847 sync_pair
.dest_bs
.shard_id
= sync_pair
.source_bs
.shard_id
;
2849 sync_pair
.dest_bs
.shard_id
= -1;
2853 sc
.init(sync_env
, conn
, source_zone
);
2856 RGWCoroutine
*RGWRemoteBucketManager::init_sync_status_cr(int num
, RGWObjVersionTracker
& objv_tracker
)
2858 if ((size_t)num
>= sync_pairs
.size()) {
2861 return new RGWInitBucketShardSyncStatusCoroutine(&sc
, sync_pairs
[num
], init_status
, objv_tracker
);
2864 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2867 static bool decode_attr(CephContext
*cct
, map
<string
, bufferlist
>& attrs
, const string
& attr_name
, T
*val
)
2869 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
2870 if (iter
== attrs
.end()) {
2875 auto biter
= iter
->second
.cbegin();
2877 decode(*val
, biter
);
2878 } catch (buffer::error
& err
) {
2879 ldout(cct
, 0) << "ERROR: failed to decode attribute: " << attr_name
<< dendl
;
2885 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
)
2887 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"state", &state
)) {
2888 decode_attr(cct
, attrs
, "state", &state
);
2890 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"full_marker", &full_marker
)) {
2891 decode_attr(cct
, attrs
, "full_marker", &full_marker
);
2893 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"inc_marker", &inc_marker
)) {
2894 decode_attr(cct
, attrs
, "inc_marker", &inc_marker
);
2898 void rgw_bucket_shard_sync_info::encode_all_attrs(map
<string
, bufferlist
>& attrs
)
2900 encode_state_attr(attrs
);
2901 full_marker
.encode_attr(attrs
);
2902 inc_marker
.encode_attr(attrs
);
2905 void rgw_bucket_shard_sync_info::encode_state_attr(map
<string
, bufferlist
>& attrs
)
2908 encode(state
, attrs
[BUCKET_SYNC_ATTR_PREFIX
"state"]);
2911 void rgw_bucket_shard_full_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
2914 encode(*this, attrs
[BUCKET_SYNC_ATTR_PREFIX
"full_marker"]);
2917 void rgw_bucket_shard_inc_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
2920 encode(*this, attrs
[BUCKET_SYNC_ATTR_PREFIX
"inc_marker"]);
2923 class RGWReadBucketPipeSyncStatusCoroutine
: public RGWCoroutine
{
2925 RGWDataSyncEnv
*sync_env
;
2927 rgw_bucket_shard_sync_info
*status
;
2928 RGWObjVersionTracker
* objv_tracker
;
2929 map
<string
, bufferlist
> attrs
;
2931 RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx
*_sc
,
2932 const rgw_bucket_sync_pair_info
& sync_pair
,
2933 rgw_bucket_shard_sync_info
*_status
,
2934 RGWObjVersionTracker
* objv_tracker
)
2935 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
2936 oid(RGWBucketPipeSyncStatusManager::status_oid(sc
->source_zone
, sync_pair
)),
2937 status(_status
), objv_tracker(objv_tracker
)
2939 int operate(const DoutPrefixProvider
*dpp
) override
;
2942 int RGWReadBucketPipeSyncStatusCoroutine::operate(const DoutPrefixProvider
*dpp
)
2945 yield
call(new RGWSimpleRadosReadAttrsCR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
2946 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, oid
),
2947 &attrs
, true, objv_tracker
));
2948 if (retcode
== -ENOENT
) {
2949 *status
= rgw_bucket_shard_sync_info();
2950 return set_cr_done();
2953 ldpp_dout(dpp
, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid
<< " ret=" << retcode
<< dendl
;
2954 return set_cr_error(retcode
);
2956 status
->decode_from_attrs(sync_env
->cct
, attrs
);
2957 return set_cr_done();
2962 #define OMAP_READ_MAX_ENTRIES 10
2963 class RGWReadRecoveringBucketShardsCoroutine
: public RGWCoroutine
{
2965 RGWDataSyncEnv
*sync_env
;
2966 rgw::sal::RadosStore
* store
;
2971 set
<string
>& recovering_buckets
;
2975 RGWRadosGetOmapKeysCR::ResultPtr omapkeys
;
2976 set
<string
> error_entries
;
2977 int max_omap_entries
;
2981 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncCtx
*_sc
, const int _shard_id
,
2982 set
<string
>& _recovering_buckets
, const int _max_entries
)
2983 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
2984 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
2985 recovering_buckets(_recovering_buckets
), max_omap_entries(OMAP_READ_MAX_ENTRIES
)
2987 error_oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
) + ".retry";
2990 int operate(const DoutPrefixProvider
*dpp
) override
;
2993 int RGWReadRecoveringBucketShardsCoroutine::operate(const DoutPrefixProvider
*dpp
)
2996 //read recovering bucket shards
2999 omapkeys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
3000 yield
call(new RGWRadosGetOmapKeysCR(store
, rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, error_oid
),
3001 marker
, max_omap_entries
, omapkeys
));
3003 if (retcode
== -ENOENT
) {
3008 ldpp_dout(dpp
, 0) << "failed to read recovering bucket shards with "
3009 << cpp_strerror(retcode
) << dendl
;
3010 return set_cr_error(retcode
);
3013 error_entries
= std::move(omapkeys
->entries
);
3014 if (error_entries
.empty()) {
3018 count
+= error_entries
.size();
3019 marker
= *error_entries
.rbegin();
3020 recovering_buckets
.insert(std::make_move_iterator(error_entries
.begin()),
3021 std::make_move_iterator(error_entries
.end()));
3022 } while (omapkeys
->more
&& count
< max_entries
);
3024 return set_cr_done();
3030 class RGWReadPendingBucketShardsCoroutine
: public RGWCoroutine
{
3032 RGWDataSyncEnv
*sync_env
;
3033 rgw::sal::RadosStore
* store
;
3038 set
<string
>& pending_buckets
;
3042 rgw_data_sync_marker
* sync_marker
;
3045 std::string next_marker
;
3046 list
<rgw_data_change_log_entry
> log_entries
;
3050 RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx
*_sc
, const int _shard_id
,
3051 set
<string
>& _pending_buckets
,
3052 rgw_data_sync_marker
* _sync_marker
, const int _max_entries
)
3053 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3054 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
3055 pending_buckets(_pending_buckets
), sync_marker(_sync_marker
)
3057 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
);
3060 int operate(const DoutPrefixProvider
*dpp
) override
;
3063 int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider
*dpp
)
3066 //read sync status marker
3067 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
3068 yield
call(new CR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
3069 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, status_oid
),
3072 ldpp_dout(dpp
, 0) << "failed to read sync status marker with "
3073 << cpp_strerror(retcode
) << dendl
;
3074 return set_cr_error(retcode
);
3077 //read pending bucket shards
3078 marker
= sync_marker
->marker
;
3081 yield
call(new RGWReadRemoteDataLogShardCR(sc
, shard_id
, marker
,
3082 &next_marker
, &log_entries
, &truncated
));
3084 if (retcode
== -ENOENT
) {
3089 ldpp_dout(dpp
, 0) << "failed to read remote data log info with "
3090 << cpp_strerror(retcode
) << dendl
;
3091 return set_cr_error(retcode
);
3094 if (log_entries
.empty()) {
3098 count
+= log_entries
.size();
3099 for (const auto& entry
: log_entries
) {
3100 pending_buckets
.insert(entry
.entry
.key
);
3102 }while(truncated
&& count
< max_entries
);
3104 return set_cr_done();
3110 int RGWRemoteDataLog::read_shard_status(const DoutPrefixProvider
*dpp
, int shard_id
, set
<string
>& pending_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
)
3112 // cannot run concurrently with run_sync(), so run in a separate manager
3113 RGWCoroutinesManager
crs(store
->ctx(), store
->getRados()->get_cr_registry());
3114 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
3115 int ret
= http_manager
.start();
3117 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
3120 RGWDataSyncEnv sync_env_local
= sync_env
;
3121 sync_env_local
.http_manager
= &http_manager
;
3122 RGWDataSyncCtx sc_local
= sc
;
3123 sc_local
.env
= &sync_env_local
;
3124 list
<RGWCoroutinesStack
*> stacks
;
3125 RGWCoroutinesStack
* recovering_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
3126 recovering_stack
->call(new RGWReadRecoveringBucketShardsCoroutine(&sc_local
, shard_id
, recovering_buckets
, max_entries
));
3127 stacks
.push_back(recovering_stack
);
3128 RGWCoroutinesStack
* pending_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
3129 pending_stack
->call(new RGWReadPendingBucketShardsCoroutine(&sc_local
, shard_id
, pending_buckets
, sync_marker
, max_entries
));
3130 stacks
.push_back(pending_stack
);
3131 ret
= crs
.run(dpp
, stacks
);
3132 http_manager
.stop();
3136 RGWCoroutine
*RGWRemoteBucketManager::read_sync_status_cr(int num
, rgw_bucket_shard_sync_info
*sync_status
)
3138 if ((size_t)num
>= sync_pairs
.size()) {
3142 return new RGWReadBucketPipeSyncStatusCoroutine(&sc
, sync_pairs
[num
], sync_status
, nullptr);
3145 RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore
* _store
,
3146 std::optional
<rgw_zone_id
> _source_zone
,
3147 std::optional
<rgw_bucket
> _source_bucket
,
3148 const rgw_bucket
& _dest_bucket
) : store(_store
),
3149 cr_mgr(_store
->ctx(), _store
->getRados()->get_cr_registry()),
3150 http_manager(store
->ctx(), cr_mgr
.get_completion_mgr()),
3151 source_zone(_source_zone
), source_bucket(_source_bucket
),
3152 conn(NULL
), error_logger(NULL
),
3153 dest_bucket(_dest_bucket
),
3158 RGWBucketPipeSyncStatusManager::~RGWBucketPipeSyncStatusManager()
3160 for (vector
<RGWRemoteBucketManager
*>::iterator iter
= source_mgrs
.begin(); iter
!= source_mgrs
.end(); ++iter
) {
3163 delete error_logger
;
3166 CephContext
*RGWBucketPipeSyncStatusManager::get_cct() const
3168 return store
->ctx();
3171 void rgw_bucket_entry_owner::decode_json(JSONObj
*obj
)
3173 JSONDecoder::decode_json("ID", id
, obj
);
3174 JSONDecoder::decode_json("DisplayName", display_name
, obj
);
3177 struct bucket_list_entry
{
3184 string storage_class
;
3185 rgw_bucket_entry_owner owner
;
3186 uint64_t versioned_epoch
;
3189 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
3191 void decode_json(JSONObj
*obj
) {
3192 JSONDecoder::decode_json("IsDeleteMarker", delete_marker
, obj
);
3193 JSONDecoder::decode_json("Key", key
.name
, obj
);
3194 JSONDecoder::decode_json("VersionId", key
.instance
, obj
);
3195 JSONDecoder::decode_json("IsLatest", is_latest
, obj
);
3197 JSONDecoder::decode_json("RgwxMtime", mtime_str
, obj
);
3201 if (parse_iso8601(mtime_str
.c_str(), &t
, &nsec
)) {
3203 ts
.tv_sec
= (uint64_t)internal_timegm(&t
);
3205 mtime
= real_clock::from_ceph_timespec(ts
);
3207 JSONDecoder::decode_json("ETag", etag
, obj
);
3208 JSONDecoder::decode_json("Size", size
, obj
);
3209 JSONDecoder::decode_json("StorageClass", storage_class
, obj
);
3210 JSONDecoder::decode_json("Owner", owner
, obj
);
3211 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch
, obj
);
3212 JSONDecoder::decode_json("RgwxTag", rgw_tag
, obj
);
3213 if (key
.instance
== "null" && !versioned_epoch
) {
3214 key
.instance
.clear();
3218 RGWModifyOp
get_modify_op() const {
3219 if (delete_marker
) {
3220 return CLS_RGW_OP_LINK_OLH_DM
;
3221 } else if (!key
.instance
.empty() && key
.instance
!= "null") {
3222 return CLS_RGW_OP_LINK_OLH
;
3224 return CLS_RGW_OP_ADD
;
3229 struct bucket_list_result
{
3233 string version_id_marker
;
3236 list
<bucket_list_entry
> entries
;
3238 bucket_list_result() : max_keys(0), is_truncated(false) {}
3240 void decode_json(JSONObj
*obj
) {
3241 JSONDecoder::decode_json("Name", name
, obj
);
3242 JSONDecoder::decode_json("Prefix", prefix
, obj
);
3243 JSONDecoder::decode_json("KeyMarker", key_marker
, obj
);
3244 JSONDecoder::decode_json("VersionIdMarker", version_id_marker
, obj
);
3245 JSONDecoder::decode_json("MaxKeys", max_keys
, obj
);
3246 JSONDecoder::decode_json("IsTruncated", is_truncated
, obj
);
3247 JSONDecoder::decode_json("Entries", entries
, obj
);
3251 class RGWListBucketShardCR
: public RGWCoroutine
{
3253 RGWDataSyncEnv
*sync_env
;
3254 const rgw_bucket_shard
& bs
;
3255 const string instance_key
;
3256 rgw_obj_key marker_position
;
3258 bucket_list_result
*result
;
3261 RGWListBucketShardCR(RGWDataSyncCtx
*_sc
, const rgw_bucket_shard
& bs
,
3262 rgw_obj_key
& _marker_position
, bucket_list_result
*_result
)
3263 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
), bs(bs
),
3264 instance_key(bs
.get_key()), marker_position(_marker_position
),
3267 int operate(const DoutPrefixProvider
*dpp
) override
{
3270 rgw_http_param_pair pairs
[] = { { "rgwx-bucket-instance", instance_key
.c_str() },
3271 { "versions" , NULL
},
3272 { "format" , "json" },
3273 { "objs-container" , "true" },
3274 { "key-marker" , marker_position
.name
.c_str() },
3275 { "version-id-marker" , marker_position
.instance
.c_str() },
3277 // don't include tenant in the url, it's already part of instance_key
3278 string p
= string("/") + bs
.bucket
.name
;
3279 call(new RGWReadRESTResourceCR
<bucket_list_result
>(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
, p
, pairs
, result
));
3282 return set_cr_error(retcode
);
3284 return set_cr_done();
3290 class RGWListBucketIndexLogCR
: public RGWCoroutine
{
3292 RGWDataSyncEnv
*sync_env
;
3293 const string instance_key
;
3296 list
<rgw_bi_log_entry
> *result
;
3297 std::optional
<PerfGuard
> timer
;
3300 RGWListBucketIndexLogCR(RGWDataSyncCtx
*_sc
, const rgw_bucket_shard
& bs
,
3301 string
& _marker
, list
<rgw_bi_log_entry
> *_result
)
3302 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3303 instance_key(bs
.get_key()), marker(_marker
), result(_result
) {}
3305 int operate(const DoutPrefixProvider
*dpp
) override
{
3307 if (sync_env
->counters
) {
3308 timer
.emplace(sync_env
->counters
, sync_counters::l_poll
);
3311 rgw_http_param_pair pairs
[] = { { "bucket-instance", instance_key
.c_str() },
3312 { "format" , "json" },
3313 { "marker" , marker
.c_str() },
3314 { "type", "bucket-index" },
3317 call(new RGWReadRESTResourceCR
<list
<rgw_bi_log_entry
> >(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
, "/admin/log", pairs
, result
));
3321 if (sync_env
->counters
) {
3322 sync_env
->counters
->inc(sync_counters::l_poll_err
);
3324 return set_cr_error(retcode
);
3326 return set_cr_done();
3332 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
3334 class RGWBucketFullSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<rgw_obj_key
, rgw_obj_key
> {
3336 RGWDataSyncEnv
*sync_env
;
3339 rgw_bucket_shard_full_sync_marker sync_marker
;
3340 RGWSyncTraceNodeRef tn
;
3341 RGWObjVersionTracker
& objv_tracker
;
3344 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncCtx
*_sc
,
3345 const string
& _marker_oid
,
3346 const rgw_bucket_shard_full_sync_marker
& _marker
,
3347 RGWSyncTraceNodeRef tn
,
3348 RGWObjVersionTracker
& objv_tracker
)
3349 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
3350 sc(_sc
), sync_env(_sc
->env
), marker_oid(_marker_oid
),
3351 sync_marker(_marker
), tn(std::move(tn
)), objv_tracker(objv_tracker
)
3354 RGWCoroutine
* store_marker(const rgw_obj_key
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
3355 sync_marker
.position
= new_marker
;
3356 sync_marker
.count
= index_pos
;
3358 map
<string
, bufferlist
> attrs
;
3359 sync_marker
.encode_attr(attrs
);
3361 tn
->log(20, SSTR("updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
));
3362 return new RGWSimpleRadosWriteAttrsCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
3363 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, marker_oid
),
3364 attrs
, &objv_tracker
);
3367 RGWOrderCallCR
*allocate_order_control_cr() override
{
3368 return new RGWLastCallerWinsCR(sync_env
->cct
);
3372 // write the incremental sync status and update 'stable_timestamp' on success
3373 class RGWWriteBucketShardIncSyncStatus
: public RGWCoroutine
{
3374 RGWDataSyncEnv
*sync_env
;
3376 rgw_bucket_shard_inc_sync_marker sync_marker
;
3377 ceph::real_time
* stable_timestamp
;
3378 RGWObjVersionTracker
& objv_tracker
;
3379 std::map
<std::string
, bufferlist
> attrs
;
3381 RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv
*sync_env
,
3382 const rgw_raw_obj
& obj
,
3383 const rgw_bucket_shard_inc_sync_marker
& sync_marker
,
3384 ceph::real_time
* stable_timestamp
,
3385 RGWObjVersionTracker
& objv_tracker
)
3386 : RGWCoroutine(sync_env
->cct
), sync_env(sync_env
), obj(obj
),
3387 sync_marker(sync_marker
), stable_timestamp(stable_timestamp
),
3388 objv_tracker(objv_tracker
)
3390 int operate(const DoutPrefixProvider
*dpp
) {
3392 sync_marker
.encode_attr(attrs
);
3394 yield
call(new RGWSimpleRadosWriteAttrsCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
3395 obj
, attrs
, &objv_tracker
));
3397 return set_cr_error(retcode
);
3399 if (stable_timestamp
) {
3400 *stable_timestamp
= sync_marker
.timestamp
;
3402 return set_cr_done();
3408 class RGWBucketIncSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, rgw_obj_key
> {
3410 RGWDataSyncEnv
*sync_env
;
3413 rgw_bucket_shard_inc_sync_marker sync_marker
;
3415 map
<rgw_obj_key
, string
> key_to_marker
;
3421 map
<string
, operation
> marker_to_op
;
3422 std::set
<std::string
> pending_olh
; // object names with pending olh operations
3424 RGWSyncTraceNodeRef tn
;
3425 RGWObjVersionTracker
& objv_tracker
;
3426 ceph::real_time
* stable_timestamp
;
3428 void handle_finish(const string
& marker
) override
{
3429 auto iter
= marker_to_op
.find(marker
);
3430 if (iter
== marker_to_op
.end()) {
3433 auto& op
= iter
->second
;
3434 key_to_marker
.erase(op
.key
);
3435 reset_need_retry(op
.key
);
3437 pending_olh
.erase(op
.key
.name
);
3439 marker_to_op
.erase(iter
);
3443 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx
*_sc
,
3444 const string
& _marker_oid
,
3445 const rgw_bucket_shard_inc_sync_marker
& _marker
,
3446 RGWSyncTraceNodeRef tn
,
3447 RGWObjVersionTracker
& objv_tracker
,
3448 ceph::real_time
* stable_timestamp
)
3449 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
3450 sc(_sc
), sync_env(_sc
->env
),
3451 obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, _marker_oid
),
3452 sync_marker(_marker
), tn(std::move(tn
)), objv_tracker(objv_tracker
),
3453 stable_timestamp(stable_timestamp
)
3456 RGWCoroutine
* store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
3457 sync_marker
.position
= new_marker
;
3458 sync_marker
.timestamp
= timestamp
;
3460 tn
->log(20, SSTR("updating marker marker_oid=" << obj
.oid
<< " marker=" << new_marker
<< " timestamp=" << timestamp
));
3461 return new RGWWriteBucketShardIncSyncStatus(sync_env
, obj
, sync_marker
,
3462 stable_timestamp
, objv_tracker
);
3466 * create index from key -> <op, marker>, and from marker -> key
3467 * this is useful so that we can insure that we only have one
3468 * entry for any key that is used. This is needed when doing
3469 * incremenatl sync of data, and we don't want to run multiple
3470 * concurrent sync operations for the same bucket shard
3471 * Also, we should make sure that we don't run concurrent operations on the same key with
3474 bool index_key_to_marker(const rgw_obj_key
& key
, const string
& marker
, bool is_olh
) {
3475 auto result
= key_to_marker
.emplace(key
, marker
);
3476 if (!result
.second
) { // exists
3477 set_need_retry(key
);
3480 marker_to_op
[marker
] = operation
{key
, is_olh
};
3482 // prevent other olh ops from starting on this object name
3483 pending_olh
.insert(key
.name
);
3488 bool can_do_op(const rgw_obj_key
& key
, bool is_olh
) {
3489 // serialize olh ops on the same object name
3490 if (is_olh
&& pending_olh
.count(key
.name
)) {
3491 tn
->log(20, SSTR("sync of " << key
<< " waiting for pending olh op"));
3494 return (key_to_marker
.find(key
) == key_to_marker
.end());
3497 RGWOrderCallCR
*allocate_order_control_cr() override
{
3498 return new RGWLastCallerWinsCR(sync_env
->cct
);
3502 static bool ignore_sync_error(int err
) {
3513 template <class T
, class K
>
3514 class RGWBucketSyncSingleEntryCR
: public RGWCoroutine
{
3516 RGWDataSyncEnv
*sync_env
;
3518 rgw_bucket_sync_pipe
& sync_pipe
;
3519 rgw_bucket_shard
& bs
;
3523 std::optional
<uint64_t> versioned_epoch
;
3524 rgw_bucket_entry_owner owner
;
3525 real_time timestamp
;
3527 RGWPendingState op_state
;
3530 RGWSyncShardMarkerTrack
<T
, K
> *marker_tracker
;
3534 stringstream error_ss
;
3536 bool error_injection
;
3538 RGWDataSyncModule
*data_sync_module
;
3540 rgw_zone_set zones_trace
;
3542 RGWSyncTraceNodeRef tn
;
3544 RGWBucketSyncSingleEntryCR(RGWDataSyncCtx
*_sc
,
3545 rgw_bucket_sync_pipe
& _sync_pipe
,
3546 const rgw_obj_key
& _key
, bool _versioned
,
3547 std::optional
<uint64_t> _versioned_epoch
,
3548 real_time
& _timestamp
,
3549 const rgw_bucket_entry_owner
& _owner
,
3550 RGWModifyOp _op
, RGWPendingState _op_state
,
3551 const T
& _entry_marker
, RGWSyncShardMarkerTrack
<T
, K
> *_marker_tracker
, rgw_zone_set
& _zones_trace
,
3552 RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sc
->cct
),
3553 sc(_sc
), sync_env(_sc
->env
),
3554 sync_pipe(_sync_pipe
), bs(_sync_pipe
.info
.source_bs
),
3555 key(_key
), versioned(_versioned
), versioned_epoch(_versioned_epoch
),
3557 timestamp(_timestamp
), op(_op
),
3558 op_state(_op_state
),
3559 entry_marker(_entry_marker
),
3560 marker_tracker(_marker_tracker
),
3563 ss
<< bucket_shard_str
{bs
} << "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]";
3564 set_description() << "bucket sync single entry (source_zone=" << sc
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
;
3567 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", SSTR(key
));
3569 tn
->log(20, SSTR("bucket sync single entry (source_zone=" << sc
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
));
3570 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_data_inject_err_probability
> 0);
3572 data_sync_module
= sync_env
->sync_module
->get_data_handler();
3574 zones_trace
= _zones_trace
;
3575 zones_trace
.insert(sync_env
->svc
->zone
->get_zone().id
, _sync_pipe
.info
.dest_bs
.get_key());
3578 int operate(const DoutPrefixProvider
*dpp
) override
{
3580 /* skip entries that are not complete */
3581 if (op_state
!= CLS_RGW_STATE_COMPLETE
) {
3584 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
3587 marker_tracker
->reset_need_retry(key
);
3588 if (key
.name
.empty()) {
3589 /* shouldn't happen */
3590 set_status("skipping empty entry");
3591 tn
->log(0, "entry with empty obj name, skipping");
3594 if (error_injection
&&
3595 rand() % 10000 < cct
->_conf
->rgw_sync_data_inject_err_probability
* 10000.0) {
3596 tn
->log(0, SSTR(": injecting data sync error on key=" << key
.name
));
3598 } else if (op
== CLS_RGW_OP_ADD
||
3599 op
== CLS_RGW_OP_LINK_OLH
) {
3600 set_status("syncing obj");
3601 tn
->log(5, SSTR("bucket sync: sync obj: " << sc
->source_zone
<< "/" << bs
.bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
3602 call(data_sync_module
->sync_object(dpp
, sc
, sync_pipe
, key
, versioned_epoch
, &zones_trace
));
3603 } else if (op
== CLS_RGW_OP_DEL
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
3604 set_status("removing obj");
3605 if (op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
3608 tn
->log(10, SSTR("removing obj: " << sc
->source_zone
<< "/" << bs
.bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
3609 call(data_sync_module
->remove_object(dpp
, sc
, sync_pipe
, key
, timestamp
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
3610 // our copy of the object is more recent, continue as if it succeeded
3611 } else if (op
== CLS_RGW_OP_LINK_OLH_DM
) {
3612 set_status("creating delete marker");
3613 tn
->log(10, SSTR("creating delete marker: obj: " << sc
->source_zone
<< "/" << bs
.bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
3614 call(data_sync_module
->create_delete_marker(dpp
, sc
, sync_pipe
, key
, timestamp
, owner
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
3616 tn
->set_resource_name(SSTR(bucket_str_noinstance(bs
.bucket
) << "/" << key
));
3618 if (retcode
== -ERR_PRECONDITION_FAILED
) {
3619 set_status("Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
3620 tn
->log(0, "Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
3623 } while (marker_tracker
->need_retry(key
));
3625 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
3627 tn
->log(10, "success");
3629 tn
->log(10, SSTR("failed, retcode=" << retcode
<< " (" << cpp_strerror(-retcode
) << ")"));
3633 if (retcode
< 0 && retcode
!= -ENOENT
) {
3634 set_status() << "failed to sync obj; retcode=" << retcode
;
3635 tn
->log(0, SSTR("ERROR: failed to sync object: "
3636 << bucket_shard_str
{bs
} << "/" << key
.name
));
3637 if (!ignore_sync_error(retcode
)) {
3638 error_ss
<< bucket_shard_str
{bs
} << "/" << key
.name
;
3639 sync_status
= retcode
;
3642 if (!error_ss
.str().empty()) {
3643 yield
call(sync_env
->error_logger
->log_error_cr(dpp
, sc
->conn
->get_remote_id(), "data", error_ss
.str(), -retcode
, string("failed to sync object") + cpp_strerror(-sync_status
)));
3646 if (sync_status
== 0) {
3648 set_status() << "calling marker_tracker->finish(" << entry_marker
<< ")";
3649 yield
call(marker_tracker
->finish(entry_marker
));
3650 sync_status
= retcode
;
3652 if (sync_status
< 0) {
3653 return set_cr_error(sync_status
);
3655 return set_cr_done();
3661 #define BUCKET_SYNC_SPAWN_WINDOW 20
3663 class RGWBucketShardFullSyncCR
: public RGWCoroutine
{
3665 RGWDataSyncEnv
*sync_env
;
3666 rgw_bucket_sync_pipe
& sync_pipe
;
3667 rgw_bucket_shard
& bs
;
3668 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
3669 bucket_list_result list_result
;
3670 list
<bucket_list_entry
>::iterator entries_iter
;
3671 rgw_bucket_shard_sync_info
& sync_info
;
3672 rgw_obj_key list_marker
;
3673 bucket_list_entry
*entry
{nullptr};
3675 int total_entries
{0};
3679 const string
& status_oid
;
3681 rgw_zone_set zones_trace
;
3683 RGWSyncTraceNodeRef tn
;
3684 RGWBucketFullSyncShardMarkerTrack marker_tracker
;
3686 struct _prefix_handler
{
3687 RGWBucketSyncFlowManager::pipe_rules_ref rules
;
3688 RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator iter
;
3689 std::optional
<string
> cur_prefix
;
3691 void set_rules(RGWBucketSyncFlowManager::pipe_rules_ref
& _rules
) {
3695 bool revalidate_marker(rgw_obj_key
*marker
) {
3697 boost::starts_with(marker
->name
, *cur_prefix
)) {
3703 iter
= rules
->prefix_search(marker
->name
);
3704 if (iter
== rules
->prefix_end()) {
3707 cur_prefix
= iter
->first
;
3708 marker
->name
= *cur_prefix
;
3709 marker
->instance
.clear();
3713 bool check_key_handled(const rgw_obj_key
& key
) {
3718 boost::starts_with(key
.name
, *cur_prefix
)) {
3721 iter
= rules
->prefix_search(key
.name
);
3722 if (iter
== rules
->prefix_end()) {
3725 cur_prefix
= iter
->first
;
3726 return boost::starts_with(key
.name
, iter
->first
);
3731 RGWBucketShardFullSyncCR(RGWDataSyncCtx
*_sc
,
3732 rgw_bucket_sync_pipe
& _sync_pipe
,
3733 const std::string
& status_oid
,
3734 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
3735 rgw_bucket_shard_sync_info
& sync_info
,
3736 RGWSyncTraceNodeRef tn_parent
,
3737 RGWObjVersionTracker
& objv_tracker
)
3738 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3739 sync_pipe(_sync_pipe
), bs(_sync_pipe
.info
.source_bs
),
3740 lease_cr(std::move(lease_cr
)), sync_info(sync_info
),
3741 status_oid(status_oid
),
3742 tn(sync_env
->sync_tracer
->add_node(tn_parent
, "full_sync",
3743 SSTR(bucket_shard_str
{bs
}))),
3744 marker_tracker(sc
, status_oid
, sync_info
.full_marker
, tn
, objv_tracker
)
3746 zones_trace
.insert(sc
->source_zone
.id
, sync_pipe
.info
.dest_bs
.bucket
.get_key());
3747 prefix_handler
.set_rules(sync_pipe
.get_rules());
3750 int operate(const DoutPrefixProvider
*dpp
) override
;
3753 int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider
*dpp
)
3756 list_marker
= sync_info
.full_marker
.position
;
3758 total_entries
= sync_info
.full_marker
.count
;
3760 if (lease_cr
&& !lease_cr
->is_locked()) {
3762 return set_cr_error(-ECANCELED
);
3764 set_status("listing remote bucket");
3765 tn
->log(20, "listing bucket for full sync");
3767 if (!prefix_handler
.revalidate_marker(&list_marker
)) {
3768 set_status() << "finished iterating over all available prefixes: last marker=" << list_marker
;
3769 tn
->log(20, SSTR("finished iterating over all available prefixes: last marker=" << list_marker
));
3773 yield
call(new RGWListBucketShardCR(sc
, bs
, list_marker
,
3775 if (retcode
< 0 && retcode
!= -ENOENT
) {
3776 set_status("failed bucket listing, going down");
3778 return set_cr_error(retcode
);
3780 if (list_result
.entries
.size() > 0) {
3781 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
3783 entries_iter
= list_result
.entries
.begin();
3784 for (; entries_iter
!= list_result
.entries
.end(); ++entries_iter
) {
3785 if (lease_cr
&& !lease_cr
->is_locked()) {
3787 return set_cr_error(-ECANCELED
);
3789 tn
->log(20, SSTR("[full sync] syncing object: "
3790 << bucket_shard_str
{bs
} << "/" << entries_iter
->key
));
3791 entry
= &(*entries_iter
);
3792 list_marker
= entries_iter
->key
;
3793 if (!prefix_handler
.check_key_handled(entries_iter
->key
)) {
3794 set_status() << "skipping entry due to policy rules: " << entries_iter
->key
;
3795 tn
->log(20, SSTR("skipping entry due to policy rules: " << entries_iter
->key
));
3799 if (!marker_tracker
.start(entry
->key
, total_entries
, real_time())) {
3800 tn
->log(0, SSTR("ERROR: cannot start syncing " << entry
->key
<< ". Duplicate entry?"));
3802 using SyncCR
= RGWBucketSyncSingleEntryCR
<rgw_obj_key
, rgw_obj_key
>;
3803 yield
spawn(new SyncCR(sc
, sync_pipe
, entry
->key
,
3804 false, /* versioned, only matters for object removal */
3805 entry
->versioned_epoch
, entry
->mtime
,
3806 entry
->owner
, entry
->get_modify_op(), CLS_RGW_STATE_COMPLETE
,
3807 entry
->key
, &marker_tracker
, zones_trace
, tn
),
3810 drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW
,
3811 [&](uint64_t stack_id
, int ret
) {
3813 tn
->log(10, "a sync operation returned error");
3819 } while (list_result
.is_truncated
&& sync_status
== 0);
3820 set_status("done iterating over all objects");
3821 /* wait for all operations to complete */
3823 drain_all_cb([&](uint64_t stack_id
, int ret
) {
3825 tn
->log(10, "a sync operation returned error");
3830 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
3831 if (lease_cr
&& !lease_cr
->is_locked()) {
3832 return set_cr_error(-ECANCELED
);
3834 /* update sync state to incremental */
3835 if (sync_status
== 0) {
3837 sync_info
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
3838 map
<string
, bufferlist
> attrs
;
3839 sync_info
.encode_state_attr(attrs
);
3840 call(new RGWSimpleRadosWriteAttrsCR(dpp
, sync_env
->async_rados
, sync_env
->svc
->sysobj
,
3841 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, status_oid
),
3845 tn
->log(10, SSTR("backing out with sync_status=" << sync_status
));
3847 if (retcode
< 0 && sync_status
== 0) { /* actually tried to set incremental state and failed */
3848 tn
->log(0, SSTR("ERROR: failed to set sync state on bucket "
3849 << bucket_shard_str
{bs
} << " retcode=" << retcode
));
3850 return set_cr_error(retcode
);
3852 if (sync_status
< 0) {
3853 return set_cr_error(sync_status
);
3855 return set_cr_done();
3860 static bool has_olh_epoch(RGWModifyOp op
) {
3861 return op
== CLS_RGW_OP_LINK_OLH
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
;
3864 class RGWBucketShardIncrementalSyncCR
: public RGWCoroutine
{
3866 RGWDataSyncEnv
*sync_env
;
3867 rgw_bucket_sync_pipe
& sync_pipe
;
3868 RGWBucketSyncFlowManager::pipe_rules_ref rules
;
3869 rgw_bucket_shard
& bs
;
3870 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
3871 list
<rgw_bi_log_entry
> list_result
;
3872 list
<rgw_bi_log_entry
>::iterator entries_iter
, entries_end
;
3873 map
<pair
<string
, string
>, pair
<real_time
, RGWModifyOp
> > squash_map
;
3874 rgw_bucket_shard_sync_info
& sync_info
;
3876 rgw_bi_log_entry
*entry
{nullptr};
3877 bool updated_status
{false};
3878 rgw_zone_id zone_id
;
3879 string target_location_key
;
3884 bool syncstopped
{false};
3886 RGWSyncTraceNodeRef tn
;
3887 RGWBucketIncSyncShardMarkerTrack marker_tracker
;
3890 RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx
*_sc
,
3891 rgw_bucket_sync_pipe
& _sync_pipe
,
3892 const std::string
& status_oid
,
3893 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
3894 rgw_bucket_shard_sync_info
& sync_info
,
3895 RGWSyncTraceNodeRef
& _tn_parent
,
3896 RGWObjVersionTracker
& objv_tracker
,
3897 ceph::real_time
* stable_timestamp
)
3898 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3899 sync_pipe(_sync_pipe
), bs(_sync_pipe
.info
.source_bs
),
3900 lease_cr(std::move(lease_cr
)), sync_info(sync_info
),
3901 zone_id(sync_env
->svc
->zone
->get_zone().id
),
3902 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "inc_sync",
3903 SSTR(bucket_shard_str
{bs
}))),
3904 marker_tracker(sc
, status_oid
, sync_info
.inc_marker
, tn
,
3905 objv_tracker
, stable_timestamp
)
3907 set_description() << "bucket shard incremental sync bucket="
3908 << bucket_shard_str
{bs
};
3910 rules
= sync_pipe
.get_rules();
3911 target_location_key
= sync_pipe
.info
.dest_bs
.bucket
.get_key();
3914 bool check_key_handled(const rgw_obj_key
& key
) {
3918 auto iter
= rules
->prefix_search(key
.name
);
3919 if (iter
== rules
->prefix_end()) {
3922 return boost::starts_with(key
.name
, iter
->first
);
3925 int operate(const DoutPrefixProvider
*dpp
) override
;
3928 int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider
*dpp
)
3933 if (lease_cr
&& !lease_cr
->is_locked()) {
3935 tn
->log(0, "ERROR: lease is not taken, abort");
3936 return set_cr_error(-ECANCELED
);
3938 tn
->log(20, SSTR("listing bilog for incremental sync" << sync_info
.inc_marker
.position
));
3939 set_status() << "listing bilog; position=" << sync_info
.inc_marker
.position
;
3940 yield
call(new RGWListBucketIndexLogCR(sc
, bs
, sync_info
.inc_marker
.position
,
3942 if (retcode
< 0 && retcode
!= -ENOENT
) {
3943 /* wait for all operations to complete */
3945 return set_cr_error(retcode
);
3948 entries_iter
= list_result
.begin();
3949 entries_end
= list_result
.end();
3950 for (; entries_iter
!= entries_end
; ++entries_iter
) {
3951 auto e
= *entries_iter
;
3952 if (e
.op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
) {
3953 ldpp_dout(dpp
, 20) << "syncstop on " << e
.timestamp
<< dendl
;
3955 entries_end
= std::next(entries_iter
); // stop after this entry
3958 if (e
.op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
3961 if (e
.op
== CLS_RGW_OP_CANCEL
) {
3964 if (e
.state
!= CLS_RGW_STATE_COMPLETE
) {
3967 if (e
.zones_trace
.exists(zone_id
.id
, target_location_key
)) {
3970 auto& squash_entry
= squash_map
[make_pair(e
.object
, e
.instance
)];
3971 // don't squash over olh entries - we need to apply their olh_epoch
3972 if (has_olh_epoch(squash_entry
.second
) && !has_olh_epoch(e
.op
)) {
3975 if (squash_entry
.first
<= e
.timestamp
) {
3976 squash_entry
= make_pair
<>(e
.timestamp
, e
.op
);
3980 entries_iter
= list_result
.begin();
3981 for (; entries_iter
!= entries_end
; ++entries_iter
) {
3982 if (lease_cr
&& !lease_cr
->is_locked()) {
3984 return set_cr_error(-ECANCELED
);
3986 entry
= &(*entries_iter
);
3988 ssize_t p
= entry
->id
.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3992 cur_id
= entry
->id
.substr(p
+ 1);
3995 sync_info
.inc_marker
.position
= cur_id
;
3997 if (entry
->op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
|| entry
->op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
3998 ldpp_dout(dpp
, 20) << "detected syncstop or resync on " << entries_iter
->timestamp
<< ", skipping entry" << dendl
;
3999 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4003 if (!key
.set(rgw_obj_index_key
{entry
->object
, entry
->instance
})) {
4004 set_status() << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry";
4005 tn
->log(20, SSTR("parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry"));
4006 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4010 tn
->log(20, SSTR("parsed entry: id=" << cur_id
<< " iter->object=" << entry
->object
<< " iter->instance=" << entry
->instance
<< " name=" << key
.name
<< " instance=" << key
.instance
<< " ns=" << key
.ns
));
4012 if (!key
.ns
.empty()) {
4013 set_status() << "skipping entry in namespace: " << entry
->object
;
4014 tn
->log(20, SSTR("skipping entry in namespace: " << entry
->object
));
4015 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4019 if (!check_key_handled(key
)) {
4020 set_status() << "skipping entry due to policy rules: " << entry
->object
;
4021 tn
->log(20, SSTR("skipping entry due to policy rules: " << entry
->object
));
4022 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4026 set_status() << "got entry.id=" << cur_id
<< " key=" << key
<< " op=" << (int)entry
->op
;
4027 if (entry
->op
== CLS_RGW_OP_CANCEL
) {
4028 set_status() << "canceled operation, skipping";
4029 tn
->log(20, SSTR("skipping object: "
4030 << bucket_shard_str
{bs
} << "/" << key
<< ": canceled operation"));
4031 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4034 if (entry
->state
!= CLS_RGW_STATE_COMPLETE
) {
4035 set_status() << "non-complete operation, skipping";
4036 tn
->log(20, SSTR("skipping object: "
4037 << bucket_shard_str
{bs
} << "/" << key
<< ": non-complete operation"));
4038 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4041 if (entry
->zones_trace
.exists(zone_id
.id
, target_location_key
)) {
4042 set_status() << "redundant operation, skipping";
4043 tn
->log(20, SSTR("skipping object: "
4044 <<bucket_shard_str
{bs
} <<"/"<<key
<<": redundant operation"));
4045 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4048 if (make_pair
<>(entry
->timestamp
, entry
->op
) != squash_map
[make_pair(entry
->object
, entry
->instance
)]) {
4049 set_status() << "squashed operation, skipping";
4050 tn
->log(20, SSTR("skipping object: "
4051 << bucket_shard_str
{bs
} << "/" << key
<< ": squashed operation"));
4052 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4055 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
4056 tn
->log(20, SSTR("syncing object: "
4057 << bucket_shard_str
{bs
} << "/" << key
));
4058 updated_status
= false;
4059 while (!marker_tracker
.can_do_op(key
, has_olh_epoch(entry
->op
))) {
4060 if (!updated_status
) {
4061 set_status() << "can't do op, conflicting inflight operation";
4062 updated_status
= true;
4064 tn
->log(5, SSTR("can't do op on key=" << key
<< " need to wait for conflicting operation to complete"));
4065 yield
wait_for_child();
4068 again
= collect(&ret
, nullptr);
4070 tn
->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret
<< ")"));
4072 /* we have reported this error */
4075 if (sync_status
!= 0)
4078 if (sync_status
!= 0) {
4079 /* get error, stop */
4082 if (!marker_tracker
.index_key_to_marker(key
, cur_id
, has_olh_epoch(entry
->op
))) {
4083 set_status() << "can't do op, sync already in progress for object";
4084 tn
->log(20, SSTR("skipping sync of entry: " << cur_id
<< ":" << key
<< " sync already in progress for object"));
4085 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4089 set_status() << "start object sync";
4090 if (!marker_tracker
.start(cur_id
, 0, entry
->timestamp
)) {
4091 tn
->log(0, SSTR("ERROR: cannot start syncing " << cur_id
<< ". Duplicate entry?"));
4093 std::optional
<uint64_t> versioned_epoch
;
4094 rgw_bucket_entry_owner
owner(entry
->owner
, entry
->owner_display_name
);
4095 if (entry
->ver
.pool
< 0) {
4096 versioned_epoch
= entry
->ver
.epoch
;
4098 tn
->log(20, SSTR("entry->timestamp=" << entry
->timestamp
));
4099 using SyncCR
= RGWBucketSyncSingleEntryCR
<string
, rgw_obj_key
>;
4100 spawn(new SyncCR(sc
, sync_pipe
, key
,
4101 entry
->is_versioned(), versioned_epoch
,
4102 entry
->timestamp
, owner
, entry
->op
, entry
->state
,
4103 cur_id
, &marker_tracker
, entry
->zones_trace
, tn
),
4107 drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW
,
4108 [&](uint64_t stack_id
, int ret
) {
4110 tn
->log(10, "a sync operation returned error");
4116 } while (!list_result
.empty() && sync_status
== 0 && !syncstopped
);
4118 drain_all_cb([&](uint64_t stack_id
, int ret
) {
4120 tn
->log(10, "a sync operation returned error");
4125 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
4128 // transition to StateStopped in RGWRunBucketSyncCoroutine. if sync is
4129 // still disabled, we'll delete the sync status object. otherwise we'll
4130 // restart full sync to catch any changes that happened while sync was
4132 sync_info
.state
= rgw_bucket_shard_sync_info::StateStopped
;
4133 return set_cr_done();
4136 yield
call(marker_tracker
.flush());
4138 tn
->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode
));
4139 return set_cr_error(retcode
);
4141 if (sync_status
< 0) {
4142 tn
->log(10, SSTR("backing out with sync_status=" << sync_status
));
4143 return set_cr_error(sync_status
);
4145 return set_cr_done();
4150 class RGWGetBucketPeersCR
: public RGWCoroutine
{
4151 RGWDataSyncEnv
*sync_env
;
4153 std::optional
<rgw_bucket
> target_bucket
;
4154 std::optional
<rgw_zone_id
> source_zone
;
4155 std::optional
<rgw_bucket
> source_bucket
;
4157 rgw_sync_pipe_info_set
*pipes
;
4158 map
<rgw_bucket
, all_bucket_info
> buckets_info
;
4159 map
<rgw_bucket
, all_bucket_info
>::iterator siiter
;
4160 std::optional
<all_bucket_info
> target_bucket_info
;
4161 std::optional
<all_bucket_info
> source_bucket_info
;
4163 rgw_sync_pipe_info_set::iterator siter
;
4165 std::shared_ptr
<rgw_bucket_get_sync_policy_result
> source_policy
;
4166 std::shared_ptr
<rgw_bucket_get_sync_policy_result
> target_policy
;
4168 RGWSyncTraceNodeRef tn
;
4170 using pipe_const_iter
= map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>::const_iterator
;
4172 static pair
<pipe_const_iter
, pipe_const_iter
> get_pipe_iters(const map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& m
, std::optional
<rgw_zone_id
> zone
) {
4174 return { m
.begin(), m
.end() };
4177 auto b
= m
.find(*zone
);
4181 return { b
, std::next(b
) };
4184 void filter_sources(std::optional
<rgw_zone_id
> source_zone
,
4185 std::optional
<rgw_bucket
> source_bucket
,
4186 const map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& all_sources
,
4187 rgw_sync_pipe_info_set
*result
) {
4188 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": source_zone=" << source_zone
.value_or(rgw_zone_id("*")).id
4189 << " source_bucket=" << source_bucket
.value_or(rgw_bucket())
4190 << " all_sources.size()=" << all_sources
.size() << dendl
;
4191 auto iters
= get_pipe_iters(all_sources
, source_zone
);
4192 for (auto i
= iters
.first
; i
!= iters
.second
; ++i
) {
4193 for (auto& handler
: i
->second
) {
4194 if (!handler
.specific()) {
4195 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": skipping" << dendl
;
4198 if (source_bucket
&&
4199 !source_bucket
->match(*handler
.source
.bucket
)) {
4202 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": adding" << dendl
;
4203 result
->insert(handler
, source_bucket_info
, target_bucket_info
);
4208 void filter_targets(std::optional
<rgw_zone_id
> target_zone
,
4209 std::optional
<rgw_bucket
> target_bucket
,
4210 const map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& all_targets
,
4211 rgw_sync_pipe_info_set
*result
) {
4212 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": target_zone=" << source_zone
.value_or(rgw_zone_id("*")).id
4213 << " target_bucket=" << source_bucket
.value_or(rgw_bucket())
4214 << " all_targets.size()=" << all_targets
.size() << dendl
;
4215 auto iters
= get_pipe_iters(all_targets
, target_zone
);
4216 for (auto i
= iters
.first
; i
!= iters
.second
; ++i
) {
4217 for (auto& handler
: i
->second
) {
4218 if (target_bucket
&&
4219 handler
.dest
.bucket
&&
4220 !target_bucket
->match(*handler
.dest
.bucket
)) {
4221 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": skipping" << dendl
;
4224 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": adding" << dendl
;
4225 result
->insert(handler
, source_bucket_info
, target_bucket_info
);
4230 void update_from_target_bucket_policy();
4231 void update_from_source_bucket_policy();
4233 struct GetHintTargets
: public RGWGenericAsyncCR::Action
{
4234 RGWDataSyncEnv
*sync_env
;
4235 rgw_bucket source_bucket
;
4236 std::set
<rgw_bucket
> targets
;
4238 GetHintTargets(RGWDataSyncEnv
*_sync_env
,
4239 const rgw_bucket
& _source_bucket
) : sync_env(_sync_env
),
4240 source_bucket(_source_bucket
) {}
4241 int operate() override
{
4242 int r
= sync_env
->svc
->bucket_sync
->get_bucket_sync_hints(sync_env
->dpp
,
4248 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: " << __func__
<< "(): failed to fetch bucket sync hints for bucket=" << source_bucket
<< dendl
;
4256 std::shared_ptr
<GetHintTargets
> get_hint_targets_action
;
4257 std::set
<rgw_bucket
>::iterator hiter
;
4260 RGWGetBucketPeersCR(RGWDataSyncEnv
*_sync_env
,
4261 std::optional
<rgw_bucket
> _target_bucket
,
4262 std::optional
<rgw_zone_id
> _source_zone
,
4263 std::optional
<rgw_bucket
> _source_bucket
,
4264 rgw_sync_pipe_info_set
*_pipes
,
4265 const RGWSyncTraceNodeRef
& _tn_parent
)
4266 : RGWCoroutine(_sync_env
->cct
),
4267 sync_env(_sync_env
),
4268 target_bucket(_target_bucket
),
4269 source_zone(_source_zone
),
4270 source_bucket(_source_bucket
),
4272 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "get_bucket_peers",
4273 SSTR( "target=" << target_bucket
.value_or(rgw_bucket())
4274 << ":source=" << target_bucket
.value_or(rgw_bucket())
4275 << ":source_zone=" << source_zone
.value_or(rgw_zone_id("*")).id
))) {
4278 int operate(const DoutPrefixProvider
*dpp
) override
;
4281 std::ostream
& operator<<(std::ostream
& out
, std::optional
<rgw_bucket_shard
>& bs
) {
4290 RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx
*_sc
,
4291 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
4292 std::optional
<rgw_bucket_shard
> _target_bs
,
4293 std::optional
<rgw_bucket_shard
> _source_bs
,
4294 const RGWSyncTraceNodeRef
& _tn_parent
,
4295 ceph::real_time
* progress
)
4296 : RGWCoroutine(_sc
->env
->cct
), sc(_sc
), sync_env(_sc
->env
),
4297 lease_cr(std::move(lease_cr
)), target_bs(_target_bs
), source_bs(_source_bs
),
4298 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "bucket_sync_sources",
4299 SSTR( "target=" << target_bucket
.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket
.value_or(rgw_bucket()) << ":source_zone=" << sc
->source_zone
))),
4303 target_bucket
= target_bs
->bucket
;
4306 source_bucket
= source_bs
->bucket
;
4310 int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider
*dpp
)
4313 yield
call(new RGWGetBucketPeersCR(sync_env
, target_bucket
, sc
->source_zone
, source_bucket
, &pipes
, tn
));
4314 if (retcode
< 0 && retcode
!= -ENOENT
) {
4315 tn
->log(0, "ERROR: failed to read sync status for bucket");
4316 return set_cr_error(retcode
);
4319 ldpp_dout(dpp
, 20) << __func__
<< "(): requested source_bs=" << source_bs
<< " target_bs=" << target_bs
<< dendl
;
4321 if (pipes
.empty()) {
4322 ldpp_dout(dpp
, 20) << __func__
<< "(): no relevant sync pipes found" << dendl
;
4323 return set_cr_done();
4326 for (siter
= pipes
.begin(); siter
!= pipes
.end(); ++siter
) {
4328 ldpp_dout(dpp
, 20) << __func__
<< "(): sync pipe=" << *siter
<< dendl
;
4330 source_num_shards
= siter
->source
.get_bucket_info().layout
.current_index
.layout
.normal
.num_shards
;
4331 target_num_shards
= siter
->target
.get_bucket_info().layout
.current_index
.layout
.normal
.num_shards
;
4333 sync_pair
.source_bs
= *source_bs
;
4335 sync_pair
.source_bs
.bucket
= siter
->source
.get_bucket();
4337 sync_pair
.dest_bs
.bucket
= siter
->target
.get_bucket();
4339 sync_pair
.handler
= siter
->handler
;
4341 if (sync_pair
.source_bs
.shard_id
>= 0) {
4343 cur_shard
= sync_pair
.source_bs
.shard_id
;
4345 num_shards
= std::max
<int>(1, source_num_shards
);
4346 cur_shard
= std::min
<int>(0, source_num_shards
);
4350 ldpp_dout(dpp
, 20) << __func__
<< "(): num shards=" << num_shards
<< " cur_shard=" << cur_shard
<< dendl
;
4352 for (; num_shards
> 0; --num_shards
, ++cur_shard
) {
4354 * use a negatvie shard_id for backward compatibility,
4355 * this affects the crafted status oid
4357 sync_pair
.source_bs
.shard_id
= (source_num_shards
> 0 ? cur_shard
: -1);
4358 if (source_num_shards
== target_num_shards
) {
4359 sync_pair
.dest_bs
.shard_id
= sync_pair
.source_bs
.shard_id
;
4361 sync_pair
.dest_bs
.shard_id
= -1;
4364 ldpp_dout(dpp
, 20) << __func__
<< "(): sync_pair=" << sync_pair
<< dendl
;
4366 cur_progress
= (progress
? &shard_progress
[prealloc_stack_id()] : nullptr);
4368 yield_spawn_window(new RGWRunBucketSyncCoroutine(sc
, lease_cr
, sync_pair
, tn
,
4370 BUCKET_SYNC_SPAWN_WINDOW
,
4371 [&](uint64_t stack_id
, int ret
) {
4372 handle_complete_stack(stack_id
);
4374 tn
->log(10, "a sync operation returned error");
4380 drain_all_cb([&](uint64_t stack_id
, int ret
) {
4381 handle_complete_stack(stack_id
);
4383 tn
->log(10, "a sync operation returned error");
4387 if (progress
&& min_progress
) {
4388 *progress
= *min_progress
;
4390 return set_cr_done();
4396 class RGWSyncGetBucketInfoCR
: public RGWCoroutine
{
4397 RGWDataSyncEnv
*sync_env
;
4399 RGWBucketInfo
*pbucket_info
;
4400 map
<string
, bufferlist
> *pattrs
;
4401 RGWMetaSyncEnv meta_sync_env
;
4403 RGWSyncTraceNodeRef tn
;
4406 RGWSyncGetBucketInfoCR(RGWDataSyncEnv
*_sync_env
,
4407 const rgw_bucket
& _bucket
,
4408 RGWBucketInfo
*_pbucket_info
,
4409 map
<string
, bufferlist
> *_pattrs
,
4410 const RGWSyncTraceNodeRef
& _tn_parent
)
4411 : RGWCoroutine(_sync_env
->cct
),
4412 sync_env(_sync_env
),
4414 pbucket_info(_pbucket_info
),
4416 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "get_bucket_info",
4420 int operate(const DoutPrefixProvider
*dpp
) override
;
4423 int RGWSyncGetBucketInfoCR::operate(const DoutPrefixProvider
*dpp
)
4426 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bucket
, pbucket_info
, pattrs
, dpp
));
4427 if (retcode
== -ENOENT
) {
4428 /* bucket instance info has not been synced in yet, fetch it now */
4430 tn
->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
4431 string raw_key
= string("bucket.instance:") + bucket
.get_key();
4433 meta_sync_env
.init(dpp
, cct
, sync_env
->store
, sync_env
->svc
->zone
->get_master_conn(), sync_env
->async_rados
,
4434 sync_env
->http_manager
, sync_env
->error_logger
, sync_env
->sync_tracer
);
4436 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env
, raw_key
,
4437 string() /* no marker */,
4438 MDLOG_STATUS_COMPLETE
,
4439 NULL
/* no marker tracker */,
4443 tn
->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str
{bucket
}));
4444 return set_cr_error(retcode
);
4447 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bucket
, pbucket_info
, pattrs
, dpp
));
4450 tn
->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{bucket
}));
4451 return set_cr_error(retcode
);
4454 return set_cr_done();
4460 void RGWGetBucketPeersCR::update_from_target_bucket_policy()
4462 if (!target_policy
||
4463 !target_policy
->policy_handler
||
4468 auto handler
= target_policy
->policy_handler
.get();
4470 filter_sources(source_zone
,
4472 handler
->get_sources(),
4475 for (siter
= pipes
->begin(); siter
!= pipes
->end(); ++siter
) {
4476 if (!siter
->source
.has_bucket_info()) {
4477 buckets_info
.emplace(siter
->source
.get_bucket(), all_bucket_info());
4479 if (!siter
->target
.has_bucket_info()) {
4480 buckets_info
.emplace(siter
->target
.get_bucket(), all_bucket_info());
4485 void RGWGetBucketPeersCR::update_from_source_bucket_policy()
4487 if (!source_policy
||
4488 !source_policy
->policy_handler
||
4493 auto handler
= source_policy
->policy_handler
.get();
4495 filter_targets(sync_env
->svc
->zone
->get_zone().id
,
4497 handler
->get_targets(),
4500 for (siter
= pipes
->begin(); siter
!= pipes
->end(); ++siter
) {
4501 if (!siter
->source
.has_bucket_info()) {
4502 buckets_info
.emplace(siter
->source
.get_bucket(), all_bucket_info());
4504 if (!siter
->target
.has_bucket_info()) {
4505 buckets_info
.emplace(siter
->target
.get_bucket(), all_bucket_info());
4511 class RGWSyncGetBucketSyncPolicyHandlerCR
: public RGWCoroutine
{
4512 RGWDataSyncEnv
*sync_env
;
4514 rgw_bucket_get_sync_policy_params get_policy_params
;
4516 std::shared_ptr
<rgw_bucket_get_sync_policy_result
> policy
;
4518 RGWSyncTraceNodeRef tn
;
4523 RGWSyncGetBucketSyncPolicyHandlerCR(RGWDataSyncEnv
*_sync_env
,
4524 std::optional
<rgw_zone_id
> zone
,
4525 const rgw_bucket
& _bucket
,
4526 std::shared_ptr
<rgw_bucket_get_sync_policy_result
>& _policy
,
4527 const RGWSyncTraceNodeRef
& _tn_parent
)
4528 : RGWCoroutine(_sync_env
->cct
),
4529 sync_env(_sync_env
),
4532 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "get_sync_policy_handler",
4534 get_policy_params
.zone
= zone
;
4535 get_policy_params
.bucket
= bucket
;
4538 int operate(const DoutPrefixProvider
*dpp
) override
{
4540 for (i
= 0; i
< 2; ++i
) {
4541 yield
call(new RGWBucketGetSyncPolicyHandlerCR(sync_env
->async_rados
,
4547 retcode
!= -ENOENT
) {
4548 return set_cr_error(retcode
);
4552 return set_cr_done();
4555 /* bucket instance was not found,
4556 * try to get bucket instance info, can trigger
4557 * metadata sync of bucket instance
4559 yield
call(new RGWSyncGetBucketInfoCR(sync_env
,
4565 return set_cr_error(retcode
);
4575 int RGWGetBucketPeersCR::operate(const DoutPrefixProvider
*dpp
)
4581 if (target_bucket
) {
4582 target_policy
= make_shared
<rgw_bucket_get_sync_policy_result
>();
4583 yield
call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env
,
4589 retcode
!= -ENOENT
) {
4590 return set_cr_error(retcode
);
4593 update_from_target_bucket_policy();
4596 if (source_bucket
&& source_zone
) {
4597 source_policy
= make_shared
<rgw_bucket_get_sync_policy_result
>();
4598 yield
call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env
,
4604 retcode
!= -ENOENT
) {
4605 return set_cr_error(retcode
);
4608 if (source_policy
->policy_handler
) {
4609 auto& opt_bucket_info
= source_policy
->policy_handler
->get_bucket_info();
4610 auto& opt_attrs
= source_policy
->policy_handler
->get_bucket_attrs();
4611 if (opt_bucket_info
&& opt_attrs
) {
4612 source_bucket_info
.emplace();
4613 source_bucket_info
->bucket_info
= *opt_bucket_info
;
4614 source_bucket_info
->attrs
= *opt_attrs
;
4618 if (!target_bucket
) {
4619 get_hint_targets_action
= make_shared
<GetHintTargets
>(sync_env
, *source_bucket
);
4621 yield
call(new RGWGenericAsyncCR(cct
, sync_env
->async_rados
,
4622 get_hint_targets_action
));
4624 return set_cr_error(retcode
);
4627 /* hints might have incomplete bucket ids,
4628 * in which case we need to figure out the current
4631 for (hiter
= get_hint_targets_action
->targets
.begin();
4632 hiter
!= get_hint_targets_action
->targets
.end();
4634 ldpp_dout(dpp
, 20) << "Got sync hint for bucket=" << *source_bucket
<< ": " << hiter
->get_key() << dendl
;
4636 target_policy
= make_shared
<rgw_bucket_get_sync_policy_result
>();
4637 yield
call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env
,
4643 retcode
!= -ENOENT
) {
4644 return set_cr_error(retcode
);
4646 update_from_target_bucket_policy();
4651 update_from_source_bucket_policy();
4653 for (siiter
= buckets_info
.begin(); siiter
!= buckets_info
.end(); ++siiter
) {
4654 if (siiter
->second
.bucket_info
.bucket
.name
.empty()) {
4655 yield
call(new RGWSyncGetBucketInfoCR(sync_env
, siiter
->first
,
4656 &siiter
->second
.bucket_info
,
4657 &siiter
->second
.attrs
,
4663 pipes
->update_empty_bucket_info(buckets_info
);
4666 return set_cr_done();
4672 int RGWRunBucketsSyncBySourceCR::operate(const DoutPrefixProvider
*dpp
)
4675 return set_cr_done();
4681 int RGWRunBucketSyncCoroutine::operate(const DoutPrefixProvider
*dpp
)
4684 yield
call(new RGWReadBucketPipeSyncStatusCoroutine(sc
, sync_pair
, &sync_status
, &objv_tracker
));
4685 if (retcode
< 0 && retcode
!= -ENOENT
) {
4686 tn
->log(0, "ERROR: failed to read sync status for bucket");
4688 return set_cr_error(retcode
);
4691 tn
->log(20, SSTR("sync status for source bucket: " << sync_status
.state
));
4693 yield
call(new RGWSyncGetBucketInfoCR(sync_env
, sync_pair
.source_bs
.bucket
, &sync_pipe
.source_bucket_info
,
4694 &sync_pipe
.source_bucket_attrs
, tn
));
4696 tn
->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{sync_pair
.source_bs
.bucket
}));
4698 return set_cr_error(retcode
);
4701 yield
call(new RGWSyncGetBucketInfoCR(sync_env
, sync_pair
.dest_bs
.bucket
, &sync_pipe
.dest_bucket_info
,
4702 &sync_pipe
.dest_bucket_attrs
, tn
));
4704 tn
->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{sync_pair
.source_bs
.bucket
}));
4706 return set_cr_error(retcode
);
4709 sync_pipe
.info
= sync_pair
;
4712 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateInit
||
4713 sync_status
.state
== rgw_bucket_shard_sync_info::StateStopped
) {
4714 yield
call(new RGWInitBucketShardSyncStatusCoroutine(sc
, sync_pair
, sync_status
, objv_tracker
));
4715 if (retcode
== -ENOENT
) {
4716 tn
->log(0, "bucket sync disabled");
4718 return set_cr_done();
4721 tn
->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode
));
4723 return set_cr_error(retcode
);
4727 *progress
= sync_status
.inc_marker
.timestamp
;
4730 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateFullSync
) {
4731 yield
call(new RGWBucketShardFullSyncCR(sc
, sync_pipe
,
4732 status_oid
, lease_cr
,
4733 sync_status
, tn
, objv_tracker
));
4735 tn
->log(5, SSTR("full sync on bucket failed, retcode=" << retcode
));
4737 return set_cr_error(retcode
);
4741 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateIncrementalSync
) {
4742 yield
call(new RGWBucketShardIncrementalSyncCR(sc
, sync_pipe
,
4743 status_oid
, lease_cr
,
4745 objv_tracker
, progress
));
4747 tn
->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode
));
4749 return set_cr_error(retcode
);
4752 // loop back to previous states unless incremental sync returns normally
4753 } while (sync_status
.state
!= rgw_bucket_shard_sync_info::StateIncrementalSync
);
4756 return set_cr_done();
4762 RGWCoroutine
*RGWRemoteBucketManager::run_sync_cr(int num
)
4764 if ((size_t)num
>= sync_pairs
.size()) {
4768 return new RGWRunBucketSyncCoroutine(&sc
, nullptr, sync_pairs
[num
], sync_env
->sync_tracer
->root_node
, nullptr);
4771 int RGWBucketPipeSyncStatusManager::init(const DoutPrefixProvider
*dpp
)
4773 int ret
= http_manager
.start();
4775 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
4779 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
4781 sync_module
.reset(new RGWDefaultSyncModuleInstance());
4782 auto async_rados
= store
->svc()->rados
->get_async_processor();
4784 sync_env
.init(this, store
->ctx(), store
,
4785 store
->svc(), async_rados
, &http_manager
,
4786 error_logger
, store
->getRados()->get_sync_tracer(),
4787 sync_module
, nullptr);
4789 rgw_sync_pipe_info_set pipes
;
4791 ret
= cr_mgr
.run(dpp
, new RGWGetBucketPeersCR(&sync_env
,
4796 sync_env
.sync_tracer
->root_node
));
4798 ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret
<< "): " << cpp_strerror(-ret
) << dendl
;
4802 rgw_zone_id last_zone
;
4804 for (auto& pipe
: pipes
) {
4805 auto& szone
= pipe
.source
.zone
;
4807 if (last_zone
!= szone
) {
4808 conn
= store
->svc()->zone
->get_zone_conn(szone
);
4810 ldpp_dout(this, 0) << "connection object to zone " << szone
<< " does not exist" << dendl
;
4816 source_mgrs
.push_back(new RGWRemoteBucketManager(this, &sync_env
,
4818 pipe
.source
.get_bucket_info(),
4819 pipe
.target
.get_bucket()));
4825 int RGWBucketPipeSyncStatusManager::init_sync_status(const DoutPrefixProvider
*dpp
)
4827 list
<RGWCoroutinesStack
*> stacks
;
4828 // pass an empty objv tracker to each so that the version gets incremented
4829 std::list
<RGWObjVersionTracker
> objvs
;
4831 for (auto& mgr
: source_mgrs
) {
4832 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
4834 for (int i
= 0; i
< mgr
->num_pipes(); ++i
) {
4835 objvs
.emplace_back();
4836 stack
->call(mgr
->init_sync_status_cr(i
, objvs
.back()));
4839 stacks
.push_back(stack
);
4842 return cr_mgr
.run(dpp
, stacks
);
4845 int RGWBucketPipeSyncStatusManager::read_sync_status(const DoutPrefixProvider
*dpp
)
4847 list
<RGWCoroutinesStack
*> stacks
;
4849 for (auto& mgr
: source_mgrs
) {
4850 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
4851 for (int i
= 0; i
< mgr
->num_pipes(); ++i
) {
4852 stack
->call(mgr
->read_sync_status_cr(i
, &sync_status
[i
]));
4855 stacks
.push_back(stack
);
4858 int ret
= cr_mgr
.run(dpp
, stacks
);
4860 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
4861 << bucket_str
{dest_bucket
} << dendl
;
4868 int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider
*dpp
)
4870 list
<RGWCoroutinesStack
*> stacks
;
4872 for (auto& mgr
: source_mgrs
) {
4873 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
4874 for (int i
= 0; i
< mgr
->num_pipes(); ++i
) {
4875 stack
->call(mgr
->run_sync_cr(i
));
4878 stacks
.push_back(stack
);
4881 int ret
= cr_mgr
.run(dpp
, stacks
);
4883 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
4884 << bucket_str
{dest_bucket
} << dendl
;
4891 unsigned RGWBucketPipeSyncStatusManager::get_subsys() const
4896 std::ostream
& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream
& out
) const
4898 auto zone
= std::string_view
{source_zone
.value_or(rgw_zone_id("*")).id
};
4899 return out
<< "bucket sync zone:" << zone
.substr(0, 8)
4900 << " bucket:" << dest_bucket
<< ' ';
4903 string
RGWBucketPipeSyncStatusManager::status_oid(const rgw_zone_id
& source_zone
,
4904 const rgw_bucket_sync_pair_info
& sync_pair
)
4906 if (sync_pair
.source_bs
== sync_pair
.dest_bs
) {
4907 return bucket_status_oid_prefix
+ "." + source_zone
.id
+ ":" + sync_pair
.dest_bs
.get_key();
4909 return bucket_status_oid_prefix
+ "." + source_zone
.id
+ ":" + sync_pair
.dest_bs
.get_key() + ":" + sync_pair
.source_bs
.get_key();
4913 string
RGWBucketPipeSyncStatusManager::obj_status_oid(const rgw_bucket_sync_pipe
& sync_pipe
,
4914 const rgw_zone_id
& source_zone
,
4915 const rgw::sal::Object
* obj
)
4917 string prefix
= object_status_oid_prefix
+ "." + source_zone
.id
+ ":" + obj
->get_bucket()->get_key().get_key();
4918 if (sync_pipe
.source_bucket_info
.bucket
!=
4919 sync_pipe
.dest_bucket_info
.bucket
) {
4920 prefix
+= string("/") + sync_pipe
.dest_bucket_info
.bucket
.get_key();
4922 return prefix
+ ":" + obj
->get_name() + ":" + obj
->get_instance();
4925 int rgw_read_remote_bilog_info(const DoutPrefixProvider
*dpp
,
4927 const rgw_bucket
& bucket
,
4928 BucketIndexShardsManager
& markers
,
4931 const auto instance_key
= bucket
.get_key();
4932 const rgw_http_param_pair params
[] = {
4933 { "type" , "bucket-index" },
4934 { "bucket-instance", instance_key
.c_str() },
4935 { "info" , nullptr },
4936 { nullptr, nullptr }
4938 rgw_bucket_index_marker_info result
;
4939 int r
= conn
->get_json_resource(dpp
, "/admin/log/", params
, y
, result
);
4941 ldpp_dout(dpp
, -1) << "failed to fetch remote log markers: " << cpp_strerror(r
) << dendl
;
4944 r
= markers
.from_string(result
.max_marker
, -1);
4946 lderr(conn
->get_ctx()) << "failed to decode remote log markers" << dendl
;
4952 class RGWCollectBucketSyncStatusCR
: public RGWShardCollectCR
{
4953 static constexpr int max_concurrent_shards
= 16;
4954 rgw::sal::RadosStore
* const store
;
4955 RGWDataSyncCtx
*const sc
;
4956 RGWDataSyncEnv
*const env
;
4957 RGWBucketInfo source_bucket_info
;
4958 RGWBucketInfo dest_bucket_info
;
4959 rgw_bucket_shard source_bs
;
4960 rgw_bucket_shard dest_bs
;
4962 rgw_bucket_sync_pair_info sync_pair
;
4964 bool shard_to_shard_sync
;
4966 using Vector
= std::vector
<rgw_bucket_shard_sync_info
>;
4967 Vector::iterator i
, end
;
4970 RGWCollectBucketSyncStatusCR(rgw::sal::RadosStore
* store
, RGWDataSyncCtx
*sc
,
4971 const RGWBucketInfo
& source_bucket_info
,
4972 const RGWBucketInfo
& dest_bucket_info
,
4974 : RGWShardCollectCR(sc
->cct
, max_concurrent_shards
),
4975 store(store
), sc(sc
), env(sc
->env
),
4976 source_bucket_info(source_bucket_info
),
4977 dest_bucket_info(dest_bucket_info
),
4978 i(status
->begin()), end(status
->end())
4980 shard_to_shard_sync
= (source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
== dest_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
);
4982 source_bs
= rgw_bucket_shard(source_bucket_info
.bucket
, source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
> 0 ? 0 : -1);
4983 dest_bs
= rgw_bucket_shard(dest_bucket_info
.bucket
, dest_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
> 0 ? 0 : -1);
4986 status
->resize(std::max
<size_t>(1, source_bucket_info
.layout
.current_index
.layout
.normal
.num_shards
));
4988 i
= status
->begin();
4989 end
= status
->end();
4992 bool spawn_next() override
{
4996 sync_pair
.source_bs
= source_bs
;
4997 sync_pair
.dest_bs
= dest_bs
;
4998 spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc
, sync_pair
, &*i
, nullptr), false);
5000 ++source_bs
.shard_id
;
5001 if (shard_to_shard_sync
) {
5002 dest_bs
.shard_id
= source_bs
.shard_id
;
5008 int rgw_bucket_sync_status(const DoutPrefixProvider
*dpp
,
5009 rgw::sal::RadosStore
* store
,
5010 const rgw_sync_bucket_pipe
& pipe
,
5011 const RGWBucketInfo
& dest_bucket_info
,
5012 const RGWBucketInfo
*psource_bucket_info
,
5013 std::vector
<rgw_bucket_shard_sync_info
> *status
)
5015 if (!pipe
.source
.zone
||
5016 !pipe
.source
.bucket
||
5018 !pipe
.dest
.bucket
) {
5022 if (*pipe
.dest
.bucket
!=
5023 dest_bucket_info
.bucket
) {
5027 const rgw_bucket
& source_bucket
= *pipe
.source
.bucket
;
5029 RGWBucketInfo source_bucket_info
;
5031 if (!psource_bucket_info
) {
5032 auto& bucket_ctl
= store
->getRados()->ctl
.bucket
;
5034 int ret
= bucket_ctl
->read_bucket_info(source_bucket
, &source_bucket_info
, null_yield
, dpp
);
5036 ldpp_dout(dpp
, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket
<< ": " << cpp_strerror(-ret
) << dendl
;
5040 psource_bucket_info
= &source_bucket_info
;
5045 RGWSyncModuleInstanceRef module
; // null sync module
5046 env
.init(dpp
, store
->ctx(), store
, store
->svc(), store
->svc()->rados
->get_async_processor(),
5047 nullptr, nullptr, nullptr, module
, nullptr);
5050 sc
.init(&env
, nullptr, *pipe
.source
.zone
);
5052 RGWCoroutinesManager
crs(store
->ctx(), store
->getRados()->get_cr_registry());
5053 return crs
.run(dpp
, new RGWCollectBucketSyncStatusCR(store
, &sc
,
5054 *psource_bucket_info
,
5059 void rgw_data_sync_info::generate_test_instances(list
<rgw_data_sync_info
*>& o
)
5061 auto info
= new rgw_data_sync_info
;
5062 info
->state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
5063 info
->num_shards
= 8;
5065 o
.push_back(new rgw_data_sync_info
);
5068 void rgw_data_sync_marker::generate_test_instances(list
<rgw_data_sync_marker
*>& o
)
5070 auto marker
= new rgw_data_sync_marker
;
5071 marker
->state
= rgw_data_sync_marker::IncrementalSync
;
5072 marker
->marker
= "01234";
5074 o
.push_back(marker
);
5075 o
.push_back(new rgw_data_sync_marker
);
5078 void rgw_data_sync_status::generate_test_instances(list
<rgw_data_sync_status
*>& o
)
5080 o
.push_back(new rgw_data_sync_status
);
5083 void rgw_bucket_shard_full_sync_marker::decode_json(JSONObj
*obj
)
5085 JSONDecoder::decode_json("position", position
, obj
);
5086 JSONDecoder::decode_json("count", count
, obj
);
5089 void rgw_bucket_shard_full_sync_marker::dump(Formatter
*f
) const
5091 encode_json("position", position
, f
);
5092 encode_json("count", count
, f
);
5095 void rgw_bucket_shard_inc_sync_marker::decode_json(JSONObj
*obj
)
5097 JSONDecoder::decode_json("position", position
, obj
);
5098 JSONDecoder::decode_json("timestamp", timestamp
, obj
);
5101 void rgw_bucket_shard_inc_sync_marker::dump(Formatter
*f
) const
5103 encode_json("position", position
, f
);
5104 encode_json("timestamp", timestamp
, f
);
5107 void rgw_bucket_shard_sync_info::decode_json(JSONObj
*obj
)
5110 JSONDecoder::decode_json("status", s
, obj
);
5111 if (s
== "full-sync") {
5112 state
= StateFullSync
;
5113 } else if (s
== "incremental-sync") {
5114 state
= StateIncrementalSync
;
5115 } else if (s
== "stopped") {
5116 state
= StateStopped
;
5120 JSONDecoder::decode_json("full_marker", full_marker
, obj
);
5121 JSONDecoder::decode_json("inc_marker", inc_marker
, obj
);
5124 void rgw_bucket_shard_sync_info::dump(Formatter
*f
) const
5126 const char *s
{nullptr};
5127 switch ((SyncState
)state
) {
5134 case StateIncrementalSync
:
5135 s
= "incremental-sync";
5144 encode_json("status", s
, f
);
5145 encode_json("full_marker", full_marker
, f
);
5146 encode_json("inc_marker", inc_marker
, f
);