1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/ceph_json.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
10 #include "rgw_common.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_cr_tools.h"
18 #include "rgw_http_client.h"
19 #include "rgw_bucket.h"
20 #include "rgw_bucket_sync.h"
21 #include "rgw_bucket_sync_cache.h"
22 #include "rgw_datalog.h"
23 #include "rgw_metadata.h"
24 #include "rgw_sync_counters.h"
25 #include "rgw_sync_error_repo.h"
26 #include "rgw_sync_module.h"
29 #include "cls/lock/cls_lock_client.h"
30 #include "cls/rgw/cls_rgw_client.h"
32 #include "services/svc_zone.h"
33 #include "services/svc_sync_modules.h"
35 #include "include/common_fwd.h"
36 #include "include/random.h"
38 #include <boost/asio/yield.hpp>
39 #include <string_view>
41 #define dout_subsys ceph_subsys_rgw
44 #define dout_prefix (*_dout << "data sync: ")
48 static const string datalog_sync_status_oid_prefix
= "datalog.sync-status";
49 static const string datalog_sync_status_shard_prefix
= "datalog.sync-status.shard";
50 static const string datalog_sync_full_sync_index_prefix
= "data.full-sync.index";
51 static const string bucket_full_status_oid_prefix
= "bucket.full-sync-status";
52 static const string bucket_status_oid_prefix
= "bucket.sync-status";
53 static const string object_status_oid_prefix
= "bucket.sync-status";
55 void rgw_datalog_info::decode_json(JSONObj
*obj
) {
56 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
59 void rgw_datalog_entry::decode_json(JSONObj
*obj
) {
60 JSONDecoder::decode_json("key", key
, obj
);
62 JSONDecoder::decode_json("timestamp", ut
, obj
);
63 timestamp
= ut
.to_real_time();
66 void rgw_datalog_shard_data::decode_json(JSONObj
*obj
) {
67 JSONDecoder::decode_json("marker", marker
, obj
);
68 JSONDecoder::decode_json("truncated", truncated
, obj
);
69 JSONDecoder::decode_json("entries", entries
, obj
);
72 // print a bucket shard with [gen]
73 std::string
to_string(const rgw_bucket_shard
& bs
, std::optional
<uint64_t> gen
)
75 constexpr auto digits10
= std::numeric_limits
<uint64_t>::digits10
;
76 constexpr auto reserve
= 2 + digits10
; // [value]
77 auto str
= bs
.get_key('/', ':', ':', reserve
);
79 str
.append(std::to_string(gen
.value_or(0)));
84 class RGWReadDataSyncStatusMarkersCR
: public RGWShardCollectCR
{
85 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
92 map
<uint32_t, rgw_data_sync_marker
>& markers
;
93 std::vector
<RGWObjVersionTracker
>& objvs
;
95 int handle_result(int r
) override
{
96 if (r
== -ENOENT
) { // ENOENT is not a fatal error
100 ldout(cct
, 4) << "failed to read data sync status: "
101 << cpp_strerror(r
) << dendl
;
106 RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx
*sc
, int num_shards
,
107 map
<uint32_t, rgw_data_sync_marker
>& markers
,
108 std::vector
<RGWObjVersionTracker
>& objvs
)
109 : RGWShardCollectCR(sc
->cct
, MAX_CONCURRENT_SHARDS
),
110 sc(sc
), env(sc
->env
), num_shards(num_shards
), markers(markers
), objvs(objvs
)
112 bool spawn_next() override
;
115 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
117 if (shard_id
>= num_shards
) {
120 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
121 spawn(new CR(env
->dpp
, env
->driver
,
122 rgw_raw_obj(env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
)),
123 &markers
[shard_id
], true, &objvs
[shard_id
]),
129 class RGWReadDataSyncRecoveringShardsCR
: public RGWShardCollectCR
{
130 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
135 uint64_t max_entries
;
140 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
>& omapkeys
;
142 int handle_result(int r
) override
{
143 if (r
== -ENOENT
) { // ENOENT is not a fatal error
147 ldout(cct
, 4) << "failed to list recovering data sync: "
148 << cpp_strerror(r
) << dendl
;
153 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncCtx
*sc
, uint64_t _max_entries
, int _num_shards
,
154 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
>& omapkeys
)
155 : RGWShardCollectCR(sc
->cct
, MAX_CONCURRENT_SHARDS
), sc(sc
), env(sc
->env
),
156 max_entries(_max_entries
), num_shards(_num_shards
), omapkeys(omapkeys
)
158 bool spawn_next() override
;
161 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
163 if (shard_id
>= num_shards
)
166 string error_oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
) + ".retry";
167 auto& shard_keys
= omapkeys
[shard_id
];
168 shard_keys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
169 spawn(new RGWRadosGetOmapKeysCR(env
->driver
, rgw_raw_obj(env
->svc
->zone
->get_zone_params().log_pool
, error_oid
),
170 marker
, max_entries
, shard_keys
), false);
176 class RGWReadDataSyncStatusCoroutine
: public RGWCoroutine
{
178 RGWDataSyncEnv
*sync_env
;
179 rgw_data_sync_status
*sync_status
;
180 RGWObjVersionTracker
* objv_tracker
;
181 std::vector
<RGWObjVersionTracker
>& objvs
;
184 RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx
*_sc
,
185 rgw_data_sync_status
*_status
,
186 RGWObjVersionTracker
* objv_tracker
,
187 std::vector
<RGWObjVersionTracker
>& objvs
)
188 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(sc
->env
), sync_status(_status
),
189 objv_tracker(objv_tracker
), objvs(objvs
)
191 int operate(const DoutPrefixProvider
*dpp
) override
;
194 int RGWReadDataSyncStatusCoroutine::operate(const DoutPrefixProvider
*dpp
)
198 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_data_sync_info
>;
200 bool empty_on_enoent
= false; // fail on ENOENT
201 call(new ReadInfoCR(dpp
, sync_env
->driver
,
202 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sc
->source_zone
)),
203 &sync_status
->sync_info
, empty_on_enoent
, objv_tracker
));
206 ldpp_dout(dpp
, 4) << "failed to read sync status info with "
207 << cpp_strerror(retcode
) << dendl
;
208 return set_cr_error(retcode
);
210 // read shard markers
211 objvs
.resize(sync_status
->sync_info
.num_shards
);
212 using ReadMarkersCR
= RGWReadDataSyncStatusMarkersCR
;
213 yield
call(new ReadMarkersCR(sc
, sync_status
->sync_info
.num_shards
,
214 sync_status
->sync_markers
, objvs
));
216 ldpp_dout(dpp
, 4) << "failed to read sync status markers with "
217 << cpp_strerror(retcode
) << dendl
;
218 return set_cr_error(retcode
);
220 return set_cr_done();
225 class RGWReadRemoteDataLogShardInfoCR
: public RGWCoroutine
{
227 RGWDataSyncEnv
*sync_env
;
229 RGWRESTReadResource
*http_op
;
232 RGWDataChangesLogInfo
*shard_info
;
235 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx
*_sc
,
236 int _shard_id
, RGWDataChangesLogInfo
*_shard_info
) : RGWCoroutine(_sc
->cct
),
241 shard_info(_shard_info
) {
244 ~RGWReadRemoteDataLogShardInfoCR() override
{
250 int operate(const DoutPrefixProvider
*dpp
) override
{
254 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
255 rgw_http_param_pair pairs
[] = { { "type" , "data" },
260 string p
= "/admin/log/";
262 http_op
= new RGWRESTReadResource(sc
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
264 init_new_io(http_op
);
266 int ret
= http_op
->aio_read(dpp
);
268 ldpp_dout(dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
269 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
270 return set_cr_error(ret
);
276 int ret
= http_op
->wait(shard_info
, null_yield
);
278 return set_cr_error(ret
);
280 return set_cr_done();
287 struct read_remote_data_log_response
{
290 vector
<rgw_data_change_log_entry
> entries
;
292 read_remote_data_log_response() : truncated(false) {}
294 void decode_json(JSONObj
*obj
) {
295 JSONDecoder::decode_json("marker", marker
, obj
);
296 JSONDecoder::decode_json("truncated", truncated
, obj
);
297 JSONDecoder::decode_json("entries", entries
, obj
);
301 class RGWReadRemoteDataLogShardCR
: public RGWCoroutine
{
303 RGWDataSyncEnv
*sync_env
;
305 RGWRESTReadResource
*http_op
= nullptr;
308 const std::string
& marker
;
309 string
*pnext_marker
;
310 vector
<rgw_data_change_log_entry
> *entries
;
313 read_remote_data_log_response response
;
314 std::optional
<TOPNSPC::common::PerfGuard
> timer
;
317 RGWReadRemoteDataLogShardCR(RGWDataSyncCtx
*_sc
, int _shard_id
,
318 const std::string
& marker
, string
*pnext_marker
,
319 vector
<rgw_data_change_log_entry
> *_entries
,
321 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
322 shard_id(_shard_id
), marker(marker
), pnext_marker(pnext_marker
),
323 entries(_entries
), truncated(_truncated
) {
325 ~RGWReadRemoteDataLogShardCR() override
{
331 int operate(const DoutPrefixProvider
*dpp
) override
{
335 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
336 rgw_http_param_pair pairs
[] = { { "type" , "data" },
338 { "marker", marker
.c_str() },
339 { "extra-info", "true" },
342 string p
= "/admin/log/";
344 http_op
= new RGWRESTReadResource(sc
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
346 init_new_io(http_op
);
348 if (sync_env
->counters
) {
349 timer
.emplace(sync_env
->counters
, sync_counters::l_poll
);
351 int ret
= http_op
->aio_read(dpp
);
353 ldpp_dout(dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
354 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
355 if (sync_env
->counters
) {
356 sync_env
->counters
->inc(sync_counters::l_poll_err
);
358 return set_cr_error(ret
);
365 int ret
= http_op
->wait(&response
, null_yield
);
367 if (sync_env
->counters
&& ret
!= -ENOENT
) {
368 sync_env
->counters
->inc(sync_counters::l_poll_err
);
370 return set_cr_error(ret
);
373 entries
->swap(response
.entries
);
374 *pnext_marker
= response
.marker
;
375 *truncated
= response
.truncated
;
376 return set_cr_done();
383 class RGWReadRemoteDataLogInfoCR
: public RGWShardCollectCR
{
385 RGWDataSyncEnv
*sync_env
;
388 map
<int, RGWDataChangesLogInfo
> *datalog_info
;
391 #define READ_DATALOG_MAX_CONCURRENT 10
393 int handle_result(int r
) override
{
394 if (r
== -ENOENT
) { // ENOENT is not a fatal error
398 ldout(cct
, 4) << "failed to fetch remote datalog info: "
399 << cpp_strerror(r
) << dendl
;
404 RGWReadRemoteDataLogInfoCR(RGWDataSyncCtx
*_sc
,
406 map
<int, RGWDataChangesLogInfo
> *_datalog_info
) : RGWShardCollectCR(_sc
->cct
, READ_DATALOG_MAX_CONCURRENT
),
407 sc(_sc
), sync_env(_sc
->env
), num_shards(_num_shards
),
408 datalog_info(_datalog_info
), shard_id(0) {}
409 bool spawn_next() override
;
412 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
413 if (shard_id
>= num_shards
) {
416 spawn(new RGWReadRemoteDataLogShardInfoCR(sc
, shard_id
, &(*datalog_info
)[shard_id
]), false);
421 class RGWListRemoteDataLogShardCR
: public RGWSimpleCoroutine
{
423 RGWDataSyncEnv
*sync_env
;
424 RGWRESTReadResource
*http_op
;
428 uint32_t max_entries
;
429 rgw_datalog_shard_data
*result
;
432 RGWListRemoteDataLogShardCR(RGWDataSyncCtx
*sc
, int _shard_id
,
433 const string
& _marker
, uint32_t _max_entries
,
434 rgw_datalog_shard_data
*_result
)
435 : RGWSimpleCoroutine(sc
->cct
), sc(sc
), sync_env(sc
->env
), http_op(NULL
),
436 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
438 int send_request(const DoutPrefixProvider
*dpp
) override
{
439 RGWRESTConn
*conn
= sc
->conn
;
442 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
444 char max_entries_buf
[32];
445 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
447 const char *marker_key
= (marker
.empty() ? "" : "marker");
449 rgw_http_param_pair pairs
[] = { { "type", "data" },
451 { "max-entries", max_entries_buf
},
452 { marker_key
, marker
.c_str() },
455 string p
= "/admin/log/";
457 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
458 init_new_io(http_op
);
460 int ret
= http_op
->aio_read(dpp
);
462 ldpp_dout(dpp
, 0) << "ERROR: failed to read from " << p
<< dendl
;
463 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
471 int request_complete() override
{
472 int ret
= http_op
->wait(result
, null_yield
);
474 if (ret
< 0 && ret
!= -ENOENT
) {
475 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret
<< dendl
;
482 class RGWListRemoteDataLogCR
: public RGWShardCollectCR
{
484 RGWDataSyncEnv
*sync_env
;
486 map
<int, string
> shards
;
487 int max_entries_per_shard
;
488 map
<int, rgw_datalog_shard_data
> *result
;
490 map
<int, string
>::iterator iter
;
491 #define READ_DATALOG_MAX_CONCURRENT 10
493 int handle_result(int r
) override
{
494 if (r
== -ENOENT
) { // ENOENT is not a fatal error
498 ldout(cct
, 4) << "failed to list remote datalog: "
499 << cpp_strerror(r
) << dendl
;
504 RGWListRemoteDataLogCR(RGWDataSyncCtx
*_sc
,
505 map
<int, string
>& _shards
,
506 int _max_entries_per_shard
,
507 map
<int, rgw_datalog_shard_data
> *_result
) : RGWShardCollectCR(_sc
->cct
, READ_DATALOG_MAX_CONCURRENT
),
508 sc(_sc
), sync_env(_sc
->env
), max_entries_per_shard(_max_entries_per_shard
),
510 shards
.swap(_shards
);
511 iter
= shards
.begin();
513 bool spawn_next() override
;
516 bool RGWListRemoteDataLogCR::spawn_next() {
517 if (iter
== shards
.end()) {
521 spawn(new RGWListRemoteDataLogShardCR(sc
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
526 class RGWInitDataSyncStatusCoroutine
: public RGWCoroutine
{
527 static constexpr auto lock_name
{ "sync_lock"sv
};
528 RGWDataSyncCtx
* const sc
;
529 RGWDataSyncEnv
* const sync_env
{ sc
->env
};
530 const uint32_t num_shards
;
531 rgw_data_sync_status
* const status
;
532 RGWSyncTraceNodeRef tn
;
533 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
534 RGWObjVersionTracker
& objv_tracker
;
535 std::vector
<RGWObjVersionTracker
>& objvs
;
537 const rgw_pool
& pool
{ sync_env
->svc
->zone
->get_zone_params().log_pool
};
538 const string sync_status_oid
{
539 RGWDataSyncStatusManager::sync_status_oid(sc
->source_zone
) };
541 map
<int, RGWDataChangesLogInfo
> shards_info
;
545 RGWInitDataSyncStatusCoroutine(
546 RGWDataSyncCtx
* _sc
, uint32_t num_shards
, uint64_t instance_id
,
547 const RGWSyncTraceNodeRef
& tn_parent
, rgw_data_sync_status
* status
,
548 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
,
549 RGWObjVersionTracker
& objv_tracker
,
550 std::vector
<RGWObjVersionTracker
>& objvs
)
551 : RGWCoroutine(_sc
->cct
), sc(_sc
), num_shards(num_shards
), status(status
),
552 tn(sync_env
->sync_tracer
->add_node(tn_parent
, "init_data_sync_status")),
553 lease_cr(std::move(lease_cr
)), objv_tracker(objv_tracker
), objvs(objvs
) {
554 status
->sync_info
.instance_id
= instance_id
;
557 static auto continuous_lease_cr(RGWDataSyncCtx
* const sc
,
558 RGWCoroutine
* const caller
) {
559 auto lock_duration
= sc
->cct
->_conf
->rgw_sync_lease_period
;
560 return new RGWContinuousLeaseCR(
561 sc
->env
->async_rados
, sc
->env
->driver
,
562 { sc
->env
->svc
->zone
->get_zone_params().log_pool
,
563 RGWDataSyncStatusManager::sync_status_oid(sc
->source_zone
) },
564 string(lock_name
), lock_duration
, caller
, &sc
->lcc
);
567 int operate(const DoutPrefixProvider
*dpp
) override
{
570 if (!lease_cr
->is_locked()) {
572 return set_cr_error(-ECANCELED
);
575 using WriteInfoCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_info
>;
576 yield
call(new WriteInfoCR(dpp
, sync_env
->driver
,
577 rgw_raw_obj
{pool
, sync_status_oid
},
578 status
->sync_info
, &objv_tracker
));
580 tn
->log(0, SSTR("ERROR: failed to write sync status info with " << retcode
));
581 return set_cr_error(retcode
);
584 // In the original code we reacquired the lock. Since
585 // RGWSimpleRadosWriteCR doesn't appear to touch the attributes
586 // and cls_version works across it, this should be unnecessary.
587 // Putting a note here just in case. If we see ECANCELED where
588 // we expect EBUSY, we can revisit this.
590 /* fetch current position in logs */
592 RGWRESTConn
*conn
= sync_env
->svc
->zone
->get_zone_conn(sc
->source_zone
);
594 tn
->log(0, SSTR("ERROR: connection to zone " << sc
->source_zone
<< " does not exist!"));
595 return set_cr_error(-EIO
);
597 for (uint32_t i
= 0; i
< num_shards
; i
++) {
598 spawn(new RGWReadRemoteDataLogShardInfoCR(sc
, i
, &shards_info
[i
]), true);
601 while (collect(&ret
, NULL
)) {
603 tn
->log(0, SSTR("ERROR: failed to read remote data log shards"));
604 return set_state(RGWCoroutine_Error
);
609 objvs
.resize(num_shards
);
610 for (uint32_t i
= 0; i
< num_shards
; i
++) {
611 RGWDataChangesLogInfo
& info
= shards_info
[i
];
612 auto& marker
= status
->sync_markers
[i
];
613 marker
.next_step_marker
= info
.marker
;
614 marker
.timestamp
= info
.last_update
;
615 const auto& oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, i
);
616 auto& objv
= objvs
[i
];
617 objv
.generate_new_write_ver(cct
);
618 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>;
619 spawn(new WriteMarkerCR(dpp
, sync_env
->driver
,
620 rgw_raw_obj
{pool
, oid
}, marker
, &objv
), true);
623 while (collect(&ret
, NULL
)) {
625 tn
->log(0, SSTR("ERROR: failed to write data sync status markers"));
626 return set_state(RGWCoroutine_Error
);
631 status
->sync_info
.state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
632 yield
call(new WriteInfoCR(dpp
, sync_env
->driver
,
633 rgw_raw_obj
{pool
, sync_status_oid
},
634 status
->sync_info
, &objv_tracker
));
636 tn
->log(0, SSTR("ERROR: failed to write sync status info with " << retcode
));
637 return set_cr_error(retcode
);
639 return set_cr_done();
645 RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider
*dpp
,
646 rgw::sal::RadosStore
* driver
,
647 RGWAsyncRadosProcessor
*async_rados
)
648 : RGWCoroutinesManager(driver
->ctx(), driver
->getRados()->get_cr_registry()),
649 dpp(dpp
), driver(driver
),
650 cct(driver
->ctx()), cr_registry(driver
->getRados()->get_cr_registry()),
651 async_rados(async_rados
),
652 http_manager(driver
->ctx(), completion_mgr
),
658 int RGWRemoteDataLog::read_log_info(const DoutPrefixProvider
*dpp
, rgw_datalog_info
*log_info
)
660 rgw_http_param_pair pairs
[] = { { "type", "data" },
663 int ret
= sc
.conn
->get_json_resource(dpp
, "/admin/log", pairs
, null_yield
, *log_info
);
665 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch datalog info" << dendl
;
669 ldpp_dout(dpp
, 20) << "remote datalog, num_shards=" << log_info
->num_shards
<< dendl
;
674 int RGWRemoteDataLog::read_source_log_shards_info(const DoutPrefixProvider
*dpp
, map
<int, RGWDataChangesLogInfo
> *shards_info
)
676 rgw_datalog_info log_info
;
677 int ret
= read_log_info(dpp
, &log_info
);
682 return run(dpp
, new RGWReadRemoteDataLogInfoCR(&sc
, log_info
.num_shards
, shards_info
));
685 int RGWRemoteDataLog::read_source_log_shards_next(const DoutPrefixProvider
*dpp
, map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
)
687 return run(dpp
, new RGWListRemoteDataLogCR(&sc
, shard_markers
, 1, result
));
690 int RGWRemoteDataLog::init(const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
,
691 RGWSyncTraceManager
*_sync_tracer
, RGWSyncModuleInstanceRef
& _sync_module
,
692 PerfCounters
* counters
)
694 sync_env
.init(dpp
, cct
, driver
, driver
->svc(), async_rados
, &http_manager
, _error_logger
,
695 _sync_tracer
, _sync_module
, counters
);
696 sc
.init(&sync_env
, _conn
, _source_zone
);
702 int ret
= http_manager
.start();
704 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
708 tn
= sync_env
.sync_tracer
->add_node(sync_env
.sync_tracer
->root_node
, "data");
715 void RGWRemoteDataLog::finish()
720 int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider
*dpp
, rgw_data_sync_status
*sync_status
)
722 // cannot run concurrently with run_sync(), so run in a separate manager
723 RGWObjVersionTracker objv
;
724 std::vector
<RGWObjVersionTracker
> shard_objvs
;
725 RGWCoroutinesManager
crs(cct
, cr_registry
);
726 RGWHTTPManager
http_manager(cct
, crs
.get_completion_mgr());
727 int ret
= http_manager
.start();
729 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
732 RGWDataSyncEnv sync_env_local
= sync_env
;
733 sync_env_local
.http_manager
= &http_manager
;
735 RGWDataSyncCtx sc_local
= sc
;
736 sc_local
.env
= &sync_env_local
;
738 ret
= crs
.run(dpp
, new RGWReadDataSyncStatusCoroutine(&sc_local
, sync_status
,
739 &objv
, shard_objvs
));
744 int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider
*dpp
, const int num_shards
, set
<int>& recovering_shards
)
746 // cannot run concurrently with run_sync(), so run in a separate manager
747 RGWCoroutinesManager
crs(cct
, cr_registry
);
748 RGWHTTPManager
http_manager(cct
, crs
.get_completion_mgr());
749 int ret
= http_manager
.start();
751 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
754 RGWDataSyncEnv sync_env_local
= sync_env
;
755 sync_env_local
.http_manager
= &http_manager
;
757 RGWDataSyncCtx sc_local
= sc
;
758 sc_local
.env
= &sync_env_local
;
760 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
> omapkeys
;
761 omapkeys
.resize(num_shards
);
762 uint64_t max_entries
{1};
764 ret
= crs
.run(dpp
, new RGWReadDataSyncRecoveringShardsCR(&sc_local
, max_entries
, num_shards
, omapkeys
));
768 for (int i
= 0; i
< num_shards
; i
++) {
769 if (omapkeys
[i
]->entries
.size() != 0) {
770 recovering_shards
.insert(i
);
779 class DataSyncInitCR
: public RGWCoroutine
{
780 RGWDataSyncCtx
* const sc
;
781 const uint32_t num_shards
;
782 uint64_t instance_id
;
783 const RGWSyncTraceNodeRef
& tn
;
784 rgw_data_sync_status
* const sync_status
;
785 std::vector
<RGWObjVersionTracker
>& objvs
;
787 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
789 RGWObjVersionTracker objv_tracker
;
793 DataSyncInitCR(RGWDataSyncCtx
* sc
, uint32_t num_shards
, uint64_t instance_id
,
794 const RGWSyncTraceNodeRef
& tn
,
795 rgw_data_sync_status
* sync_status
,
796 std::vector
<RGWObjVersionTracker
>& objvs
)
797 : RGWCoroutine(sc
->cct
), sc(sc
), num_shards(num_shards
),
798 instance_id(instance_id
), tn(tn
),
799 sync_status(sync_status
), objvs(objvs
) {}
801 ~DataSyncInitCR() override
{
807 int operate(const DoutPrefixProvider
*dpp
) override
{
810 RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc
, this));
812 yield
spawn(lease_cr
.get(), false);
813 while (!lease_cr
->is_locked()) {
814 if (lease_cr
->is_done()) {
815 tn
->log(5, "ERROR: failed to take data sync status lease");
816 set_status("lease lock failed, early abort");
818 return set_cr_error(lease_cr
->get_ret_status());
820 tn
->log(5, "waiting on data sync status lease");
821 yield
set_sleeping(true);
823 tn
->log(5, "acquired data sync status lease");
824 objv_tracker
.generate_new_write_ver(sc
->cct
);
825 yield
call(new RGWInitDataSyncStatusCoroutine(sc
, num_shards
, instance_id
,
826 tn
, sync_status
, lease_cr
,
827 objv_tracker
, objvs
));
832 set_cr_error(retcode
);
834 return set_cr_done();
841 int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider
*dpp
, int num_shards
)
843 rgw_data_sync_status sync_status
;
844 std::vector
<RGWObjVersionTracker
> objvs
;
845 sync_status
.sync_info
.num_shards
= num_shards
;
847 RGWCoroutinesManager
crs(cct
, cr_registry
);
848 RGWHTTPManager
http_manager(cct
, crs
.get_completion_mgr());
849 int ret
= http_manager
.start();
851 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
854 RGWDataSyncEnv sync_env_local
= sync_env
;
855 sync_env_local
.http_manager
= &http_manager
;
856 auto instance_id
= ceph::util::generate_random_number
<uint64_t>();
857 RGWDataSyncCtx sc_local
= sc
;
858 sc_local
.env
= &sync_env_local
;
859 ret
= crs
.run(dpp
, new RGWRDL::DataSyncInitCR(&sc_local
, num_shards
,
860 instance_id
, tn
, &sync_status
, objvs
));
865 static string
full_data_sync_index_shard_oid(const rgw_zone_id
& source_zone
, int shard_id
)
867 char buf
[datalog_sync_full_sync_index_prefix
.size() + 1 + source_zone
.id
.size() + 1 + 16];
868 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_full_sync_index_prefix
.c_str(), source_zone
.id
.c_str(), shard_id
);
872 struct read_metadata_list
{
878 read_metadata_list() : truncated(false), count(0) {}
880 void decode_json(JSONObj
*obj
) {
881 JSONDecoder::decode_json("marker", marker
, obj
);
882 JSONDecoder::decode_json("truncated", truncated
, obj
);
883 JSONDecoder::decode_json("keys", keys
, obj
);
884 JSONDecoder::decode_json("count", count
, obj
);
888 struct bucket_instance_meta_info
{
892 RGWBucketInstanceMetadataObject data
;
894 bucket_instance_meta_info() {}
896 void decode_json(JSONObj
*obj
) {
897 JSONDecoder::decode_json("key", key
, obj
);
898 JSONDecoder::decode_json("ver", ver
, obj
);
899 JSONDecoder::decode_json("mtime", mtime
, obj
);
900 JSONDecoder::decode_json("data", data
, obj
);
904 class RGWReadRemoteBucketIndexLogInfoCR
: public RGWCoroutine
{
906 RGWDataSyncEnv
*sync_env
;
907 const string instance_key
;
909 rgw_bucket_index_marker_info
*info
;
912 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx
*_sc
,
913 const rgw_bucket
& bucket
,
914 rgw_bucket_index_marker_info
*_info
)
915 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
916 instance_key(bucket
.get_key()), info(_info
) {}
918 int operate(const DoutPrefixProvider
*dpp
) override
{
921 rgw_http_param_pair pairs
[] = { { "type" , "bucket-index" },
922 { "bucket-instance", instance_key
.c_str() },
926 string p
= "/admin/log/";
927 call(new RGWReadRESTResourceCR
<rgw_bucket_index_marker_info
>(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
, p
, pairs
, info
));
930 return set_cr_error(retcode
);
933 return set_cr_done();
940 class RGWListBucketIndexesCR
: public RGWCoroutine
{
942 RGWDataSyncEnv
*sync_env
= sc
->env
;
944 rgw::sal::RadosStore
* driver
= sync_env
->driver
;
946 rgw_data_sync_status
*sync_status
;
947 std::vector
<RGWObjVersionTracker
>& objvs
;
952 list
<string
>::iterator iter
;
954 unique_ptr
<RGWShardedOmapCRManager
> entries_index
;
956 datalog_sync_full_sync_index_prefix
+ "." + sc
->source_zone
.id
;
958 string path
= "/admin/metadata/bucket.instance";
959 bucket_instance_meta_info meta_info
;
963 bool truncated
= false;
964 read_metadata_list result
;
967 RGWListBucketIndexesCR(RGWDataSyncCtx
* sc
,
968 rgw_data_sync_status
* sync_status
, std::vector
<RGWObjVersionTracker
>& objvs
)
969 : RGWCoroutine(sc
->cct
), sc(sc
), sync_status(sync_status
), objvs(objvs
) {}
970 ~RGWListBucketIndexesCR() override
{ }
972 int operate(const DoutPrefixProvider
*dpp
) override
{
974 entries_index
= std::make_unique
<RGWShardedOmapCRManager
>(
975 sync_env
->async_rados
, driver
, this,
976 cct
->_conf
->rgw_data_log_num_shards
,
977 sync_env
->svc
->zone
->get_zone_params().log_pool
,
979 yield
; // yield so OmapAppendCRs can start
983 string entrypoint
= "/admin/metadata/bucket.instance"s
;
985 rgw_http_param_pair pairs
[] = {{"max-entries", "1000"},
986 {"marker", result
.marker
.c_str()},
989 call(new RGWReadRESTResourceCR
<read_metadata_list
>(
990 sync_env
->cct
, sc
->conn
, sync_env
->http_manager
,
991 entrypoint
, pairs
, &result
));
995 << "ERROR: failed to fetch metadata for section bucket.instance"
997 return set_cr_error(retcode
);
1000 for (iter
= result
.keys
.begin(); iter
!= result
.keys
.end(); ++iter
) {
1001 ldpp_dout(dpp
, 20) << "list metadata: section=bucket.instance key="
1006 rgw_http_param_pair pairs
[] = {{"key", key
.c_str()},
1009 call(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(
1010 sync_env
->cct
, sc
->conn
, sync_env
->http_manager
, path
, pairs
,
1014 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch metadata for key: "
1016 return set_cr_error(retcode
);
1018 // Now that bucket full sync is bucket-wide instead of
1019 // per-shard, we only need to register a single shard of
1020 // each bucket to guarantee that sync will see everything
1021 // that happened before data full sync starts. This also
1022 // means we don't have to care about the bucket's current
1024 yield entries_index
->append(
1025 fmt::format("{}:{}", key
, 0),
1026 sync_env
->svc
->datalog_rados
->get_log_shard_id(
1027 meta_info
.data
.get_bucket_info().bucket
, 0));
1029 truncated
= result
.truncated
;
1030 } while (truncated
);
1033 if (!entries_index
->finish()) {
1038 for (auto iter
= sync_status
->sync_markers
.begin();
1039 iter
!= sync_status
->sync_markers
.end();
1041 int shard_id
= (int)iter
->first
;
1042 rgw_data_sync_marker
& marker
= iter
->second
;
1043 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
1044 spawn(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(
1045 dpp
, sync_env
->driver
,
1046 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
,
1047 RGWDataSyncStatusManager::shard_obj_name(
1048 sc
->source_zone
, shard_id
)),
1049 marker
, &objvs
[shard_id
]),
1053 yield
call(sync_env
->error_logger
->log_error_cr(
1054 dpp
, sc
->conn
->get_remote_id(), "data.init", "",
1055 EIO
, string("failed to build bucket instances map")));
1057 while (collect(&ret
, NULL
)) {
1059 yield
call(sync_env
->error_logger
->log_error_cr(
1060 dpp
, sc
->conn
->get_remote_id(), "data.init", "",
1061 -ret
, string("failed to driver sync status: ") +
1062 cpp_strerror(-ret
)));
1069 yield
return set_cr_error(req_ret
);
1071 yield
return set_cr_done();
1077 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
1079 class RGWDataSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
1081 RGWDataSyncEnv
*sync_env
;
1083 rgw_data_sync_marker sync_marker
;
1084 RGWSyncTraceNodeRef tn
;
1085 RGWObjVersionTracker
& objv
;
1088 RGWDataSyncShardMarkerTrack(RGWDataSyncCtx
*_sc
,
1089 const string
& _marker_oid
,
1090 const rgw_data_sync_marker
& _marker
,
1091 RGWSyncTraceNodeRef
& _tn
, RGWObjVersionTracker
& objv
) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW
),
1092 sc(_sc
), sync_env(_sc
->env
),
1093 marker_oid(_marker_oid
),
1094 sync_marker(_marker
),
1095 tn(_tn
), objv(objv
) {}
1097 RGWCoroutine
* store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
1098 sync_marker
.marker
= new_marker
;
1099 sync_marker
.pos
= index_pos
;
1100 sync_marker
.timestamp
= timestamp
;
1102 tn
->log(20, SSTR("updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
));
1104 return new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->dpp
, sync_env
->driver
,
1105 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, marker_oid
),
1106 sync_marker
, &objv
);
1109 RGWOrderCallCR
*allocate_order_control_cr() override
{
1110 return new RGWLastCallerWinsCR(sync_env
->cct
);
1114 // ostream wrappers to print buckets without copying strings
1116 const rgw_bucket
& b
;
1117 explicit bucket_str(const rgw_bucket
& b
) : b(b
) {}
1119 std::ostream
& operator<<(std::ostream
& out
, const bucket_str
& rhs
) {
1121 if (!b
.tenant
.empty()) {
1122 out
<< b
.tenant
<< '/';
1125 if (!b
.bucket_id
.empty()) {
1126 out
<< ':' << b
.bucket_id
;
1131 struct bucket_str_noinstance
{
1132 const rgw_bucket
& b
;
1133 explicit bucket_str_noinstance(const rgw_bucket
& b
) : b(b
) {}
1135 std::ostream
& operator<<(std::ostream
& out
, const bucket_str_noinstance
& rhs
) {
1137 if (!b
.tenant
.empty()) {
1138 out
<< b
.tenant
<< '/';
1144 struct bucket_shard_str
{
1145 const rgw_bucket_shard
& bs
;
1146 explicit bucket_shard_str(const rgw_bucket_shard
& bs
) : bs(bs
) {}
1148 std::ostream
& operator<<(std::ostream
& out
, const bucket_shard_str
& rhs
) {
1150 out
<< bucket_str
{bs
.bucket
};
1151 if (bs
.shard_id
>= 0) {
1152 out
<< ':' << bs
.shard_id
;
1156 #if FMT_VERSION >= 90000
1157 template <> struct fmt::formatter
<bucket_shard_str
> : fmt::ostream_formatter
{};
1160 struct all_bucket_info
{
1161 RGWBucketInfo bucket_info
;
1162 map
<string
, bufferlist
> attrs
;
1165 struct rgw_sync_pipe_info_entity
1168 RGWBucketInfo bucket_info
;
1169 map
<string
, bufferlist
> bucket_attrs
;
1170 bool _has_bucket_info
{false};
1175 rgw_sync_pipe_info_entity() {}
1176 rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity
& e
,
1177 std::optional
<all_bucket_info
>& binfo
) {
1185 binfo
->bucket_info
.bucket
!= *e
.bucket
) {
1186 bucket_info
.bucket
= *e
.bucket
;
1188 set_bucket_info(*binfo
);
1192 void update_empty_bucket_info(const std::map
<rgw_bucket
, all_bucket_info
>& buckets_info
) {
1193 if (_has_bucket_info
) {
1196 if (bucket_info
.bucket
.name
.empty()) {
1200 auto iter
= buckets_info
.find(bucket_info
.bucket
);
1201 if (iter
== buckets_info
.end()) {
1205 set_bucket_info(iter
->second
);
1208 bool has_bucket_info() const {
1209 return _has_bucket_info
;
1212 void set_bucket_info(const all_bucket_info
& all_info
) {
1213 bucket_info
= all_info
.bucket_info
;
1214 bucket_attrs
= all_info
.attrs
;
1215 _has_bucket_info
= true;
1218 const RGWBucketInfo
& get_bucket_info() const {
1222 const rgw_bucket
& get_bucket() const {
1223 return bucket_info
.bucket
;
1226 bool operator<(const rgw_sync_pipe_info_entity
& e
) const {
1227 if (zone
< e
.zone
) {
1230 if (zone
> e
.zone
) {
1233 return (bucket_info
.bucket
< e
.bucket_info
.bucket
);
1237 std::ostream
& operator<<(std::ostream
& out
, const rgw_sync_pipe_info_entity
& e
) {
1238 auto& bucket
= e
.get_bucket_info().bucket
;
1240 out
<< e
.zone
<< ":" << bucket
.get_key();
1244 struct rgw_sync_pipe_handler_info
{
1245 RGWBucketSyncFlowManager::pipe_handler handler
;
1246 rgw_sync_pipe_info_entity source
;
1247 rgw_sync_pipe_info_entity target
;
1249 rgw_sync_pipe_handler_info() {}
1250 rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler
& _handler
,
1251 std::optional
<all_bucket_info
> source_bucket_info
,
1252 std::optional
<all_bucket_info
> target_bucket_info
) : handler(_handler
),
1253 source(handler
.source
, source_bucket_info
),
1254 target(handler
.dest
, target_bucket_info
) {
1257 bool operator<(const rgw_sync_pipe_handler_info
& p
) const {
1258 if (source
< p
.source
) {
1261 if (p
.source
< source
) {
1264 return (target
< p
.target
);
1267 void update_empty_bucket_info(const std::map
<rgw_bucket
, all_bucket_info
>& buckets_info
) {
1268 source
.update_empty_bucket_info(buckets_info
);
1269 target
.update_empty_bucket_info(buckets_info
);
1273 std::ostream
& operator<<(std::ostream
& out
, const rgw_sync_pipe_handler_info
& p
) {
1274 out
<< p
.source
<< ">" << p
.target
;
1278 struct rgw_sync_pipe_info_set
{
1279 std::set
<rgw_sync_pipe_handler_info
> handlers
;
1281 using iterator
= std::set
<rgw_sync_pipe_handler_info
>::iterator
;
1287 void insert(const RGWBucketSyncFlowManager::pipe_handler
& handler
,
1288 std::optional
<all_bucket_info
>& source_bucket_info
,
1289 std::optional
<all_bucket_info
>& target_bucket_info
) {
1290 rgw_sync_pipe_handler_info
p(handler
, source_bucket_info
, target_bucket_info
);
1295 return handlers
.begin();
1299 return handlers
.end();
1302 size_t size() const {
1303 return handlers
.size();
1306 bool empty() const {
1307 return handlers
.empty();
1310 void update_empty_bucket_info(const std::map
<rgw_bucket
, all_bucket_info
>& buckets_info
) {
1311 if (buckets_info
.empty()) {
1315 std::set
<rgw_sync_pipe_handler_info
> p
;
1317 for (auto pipe
: handlers
) {
1318 pipe
.update_empty_bucket_info(buckets_info
);
1322 handlers
= std::move(p
);
1326 class RGWRunBucketSourcesSyncCR
: public RGWCoroutine
{
1328 RGWDataSyncEnv
*sync_env
;
1329 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
1331 rgw_sync_pipe_info_set pipes
;
1332 rgw_sync_pipe_info_set::iterator siter
;
1334 rgw_bucket_sync_pair_info sync_pair
;
1336 RGWSyncTraceNodeRef tn
;
1337 ceph::real_time
* progress
;
1338 std::vector
<ceph::real_time
> shard_progress
;
1339 std::vector
<ceph::real_time
>::iterator cur_shard_progress
;
1341 RGWRESTConn
*conn
{nullptr};
1342 rgw_zone_id last_zone
;
1344 std::optional
<uint64_t> gen
;
1345 rgw_bucket_index_marker_info marker_info
;
1346 BucketIndexShardsManager marker_mgr
;
1349 RGWRunBucketSourcesSyncCR(RGWDataSyncCtx
*_sc
,
1350 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1351 const rgw_bucket_shard
& source_bs
,
1352 const RGWSyncTraceNodeRef
& _tn_parent
,
1353 std::optional
<uint64_t> gen
,
1354 ceph::real_time
* progress
);
1356 int operate(const DoutPrefixProvider
*dpp
) override
;
1359 class RGWDataSyncSingleEntryCR
: public RGWCoroutine
{
1361 RGWDataSyncEnv
*sync_env
;
1362 rgw::bucket_sync::Handle state
; // cached bucket-shard state
1363 rgw_data_sync_obligation obligation
; // input obligation
1364 std::optional
<rgw_data_sync_obligation
> complete
; // obligation to complete
1365 uint32_t obligation_counter
= 0;
1366 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1367 rgw_raw_obj error_repo
;
1368 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
1369 RGWSyncTraceNodeRef tn
;
1371 ceph::real_time progress
;
1372 int sync_status
= 0;
1374 RGWDataSyncSingleEntryCR(RGWDataSyncCtx
*_sc
, rgw::bucket_sync::Handle state
,
1375 rgw_data_sync_obligation _obligation
,
1376 RGWDataSyncShardMarkerTrack
*_marker_tracker
,
1377 const rgw_raw_obj
& error_repo
,
1378 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1379 const RGWSyncTraceNodeRef
& _tn_parent
)
1380 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
1381 state(std::move(state
)), obligation(std::move(_obligation
)),
1382 marker_tracker(_marker_tracker
), error_repo(error_repo
),
1383 lease_cr(std::move(lease_cr
)) {
1384 set_description() << "data sync single entry (source_zone=" << sc
->source_zone
<< ") " << obligation
;
1385 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", to_string(obligation
.bs
, obligation
.gen
));
1388 int operate(const DoutPrefixProvider
*dpp
) override
{
1390 if (state
->obligation
) {
1391 // this is already syncing in another DataSyncSingleEntryCR
1392 if (state
->obligation
->timestamp
< obligation
.timestamp
) {
1393 // cancel existing obligation and overwrite it
1394 tn
->log(10, SSTR("canceling existing obligation " << *state
->obligation
));
1395 complete
= std::move(*state
->obligation
);
1396 *state
->obligation
= std::move(obligation
);
1399 // cancel new obligation
1400 tn
->log(10, SSTR("canceling new obligation " << obligation
));
1401 complete
= std::move(obligation
);
1404 // start syncing a new obligation
1405 state
->obligation
= obligation
;
1406 obligation_counter
= state
->counter
;
1409 // loop until the latest obligation is satisfied, because other callers
1410 // may update the obligation while we're syncing
1411 while ((state
->obligation
->timestamp
== ceph::real_time() ||
1412 state
->progress_timestamp
< state
->obligation
->timestamp
) &&
1413 obligation_counter
!= state
->counter
) {
1414 obligation_counter
= state
->counter
;
1415 progress
= ceph::real_time
{};
1417 ldout(cct
, 4) << "starting sync on " << bucket_shard_str
{state
->key
.first
}
1418 << ' ' << *state
->obligation
<< " progress timestamp " << state
->progress_timestamp
1419 << " progress " << progress
<< dendl
;
1420 yield
call(new RGWRunBucketSourcesSyncCR(sc
, lease_cr
,
1421 state
->key
.first
, tn
,
1422 state
->obligation
->gen
,
1427 state
->progress_timestamp
= std::max(progress
, state
->progress_timestamp
);
1429 // any new obligations will process themselves
1430 complete
= std::move(*state
->obligation
);
1431 state
->obligation
.reset();
1433 tn
->log(10, SSTR("sync finished on " << bucket_shard_str
{state
->key
.first
}
1434 << " progress=" << progress
<< ' ' << complete
<< " r=" << retcode
));
1436 sync_status
= retcode
;
1438 if (sync_status
== -ENOENT
) {
1439 // this was added when 'tenant/' was added to datalog entries, because
1440 // preexisting tenant buckets could never sync and would stay in the
1441 // error_repo forever
1442 tn
->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete
->bs
));
1446 if (sync_status
< 0) {
1447 // write actual sync failures for 'radosgw-admin sync error list'
1448 if (sync_status
!= -EBUSY
&& sync_status
!= -EAGAIN
) {
1449 yield
call(sync_env
->error_logger
->log_error_cr(dpp
, sc
->conn
->get_remote_id(), "data",
1450 to_string(complete
->bs
, complete
->gen
),
1451 -sync_status
, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status
)));
1453 tn
->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode
));
1456 if (complete
->timestamp
!= ceph::real_time
{}) {
1457 tn
->log(10, SSTR("writing " << *complete
<< " to error repo for retry"));
1458 yield
call(rgw::error_repo::write_cr(sync_env
->driver
->svc()->rados
, error_repo
,
1459 rgw::error_repo::encode_key(complete
->bs
, complete
->gen
),
1460 complete
->timestamp
));
1462 tn
->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode
));
1465 } else if (complete
->retry
) {
1466 yield
call(rgw::error_repo::remove_cr(sync_env
->driver
->svc()->rados
, error_repo
,
1467 rgw::error_repo::encode_key(complete
->bs
, complete
->gen
),
1468 complete
->timestamp
));
1470 tn
->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1471 << error_repo
<< " retcode=" << retcode
));
1474 /* FIXME: what do do in case of error */
1475 if (marker_tracker
&& !complete
->marker
.empty()) {
1477 yield
call(marker_tracker
->finish(complete
->marker
));
1479 return set_cr_error(retcode
);
1482 if (sync_status
== 0) {
1483 sync_status
= retcode
;
1485 if (sync_status
< 0) {
1486 return set_cr_error(sync_status
);
1488 return set_cr_done();
1494 rgw_raw_obj
datalog_oid_for_error_repo(RGWDataSyncCtx
*sc
, rgw::sal::RadosStore
* driver
,
1495 rgw_pool
& pool
, rgw_bucket_shard
& bs
) {
1496 int datalog_shard
= driver
->svc()->datalog_rados
->choose_oid(bs
);
1497 string oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, datalog_shard
);
1498 return rgw_raw_obj(pool
, oid
+ ".retry");
1501 class RGWDataIncrementalSyncFullObligationCR
: public RGWCoroutine
{
1503 RGWDataSyncEnv
*sync_env
;
1504 rgw_bucket_shard source_bs
;
1505 rgw_raw_obj error_repo
;
1506 std::string error_marker
;
1507 ceph::real_time timestamp
;
1508 RGWSyncTraceNodeRef tn
;
1509 rgw_bucket_index_marker_info remote_info
;
1512 rgw_bucket_shard bs
;
1513 std::vector
<store_gen_shards
>::const_iterator each
;
1516 RGWDataIncrementalSyncFullObligationCR(RGWDataSyncCtx
*_sc
, rgw_bucket_shard
& _source_bs
,
1517 const rgw_raw_obj
& error_repo
, const std::string
& _error_marker
,
1518 ceph::real_time
& _timestamp
, RGWSyncTraceNodeRef
& _tn
)
1519 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
), source_bs(_source_bs
),
1520 error_repo(error_repo
), error_marker(_error_marker
), timestamp(_timestamp
),
1521 tn(sync_env
->sync_tracer
->add_node(_tn
, "error_repo", SSTR(bucket_shard_str(source_bs
))))
1524 int operate(const DoutPrefixProvider
*dpp
) override
{
1526 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sc
, source_bs
.bucket
, &remote_info
));
1528 return set_cr_error(retcode
);
1531 each
= remote_info
.generations
.cbegin();
1532 for (; each
!= remote_info
.generations
.cend(); each
++) {
1533 for (sid
= 0; sid
< each
->num_shards
; sid
++) {
1534 bs
.bucket
= source_bs
.bucket
;
1536 pool
= sync_env
->svc
->zone
->get_zone_params().log_pool
;
1537 error_repo
= datalog_oid_for_error_repo(sc
, sync_env
->driver
, pool
, source_bs
);
1538 tn
->log(10, SSTR("writing shard_id " << sid
<< " of gen " << each
->gen
<< " to error repo for retry"));
1539 yield_spawn_window(rgw::error_repo::write_cr(sync_env
->driver
->svc()->rados
, error_repo
,
1540 rgw::error_repo::encode_key(bs
, each
->gen
),
1541 timestamp
), sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_data_sync_spawn_window
),
1542 [&](uint64_t stack_id
, int ret
) {
1550 drain_all_cb([&](uint64_t stack_id
, int ret
) {
1552 tn
->log(10, SSTR("writing to error repo returned error: " << ret
));
1557 // once everything succeeds, remove the full sync obligation from the error repo
1558 yield
call(rgw::error_repo::remove_cr(sync_env
->driver
->svc()->rados
, error_repo
,
1559 error_marker
, timestamp
));
1560 return set_cr_done();
1566 RGWCoroutine
* data_sync_single_entry(RGWDataSyncCtx
*sc
, const rgw_bucket_shard
& src
,
1567 std::optional
<uint64_t> gen
,
1568 const std::string marker
,
1569 ceph::real_time timestamp
,
1570 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1571 boost::intrusive_ptr
<rgw::bucket_sync::Cache
> bucket_shard_cache
,
1572 RGWDataSyncShardMarkerTrack
* marker_tracker
,
1573 rgw_raw_obj error_repo
,
1574 RGWSyncTraceNodeRef
& tn
,
1576 auto state
= bucket_shard_cache
->get(src
, gen
);
1577 auto obligation
= rgw_data_sync_obligation
{src
, gen
, marker
, timestamp
, retry
};
1578 return new RGWDataSyncSingleEntryCR(sc
, std::move(state
), std::move(obligation
),
1579 &*marker_tracker
, error_repo
,
1580 lease_cr
.get(), tn
);
1583 static ceph::real_time
timestamp_for_bucket_shard(rgw::sal::RadosStore
* driver
,
1584 const rgw_data_sync_status
& sync_status
,
1585 const rgw_bucket_shard
& bs
) {
1586 int datalog_shard
= driver
->svc()->datalog_rados
->choose_oid(bs
);
1587 auto status
= sync_status
.sync_markers
.find(datalog_shard
);
1588 if (status
== sync_status
.sync_markers
.end()) {
1589 return ceph::real_clock::zero();
1591 return status
->second
.timestamp
;
1594 class RGWDataFullSyncSingleEntryCR
: public RGWCoroutine
{
1596 RGWDataSyncEnv
*sync_env
;
1598 rgw_bucket_shard source_bs
;
1599 const std::string key
;
1600 rgw_data_sync_status sync_status
;
1601 rgw_raw_obj error_repo
;
1602 ceph::real_time timestamp
;
1603 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
1604 boost::intrusive_ptr
<rgw::bucket_sync::Cache
> bucket_shard_cache
;
1605 RGWDataSyncShardMarkerTrack
* marker_tracker
;
1606 RGWSyncTraceNodeRef tn
;
1607 rgw_bucket_index_marker_info remote_info
;
1609 std::vector
<store_gen_shards
>::iterator each
;
1611 RGWCoroutine
* shard_cr
= nullptr;
1612 bool first_shard
= true;
1616 RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx
*_sc
, const rgw_pool
& _pool
, const rgw_bucket_shard
& _source_bs
,
1617 const std::string
& _key
, const rgw_data_sync_status
& sync_status
, const rgw_raw_obj
& _error_repo
,
1618 ceph::real_time _timestamp
, boost::intrusive_ptr
<const RGWContinuousLeaseCR
> _lease_cr
,
1619 boost::intrusive_ptr
<rgw::bucket_sync::Cache
> _bucket_shard_cache
,
1620 RGWDataSyncShardMarkerTrack
* _marker_tracker
,
1621 RGWSyncTraceNodeRef
& _tn
)
1622 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
), pool(_pool
), source_bs(_source_bs
), key(_key
),
1623 error_repo(_error_repo
), timestamp(_timestamp
), lease_cr(std::move(_lease_cr
)),
1624 bucket_shard_cache(_bucket_shard_cache
), marker_tracker(_marker_tracker
), tn(_tn
) {
1625 error_inject
= (sync_env
->cct
->_conf
->rgw_sync_data_full_inject_err_probability
> 0);
1629 int operate(const DoutPrefixProvider
*dpp
) override
{
1632 rand() % 10000 < cct
->_conf
->rgw_sync_data_full_inject_err_probability
* 10000.0) {
1633 tn
->log(0, SSTR("injecting read bilog info error on key=" << key
));
1636 tn
->log(0, SSTR("read bilog info key=" << key
));
1637 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sc
, source_bs
.bucket
, &remote_info
));
1641 tn
->log(10, SSTR("full sync: failed to read remote bucket info. Writing "
1642 << source_bs
.shard_id
<< " to error repo for retry"));
1643 yield
call(rgw::error_repo::write_cr(sync_env
->driver
->svc()->rados
, error_repo
,
1644 rgw::error_repo::encode_key(source_bs
, std::nullopt
),
1647 tn
->log(0, SSTR("ERROR: failed to log " << source_bs
.shard_id
<< " in error repo: retcode=" << retcode
));
1649 yield
call(marker_tracker
->finish(key
));
1650 return set_cr_error(retcode
);
1653 //wait to sync the first shard of the oldest generation and then sync all other shards.
1654 //if any of the operations fail at any time, write them into error repo for later retry.
1656 each
= remote_info
.generations
.begin();
1657 for (; each
!= remote_info
.generations
.end(); each
++) {
1658 for (sid
= 0; sid
< each
->num_shards
; sid
++) {
1659 source_bs
.shard_id
= sid
;
1660 // use the error repo and sync status timestamp from the datalog shard corresponding to source_bs
1661 error_repo
= datalog_oid_for_error_repo(sc
, sync_env
->driver
, pool
, source_bs
);
1662 timestamp
= timestamp_for_bucket_shard(sync_env
->driver
, sync_status
, source_bs
);
1664 tn
->log(10, SSTR("Write " << source_bs
.shard_id
<< " to error repo for retry"));
1665 yield_spawn_window(rgw::error_repo::write_cr(sync_env
->driver
->svc()->rados
, error_repo
,
1666 rgw::error_repo::encode_key(source_bs
, each
->gen
),
1667 timestamp
), sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_data_sync_spawn_window
), std::nullopt
);
1669 shard_cr
= data_sync_single_entry(sc
, source_bs
, each
->gen
, key
, timestamp
,
1670 lease_cr
, bucket_shard_cache
, nullptr, error_repo
, tn
, false);
1671 tn
->log(10, SSTR("full sync: syncing shard_id " << sid
<< " of gen " << each
->gen
));
1673 yield
call(shard_cr
);
1674 first_shard
= false;
1676 yield_spawn_window(shard_cr
, sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_data_sync_spawn_window
),
1677 [&](uint64_t stack_id
, int ret
) {
1686 drain_all_cb([&](uint64_t stack_id
, int ret
) {
1694 yield
call(marker_tracker
->finish(key
));
1696 return set_cr_error(retcode
);
1699 return set_cr_done();
1705 class RGWDataBaseSyncShardCR
: public RGWCoroutine
{
1707 RGWDataSyncCtx
*const sc
;
1708 const rgw_pool
& pool
;
1709 const uint32_t shard_id
;
1710 rgw_data_sync_marker
& sync_marker
;
1711 RGWSyncTraceNodeRef tn
;
1712 const string
& status_oid
;
1713 const rgw_raw_obj
& error_repo
;
1714 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
1715 const rgw_data_sync_status
& sync_status
;
1716 RGWObjVersionTracker
& objv
;
1717 boost::intrusive_ptr
<rgw::bucket_sync::Cache
> bucket_shard_cache
;
1719 std::optional
<RGWDataSyncShardMarkerTrack
> marker_tracker
;
1720 RGWRadosGetOmapValsCR::ResultPtr omapvals
;
1721 rgw_bucket_shard source_bs
;
1723 int parse_bucket_key(const std::string
& key
, rgw_bucket_shard
& bs
) const {
1724 int ret
= rgw_bucket_parse_bucket_key(sc
->env
->cct
, key
,
1725 &bs
.bucket
, &bs
.shard_id
);
1726 //for the case of num_shards 0, shard_id gets a value of -1
1727 //because of the way bucket instance gets parsed in the absence of shard_id delimiter.
1728 //interpret it as a non-negative value.
1730 if (bs
.shard_id
< 0) {
1737 RGWDataBaseSyncShardCR(
1738 RGWDataSyncCtx
*const _sc
, const rgw_pool
& pool
, const uint32_t shard_id
,
1739 rgw_data_sync_marker
& sync_marker
, RGWSyncTraceNodeRef tn
,
1740 const string
& status_oid
, const rgw_raw_obj
& error_repo
,
1741 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1742 const rgw_data_sync_status
& sync_status
,
1743 RGWObjVersionTracker
& objv
,
1744 const boost::intrusive_ptr
<rgw::bucket_sync::Cache
>& bucket_shard_cache
)
1745 : RGWCoroutine(_sc
->cct
), sc(_sc
), pool(pool
), shard_id(shard_id
),
1746 sync_marker(sync_marker
), tn(tn
), status_oid(status_oid
),
1747 error_repo(error_repo
), lease_cr(std::move(lease_cr
)),
1748 sync_status(sync_status
), objv(objv
),
1749 bucket_shard_cache(bucket_shard_cache
) {}
1752 class RGWDataFullSyncShardCR
: public RGWDataBaseSyncShardCR
{
1753 static constexpr auto OMAP_GET_MAX_ENTRIES
= 100;
1756 uint64_t total_entries
= 0;
1757 ceph::real_time entry_timestamp
;
1758 std::map
<std::string
, bufferlist
> entries
;
1759 std::map
<std::string
, bufferlist
>::iterator iter
;
1760 string error_marker
;
1764 RGWDataFullSyncShardCR(
1765 RGWDataSyncCtx
*const sc
, const rgw_pool
& pool
, const uint32_t shard_id
,
1766 rgw_data_sync_marker
& sync_marker
, RGWSyncTraceNodeRef tn
,
1767 const string
& status_oid
, const rgw_raw_obj
& error_repo
,
1768 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1769 const rgw_data_sync_status
& sync_status
, RGWObjVersionTracker
& objv
,
1770 const boost::intrusive_ptr
<rgw::bucket_sync::Cache
>& bucket_shard_cache
)
1771 : RGWDataBaseSyncShardCR(sc
, pool
, shard_id
, sync_marker
, tn
,
1772 status_oid
, error_repo
, std::move(lease_cr
),
1773 sync_status
, objv
, bucket_shard_cache
) {}
1775 int operate(const DoutPrefixProvider
*dpp
) override
{
1777 tn
->log(10, "start full sync");
1778 oid
= full_data_sync_index_shard_oid(sc
->source_zone
, shard_id
);
1779 marker_tracker
.emplace(sc
, status_oid
, sync_marker
, tn
, objv
);
1780 total_entries
= sync_marker
.pos
;
1781 entry_timestamp
= sync_marker
.timestamp
; // time when full sync started
1783 if (!lease_cr
->is_locked()) {
1785 tn
->log(1, "lease is lost, abort");
1786 return set_cr_error(-ECANCELED
);
1788 omapvals
= std::make_shared
<RGWRadosGetOmapValsCR::Result
>();
1789 yield
call(new RGWRadosGetOmapValsCR(sc
->env
->driver
,
1790 rgw_raw_obj(pool
, oid
),
1792 OMAP_GET_MAX_ENTRIES
, omapvals
));
1795 return set_cr_error(retcode
);
1797 entries
= std::move(omapvals
->entries
);
1798 if (entries
.size() > 0) {
1799 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1801 tn
->log(20, SSTR("retrieved " << entries
.size() << " entries to sync"));
1802 iter
= entries
.begin();
1803 for (; iter
!= entries
.end(); ++iter
) {
1804 retcode
= parse_bucket_key(iter
->first
, source_bs
);
1806 tn
->log(1, SSTR("failed to parse bucket shard: " << iter
->first
));
1807 marker_tracker
->try_update_high_marker(iter
->first
, 0,
1811 tn
->log(20, SSTR("full sync: " << iter
->first
));
1813 if (!marker_tracker
->start(iter
->first
, total_entries
,
1815 tn
->log(0, SSTR("ERROR: cannot start syncing " << iter
->first
1816 << ". Duplicate entry?"));
1818 tn
->log(10, SSTR("timestamp for " << iter
->first
<< " is :" << entry_timestamp
));
1819 yield_spawn_window(new RGWDataFullSyncSingleEntryCR(
1820 sc
, pool
, source_bs
, iter
->first
, sync_status
,
1821 error_repo
, entry_timestamp
, lease_cr
,
1822 bucket_shard_cache
, &*marker_tracker
, tn
),
1823 sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_data_sync_spawn_window
),
1826 sync_marker
.marker
= iter
->first
;
1828 } while (omapvals
->more
);
1833 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1835 /* update marker to reflect we're done with full sync */
1836 sync_marker
.state
= rgw_data_sync_marker::IncrementalSync
;
1837 sync_marker
.marker
= sync_marker
.next_step_marker
;
1838 sync_marker
.next_step_marker
.clear();
1839 yield
call(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(
1840 sc
->env
->dpp
, sc
->env
->driver
,
1841 rgw_raw_obj(pool
, status_oid
), sync_marker
, &objv
));
1843 tn
->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode
));
1844 return set_cr_error(retcode
);
1847 // clean up full sync index, ignoring errors
1848 yield
call(new RGWRadosRemoveCR(sc
->env
->driver
, {pool
, oid
}));
1850 // transition to incremental sync
1851 return set_cr_done();
1857 class RGWDataIncSyncShardCR
: public RGWDataBaseSyncShardCR
{
1858 static constexpr int max_error_entries
= 10;
1859 static constexpr uint32_t retry_backoff_secs
= 60;
1861 ceph::mutex
& inc_lock
;
1862 bc::flat_set
<rgw_data_notify_entry
>& modified_shards
;
1864 bc::flat_set
<rgw_data_notify_entry
> current_modified
;
1865 decltype(current_modified
)::iterator modified_iter
;
1867 ceph::coarse_real_time error_retry_time
;
1868 string error_marker
;
1869 std::map
<std::string
, bufferlist
> error_entries
;
1870 decltype(error_entries
)::iterator iter
;
1871 ceph::real_time entry_timestamp
;
1872 std::optional
<uint64_t> gen
;
1875 vector
<rgw_data_change_log_entry
> log_entries
;
1876 decltype(log_entries
)::iterator log_iter
;
1877 bool truncated
= false;
1880 utime_t
get_idle_interval() const {
1881 ceph::timespan interval
= std::chrono::seconds(cct
->_conf
->rgw_data_sync_poll_interval
);
1882 if (!ceph::coarse_real_clock::is_zero(error_retry_time
)) {
1883 auto now
= ceph::coarse_real_clock::now();
1884 if (error_retry_time
> now
) {
1885 auto d
= error_retry_time
- now
;
1891 // convert timespan -> time_point -> utime_t
1892 return utime_t(ceph::coarse_real_clock::zero() + interval
);
1898 RGWDataIncSyncShardCR(
1899 RGWDataSyncCtx
*const sc
, const rgw_pool
& pool
, const uint32_t shard_id
,
1900 rgw_data_sync_marker
& sync_marker
, RGWSyncTraceNodeRef tn
,
1901 const string
& status_oid
, const rgw_raw_obj
& error_repo
,
1902 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
1903 const rgw_data_sync_status
& sync_status
, RGWObjVersionTracker
& objv
,
1904 const boost::intrusive_ptr
<rgw::bucket_sync::Cache
>& bucket_shard_cache
,
1905 ceph::mutex
& inc_lock
,
1906 bc::flat_set
<rgw_data_notify_entry
>& modified_shards
)
1907 : RGWDataBaseSyncShardCR(sc
, pool
, shard_id
, sync_marker
, tn
,
1908 status_oid
, error_repo
, std::move(lease_cr
),
1909 sync_status
, objv
, bucket_shard_cache
),
1910 inc_lock(inc_lock
), modified_shards(modified_shards
) {}
1912 int operate(const DoutPrefixProvider
*dpp
) override
{
1914 tn
->log(10, "start incremental sync");
1915 marker_tracker
.emplace(sc
, status_oid
, sync_marker
, tn
, objv
);
1917 if (!lease_cr
->is_locked()) {
1919 tn
->log(1, "lease is lost, abort");
1920 return set_cr_error(-ECANCELED
);
1923 current_modified
.clear();
1924 std::unique_lock
il(inc_lock
);
1925 current_modified
.swap(modified_shards
);
1929 if (current_modified
.size() > 0) {
1930 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1932 /* process out of band updates */
1933 for (modified_iter
= current_modified
.begin();
1934 modified_iter
!= current_modified
.end();
1936 if (!lease_cr
->is_locked()) {
1938 yield
call(marker_tracker
->flush());
1940 tn
->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode
));
1941 return set_cr_error(retcode
);
1943 return set_cr_error(-ECANCELED
);
1945 retcode
= parse_bucket_key(modified_iter
->key
, source_bs
);
1947 tn
->log(1, SSTR("failed to parse bucket shard: "
1948 << modified_iter
->key
));
1951 tn
->log(20, SSTR("received async update notification: "
1952 << modified_iter
->key
));
1953 spawn(data_sync_single_entry(sc
, source_bs
, modified_iter
->gen
, {},
1954 ceph::real_time
{}, lease_cr
,
1955 bucket_shard_cache
, &*marker_tracker
,
1956 error_repo
, tn
, false), false);
1959 if (error_retry_time
<= ceph::coarse_real_clock::now()) {
1960 /* process bucket shards that previously failed */
1961 omapvals
= std::make_shared
<RGWRadosGetOmapValsCR::Result
>();
1962 yield
call(new RGWRadosGetOmapValsCR(sc
->env
->driver
, error_repo
,
1963 error_marker
, max_error_entries
,
1965 error_entries
= std::move(omapvals
->entries
);
1966 tn
->log(20, SSTR("read error repo, got " << error_entries
.size()
1968 iter
= error_entries
.begin();
1969 for (; iter
!= error_entries
.end(); ++iter
) {
1970 if (!lease_cr
->is_locked()) {
1972 yield
call(marker_tracker
->flush());
1974 tn
->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode
));
1975 return set_cr_error(retcode
);
1977 return set_cr_error(-ECANCELED
);
1979 error_marker
= iter
->first
;
1980 entry_timestamp
= rgw::error_repo::decode_value(iter
->second
);
1981 retcode
= rgw::error_repo::decode_key(iter
->first
, source_bs
, gen
);
1982 if (retcode
== -EINVAL
) {
1983 // backward compatibility for string keys that don't encode a gen
1984 retcode
= parse_bucket_key(error_marker
, source_bs
);
1987 tn
->log(1, SSTR("failed to parse bucket shard: " << error_marker
));
1988 spawn(rgw::error_repo::remove_cr(sc
->env
->driver
->svc()->rados
,
1989 error_repo
, error_marker
,
1994 tn
->log(10, SSTR("gen is " << gen
));
1996 // write all full sync obligations for the bucket to error repo
1997 spawn(new RGWDataIncrementalSyncFullObligationCR(sc
, source_bs
,
1998 error_repo
, error_marker
, entry_timestamp
, tn
), false);
2000 tn
->log(20, SSTR("handle error entry key="
2001 << to_string(source_bs
, gen
)
2002 << " timestamp=" << entry_timestamp
));
2003 spawn(data_sync_single_entry(sc
, source_bs
, gen
, "",
2004 entry_timestamp
, lease_cr
,
2005 bucket_shard_cache
, &*marker_tracker
,
2006 error_repo
, tn
, true), false);
2009 if (!omapvals
->more
) {
2010 error_retry_time
= ceph::coarse_real_clock::now() +
2011 make_timespan(retry_backoff_secs
);
2012 error_marker
.clear();
2017 tn
->log(20, SSTR("shard_id=" << shard_id
<< " sync_marker="
2018 << sync_marker
.marker
));
2019 yield
call(new RGWReadRemoteDataLogShardCR(sc
, shard_id
,
2021 &next_marker
, &log_entries
,
2023 if (retcode
< 0 && retcode
!= -ENOENT
) {
2024 tn
->log(0, SSTR("ERROR: failed to read remote data log info: ret="
2027 return set_cr_error(retcode
);
2030 if (log_entries
.size() > 0) {
2031 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
2034 for (log_iter
= log_entries
.begin();
2035 log_iter
!= log_entries
.end();
2037 if (!lease_cr
->is_locked()) {
2039 yield
call(marker_tracker
->flush());
2041 tn
->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode
));
2042 return set_cr_error(retcode
);
2044 return set_cr_error(-ECANCELED
);
2047 tn
->log(20, SSTR("shard_id=" << shard_id
<< " log_entry: " << log_iter
->log_id
<< ":" << log_iter
->log_timestamp
<< ":" << log_iter
->entry
.key
));
2048 retcode
= parse_bucket_key(log_iter
->entry
.key
, source_bs
);
2050 tn
->log(1, SSTR("failed to parse bucket shard: "
2051 << log_iter
->entry
.key
));
2052 marker_tracker
->try_update_high_marker(log_iter
->log_id
, 0,
2053 log_iter
->log_timestamp
);
2056 if (!marker_tracker
->start(log_iter
->log_id
, 0,
2057 log_iter
->log_timestamp
)) {
2058 tn
->log(0, SSTR("ERROR: cannot start syncing " << log_iter
->log_id
2059 << ". Duplicate entry?"));
2061 tn
->log(1, SSTR("incremental sync on " << log_iter
->entry
.key
<< "shard: " << shard_id
<< "on gen " << log_iter
->entry
.gen
));
2062 yield_spawn_window(data_sync_single_entry(sc
, source_bs
, log_iter
->entry
.gen
, log_iter
->log_id
,
2063 log_iter
->log_timestamp
, lease_cr
,bucket_shard_cache
,
2064 &*marker_tracker
, error_repo
, tn
, false),
2065 sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_data_sync_spawn_window
),
2066 [&](uint64_t stack_id
, int ret
) {
2068 tn
->log(10, SSTR("data_sync_single_entry returned error: " << ret
));
2078 return set_cr_error(retcode
);
2081 tn
->log(20, SSTR("shard_id=" << shard_id
<<
2082 " sync_marker="<< sync_marker
.marker
2083 << " next_marker=" << next_marker
2084 << " truncated=" << truncated
));
2085 if (!next_marker
.empty()) {
2086 sync_marker
.marker
= next_marker
;
2087 } else if (!log_entries
.empty()) {
2088 sync_marker
.marker
= log_entries
.back().log_id
;
2091 // we reached the end, wait a while before checking for more
2092 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
2093 yield
wait(get_idle_interval());
2101 class RGWDataSyncShardCR
: public RGWCoroutine
{
2102 RGWDataSyncCtx
*const sc
;
2103 const rgw_pool pool
;
2104 const uint32_t shard_id
;
2105 rgw_data_sync_marker
& sync_marker
;
2106 rgw_data_sync_status sync_status
;
2107 const RGWSyncTraceNodeRef tn
;
2108 RGWObjVersionTracker
& objv
;
2109 bool *reset_backoff
;
2111 ceph::mutex inc_lock
= ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
2112 ceph::condition_variable inc_cond
;
2114 RGWDataSyncEnv
*const sync_env
{ sc
->env
};
2116 const string status_oid
{ RGWDataSyncStatusManager::shard_obj_name(
2117 sc
->source_zone
, shard_id
) };
2118 const rgw_raw_obj error_repo
{ pool
, status_oid
+ ".retry" };
2120 // target number of entries to cache before recycling idle ones
2121 static constexpr size_t target_cache_size
= 256;
2122 boost::intrusive_ptr
<rgw::bucket_sync::Cache
> bucket_shard_cache
{
2123 rgw::bucket_sync::Cache::create(target_cache_size
) };
2125 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2126 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
2128 bc::flat_set
<rgw_data_notify_entry
> modified_shards
;
2131 RGWDataSyncShardCR(RGWDataSyncCtx
* const _sc
, const rgw_pool
& pool
,
2132 const uint32_t shard_id
, rgw_data_sync_marker
& marker
,
2133 const rgw_data_sync_status
& sync_status
,
2134 RGWSyncTraceNodeRef
& tn
, RGWObjVersionTracker
& objv
, bool *reset_backoff
)
2135 : RGWCoroutine(_sc
->cct
), sc(_sc
), pool(pool
), shard_id(shard_id
),
2136 sync_marker(marker
), sync_status(sync_status
), tn(tn
),
2137 objv(objv
), reset_backoff(reset_backoff
) {
2138 set_description() << "data sync shard source_zone=" << sc
->source_zone
2139 << " shard_id=" << shard_id
;
2142 ~RGWDataSyncShardCR() override
{
2148 void append_modified_shards(bc::flat_set
<rgw_data_notify_entry
>& entries
) {
2149 std::lock_guard l
{inc_lock
};
2150 modified_shards
.insert(entries
.begin(), entries
.end());
2153 int operate(const DoutPrefixProvider
*dpp
) override
{
2155 yield
init_lease_cr();
2156 while (!lease_cr
->is_locked()) {
2157 if (lease_cr
->is_done()) {
2158 tn
->log(5, "failed to take lease");
2159 set_status("lease lock failed, early abort");
2161 return set_cr_error(lease_cr
->get_ret_status());
2166 *reset_backoff
= true;
2167 tn
->log(10, "took lease");
2168 /* Reread data sync status to fech latest marker and objv */
2170 yield
call(new RGWSimpleRadosReadCR
<rgw_data_sync_marker
>(sync_env
->dpp
, sync_env
->driver
,
2171 rgw_raw_obj(pool
, status_oid
),
2172 &sync_marker
, true, &objv
));
2174 lease_cr
->go_down();
2176 return set_cr_error(retcode
);
2180 if (sync_marker
.state
== rgw_data_sync_marker::FullSync
) {
2181 yield
call(new RGWDataFullSyncShardCR(sc
, pool
, shard_id
,
2183 status_oid
, error_repo
,
2184 lease_cr
, sync_status
,
2185 objv
, bucket_shard_cache
));
2187 if (retcode
!= -EBUSY
) {
2188 tn
->log(10, SSTR("full sync failed (retcode=" << retcode
<< ")"));
2190 lease_cr
->go_down();
2192 return set_cr_error(retcode
);
2194 } else if (sync_marker
.state
== rgw_data_sync_marker::IncrementalSync
) {
2195 yield
call(new RGWDataIncSyncShardCR(sc
, pool
, shard_id
,
2197 status_oid
, error_repo
,
2198 lease_cr
, sync_status
,
2199 objv
, bucket_shard_cache
,
2200 inc_lock
, modified_shards
));
2202 if (retcode
!= -EBUSY
) {
2203 tn
->log(10, SSTR("incremental sync failed (retcode=" << retcode
2206 lease_cr
->go_down();
2208 return set_cr_error(retcode
);
2211 lease_cr
->go_down();
2213 return set_cr_error(-EIO
);
2220 void init_lease_cr() {
2221 set_status("acquiring sync lock");
2222 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
2223 string lock_name
= "sync_lock";
2227 auto driver
= sync_env
->driver
;
2228 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, driver
,
2229 rgw_raw_obj(pool
, status_oid
),
2230 lock_name
, lock_duration
, this,
2232 lease_stack
.reset(spawn(lease_cr
.get(), false));
2236 class RGWDataSyncShardControlCR
: public RGWBackoffControlCR
{
2238 RGWDataSyncEnv
*sync_env
;
2243 rgw_data_sync_marker sync_marker
;
2244 rgw_data_sync_status sync_status
;
2246 RGWSyncTraceNodeRef tn
;
2247 RGWObjVersionTracker
& objv
;
2249 RGWDataSyncShardControlCR(RGWDataSyncCtx
*_sc
, const rgw_pool
& _pool
,
2250 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
,
2251 const rgw_data_sync_status
& sync_status
,
2252 RGWObjVersionTracker
& objv
,
2253 RGWSyncTraceNodeRef
& _tn_parent
)
2254 : RGWBackoffControlCR(_sc
->cct
, false),
2255 sc(_sc
), sync_env(_sc
->env
),
2257 shard_id(_shard_id
),
2258 sync_marker(_marker
), objv(objv
) {
2259 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "shard", std::to_string(shard_id
));
2262 RGWCoroutine
*alloc_cr() override
{
2263 return new RGWDataSyncShardCR(sc
, pool
, shard_id
, sync_marker
, sync_status
, tn
, objv
, backoff_ptr());
2266 RGWCoroutine
*alloc_finisher_cr() override
{
2267 return new RGWSimpleRadosReadCR
<rgw_data_sync_marker
>(sync_env
->dpp
, sync_env
->driver
,
2268 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
)),
2269 &sync_marker
, true, &objv
);
2272 void append_modified_shards(bc::flat_set
<rgw_data_notify_entry
>& keys
) {
2273 std::lock_guard l
{cr_lock()};
2275 RGWDataSyncShardCR
*cr
= static_cast<RGWDataSyncShardCR
*>(get_cr());
2280 cr
->append_modified_shards(keys
);
2284 class RGWDataSyncCR
: public RGWCoroutine
{
2286 RGWDataSyncEnv
*sync_env
;
2287 uint32_t num_shards
;
2289 rgw_data_sync_status sync_status
;
2290 std::vector
<RGWObjVersionTracker
> objvs
;
2292 ceph::mutex shard_crs_lock
=
2293 ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
2294 map
<int, RGWDataSyncShardControlCR
*> shard_crs
;
2296 bool *reset_backoff
;
2298 RGWSyncTraceNodeRef tn
;
2300 RGWDataSyncModule
*data_sync_module
{nullptr};
2302 boost::intrusive_ptr
<RGWContinuousLeaseCR
> init_lease
;
2303 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
2305 RGWObjVersionTracker obj_version
;
2307 RGWDataSyncCR(RGWDataSyncCtx
*_sc
, uint32_t _num_shards
, RGWSyncTraceNodeRef
& _tn
, bool *_reset_backoff
) : RGWCoroutine(_sc
->cct
),
2308 sc(_sc
), sync_env(_sc
->env
),
2309 num_shards(_num_shards
),
2310 reset_backoff(_reset_backoff
), tn(_tn
) {
2314 ~RGWDataSyncCR() override
{
2315 for (auto iter
: shard_crs
) {
2319 init_lease
->abort();
2323 int operate(const DoutPrefixProvider
*dpp
) override
{
2326 /* read sync status */
2327 yield
call(new RGWReadDataSyncStatusCoroutine(sc
, &sync_status
,
2328 &obj_version
, objvs
));
2330 data_sync_module
= sync_env
->sync_module
->get_data_handler();
2332 if (retcode
< 0 && retcode
!= -ENOENT
) {
2333 tn
->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode
));
2334 return set_cr_error(retcode
);
2337 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
!=
2338 rgw_data_sync_info::StateSync
) {
2340 RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc
, this));
2341 yield lease_stack
.reset(spawn(init_lease
.get(), false));
2343 while (!init_lease
->is_locked()) {
2344 if (init_lease
->is_done()) {
2345 tn
->log(5, "ERROR: failed to take data sync status lease");
2346 set_status("lease lock failed, early abort");
2348 return set_cr_error(init_lease
->get_ret_status());
2350 tn
->log(5, "waiting on data sync status lease");
2351 yield
set_sleeping(true);
2353 tn
->log(5, "acquired data sync status lease");
2355 // Reread sync status now that we've acquired the lock!
2356 obj_version
.clear();
2357 yield
call(new RGWReadDataSyncStatusCoroutine(sc
, &sync_status
, &obj_version
, objvs
));
2359 tn
->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode
));
2360 return set_cr_error(retcode
);
2364 /* state: init status */
2365 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateInit
) {
2366 tn
->log(20, SSTR("init"));
2367 sync_status
.sync_info
.num_shards
= num_shards
;
2368 uint64_t instance_id
;
2369 instance_id
= ceph::util::generate_random_number
<uint64_t>();
2370 yield
call(new RGWInitDataSyncStatusCoroutine(sc
, num_shards
, instance_id
, tn
,
2371 &sync_status
, init_lease
, obj_version
, objvs
));
2373 tn
->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode
));
2374 init_lease
->go_down();
2376 return set_cr_error(retcode
);
2378 // sets state = StateBuildingFullSyncMaps
2380 *reset_backoff
= true;
2383 data_sync_module
->init(sc
, sync_status
.sync_info
.instance_id
);
2385 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateBuildingFullSyncMaps
) {
2386 tn
->log(10, SSTR("building full sync maps"));
2387 /* call sync module init here */
2388 sync_status
.sync_info
.num_shards
= num_shards
;
2389 yield
call(data_sync_module
->init_sync(dpp
, sc
));
2391 tn
->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode
));
2392 return set_cr_error(retcode
);
2395 if (!init_lease
->is_locked()) {
2396 init_lease
->go_down();
2398 return set_cr_error(-ECANCELED
);
2400 /* state: building full sync maps */
2401 yield
call(new RGWListBucketIndexesCR(sc
, &sync_status
, objvs
));
2403 tn
->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode
));
2404 return set_cr_error(retcode
);
2406 sync_status
.sync_info
.state
= rgw_data_sync_info::StateSync
;
2408 if (!init_lease
->is_locked()) {
2409 init_lease
->go_down();
2411 return set_cr_error(-ECANCELED
);
2413 /* update new state */
2414 yield
call(set_sync_info_cr());
2416 tn
->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode
));
2417 return set_cr_error(retcode
);
2420 *reset_backoff
= true;
2423 yield
call(data_sync_module
->start_sync(dpp
, sc
));
2425 tn
->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode
));
2426 return set_cr_error(retcode
);
2429 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateSync
) {
2431 init_lease
->go_down();
2434 lease_stack
.reset();
2437 tn
->log(10, SSTR("spawning " << num_shards
<< " shards sync"));
2438 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
.sync_markers
.begin();
2439 iter
!= sync_status
.sync_markers
.end(); ++iter
) {
2440 RGWDataSyncShardControlCR
*cr
= new RGWDataSyncShardControlCR(sc
, sync_env
->svc
->zone
->get_zone_params().log_pool
,
2441 iter
->first
, iter
->second
, sync_status
, objvs
[iter
->first
], tn
);
2443 shard_crs_lock
.lock();
2444 shard_crs
[iter
->first
] = cr
;
2445 shard_crs_lock
.unlock();
2451 return set_cr_done();
2456 RGWCoroutine
*set_sync_info_cr() {
2457 return new RGWSimpleRadosWriteCR
<rgw_data_sync_info
>(sync_env
->dpp
, sync_env
->driver
,
2458 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sc
->source_zone
)),
2459 sync_status
.sync_info
, &obj_version
);
2462 void wakeup(int shard_id
, bc::flat_set
<rgw_data_notify_entry
>& entries
) {
2463 std::lock_guard l
{shard_crs_lock
};
2464 map
<int, RGWDataSyncShardControlCR
*>::iterator iter
= shard_crs
.find(shard_id
);
2465 if (iter
== shard_crs
.end()) {
2468 iter
->second
->append_modified_shards(entries
);
2469 iter
->second
->wakeup();
2473 class RGWDefaultDataSyncModule
: public RGWDataSyncModule
{
2475 RGWDefaultDataSyncModule() {}
2477 RGWCoroutine
*sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
,
2478 rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
2479 std::optional
<uint64_t> versioned_epoch
,
2480 const rgw_zone_set_entry
& source_trace_entry
,
2481 rgw_zone_set
*zones_trace
) override
;
2482 RGWCoroutine
*remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
2483 RGWCoroutine
*create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
2484 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
2487 class RGWDefaultSyncModuleInstance
: public RGWSyncModuleInstance
{
2488 RGWDefaultDataSyncModule data_handler
;
2490 RGWDefaultSyncModuleInstance() {}
2491 RGWDataSyncModule
*get_data_handler() override
{
2492 return &data_handler
;
2494 bool supports_user_writes() override
{
2499 int RGWDefaultSyncModule::create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
)
2501 instance
->reset(new RGWDefaultSyncModuleInstance());
2505 class RGWUserPermHandler
{
2507 friend class Bucket
;
2509 RGWDataSyncEnv
*sync_env
;
2513 RGWUserInfo user_info
;
2514 rgw::IAM::Environment env
;
2515 std::unique_ptr
<rgw::auth::Identity
> identity
;
2516 RGWAccessControlPolicy user_acl
;
2519 std::shared_ptr
<_info
> info
;
2523 std::shared_ptr
<Init
> init_action
;
2525 struct Init
: public RGWGenericAsyncCR::Action
{
2526 RGWDataSyncEnv
*sync_env
;
2529 std::shared_ptr
<RGWUserPermHandler::_info
> info
;
2533 Init(RGWUserPermHandler
*handler
) : sync_env(handler
->sync_env
),
2535 info(handler
->info
) {}
2536 int operate() override
{
2537 auto user_ctl
= sync_env
->driver
->getRados()->ctl
.user
;
2539 ret
= user_ctl
->get_info_by_uid(sync_env
->dpp
, uid
, &info
->user_info
, null_yield
);
2544 info
->identity
= rgw::auth::transform_old_authinfo(sync_env
->cct
,
2546 RGW_PERM_FULL_CONTROL
,
2547 false, /* system_request? */
2550 map
<string
, bufferlist
> uattrs
;
2552 ret
= user_ctl
->get_attrs_by_uid(sync_env
->dpp
, uid
, &uattrs
, null_yield
);
2554 ret
= RGWUserPermHandler::policy_from_attrs(sync_env
->cct
, uattrs
, &info
->user_acl
);
2556 if (ret
== -ENOENT
) {
2557 info
->user_acl
.create_default(uid
, info
->user_info
.display_name
);
2565 RGWUserPermHandler(RGWDataSyncEnv
*_sync_env
,
2566 const rgw_user
& _uid
) : sync_env(_sync_env
),
2569 RGWCoroutine
*init_cr() {
2570 info
= make_shared
<_info
>();
2571 init_action
= make_shared
<Init
>(this);
2573 return new RGWGenericAsyncCR(sync_env
->cct
,
2574 sync_env
->async_rados
,
2579 RGWDataSyncEnv
*sync_env
;
2580 std::shared_ptr
<_info
> info
;
2581 RGWAccessControlPolicy bucket_acl
;
2582 std::optional
<perm_state
> ps
;
2586 int init(RGWUserPermHandler
*handler
,
2587 const RGWBucketInfo
& bucket_info
,
2588 const map
<string
, bufferlist
>& bucket_attrs
);
2590 bool verify_bucket_permission(int perm
);
2591 bool verify_object_permission(const map
<string
, bufferlist
>& obj_attrs
,
2595 static int policy_from_attrs(CephContext
*cct
,
2596 const map
<string
, bufferlist
>& attrs
,
2597 RGWAccessControlPolicy
*acl
) {
2600 auto aiter
= attrs
.find(RGW_ATTR_ACL
);
2601 if (aiter
== attrs
.end()) {
2604 auto iter
= aiter
->second
.begin();
2607 } catch (buffer::error
& err
) {
2608 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): could not decode policy, caught buffer::error" << dendl
;
2615 int init_bucket(const RGWBucketInfo
& bucket_info
,
2616 const map
<string
, bufferlist
>& bucket_attrs
,
2618 return bs
->init(this, bucket_info
, bucket_attrs
);
2622 int RGWUserPermHandler::Bucket::init(RGWUserPermHandler
*handler
,
2623 const RGWBucketInfo
& bucket_info
,
2624 const map
<string
, bufferlist
>& bucket_attrs
)
2626 sync_env
= handler
->sync_env
;
2627 info
= handler
->info
;
2629 int r
= RGWUserPermHandler::policy_from_attrs(sync_env
->cct
, bucket_attrs
, &bucket_acl
);
2634 ps
.emplace(sync_env
->cct
,
2636 info
->identity
.get(),
2638 info
->identity
->get_perm_mask(),
2639 false, /* defer to bucket acls */
2640 nullptr, /* referer */
2641 false); /* request_payer */
2646 bool RGWUserPermHandler::Bucket::verify_bucket_permission(int perm
)
2648 return verify_bucket_permission_no_policy(sync_env
->dpp
,
2655 bool RGWUserPermHandler::Bucket::verify_object_permission(const map
<string
, bufferlist
>& obj_attrs
,
2658 RGWAccessControlPolicy obj_acl
;
2660 int r
= policy_from_attrs(sync_env
->cct
, obj_attrs
, &obj_acl
);
2665 return verify_bucket_permission_no_policy(sync_env
->dpp
,
2672 class RGWFetchObjFilter_Sync
: public RGWFetchObjFilter_Default
{
2673 rgw_bucket_sync_pipe sync_pipe
;
2675 std::shared_ptr
<RGWUserPermHandler::Bucket
> bucket_perms
;
2676 std::optional
<rgw_sync_pipe_dest_params
> verify_dest_params
;
2678 std::optional
<ceph::real_time
> mtime
;
2679 std::optional
<string
> etag
;
2680 std::optional
<uint64_t> obj_size
;
2682 std::unique_ptr
<rgw::auth::Identity
> identity
;
2684 std::shared_ptr
<bool> need_retry
;
2687 RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe
& _sync_pipe
,
2688 std::shared_ptr
<RGWUserPermHandler::Bucket
>& _bucket_perms
,
2689 std::optional
<rgw_sync_pipe_dest_params
>&& _verify_dest_params
,
2690 std::shared_ptr
<bool>& _need_retry
) : sync_pipe(_sync_pipe
),
2691 bucket_perms(_bucket_perms
),
2692 verify_dest_params(std::move(_verify_dest_params
)),
2693 need_retry(_need_retry
) {
2694 *need_retry
= false;
2697 int filter(CephContext
*cct
,
2698 const rgw_obj_key
& source_key
,
2699 const RGWBucketInfo
& dest_bucket_info
,
2700 std::optional
<rgw_placement_rule
> dest_placement_rule
,
2701 const map
<string
, bufferlist
>& obj_attrs
,
2702 std::optional
<rgw_user
> *poverride_owner
,
2703 const rgw_placement_rule
**prule
) override
;
2706 int RGWFetchObjFilter_Sync::filter(CephContext
*cct
,
2707 const rgw_obj_key
& source_key
,
2708 const RGWBucketInfo
& dest_bucket_info
,
2709 std::optional
<rgw_placement_rule
> dest_placement_rule
,
2710 const map
<string
, bufferlist
>& obj_attrs
,
2711 std::optional
<rgw_user
> *poverride_owner
,
2712 const rgw_placement_rule
**prule
)
2714 int abort_err
= -ERR_PRECONDITION_FAILED
;
2716 rgw_sync_pipe_params params
;
2718 RGWObjTags obj_tags
;
2720 auto iter
= obj_attrs
.find(RGW_ATTR_TAGS
);
2721 if (iter
!= obj_attrs
.end()) {
2723 auto it
= iter
->second
.cbegin();
2724 obj_tags
.decode(it
);
2725 } catch (buffer::error
&err
) {
2726 ldout(cct
, 0) << "ERROR: " << __func__
<< ": caught buffer::error couldn't decode TagSet " << dendl
;
2730 if (!sync_pipe
.info
.handler
.find_obj_params(source_key
,
2731 obj_tags
.get_tags(),
2736 if (verify_dest_params
&&
2737 !(*verify_dest_params
== params
.dest
)) {
2738 /* raced! original dest params were different, will need to retry */
2739 ldout(cct
, 0) << "WARNING: " << __func__
<< ": pipe dest params are different than original params, must have raced with object rewrite, retrying" << dendl
;
2744 std::optional
<std::map
<string
, bufferlist
> > new_attrs
;
2746 if (params
.dest
.acl_translation
) {
2747 rgw_user
& acl_translation_owner
= params
.dest
.acl_translation
->owner
;
2748 if (!acl_translation_owner
.empty()) {
2749 if (params
.mode
== rgw_sync_pipe_params::MODE_USER
&&
2750 acl_translation_owner
!= dest_bucket_info
.owner
) {
2751 ldout(cct
, 0) << "ERROR: " << __func__
<< ": acl translation was requested, but user (" << acl_translation_owner
2752 << ") is not dest bucket owner (" << dest_bucket_info
.owner
<< ")" << dendl
;
2755 *poverride_owner
= acl_translation_owner
;
2758 if (params
.mode
== rgw_sync_pipe_params::MODE_USER
) {
2759 if (!bucket_perms
->verify_object_permission(obj_attrs
, RGW_PERM_READ
)) {
2760 ldout(cct
, 0) << "ERROR: " << __func__
<< ": permission check failed: user not allowed to fetch object" << dendl
;
2765 if (!dest_placement_rule
&&
2766 params
.dest
.storage_class
) {
2767 dest_rule
.storage_class
= *params
.dest
.storage_class
;
2768 dest_rule
.inherit_from(dest_bucket_info
.placement_rule
);
2769 dest_placement_rule
= dest_rule
;
2770 *prule
= &dest_rule
;
2773 return RGWFetchObjFilter_Default::filter(cct
,
2776 dest_placement_rule
,
2782 class RGWObjFetchCR
: public RGWCoroutine
{
2784 RGWDataSyncEnv
*sync_env
;
2785 rgw_bucket_sync_pipe
& sync_pipe
;
2787 std::optional
<rgw_obj_key
> dest_key
;
2788 std::optional
<uint64_t> versioned_epoch
;
2789 const rgw_zone_set_entry
& source_trace_entry
;
2790 rgw_zone_set
*zones_trace
;
2792 bool need_more_info
{false};
2793 bool check_change
{false};
2795 ceph::real_time src_mtime
;
2798 map
<string
, bufferlist
> src_attrs
;
2799 map
<string
, string
> src_headers
;
2801 std::optional
<rgw_user
> param_user
;
2802 rgw_sync_pipe_params::Mode param_mode
;
2804 std::optional
<RGWUserPermHandler
> user_perms
;
2805 std::shared_ptr
<RGWUserPermHandler::Bucket
> source_bucket_perms
;
2806 RGWUserPermHandler::Bucket dest_bucket_perms
;
2808 std::optional
<rgw_sync_pipe_dest_params
> dest_params
;
2811 std::shared_ptr
<bool> need_retry
;
2813 RGWObjFetchCR(RGWDataSyncCtx
*_sc
,
2814 rgw_bucket_sync_pipe
& _sync_pipe
,
2816 std::optional
<rgw_obj_key
> _dest_key
,
2817 std::optional
<uint64_t> _versioned_epoch
,
2818 const rgw_zone_set_entry
& source_trace_entry
,
2819 rgw_zone_set
*_zones_trace
) : RGWCoroutine(_sc
->cct
),
2820 sc(_sc
), sync_env(_sc
->env
),
2821 sync_pipe(_sync_pipe
),
2823 dest_key(_dest_key
),
2824 versioned_epoch(_versioned_epoch
),
2825 source_trace_entry(source_trace_entry
),
2826 zones_trace(_zones_trace
) {
2830 int operate(const DoutPrefixProvider
*dpp
) override
{
2833 #define MAX_RACE_RETRIES_OBJ_FETCH 10
2834 for (try_num
= 0; try_num
< MAX_RACE_RETRIES_OBJ_FETCH
; ++try_num
) {
2837 std::optional
<rgw_user
> param_acl_translation
;
2838 std::optional
<string
> param_storage_class
;
2840 if (!sync_pipe
.info
.handler
.find_basic_info_without_tags(key
,
2842 ¶m_acl_translation
,
2843 ¶m_storage_class
,
2846 if (!need_more_info
) {
2847 return set_cr_error(-ERR_PRECONDITION_FAILED
);
2852 if (need_more_info
) {
2853 ldout(cct
, 20) << "Could not determine exact policy rule for obj=" << key
<< ", will read source object attributes" << dendl
;
2855 * we need to fetch info about source object, so that we can determine
2856 * the correct policy configuration. This can happen if there are multiple
2857 * policy rules, and some depend on the object tagging */
2858 yield
call(new RGWStatRemoteObjCR(sync_env
->async_rados
,
2861 sync_pipe
.info
.source_bs
.bucket
,
2869 return set_cr_error(retcode
);
2872 RGWObjTags obj_tags
;
2874 auto iter
= src_attrs
.find(RGW_ATTR_TAGS
);
2875 if (iter
!= src_attrs
.end()) {
2877 auto it
= iter
->second
.cbegin();
2878 obj_tags
.decode(it
);
2879 } catch (buffer::error
&err
) {
2880 ldout(cct
, 0) << "ERROR: " << __func__
<< ": caught buffer::error couldn't decode TagSet " << dendl
;
2884 rgw_sync_pipe_params params
;
2885 if (!sync_pipe
.info
.handler
.find_obj_params(key
,
2886 obj_tags
.get_tags(),
2888 return set_cr_error(-ERR_PRECONDITION_FAILED
);
2891 param_user
= params
.user
;
2892 param_mode
= params
.mode
;
2894 dest_params
= params
.dest
;
2897 if (param_mode
== rgw_sync_pipe_params::MODE_USER
) {
2899 ldout(cct
, 20) << "ERROR: " << __func__
<< ": user level sync but user param not set" << dendl
;
2900 return set_cr_error(-EPERM
);
2902 user_perms
.emplace(sync_env
, *param_user
);
2904 yield
call(user_perms
->init_cr());
2906 ldout(cct
, 20) << "ERROR: " << __func__
<< ": failed to init user perms manager for uid=" << *param_user
<< dendl
;
2907 return set_cr_error(retcode
);
2910 /* verify that user is allowed to write at the target bucket */
2911 int r
= user_perms
->init_bucket(sync_pipe
.dest_bucket_info
,
2912 sync_pipe
.dest_bucket_attrs
,
2913 &dest_bucket_perms
);
2915 ldout(cct
, 20) << "ERROR: " << __func__
<< ": failed to init bucket perms manager for uid=" << *param_user
<< " bucket=" << sync_pipe
.source_bucket_info
.bucket
.get_key() << dendl
;
2916 return set_cr_error(retcode
);
2919 if (!dest_bucket_perms
.verify_bucket_permission(RGW_PERM_WRITE
)) {
2920 ldout(cct
, 0) << "ERROR: " << __func__
<< ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe
.info
.dest_bucket
.get_key() << ")" << dendl
;
2924 /* init source bucket permission structure */
2925 source_bucket_perms
= make_shared
<RGWUserPermHandler::Bucket
>();
2926 r
= user_perms
->init_bucket(sync_pipe
.source_bucket_info
,
2927 sync_pipe
.source_bucket_attrs
,
2928 source_bucket_perms
.get());
2930 ldout(cct
, 20) << "ERROR: " << __func__
<< ": failed to init bucket perms manager for uid=" << *param_user
<< " bucket=" << sync_pipe
.source_bucket_info
.bucket
.get_key() << dendl
;
2931 return set_cr_error(retcode
);
2937 need_retry
= make_shared
<bool>();
2939 auto filter
= make_shared
<RGWFetchObjFilter_Sync
>(sync_pipe
,
2940 source_bucket_perms
,
2941 std::move(dest_params
),
2944 call(new RGWFetchRemoteObjCR(sync_env
->async_rados
, sync_env
->driver
, sc
->source_zone
,
2946 sync_pipe
.info
.source_bs
.bucket
,
2947 std::nullopt
, sync_pipe
.dest_bucket_info
,
2948 key
, dest_key
, versioned_epoch
,
2950 std::static_pointer_cast
<RGWFetchObjFilter
>(filter
),
2951 source_trace_entry
, zones_trace
,
2952 sync_env
->counters
, dpp
));
2958 return set_cr_error(retcode
);
2961 return set_cr_done();
2964 ldout(cct
, 0) << "ERROR: " << __func__
<< ": Too many retries trying to fetch object, possibly a bug: bucket=" << sync_pipe
.source_bucket_info
.bucket
.get_key() << " key=" << key
<< dendl
;
2966 return set_cr_error(-EIO
);
2972 RGWCoroutine
*RGWDefaultDataSyncModule::sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
,
2973 rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
2974 std::optional
<uint64_t> versioned_epoch
,
2975 const rgw_zone_set_entry
& source_trace_entry
,
2976 rgw_zone_set
*zones_trace
)
2978 return new RGWObjFetchCR(sc
, sync_pipe
, key
, std::nullopt
, versioned_epoch
,
2979 source_trace_entry
, zones_trace
);
2982 RGWCoroutine
*RGWDefaultDataSyncModule::remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
2983 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
2985 auto sync_env
= sc
->env
;
2986 return new RGWRemoveObjCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->driver
, sc
->source_zone
,
2987 sync_pipe
.dest_bucket_info
, key
, versioned
, versioned_epoch
,
2988 NULL
, NULL
, false, &mtime
, zones_trace
);
2991 RGWCoroutine
*RGWDefaultDataSyncModule::create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
2992 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
2994 auto sync_env
= sc
->env
;
2995 return new RGWRemoveObjCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->driver
, sc
->source_zone
,
2996 sync_pipe
.dest_bucket_info
, key
, versioned
, versioned_epoch
,
2997 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
3000 class RGWArchiveDataSyncModule
: public RGWDefaultDataSyncModule
{
3002 RGWArchiveDataSyncModule() {}
3004 RGWCoroutine
*sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
,
3005 rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
3006 std::optional
<uint64_t> versioned_epoch
,
3007 const rgw_zone_set_entry
& source_trace_entry
,
3008 rgw_zone_set
*zones_trace
) override
;
3009 RGWCoroutine
*remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
3010 RGWCoroutine
*create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
3011 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
3014 class RGWArchiveSyncModuleInstance
: public RGWDefaultSyncModuleInstance
{
3015 RGWArchiveDataSyncModule data_handler
;
3017 RGWArchiveSyncModuleInstance() {}
3018 RGWDataSyncModule
*get_data_handler() override
{
3019 return &data_handler
;
3021 RGWMetadataHandler
*alloc_bucket_meta_handler() override
{
3022 return RGWArchiveBucketMetaHandlerAllocator::alloc();
3024 RGWBucketInstanceMetadataHandlerBase
*alloc_bucket_instance_meta_handler(rgw::sal::Driver
* driver
) override
{
3025 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc(driver
);
3029 int RGWArchiveSyncModule::create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
)
3031 instance
->reset(new RGWArchiveSyncModuleInstance());
3035 RGWCoroutine
*RGWArchiveDataSyncModule::sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
,
3036 rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
3037 std::optional
<uint64_t> versioned_epoch
,
3038 const rgw_zone_set_entry
& source_trace_entry
,
3039 rgw_zone_set
*zones_trace
)
3041 auto sync_env
= sc
->env
;
3042 ldout(sc
->cct
, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
3043 if (!sync_pipe
.dest_bucket_info
.versioned() ||
3044 (sync_pipe
.dest_bucket_info
.flags
& BUCKET_VERSIONS_SUSPENDED
)) {
3045 ldout(sc
->cct
, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl
;
3046 sync_pipe
.dest_bucket_info
.flags
= (sync_pipe
.dest_bucket_info
.flags
& ~BUCKET_VERSIONS_SUSPENDED
) | BUCKET_VERSIONED
;
3047 int op_ret
= sync_env
->driver
->getRados()->put_bucket_instance_info(sync_pipe
.dest_bucket_info
, false, real_time(), NULL
, sync_env
->dpp
, null_yield
);
3049 ldpp_dout(sync_env
->dpp
, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl
;
3054 std::optional
<rgw_obj_key
> dest_key
;
3056 if (versioned_epoch
.value_or(0) == 0) { /* force version if not set */
3057 versioned_epoch
= 0;
3061 if (key
.instance
.empty()) {
3063 sync_env
->driver
->getRados()->gen_rand_obj_instance_name(&(*dest_key
));
3066 return new RGWObjFetchCR(sc
, sync_pipe
, key
, dest_key
, versioned_epoch
,
3067 source_trace_entry
, zones_trace
);
3070 RGWCoroutine
*RGWArchiveDataSyncModule::remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
,
3071 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
3073 ldout(sc
->cct
, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
3077 RGWCoroutine
*RGWArchiveDataSyncModule::create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
, rgw_obj_key
& key
, real_time
& mtime
,
3078 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
3080 ldout(sc
->cct
, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " mtime=" << mtime
3081 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
3082 auto sync_env
= sc
->env
;
3083 return new RGWRemoveObjCR(sync_env
->dpp
, sync_env
->async_rados
, sync_env
->driver
, sc
->source_zone
,
3084 sync_pipe
.dest_bucket_info
, key
, versioned
, versioned_epoch
,
3085 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
3088 class RGWDataSyncControlCR
: public RGWBackoffControlCR
3091 RGWDataSyncEnv
*sync_env
;
3092 uint32_t num_shards
;
3094 RGWSyncTraceNodeRef tn
;
3096 static constexpr bool exit_on_error
= false; // retry on all errors
3098 RGWDataSyncControlCR(RGWDataSyncCtx
*_sc
, uint32_t _num_shards
,
3099 RGWSyncTraceNodeRef
& _tn_parent
) : RGWBackoffControlCR(_sc
->cct
, exit_on_error
),
3100 sc(_sc
), sync_env(_sc
->env
), num_shards(_num_shards
) {
3101 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "sync");
3104 RGWCoroutine
*alloc_cr() override
{
3105 return new RGWDataSyncCR(sc
, num_shards
, tn
, backoff_ptr());
3108 void wakeup(int shard_id
, bc::flat_set
<rgw_data_notify_entry
>& entries
) {
3109 ceph::mutex
& m
= cr_lock();
3112 RGWDataSyncCR
*cr
= static_cast<RGWDataSyncCR
*>(get_cr());
3122 cr
->wakeup(shard_id
, entries
);
3129 void RGWRemoteDataLog::wakeup(int shard_id
, bc::flat_set
<rgw_data_notify_entry
>& entries
) {
3130 std::shared_lock rl
{lock
};
3131 if (!data_sync_cr
) {
3134 data_sync_cr
->wakeup(shard_id
, entries
);
3137 int RGWRemoteDataLog::run_sync(const DoutPrefixProvider
*dpp
, int num_shards
)
3140 data_sync_cr
= new RGWDataSyncControlCR(&sc
, num_shards
, tn
);
3141 data_sync_cr
->get(); // run() will drop a ref, so take another
3144 int r
= run(dpp
, data_sync_cr
);
3147 data_sync_cr
->put();
3148 data_sync_cr
= NULL
;
3152 ldpp_dout(dpp
, 0) << "ERROR: failed to run sync" << dendl
;
3158 CephContext
*RGWDataSyncStatusManager::get_cct() const
3160 return driver
->ctx();
3163 int RGWDataSyncStatusManager::init(const DoutPrefixProvider
*dpp
)
3167 if (!(zone_def
= driver
->svc()->zone
->find_zone(source_zone
))) {
3168 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone
<< dendl
;
3172 if (!driver
->svc()->sync_modules
->get_manager()->supports_data_export(zone_def
->tier_type
)) {
3176 const RGWZoneParams
& zone_params
= driver
->svc()->zone
->get_zone_params();
3178 if (sync_module
== nullptr) {
3179 sync_module
= driver
->get_sync_module();
3182 conn
= driver
->svc()->zone
->get_zone_conn(source_zone
);
3184 ldpp_dout(this, 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
3188 error_logger
= new RGWSyncErrorLogger(driver
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
3190 int r
= source_log
.init(source_zone
, conn
, error_logger
, driver
->getRados()->get_sync_tracer(),
3191 sync_module
, counters
);
3193 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
3198 rgw_datalog_info datalog_info
;
3199 r
= source_log
.read_log_info(dpp
, &datalog_info
);
3201 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r
<< dendl
;
3206 num_shards
= datalog_info
.num_shards
;
3208 for (int i
= 0; i
< num_shards
; i
++) {
3209 shard_objs
[i
] = rgw_raw_obj(zone_params
.log_pool
, shard_obj_name(source_zone
, i
));
3215 void RGWDataSyncStatusManager::finalize()
3217 delete error_logger
;
3218 error_logger
= nullptr;
3221 unsigned RGWDataSyncStatusManager::get_subsys() const
3226 std::ostream
& RGWDataSyncStatusManager::gen_prefix(std::ostream
& out
) const
3228 auto zone
= std::string_view
{source_zone
.id
};
3229 return out
<< "data sync zone:" << zone
.substr(0, 8) << ' ';
3232 string
RGWDataSyncStatusManager::sync_status_oid(const rgw_zone_id
& source_zone
)
3234 char buf
[datalog_sync_status_oid_prefix
.size() + source_zone
.id
.size() + 16];
3235 snprintf(buf
, sizeof(buf
), "%s.%s", datalog_sync_status_oid_prefix
.c_str(), source_zone
.id
.c_str());
3240 string
RGWDataSyncStatusManager::shard_obj_name(const rgw_zone_id
& source_zone
, int shard_id
)
3242 char buf
[datalog_sync_status_shard_prefix
.size() + source_zone
.id
.size() + 16];
3243 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_status_shard_prefix
.c_str(), source_zone
.id
.c_str(), shard_id
);
3248 class RGWInitBucketShardSyncStatusCoroutine
: public RGWCoroutine
{
3250 RGWDataSyncEnv
*sync_env
;
3252 const rgw_bucket_sync_pair_info
& sync_pair
;
3253 const string sync_status_oid
;
3255 rgw_bucket_shard_sync_info
& status
;
3256 RGWObjVersionTracker
& objv_tracker
;
3257 const BucketIndexShardsManager
& marker_mgr
;
3260 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx
*_sc
,
3261 const rgw_bucket_sync_pair_info
& _sync_pair
,
3262 rgw_bucket_shard_sync_info
& _status
,
3264 const BucketIndexShardsManager
& _marker_mgr
,
3265 RGWObjVersionTracker
& objv_tracker
,
3267 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3268 sync_pair(_sync_pair
),
3269 sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc
->source_zone
, _sync_pair
, gen
)),
3270 status(_status
), objv_tracker(objv_tracker
), marker_mgr(_marker_mgr
), exclusive(exclusive
)
3273 int operate(const DoutPrefixProvider
*dpp
) override
{
3276 rgw_raw_obj
obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, sync_status_oid
);
3278 // whether or not to do full sync, incremental sync will follow anyway
3279 if (sync_env
->sync_module
->should_full_sync()) {
3280 const auto max_marker
= marker_mgr
.get(sync_pair
.source_bs
.shard_id
, "");
3281 status
.inc_marker
.position
= max_marker
;
3283 status
.inc_marker
.timestamp
= ceph::real_clock::now();
3284 status
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
3286 map
<string
, bufferlist
> attrs
;
3287 status
.encode_all_attrs(attrs
);
3288 call(new RGWSimpleRadosWriteAttrsCR(dpp
, sync_env
->driver
,
3289 obj
, attrs
, &objv_tracker
, exclusive
));
3293 ldout(cct
, 20) << "ERROR: init marker position failed. error: " << retcode
<< dendl
;
3294 return set_cr_error(retcode
);
3296 ldout(cct
, 20) << "init marker position: " << status
.inc_marker
.position
<<
3297 ". written to shard status object: " << sync_status_oid
<< dendl
;
3298 return set_cr_done();
3304 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
3307 static bool decode_attr(CephContext
*cct
, map
<string
, bufferlist
>& attrs
, const string
& attr_name
, T
*val
)
3309 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
3310 if (iter
== attrs
.end()) {
3315 auto biter
= iter
->second
.cbegin();
3317 decode(*val
, biter
);
3318 } catch (buffer::error
& err
) {
3319 ldout(cct
, 0) << "ERROR: failed to decode attribute: " << attr_name
<< dendl
;
3325 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
)
3327 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"state", &state
)) {
3328 decode_attr(cct
, attrs
, "state", &state
);
3330 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"inc_marker", &inc_marker
)) {
3331 decode_attr(cct
, attrs
, "inc_marker", &inc_marker
);
3335 void rgw_bucket_shard_sync_info::encode_all_attrs(map
<string
, bufferlist
>& attrs
)
3337 encode_state_attr(attrs
);
3338 inc_marker
.encode_attr(attrs
);
3341 void rgw_bucket_shard_sync_info::encode_state_attr(map
<string
, bufferlist
>& attrs
)
3344 encode(state
, attrs
[BUCKET_SYNC_ATTR_PREFIX
"state"]);
3347 void rgw_bucket_shard_full_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
3350 encode(*this, attrs
[BUCKET_SYNC_ATTR_PREFIX
"full_marker"]);
3353 void rgw_bucket_shard_inc_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
3356 encode(*this, attrs
[BUCKET_SYNC_ATTR_PREFIX
"inc_marker"]);
3359 class RGWReadBucketPipeSyncStatusCoroutine
: public RGWCoroutine
{
3361 RGWDataSyncEnv
*sync_env
;
3363 rgw_bucket_shard_sync_info
*status
;
3364 RGWObjVersionTracker
* objv_tracker
;
3365 map
<string
, bufferlist
> attrs
;
3367 RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx
*_sc
,
3368 const rgw_bucket_sync_pair_info
& sync_pair
,
3369 rgw_bucket_shard_sync_info
*_status
,
3370 RGWObjVersionTracker
* objv_tracker
,
3372 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3373 oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc
->source_zone
, sync_pair
, gen
)),
3374 status(_status
), objv_tracker(objv_tracker
)
3376 int operate(const DoutPrefixProvider
*dpp
) override
;
3379 int RGWReadBucketPipeSyncStatusCoroutine::operate(const DoutPrefixProvider
*dpp
)
3382 yield
call(new RGWSimpleRadosReadAttrsCR(dpp
, sync_env
->driver
,
3383 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, oid
),
3384 &attrs
, true, objv_tracker
));
3385 if (retcode
== -ENOENT
) {
3386 *status
= rgw_bucket_shard_sync_info();
3387 return set_cr_done();
3390 ldpp_dout(dpp
, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid
<< " ret=" << retcode
<< dendl
;
3391 return set_cr_error(retcode
);
3393 status
->decode_from_attrs(sync_env
->cct
, attrs
);
3394 return set_cr_done();
3399 // wrap ReadSyncStatus and set a flag if it's not in incremental
3400 class CheckBucketShardStatusIsIncremental
: public RGWReadBucketPipeSyncStatusCoroutine
{
3402 rgw_bucket_shard_sync_info status
;
3404 CheckBucketShardStatusIsIncremental(RGWDataSyncCtx
* sc
,
3405 const rgw_bucket_sync_pair_info
& sync_pair
,
3407 : RGWReadBucketPipeSyncStatusCoroutine(sc
, sync_pair
, &status
, nullptr, 0 /*no gen in compat mode*/),
3411 int operate(const DoutPrefixProvider
*dpp
) override
{
3412 int r
= RGWReadBucketPipeSyncStatusCoroutine::operate(dpp
);
3413 if (state
== RGWCoroutine_Done
&&
3414 status
.state
!= rgw_bucket_shard_sync_info::StateIncrementalSync
) {
3421 class CheckAllBucketShardStatusIsIncremental
: public RGWShardCollectCR
{
3422 // start with 1 shard, and only spawn more if we detect an existing shard.
3423 // this makes the backward compatilibility check far less expensive in the
3424 // general case where no shards exist
3425 static constexpr int initial_concurrent_shards
= 1;
3426 static constexpr int max_concurrent_shards
= 16;
3429 rgw_bucket_sync_pair_info sync_pair
;
3430 const int num_shards
;
3434 CheckAllBucketShardStatusIsIncremental(RGWDataSyncCtx
* sc
,
3435 const rgw_bucket_sync_pair_info
& sync_pair
,
3436 int num_shards
, bool* result
)
3437 : RGWShardCollectCR(sc
->cct
, initial_concurrent_shards
),
3438 sc(sc
), sync_pair(sync_pair
), num_shards(num_shards
), result(result
)
3441 bool spawn_next() override
{
3442 // stop spawning if we saw any errors or non-incremental shards
3443 if (shard
>= num_shards
|| status
< 0 || !*result
) {
3446 sync_pair
.source_bs
.shard_id
= shard
++;
3447 spawn(new CheckBucketShardStatusIsIncremental(sc
, sync_pair
, result
), false);
3452 int handle_result(int r
) override
{
3454 ldout(cct
, 4) << "failed to read bucket shard status: "
3455 << cpp_strerror(r
) << dendl
;
3456 } else if (shard
== 0) {
3457 // enable concurrency once the first shard succeeds
3458 max_concurrent
= max_concurrent_shards
;
3464 // wrap InitBucketShardSyncStatus with local storage for 'status' and 'objv'
3465 // and a loop to retry on racing writes
3466 class InitBucketShardStatusCR
: public RGWCoroutine
{
3468 rgw_bucket_sync_pair_info pair
;
3469 rgw_bucket_shard_sync_info status
;
3470 RGWObjVersionTracker objv
;
3472 const BucketIndexShardsManager
& marker_mgr
;
3475 InitBucketShardStatusCR(RGWDataSyncCtx
* sc
,
3476 const rgw_bucket_sync_pair_info
& pair
,
3478 const BucketIndexShardsManager
& marker_mgr
)
3479 : RGWCoroutine(sc
->cct
), sc(sc
), pair(pair
), gen(gen
), marker_mgr(marker_mgr
)
3481 int operate(const DoutPrefixProvider
*dpp
) {
3483 // non exclusive create with empty status
3484 objv
.generate_new_write_ver(cct
);
3485 yield
call(new RGWInitBucketShardSyncStatusCoroutine(sc
, pair
, status
, gen
, marker_mgr
, objv
, false));
3487 return set_cr_error(retcode
);
3489 return set_cr_done();
3495 class InitBucketShardStatusCollectCR
: public RGWShardCollectCR
{
3496 static constexpr int max_concurrent_shards
= 16;
3498 rgw_bucket_sync_pair_info sync_pair
;
3500 const BucketIndexShardsManager
& marker_mgr
;
3502 const int num_shards
;
3505 int handle_result(int r
) override
{
3507 ldout(cct
, 4) << "failed to init bucket shard status: "
3508 << cpp_strerror(r
) << dendl
;
3513 InitBucketShardStatusCollectCR(RGWDataSyncCtx
* sc
,
3514 const rgw_bucket_sync_pair_info
& sync_pair
,
3516 const BucketIndexShardsManager
& marker_mgr
,
3518 : RGWShardCollectCR(sc
->cct
, max_concurrent_shards
),
3519 sc(sc
), sync_pair(sync_pair
), gen(gen
), marker_mgr(marker_mgr
), num_shards(num_shards
)
3522 bool spawn_next() override
{
3523 if (shard
>= num_shards
|| status
< 0) { // stop spawning on any errors
3526 sync_pair
.source_bs
.shard_id
= shard
++;
3527 spawn(new InitBucketShardStatusCR(sc
, sync_pair
, gen
, marker_mgr
), false);
3532 class RemoveBucketShardStatusCR
: public RGWCoroutine
{
3533 RGWDataSyncCtx
* const sc
;
3534 RGWDataSyncEnv
* const sync_env
;
3536 rgw_bucket_sync_pair_info sync_pair
;
3538 RGWObjVersionTracker objv
;
3541 RemoveBucketShardStatusCR(RGWDataSyncCtx
* sc
,
3542 const rgw_bucket_sync_pair_info
& sync_pair
, uint64_t gen
)
3543 : RGWCoroutine(sc
->cct
), sc(sc
), sync_env(sc
->env
),
3544 sync_pair(sync_pair
),
3545 obj(sync_env
->svc
->zone
->get_zone_params().log_pool
,
3546 RGWBucketPipeSyncStatusManager::inc_status_oid(sc
->source_zone
, sync_pair
, gen
))
3549 int operate(const DoutPrefixProvider
*dpp
) override
{
3551 yield
call(new RGWRadosRemoveCR(sync_env
->driver
, obj
, &objv
));
3552 if (retcode
< 0 && retcode
!= -ENOENT
) {
3553 ldout(cct
, 20) << "ERROR: failed to remove bucket shard status for: " << sync_pair
<<
3554 ". with error: " << retcode
<< dendl
;
3555 return set_cr_error(retcode
);
3557 ldout(cct
, 20) << "removed bucket shard status object: " << obj
.oid
<< dendl
;
3558 return set_cr_done();
3564 class RemoveBucketShardStatusCollectCR
: public RGWShardCollectCR
{
3565 static constexpr int max_concurrent_shards
= 16;
3566 RGWDataSyncCtx
* const sc
;
3567 RGWDataSyncEnv
* const sync_env
;
3568 rgw_bucket_sync_pair_info sync_pair
;
3571 const int num_shards
;
3574 int handle_result(int r
) override
{
3576 ldout(cct
, 4) << "failed to remove bucket shard status object: "
3577 << cpp_strerror(r
) << dendl
;
3582 RemoveBucketShardStatusCollectCR(RGWDataSyncCtx
* sc
,
3583 const rgw_bucket_sync_pair_info
& sync_pair
,
3586 : RGWShardCollectCR(sc
->cct
, max_concurrent_shards
),
3587 sc(sc
), sync_env(sc
->env
), sync_pair(sync_pair
), gen(gen
), num_shards(num_shards
)
3590 bool spawn_next() override
{
3591 if (shard
>= num_shards
) {
3594 sync_pair
.source_bs
.shard_id
= shard
++;
3595 spawn(new RemoveBucketShardStatusCR(sc
, sync_pair
, gen
), false);
3600 class InitBucketFullSyncStatusCR
: public RGWCoroutine
{
3602 RGWDataSyncEnv
*sync_env
;
3604 const rgw_bucket_sync_pair_info
& sync_pair
;
3605 const rgw_raw_obj
& status_obj
;
3606 rgw_bucket_sync_status
& status
;
3607 RGWObjVersionTracker
& objv
;
3608 const RGWBucketInfo
& source_info
;
3609 const bool check_compat
;
3611 const rgw_bucket_index_marker_info
& info
;
3612 BucketIndexShardsManager marker_mgr
;
3614 bool all_incremental
= true;
3615 bool no_zero
= false;
3618 InitBucketFullSyncStatusCR(RGWDataSyncCtx
* sc
,
3619 const rgw_bucket_sync_pair_info
& sync_pair
,
3620 const rgw_raw_obj
& status_obj
,
3621 rgw_bucket_sync_status
& status
,
3622 RGWObjVersionTracker
& objv
,
3623 const RGWBucketInfo
& source_info
,
3625 const rgw_bucket_index_marker_info
& info
)
3626 : RGWCoroutine(sc
->cct
), sc(sc
), sync_env(sc
->env
),
3627 sync_pair(sync_pair
), status_obj(status_obj
),
3628 status(status
), objv(objv
), source_info(source_info
),
3629 check_compat(check_compat
), info(info
)
3632 int operate(const DoutPrefixProvider
*dpp
) override
{
3634 retcode
= marker_mgr
.from_string(info
.max_marker
, -1);
3636 lderr(cct
) << "failed to parse bilog shard markers: "
3637 << cpp_strerror(retcode
) << dendl
;
3638 return set_cr_error(retcode
);
3641 status
.state
= BucketSyncState::Init
;
3643 if (info
.oldest_gen
== 0) {
3645 // use shard count from our log gen=0
3646 // try to convert existing per-shard incremental status for backward compatibility
3647 if (source_info
.layout
.logs
.empty() ||
3648 source_info
.layout
.logs
.front().gen
> 0) {
3649 ldpp_dout(dpp
, 20) << "no generation zero when checking compatibility" << dendl
;
3651 } else if (auto& log
= source_info
.layout
.logs
.front();
3652 log
.layout
.type
!= rgw::BucketLogType::InIndex
) {
3653 ldpp_dout(dpp
, 20) << "unrecognized log layout type when checking compatibility " << log
.layout
.type
<< dendl
;
3658 const int num_shards0
= rgw::num_shards(
3659 source_info
.layout
.logs
.front().layout
.in_index
.layout
);
3660 call(new CheckAllBucketShardStatusIsIncremental(sc
, sync_pair
,
3665 return set_cr_error(retcode
);
3667 if (all_incremental
) {
3668 // we can use existing status and resume incremental sync
3669 status
.state
= BucketSyncState::Incremental
;
3672 all_incremental
= false;
3677 if (status
.state
!= BucketSyncState::Incremental
) {
3678 // initialize all shard sync status. this will populate the log marker
3679 // positions where incremental sync will resume after full sync
3681 const int num_shards
= marker_mgr
.get().size();
3682 call(new InitBucketShardStatusCollectCR(sc
, sync_pair
, info
.latest_gen
, marker_mgr
, num_shards
));
3685 ldout(cct
, 20) << "failed to init bucket shard status: "
3686 << cpp_strerror(retcode
) << dendl
;
3687 return set_cr_error(retcode
);
3690 if (sync_env
->sync_module
->should_full_sync()) {
3691 status
.state
= BucketSyncState::Full
;
3693 status
.state
= BucketSyncState::Incremental
;
3697 status
.shards_done_with_gen
.resize(marker_mgr
.get().size());
3698 status
.incremental_gen
= info
.latest_gen
;
3700 ldout(cct
, 20) << "writing bucket sync status during init. state=" << status
.state
<< ". marker=" << status
.full
.position
<< dendl
;
3702 // write bucket sync status
3703 using CR
= RGWSimpleRadosWriteCR
<rgw_bucket_sync_status
>;
3704 yield
call(new CR(dpp
, sync_env
->driver
,
3705 status_obj
, status
, &objv
, false));
3707 ldout(cct
, 20) << "failed to write bucket shard status: "
3708 << cpp_strerror(retcode
) << dendl
;
3709 return set_cr_error(retcode
);
3711 return set_cr_done();
3717 #define OMAP_READ_MAX_ENTRIES 10
3718 class RGWReadRecoveringBucketShardsCoroutine
: public RGWCoroutine
{
3720 RGWDataSyncEnv
*sync_env
;
3721 rgw::sal::RadosStore
* driver
;
3726 set
<string
>& recovering_buckets
;
3730 RGWRadosGetOmapKeysCR::ResultPtr omapkeys
;
3731 set
<string
> error_entries
;
3732 int max_omap_entries
;
3736 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncCtx
*_sc
, const int _shard_id
,
3737 set
<string
>& _recovering_buckets
, const int _max_entries
)
3738 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3739 driver(sync_env
->driver
), shard_id(_shard_id
), max_entries(_max_entries
),
3740 recovering_buckets(_recovering_buckets
), max_omap_entries(OMAP_READ_MAX_ENTRIES
)
3742 error_oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
) + ".retry";
3745 int operate(const DoutPrefixProvider
*dpp
) override
;
3748 int RGWReadRecoveringBucketShardsCoroutine::operate(const DoutPrefixProvider
*dpp
)
3751 //read recovering bucket shards
3754 omapkeys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
3755 yield
call(new RGWRadosGetOmapKeysCR(driver
, rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, error_oid
),
3756 marker
, max_omap_entries
, omapkeys
));
3758 if (retcode
== -ENOENT
) {
3763 ldpp_dout(dpp
, 0) << "failed to read recovering bucket shards with "
3764 << cpp_strerror(retcode
) << dendl
;
3765 return set_cr_error(retcode
);
3768 error_entries
= std::move(omapkeys
->entries
);
3769 if (error_entries
.empty()) {
3773 count
+= error_entries
.size();
3774 marker
= *error_entries
.rbegin();
3775 for (const std::string
& key
: error_entries
) {
3776 rgw_bucket_shard bs
;
3777 std::optional
<uint64_t> gen
;
3778 if (int r
= rgw::error_repo::decode_key(key
, bs
, gen
); r
< 0) {
3779 // insert the key as-is
3780 recovering_buckets
.insert(std::move(key
));
3782 recovering_buckets
.insert(fmt::format("{}[{}]", bucket_shard_str
{bs
}, *gen
));
3784 recovering_buckets
.insert(fmt::format("{}[full]", bucket_shard_str
{bs
}));
3787 } while (omapkeys
->more
&& count
< max_entries
);
3789 return set_cr_done();
3795 class RGWReadPendingBucketShardsCoroutine
: public RGWCoroutine
{
3797 RGWDataSyncEnv
*sync_env
;
3798 rgw::sal::RadosStore
* driver
;
3803 set
<string
>& pending_buckets
;
3807 rgw_data_sync_marker
* sync_marker
;
3810 std::string next_marker
;
3811 vector
<rgw_data_change_log_entry
> log_entries
;
3815 RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx
*_sc
, const int _shard_id
,
3816 set
<string
>& _pending_buckets
,
3817 rgw_data_sync_marker
* _sync_marker
, const int _max_entries
)
3818 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
3819 driver(sync_env
->driver
), shard_id(_shard_id
), max_entries(_max_entries
),
3820 pending_buckets(_pending_buckets
), sync_marker(_sync_marker
)
3822 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sc
->source_zone
, shard_id
);
3825 int operate(const DoutPrefixProvider
*dpp
) override
;
3828 int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider
*dpp
)
3831 //read sync status marker
3832 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
3833 yield
call(new CR(dpp
, sync_env
->driver
,
3834 rgw_raw_obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, status_oid
),
3837 ldpp_dout(dpp
, 0) << "failed to read sync status marker with "
3838 << cpp_strerror(retcode
) << dendl
;
3839 return set_cr_error(retcode
);
3842 //read pending bucket shards
3843 marker
= sync_marker
->marker
;
3846 yield
call(new RGWReadRemoteDataLogShardCR(sc
, shard_id
, marker
,
3847 &next_marker
, &log_entries
, &truncated
));
3849 if (retcode
== -ENOENT
) {
3854 ldpp_dout(dpp
, 0) << "failed to read remote data log info with "
3855 << cpp_strerror(retcode
) << dendl
;
3856 return set_cr_error(retcode
);
3859 if (log_entries
.empty()) {
3863 count
+= log_entries
.size();
3864 for (const auto& entry
: log_entries
) {
3865 pending_buckets
.insert(entry
.entry
.key
);
3867 }while(truncated
&& count
< max_entries
);
3869 return set_cr_done();
3875 int RGWRemoteDataLog::read_shard_status(const DoutPrefixProvider
*dpp
, int shard_id
, set
<string
>& pending_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
)
3877 // cannot run concurrently with run_sync(), so run in a separate manager
3878 RGWCoroutinesManager
crs(driver
->ctx(), driver
->getRados()->get_cr_registry());
3879 RGWHTTPManager
http_manager(driver
->ctx(), crs
.get_completion_mgr());
3880 int ret
= http_manager
.start();
3882 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
3885 RGWDataSyncEnv sync_env_local
= sync_env
;
3886 sync_env_local
.http_manager
= &http_manager
;
3887 RGWDataSyncCtx sc_local
= sc
;
3888 sc_local
.env
= &sync_env_local
;
3889 list
<RGWCoroutinesStack
*> stacks
;
3890 RGWCoroutinesStack
* recovering_stack
= new RGWCoroutinesStack(driver
->ctx(), &crs
);
3891 recovering_stack
->call(new RGWReadRecoveringBucketShardsCoroutine(&sc_local
, shard_id
, recovering_buckets
, max_entries
));
3892 stacks
.push_back(recovering_stack
);
3893 RGWCoroutinesStack
* pending_stack
= new RGWCoroutinesStack(driver
->ctx(), &crs
);
3894 pending_stack
->call(new RGWReadPendingBucketShardsCoroutine(&sc_local
, shard_id
, pending_buckets
, sync_marker
, max_entries
));
3895 stacks
.push_back(pending_stack
);
3896 ret
= crs
.run(dpp
, stacks
);
3897 http_manager
.stop();
3901 CephContext
*RGWBucketPipeSyncStatusManager::get_cct() const
3903 return driver
->ctx();
3906 void rgw_bucket_entry_owner::decode_json(JSONObj
*obj
)
3908 JSONDecoder::decode_json("ID", id
, obj
);
3909 JSONDecoder::decode_json("DisplayName", display_name
, obj
);
3912 struct bucket_list_entry
{
3919 string storage_class
;
3920 rgw_bucket_entry_owner owner
;
3921 uint64_t versioned_epoch
;
3924 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
3926 void decode_json(JSONObj
*obj
) {
3927 JSONDecoder::decode_json("IsDeleteMarker", delete_marker
, obj
);
3928 JSONDecoder::decode_json("Key", key
.name
, obj
);
3929 JSONDecoder::decode_json("VersionId", key
.instance
, obj
);
3930 JSONDecoder::decode_json("IsLatest", is_latest
, obj
);
3932 JSONDecoder::decode_json("RgwxMtime", mtime_str
, obj
);
3936 if (parse_iso8601(mtime_str
.c_str(), &t
, &nsec
)) {
3938 ts
.tv_sec
= (uint64_t)internal_timegm(&t
);
3940 mtime
= real_clock::from_ceph_timespec(ts
);
3942 JSONDecoder::decode_json("ETag", etag
, obj
);
3943 JSONDecoder::decode_json("Size", size
, obj
);
3944 JSONDecoder::decode_json("StorageClass", storage_class
, obj
);
3945 JSONDecoder::decode_json("Owner", owner
, obj
);
3946 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch
, obj
);
3947 JSONDecoder::decode_json("RgwxTag", rgw_tag
, obj
);
3948 if (key
.instance
== "null" && !versioned_epoch
) {
3949 key
.instance
.clear();
3953 RGWModifyOp
get_modify_op() const {
3954 if (delete_marker
) {
3955 return CLS_RGW_OP_LINK_OLH_DM
;
3956 } else if (!key
.instance
.empty() && key
.instance
!= "null") {
3957 return CLS_RGW_OP_LINK_OLH
;
3959 return CLS_RGW_OP_ADD
;
3964 struct bucket_list_result
{
3968 string version_id_marker
;
3971 list
<bucket_list_entry
> entries
;
3973 bucket_list_result() : max_keys(0), is_truncated(false) {}
3975 void decode_json(JSONObj
*obj
) {
3976 JSONDecoder::decode_json("Name", name
, obj
);
3977 JSONDecoder::decode_json("Prefix", prefix
, obj
);
3978 JSONDecoder::decode_json("KeyMarker", key_marker
, obj
);
3979 JSONDecoder::decode_json("VersionIdMarker", version_id_marker
, obj
);
3980 JSONDecoder::decode_json("MaxKeys", max_keys
, obj
);
3981 JSONDecoder::decode_json("IsTruncated", is_truncated
, obj
);
3982 JSONDecoder::decode_json("Entries", entries
, obj
);
3986 class RGWListRemoteBucketCR
: public RGWCoroutine
{
3988 RGWDataSyncEnv
*sync_env
;
3989 const rgw_bucket_shard
& bs
;
3990 rgw_obj_key marker_position
;
3992 bucket_list_result
*result
;
3995 RGWListRemoteBucketCR(RGWDataSyncCtx
*_sc
, const rgw_bucket_shard
& bs
,
3996 rgw_obj_key
& _marker_position
, bucket_list_result
*_result
)
3997 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
), bs(bs
),
3998 marker_position(_marker_position
), result(_result
) {}
4000 int operate(const DoutPrefixProvider
*dpp
) override
{
4003 rgw_http_param_pair pairs
[] = { { "versions" , NULL
},
4004 { "format" , "json" },
4005 { "objs-container" , "true" },
4006 { "key-marker" , marker_position
.name
.c_str() },
4007 { "version-id-marker" , marker_position
.instance
.c_str() },
4009 string p
= string("/") + bs
.bucket
.get_key(':', 0);
4010 call(new RGWReadRESTResourceCR
<bucket_list_result
>(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
, p
, pairs
, result
));
4013 return set_cr_error(retcode
);
4015 return set_cr_done();
4021 struct next_bilog_result
{
4022 uint64_t generation
= 0;
4025 void decode_json(JSONObj
*obj
) {
4026 JSONDecoder::decode_json("generation", generation
, obj
);
4027 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
4031 struct bilog_list_result
{
4032 list
<rgw_bi_log_entry
> entries
;
4033 bool truncated
{false};
4034 std::optional
<next_bilog_result
> next_log
;
4036 void decode_json(JSONObj
*obj
) {
4037 JSONDecoder::decode_json("entries", entries
, obj
);
4038 JSONDecoder::decode_json("truncated", truncated
, obj
);
4039 JSONDecoder::decode_json("next_log", next_log
, obj
);
4043 class RGWListBucketIndexLogCR
: public RGWCoroutine
{
4045 RGWDataSyncEnv
*sync_env
;
4046 const string instance_key
;
4049 bilog_list_result
*result
;
4050 std::optional
<PerfGuard
> timer
;
4051 uint64_t generation
;
4052 std::string gen_str
= std::to_string(generation
);
4053 uint32_t format_ver
{1};
4056 RGWListBucketIndexLogCR(RGWDataSyncCtx
*_sc
, const rgw_bucket_shard
& bs
, string
& _marker
,
4057 uint64_t _generation
, bilog_list_result
*_result
)
4058 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
4059 instance_key(bs
.get_key()), marker(_marker
), result(_result
), generation(_generation
) {}
4061 int operate(const DoutPrefixProvider
*dpp
) override
{
4063 if (sync_env
->counters
) {
4064 timer
.emplace(sync_env
->counters
, sync_counters::l_poll
);
4067 rgw_http_param_pair pairs
[] = { { "bucket-instance", instance_key
.c_str() },
4068 { "format" , "json" },
4069 { "marker" , marker
.c_str() },
4070 { "type", "bucket-index" },
4071 { "generation", gen_str
.c_str() },
4072 { "format-ver", "2"},
4075 call(new RGWReadRESTResourceCR
<bilog_list_result
>(sync_env
->cct
, sc
->conn
, sync_env
->http_manager
,
4076 "/admin/log", pairs
, result
));
4080 if (sync_env
->counters
) {
4081 sync_env
->counters
->inc(sync_counters::l_poll_err
);
4083 return set_cr_error(retcode
);
4085 return set_cr_done();
4091 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
4093 class RGWBucketFullSyncMarkerTrack
: public RGWSyncShardMarkerTrack
<rgw_obj_key
, rgw_obj_key
> {
4095 RGWDataSyncEnv
*sync_env
;
4097 const rgw_raw_obj
& status_obj
;
4098 rgw_bucket_sync_status
& sync_status
;
4099 RGWSyncTraceNodeRef tn
;
4100 RGWObjVersionTracker
& objv_tracker
;
4103 RGWBucketFullSyncMarkerTrack(RGWDataSyncCtx
*_sc
,
4104 const rgw_raw_obj
& status_obj
,
4105 rgw_bucket_sync_status
& sync_status
,
4106 RGWSyncTraceNodeRef tn
,
4107 RGWObjVersionTracker
& objv_tracker
)
4108 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
4109 sc(_sc
), sync_env(_sc
->env
), status_obj(status_obj
),
4110 sync_status(sync_status
), tn(std::move(tn
)), objv_tracker(objv_tracker
)
4114 RGWCoroutine
*store_marker(const rgw_obj_key
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
4115 sync_status
.full
.position
= new_marker
;
4116 sync_status
.full
.count
= index_pos
;
4118 tn
->log(20, SSTR("updating marker oid=" << status_obj
.oid
<< " marker=" << new_marker
));
4119 return new RGWSimpleRadosWriteCR
<rgw_bucket_sync_status
>(
4120 sync_env
->dpp
, sync_env
->driver
,
4121 status_obj
, sync_status
, &objv_tracker
);
4124 RGWOrderCallCR
*allocate_order_control_cr() override
{
4125 return new RGWLastCallerWinsCR(sync_env
->cct
);
4129 // write the incremental sync status and update 'stable_timestamp' on success
4130 class RGWWriteBucketShardIncSyncStatus
: public RGWCoroutine
{
4131 RGWDataSyncEnv
*sync_env
;
4133 rgw_bucket_shard_inc_sync_marker sync_marker
;
4134 ceph::real_time
* stable_timestamp
;
4135 RGWObjVersionTracker
& objv_tracker
;
4136 std::map
<std::string
, bufferlist
> attrs
;
4138 RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv
*sync_env
,
4139 const rgw_raw_obj
& obj
,
4140 const rgw_bucket_shard_inc_sync_marker
& sync_marker
,
4141 ceph::real_time
* stable_timestamp
,
4142 RGWObjVersionTracker
& objv_tracker
)
4143 : RGWCoroutine(sync_env
->cct
), sync_env(sync_env
), obj(obj
),
4144 sync_marker(sync_marker
), stable_timestamp(stable_timestamp
),
4145 objv_tracker(objv_tracker
)
4147 int operate(const DoutPrefixProvider
*dpp
) {
4149 sync_marker
.encode_attr(attrs
);
4151 yield
call(new RGWSimpleRadosWriteAttrsCR(sync_env
->dpp
, sync_env
->driver
,
4152 obj
, attrs
, &objv_tracker
));
4154 return set_cr_error(retcode
);
4156 if (stable_timestamp
) {
4157 *stable_timestamp
= sync_marker
.timestamp
;
4159 return set_cr_done();
4165 class RGWBucketIncSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, rgw_obj_key
> {
4167 RGWDataSyncEnv
*sync_env
;
4170 rgw_bucket_shard_inc_sync_marker sync_marker
;
4172 map
<rgw_obj_key
, string
> key_to_marker
;
4178 map
<string
, operation
> marker_to_op
;
4179 std::set
<std::string
> pending_olh
; // object names with pending olh operations
4181 RGWSyncTraceNodeRef tn
;
4182 RGWObjVersionTracker
& objv_tracker
;
4183 ceph::real_time
* stable_timestamp
;
4185 void handle_finish(const string
& marker
) override
{
4186 auto iter
= marker_to_op
.find(marker
);
4187 if (iter
== marker_to_op
.end()) {
4190 auto& op
= iter
->second
;
4191 key_to_marker
.erase(op
.key
);
4192 reset_need_retry(op
.key
);
4194 pending_olh
.erase(op
.key
.name
);
4196 marker_to_op
.erase(iter
);
4200 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx
*_sc
,
4201 const string
& _marker_oid
,
4202 const rgw_bucket_shard_inc_sync_marker
& _marker
,
4203 RGWSyncTraceNodeRef tn
,
4204 RGWObjVersionTracker
& objv_tracker
,
4205 ceph::real_time
* stable_timestamp
)
4206 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
4207 sc(_sc
), sync_env(_sc
->env
),
4208 obj(sync_env
->svc
->zone
->get_zone_params().log_pool
, _marker_oid
),
4209 sync_marker(_marker
), tn(std::move(tn
)), objv_tracker(objv_tracker
),
4210 stable_timestamp(stable_timestamp
)
4213 const rgw_raw_obj
& get_obj() const { return obj
; }
4215 RGWCoroutine
* store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
4216 sync_marker
.position
= new_marker
;
4217 sync_marker
.timestamp
= timestamp
;
4219 tn
->log(20, SSTR("updating marker marker_oid=" << obj
.oid
<< " marker=" << new_marker
<< " timestamp=" << timestamp
));
4220 return new RGWWriteBucketShardIncSyncStatus(sync_env
, obj
, sync_marker
,
4221 stable_timestamp
, objv_tracker
);
4225 * create index from key -> <op, marker>, and from marker -> key
4226 * this is useful so that we can insure that we only have one
4227 * entry for any key that is used. This is needed when doing
4228 * incremenatl sync of data, and we don't want to run multiple
4229 * concurrent sync operations for the same bucket shard
4230 * Also, we should make sure that we don't run concurrent operations on the same key with
4233 bool index_key_to_marker(const rgw_obj_key
& key
, const string
& marker
, bool is_olh
) {
4234 auto result
= key_to_marker
.emplace(key
, marker
);
4235 if (!result
.second
) { // exists
4236 set_need_retry(key
);
4239 marker_to_op
[marker
] = operation
{key
, is_olh
};
4241 // prevent other olh ops from starting on this object name
4242 pending_olh
.insert(key
.name
);
4247 bool can_do_op(const rgw_obj_key
& key
, bool is_olh
) {
4248 // serialize olh ops on the same object name
4249 if (is_olh
&& pending_olh
.count(key
.name
)) {
4250 tn
->log(20, SSTR("sync of " << key
<< " waiting for pending olh op"));
4253 return (key_to_marker
.find(key
) == key_to_marker
.end());
4256 RGWOrderCallCR
*allocate_order_control_cr() override
{
4257 return new RGWLastCallerWinsCR(sync_env
->cct
);
4261 static bool ignore_sync_error(int err
) {
4272 template <class T
, class K
>
4273 class RGWBucketSyncSingleEntryCR
: public RGWCoroutine
{
4275 RGWDataSyncEnv
*sync_env
;
4277 rgw_bucket_sync_pipe
& sync_pipe
;
4278 rgw_bucket_shard
& bs
;
4282 std::optional
<uint64_t> versioned_epoch
;
4283 rgw_bucket_entry_owner owner
;
4284 real_time timestamp
;
4286 RGWPendingState op_state
;
4289 RGWSyncShardMarkerTrack
<T
, K
> *marker_tracker
;
4293 stringstream error_ss
;
4295 bool error_injection
;
4297 RGWDataSyncModule
*data_sync_module
;
4299 rgw_zone_set_entry source_trace_entry
;
4300 rgw_zone_set zones_trace
;
4302 RGWSyncTraceNodeRef tn
;
4303 std::string zone_name
;
4306 RGWBucketSyncSingleEntryCR(RGWDataSyncCtx
*_sc
,
4307 rgw_bucket_sync_pipe
& _sync_pipe
,
4308 const rgw_obj_key
& _key
, bool _versioned
,
4309 std::optional
<uint64_t> _versioned_epoch
,
4310 real_time
& _timestamp
,
4311 const rgw_bucket_entry_owner
& _owner
,
4312 RGWModifyOp _op
, RGWPendingState _op_state
,
4313 const T
& _entry_marker
, RGWSyncShardMarkerTrack
<T
, K
> *_marker_tracker
, rgw_zone_set
& _zones_trace
,
4314 RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sc
->cct
),
4315 sc(_sc
), sync_env(_sc
->env
),
4316 sync_pipe(_sync_pipe
), bs(_sync_pipe
.info
.source_bs
),
4317 key(_key
), versioned(_versioned
), versioned_epoch(_versioned_epoch
),
4319 timestamp(_timestamp
), op(_op
),
4320 op_state(_op_state
),
4321 entry_marker(_entry_marker
),
4322 marker_tracker(_marker_tracker
),
4325 ss
<< bucket_shard_str
{bs
} << "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]";
4326 set_description() << "bucket sync single entry (source_zone=" << sc
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
;
4329 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", SSTR(key
));
4331 tn
->log(20, SSTR("bucket sync single entry (source_zone=" << sc
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
));
4332 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_data_inject_err_probability
> 0);
4334 data_sync_module
= sync_env
->sync_module
->get_data_handler();
4336 source_trace_entry
.zone
= sc
->source_zone
.id
;
4337 source_trace_entry
.location_key
= _sync_pipe
.info
.source_bs
.bucket
.get_key();
4339 zones_trace
= _zones_trace
;
4340 zones_trace
.insert(sync_env
->svc
->zone
->get_zone().id
, _sync_pipe
.info
.dest_bucket
.get_key());
4342 if (sc
->env
->ostr
) {
4344 if ((z
= sc
->env
->driver
->svc()->zone
->find_zone(sc
->source_zone
))) {
4345 zone_name
= z
->name
;
4350 int operate(const DoutPrefixProvider
*dpp
) override
{
4352 /* skip entries that are not complete */
4353 if (op_state
!= CLS_RGW_STATE_COMPLETE
) {
4356 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
4359 marker_tracker
->reset_need_retry(key
);
4360 if (key
.name
.empty()) {
4361 /* shouldn't happen */
4362 set_status("skipping empty entry");
4363 tn
->log(0, "entry with empty obj name, skipping");
4366 if (error_injection
&&
4367 rand() % 10000 < cct
->_conf
->rgw_sync_data_inject_err_probability
* 10000.0) {
4368 tn
->log(0, SSTR(": injecting data sync error on key=" << key
.name
));
4370 } else if (op
== CLS_RGW_OP_ADD
||
4371 op
== CLS_RGW_OP_LINK_OLH
) {
4372 set_status("syncing obj");
4373 tn
->log(5, SSTR("bucket sync: sync obj: " << sc
->source_zone
<< "/" << bs
.bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
4374 if (versioned_epoch
) {
4375 pretty_print(sc
->env
, "Syncing object s3://{}/{} version {} in sync from zone {}\n",
4376 bs
.bucket
.name
, key
, *versioned_epoch
, zone_name
);
4378 pretty_print(sc
->env
, "Syncing object s3://{}/{} in sync from zone {}\n",
4379 bs
.bucket
.name
, key
, zone_name
);
4381 call(data_sync_module
->sync_object(dpp
, sc
, sync_pipe
, key
, versioned_epoch
,
4382 source_trace_entry
, &zones_trace
));
4383 } else if (op
== CLS_RGW_OP_DEL
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
4384 set_status("removing obj");
4385 if (versioned_epoch
) {
4386 pretty_print(sc
->env
, "Deleting object s3://{}/{} version {} in sync from zone {}\n",
4387 bs
.bucket
.name
, key
, *versioned_epoch
, zone_name
);
4389 pretty_print(sc
->env
, "Deleting object s3://{}/{} in sync from zone {}\n",
4390 bs
.bucket
.name
, key
, zone_name
);
4392 if (op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
4395 tn
->log(10, SSTR("removing obj: " << sc
->source_zone
<< "/" << bs
.bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
4396 call(data_sync_module
->remove_object(dpp
, sc
, sync_pipe
, key
, timestamp
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
4397 // our copy of the object is more recent, continue as if it succeeded
4398 } else if (op
== CLS_RGW_OP_LINK_OLH_DM
) {
4399 set_status("creating delete marker");
4400 tn
->log(10, SSTR("creating delete marker: obj: " << sc
->source_zone
<< "/" << bs
.bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
4401 call(data_sync_module
->create_delete_marker(dpp
, sc
, sync_pipe
, key
, timestamp
, owner
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
4403 tn
->set_resource_name(SSTR(bucket_str_noinstance(bs
.bucket
) << "/" << key
));
4405 if (retcode
== -ERR_PRECONDITION_FAILED
) {
4406 pretty_print(sc
->env
, "Skipping object s3://{}/{} in sync from zone {}\n",
4407 bs
.bucket
.name
, key
, zone_name
);
4408 set_status("Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
4409 tn
->log(0, "Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
4412 } while (marker_tracker
->need_retry(key
));
4414 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
4416 tn
->log(10, "success");
4418 tn
->log(10, SSTR("failed, retcode=" << retcode
<< " (" << cpp_strerror(-retcode
) << ")"));
4422 if (retcode
< 0 && retcode
!= -ENOENT
) {
4423 set_status() << "failed to sync obj; retcode=" << retcode
;
4424 tn
->log(0, SSTR("ERROR: failed to sync object: "
4425 << bucket_shard_str
{bs
} << "/" << key
.name
));
4426 if (!ignore_sync_error(retcode
)) {
4427 error_ss
<< bucket_shard_str
{bs
} << "/" << key
.name
;
4428 sync_status
= retcode
;
4431 if (!error_ss
.str().empty()) {
4432 yield
call(sync_env
->error_logger
->log_error_cr(dpp
, sc
->conn
->get_remote_id(), "data", error_ss
.str(), -retcode
, string("failed to sync object") + cpp_strerror(-sync_status
)));
4435 if (sync_status
== 0) {
4437 set_status() << "calling marker_tracker->finish(" << entry_marker
<< ")";
4438 yield
call(marker_tracker
->finish(entry_marker
));
4439 sync_status
= retcode
;
4441 if (sync_status
< 0) {
4442 return set_cr_error(sync_status
);
4444 return set_cr_done();
4450 class RGWBucketFullSyncCR
: public RGWCoroutine
{
4452 RGWDataSyncEnv
*sync_env
;
4453 rgw_bucket_sync_pipe
& sync_pipe
;
4454 rgw_bucket_sync_status
& sync_status
;
4455 rgw_bucket_shard
& bs
;
4456 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
4457 bucket_list_result list_result
;
4458 list
<bucket_list_entry
>::iterator entries_iter
;
4459 rgw_obj_key list_marker
;
4460 bucket_list_entry
*entry
{nullptr};
4462 int total_entries
{0};
4466 const rgw_raw_obj
& status_obj
;
4467 RGWObjVersionTracker
& objv
;
4469 rgw_zone_set zones_trace
;
4471 RGWSyncTraceNodeRef tn
;
4472 RGWBucketFullSyncMarkerTrack marker_tracker
;
4474 struct _prefix_handler
{
4475 RGWBucketSyncFlowManager::pipe_rules_ref rules
;
4476 RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator iter
;
4477 std::optional
<string
> cur_prefix
;
4479 void set_rules(RGWBucketSyncFlowManager::pipe_rules_ref
& _rules
) {
4483 bool revalidate_marker(rgw_obj_key
*marker
) {
4485 boost::starts_with(marker
->name
, *cur_prefix
)) {
4491 iter
= rules
->prefix_search(marker
->name
);
4492 if (iter
== rules
->prefix_end()) {
4495 cur_prefix
= iter
->first
;
4496 marker
->name
= *cur_prefix
;
4497 marker
->instance
.clear();
4501 bool check_key_handled(const rgw_obj_key
& key
) {
4506 boost::starts_with(key
.name
, *cur_prefix
)) {
4509 iter
= rules
->prefix_search(key
.name
);
4510 if (iter
== rules
->prefix_end()) {
4513 cur_prefix
= iter
->first
;
4514 return boost::starts_with(key
.name
, iter
->first
);
4519 RGWBucketFullSyncCR(RGWDataSyncCtx
*_sc
,
4520 rgw_bucket_sync_pipe
& _sync_pipe
,
4521 const rgw_raw_obj
& status_obj
,
4522 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
4523 rgw_bucket_sync_status
& sync_status
,
4524 RGWSyncTraceNodeRef tn_parent
,
4525 RGWObjVersionTracker
& objv_tracker
)
4526 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
4527 sync_pipe(_sync_pipe
), sync_status(sync_status
),
4528 bs(_sync_pipe
.info
.source_bs
),
4529 lease_cr(std::move(lease_cr
)), status_obj(status_obj
), objv(objv_tracker
),
4530 tn(sync_env
->sync_tracer
->add_node(tn_parent
, "full_sync",
4531 SSTR(bucket_shard_str
{bs
}))),
4532 marker_tracker(sc
, status_obj
, sync_status
, tn
, objv_tracker
)
4534 zones_trace
.insert(sc
->source_zone
.id
, sync_pipe
.info
.dest_bucket
.get_key());
4535 prefix_handler
.set_rules(sync_pipe
.get_rules());
4538 int operate(const DoutPrefixProvider
*dpp
) override
;
4541 int RGWBucketFullSyncCR::operate(const DoutPrefixProvider
*dpp
)
4544 list_marker
= sync_status
.full
.position
;
4546 total_entries
= sync_status
.full
.count
;
4548 if (lease_cr
&& !lease_cr
->is_locked()) {
4549 tn
->log(1, "no lease or lease is lost, abort");
4551 yield
call(marker_tracker
.flush());
4553 tn
->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode
));
4554 return set_cr_error(retcode
);
4556 return set_cr_error(-ECANCELED
);
4558 set_status("listing remote bucket");
4559 tn
->log(20, "listing bucket for full sync");
4561 if (!prefix_handler
.revalidate_marker(&list_marker
)) {
4562 set_status() << "finished iterating over all available prefixes: last marker=" << list_marker
;
4563 tn
->log(20, SSTR("finished iterating over all available prefixes: last marker=" << list_marker
));
4567 yield
call(new RGWListRemoteBucketCR(sc
, bs
, list_marker
, &list_result
));
4568 if (retcode
< 0 && retcode
!= -ENOENT
) {
4569 set_status("failed bucket listing, going down");
4571 yield
spawn(marker_tracker
.flush(), true);
4572 return set_cr_error(retcode
);
4574 if (list_result
.entries
.size() > 0) {
4575 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
4577 entries_iter
= list_result
.entries
.begin();
4578 for (; entries_iter
!= list_result
.entries
.end(); ++entries_iter
) {
4579 if (lease_cr
&& !lease_cr
->is_locked()) {
4581 yield
call(marker_tracker
.flush());
4582 tn
->log(1, "no lease or lease is lost, abort");
4584 tn
->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode
));
4585 return set_cr_error(retcode
);
4587 return set_cr_error(-ECANCELED
);
4589 tn
->log(20, SSTR("[full sync] syncing object: "
4590 << bucket_shard_str
{bs
} << "/" << entries_iter
->key
));
4591 entry
= &(*entries_iter
);
4592 list_marker
= entries_iter
->key
;
4593 if (!prefix_handler
.check_key_handled(entries_iter
->key
)) {
4594 set_status() << "skipping entry due to policy rules: " << entries_iter
->key
;
4595 tn
->log(20, SSTR("skipping entry due to policy rules: " << entries_iter
->key
));
4599 if (!marker_tracker
.start(entry
->key
, total_entries
, real_time())) {
4600 tn
->log(0, SSTR("ERROR: cannot start syncing " << entry
->key
<< ". Duplicate entry?"));
4602 using SyncCR
= RGWBucketSyncSingleEntryCR
<rgw_obj_key
, rgw_obj_key
>;
4603 yield
spawn(new SyncCR(sc
, sync_pipe
, entry
->key
,
4604 false, /* versioned, only matters for object removal */
4605 entry
->versioned_epoch
, entry
->mtime
,
4606 entry
->owner
, entry
->get_modify_op(), CLS_RGW_STATE_COMPLETE
,
4607 entry
->key
, &marker_tracker
, zones_trace
, tn
),
4610 drain_with_cb(sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_bucket_sync_spawn_window
),
4611 [&](uint64_t stack_id
, int ret
) {
4613 tn
->log(10, "a sync operation returned error");
4619 } while (list_result
.is_truncated
&& sync_result
== 0);
4620 set_status("done iterating over all objects");
4622 /* wait for all operations to complete */
4623 drain_all_cb([&](uint64_t stack_id
, int ret
) {
4625 tn
->log(10, "a sync operation returned error");
4630 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
4631 if (lease_cr
&& !lease_cr
->is_locked()) {
4632 tn
->log(1, "no lease or lease is lost, abort");
4633 yield
call(marker_tracker
.flush());
4635 tn
->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode
));
4636 return set_cr_error(retcode
);
4638 return set_cr_error(-ECANCELED
);
4640 yield
call(marker_tracker
.flush());
4642 tn
->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode
));
4643 return set_cr_error(retcode
);
4645 /* update sync state to incremental */
4646 if (sync_result
== 0) {
4647 sync_status
.state
= BucketSyncState::Incremental
;
4648 tn
->log(5, SSTR("set bucket state=" << sync_status
.state
));
4649 yield
call(new RGWSimpleRadosWriteCR
<rgw_bucket_sync_status
>(
4650 dpp
, sync_env
->driver
, status_obj
, sync_status
, &objv
));
4651 tn
->log(5, SSTR("bucket status objv=" << objv
));
4653 tn
->log(10, SSTR("backing out with sync_status=" << sync_result
));
4655 if (retcode
< 0 && sync_result
== 0) { /* actually tried to set incremental state and failed */
4656 tn
->log(0, SSTR("ERROR: failed to set sync state on bucket "
4657 << bucket_shard_str
{bs
} << " retcode=" << retcode
));
4658 return set_cr_error(retcode
);
4660 if (sync_result
< 0) {
4661 return set_cr_error(sync_result
);
4663 return set_cr_done();
4668 static bool has_olh_epoch(RGWModifyOp op
) {
4669 return op
== CLS_RGW_OP_LINK_OLH
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
;
4672 class RGWBucketShardIsDoneCR
: public RGWCoroutine
{
4674 RGWDataSyncEnv
*sync_env
;
4675 rgw_bucket_sync_status bucket_status
;
4676 const rgw_raw_obj
& bucket_status_obj
;
4678 RGWObjVersionTracker objv_tracker
;
4679 const next_bilog_result
& next_log
;
4680 const uint64_t generation
;
4683 RGWBucketShardIsDoneCR(RGWDataSyncCtx
*_sc
, const rgw_raw_obj
& _bucket_status_obj
,
4684 int _shard_id
, const next_bilog_result
& _next_log
, const uint64_t _gen
)
4685 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
4686 bucket_status_obj(_bucket_status_obj
),
4687 shard_id(_shard_id
), next_log(_next_log
), generation(_gen
) {}
4689 int operate(const DoutPrefixProvider
* dpp
) override
4693 // read bucket sync status
4694 objv_tracker
.clear();
4695 using ReadCR
= RGWSimpleRadosReadCR
<rgw_bucket_sync_status
>;
4696 yield
call(new ReadCR(dpp
, sync_env
->driver
,
4697 bucket_status_obj
, &bucket_status
, false, &objv_tracker
));
4699 ldpp_dout(dpp
, 20) << "failed to read bucket shard status: "
4700 << cpp_strerror(retcode
) << dendl
;
4701 return set_cr_error(retcode
);
4704 if (bucket_status
.state
!= BucketSyncState::Incremental
) {
4705 // exit with success to avoid stale shard being
4706 // retried in error repo if we lost a race
4707 ldpp_dout(dpp
, 20) << "RGWBucketShardIsDoneCR found sync state = " << bucket_status
.state
<< dendl
;
4708 return set_cr_done();
4711 if (bucket_status
.incremental_gen
!= generation
) {
4712 // exit with success to avoid stale shard being
4713 // retried in error repo if we lost a race
4714 ldpp_dout(dpp
, 20) << "RGWBucketShardIsDoneCR expected gen: " << generation
4715 << ", got: " << bucket_status
.incremental_gen
<< dendl
;
4716 return set_cr_done();
4720 // update bucket_status after a shard is done with current gen
4721 auto& done
= bucket_status
.shards_done_with_gen
;
4722 done
[shard_id
] = true;
4724 // increment gen if all shards are already done with current gen
4725 if (std::all_of(done
.begin(), done
.end(),
4726 [] (const bool done
){return done
; } )) {
4727 bucket_status
.incremental_gen
= next_log
.generation
;
4729 done
.resize(next_log
.num_shards
, false);
4731 ldpp_dout(dpp
, 20) << "bucket status incremental gen is " << bucket_status
.incremental_gen
<< dendl
;
4732 using WriteCR
= RGWSimpleRadosWriteCR
<rgw_bucket_sync_status
>;
4733 call(new WriteCR(dpp
, sync_env
->driver
,
4734 bucket_status_obj
, bucket_status
, &objv_tracker
, false));
4736 if (retcode
< 0 && retcode
!= -ECANCELED
) {
4737 ldpp_dout(dpp
, 20) << "failed to write bucket sync status: " << cpp_strerror(retcode
) << dendl
;
4738 return set_cr_error(retcode
);
4739 } else if (retcode
>= 0) {
4740 return set_cr_done();
4742 } while (retcode
== -ECANCELED
);
4748 class RGWBucketShardIncrementalSyncCR
: public RGWCoroutine
{
4750 RGWDataSyncEnv
*sync_env
;
4751 rgw_bucket_sync_pipe
& sync_pipe
;
4752 RGWBucketSyncFlowManager::pipe_rules_ref rules
;
4753 rgw_bucket_shard
& bs
;
4754 const rgw_raw_obj
& bucket_status_obj
;
4755 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
4756 bilog_list_result extended_result
;
4757 list
<rgw_bi_log_entry
> list_result
;
4758 int next_num_shards
;
4762 list
<rgw_bi_log_entry
>::iterator entries_iter
, entries_end
;
4763 map
<pair
<string
, string
>, pair
<real_time
, RGWModifyOp
> > squash_map
;
4764 rgw_bucket_shard_sync_info
& sync_info
;
4765 uint64_t generation
;
4767 rgw_bi_log_entry
*entry
{nullptr};
4768 bool updated_status
{false};
4769 rgw_zone_id zone_id
;
4770 string target_location_key
;
4775 bool syncstopped
{false};
4777 RGWSyncTraceNodeRef tn
;
4778 RGWBucketIncSyncShardMarkerTrack marker_tracker
;
4781 RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx
*_sc
,
4782 rgw_bucket_sync_pipe
& _sync_pipe
,
4783 const std::string
& shard_status_oid
,
4784 const rgw_raw_obj
& _bucket_status_obj
,
4785 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
4786 rgw_bucket_shard_sync_info
& sync_info
,
4787 uint64_t generation
,
4788 RGWSyncTraceNodeRef
& _tn_parent
,
4789 RGWObjVersionTracker
& objv_tracker
,
4790 ceph::real_time
* stable_timestamp
)
4791 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
4792 sync_pipe(_sync_pipe
), bs(_sync_pipe
.info
.source_bs
),
4793 bucket_status_obj(_bucket_status_obj
), lease_cr(std::move(lease_cr
)),
4794 sync_info(sync_info
), generation(generation
), zone_id(sync_env
->svc
->zone
->get_zone().id
),
4795 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "inc_sync",
4796 SSTR(bucket_shard_str
{bs
}))),
4797 marker_tracker(sc
, shard_status_oid
, sync_info
.inc_marker
, tn
,
4798 objv_tracker
, stable_timestamp
)
4800 set_description() << "bucket shard incremental sync bucket="
4801 << bucket_shard_str
{bs
};
4803 rules
= sync_pipe
.get_rules();
4804 target_location_key
= sync_pipe
.info
.dest_bucket
.get_key();
4807 bool check_key_handled(const rgw_obj_key
& key
) {
4811 auto iter
= rules
->prefix_search(key
.name
);
4812 if (iter
== rules
->prefix_end()) {
4815 return boost::starts_with(key
.name
, iter
->first
);
4818 int operate(const DoutPrefixProvider
*dpp
) override
;
4821 int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider
*dpp
)
4826 if (lease_cr
&& !lease_cr
->is_locked()) {
4827 tn
->log(1, "no lease or lease is lost, abort");
4829 yield
call(marker_tracker
.flush());
4831 tn
->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode
));
4832 return set_cr_error(retcode
);
4834 return set_cr_error(-ECANCELED
);
4836 tn
->log(20, SSTR("listing bilog for incremental sync; position=" << sync_info
.inc_marker
.position
));
4837 set_status() << "listing bilog; position=" << sync_info
.inc_marker
.position
;
4838 yield
call(new RGWListBucketIndexLogCR(sc
, bs
, sync_info
.inc_marker
.position
, generation
, &extended_result
));
4839 if (retcode
< 0 && retcode
!= -ENOENT
) {
4840 /* wait for all operations to complete */
4842 yield
spawn(marker_tracker
.flush(), true);
4843 return set_cr_error(retcode
);
4845 list_result
= std::move(extended_result
.entries
);
4846 truncated
= extended_result
.truncated
;
4847 if (extended_result
.next_log
) {
4848 next_gen
= extended_result
.next_log
->generation
;
4849 next_num_shards
= extended_result
.next_log
->num_shards
;
4853 entries_iter
= list_result
.begin();
4854 entries_end
= list_result
.end();
4855 for (; entries_iter
!= entries_end
; ++entries_iter
) {
4856 auto e
= *entries_iter
;
4857 if (e
.op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
) {
4858 ldpp_dout(dpp
, 20) << "syncstop at: " << e
.timestamp
<< ". marker: " << e
.id
<< dendl
;
4860 entries_end
= std::next(entries_iter
); // stop after this entry
4863 if (e
.op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
4864 ldpp_dout(dpp
, 20) << "syncstart at: " << e
.timestamp
<< ". marker: " << e
.id
<< dendl
;
4867 if (e
.op
== CLS_RGW_OP_CANCEL
) {
4870 if (e
.state
!= CLS_RGW_STATE_COMPLETE
) {
4873 if (e
.zones_trace
.exists(zone_id
.id
, target_location_key
)) {
4876 auto& squash_entry
= squash_map
[make_pair(e
.object
, e
.instance
)];
4877 // don't squash over olh entries - we need to apply their olh_epoch
4878 if (has_olh_epoch(squash_entry
.second
) && !has_olh_epoch(e
.op
)) {
4881 if (squash_entry
.first
<= e
.timestamp
) {
4882 squash_entry
= make_pair
<>(e
.timestamp
, e
.op
);
4886 entries_iter
= list_result
.begin();
4887 for (; entries_iter
!= entries_end
; ++entries_iter
) {
4888 if (lease_cr
&& !lease_cr
->is_locked()) {
4889 tn
->log(1, "no lease or lease is lost, abort");
4891 yield
call(marker_tracker
.flush());
4893 tn
->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode
));
4894 return set_cr_error(retcode
);
4896 return set_cr_error(-ECANCELED
);
4898 entry
= &(*entries_iter
);
4900 ssize_t p
= entry
->id
.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
4904 cur_id
= entry
->id
.substr(p
+ 1);
4907 sync_info
.inc_marker
.position
= cur_id
;
4909 if (entry
->op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
|| entry
->op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
4910 ldpp_dout(dpp
, 20) << "detected syncstop or resync on " << entries_iter
->timestamp
<< ", skipping entry" << dendl
;
4911 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4915 if (!key
.set(rgw_obj_index_key
{entry
->object
, entry
->instance
})) {
4916 set_status() << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry";
4917 tn
->log(20, SSTR("parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry"));
4918 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4922 tn
->log(20, SSTR("parsed entry: id=" << cur_id
<< " iter->object=" << entry
->object
<< " iter->instance=" << entry
->instance
<< " name=" << key
.name
<< " instance=" << key
.instance
<< " ns=" << key
.ns
));
4924 if (!key
.ns
.empty()) {
4925 set_status() << "skipping entry in namespace: " << entry
->object
;
4926 tn
->log(20, SSTR("skipping entry in namespace: " << entry
->object
));
4927 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4931 if (!check_key_handled(key
)) {
4932 set_status() << "skipping entry due to policy rules: " << entry
->object
;
4933 tn
->log(20, SSTR("skipping entry due to policy rules: " << entry
->object
));
4934 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4938 set_status() << "got entry.id=" << cur_id
<< " key=" << key
<< " op=" << (int)entry
->op
;
4939 if (entry
->op
== CLS_RGW_OP_CANCEL
) {
4940 set_status() << "canceled operation, skipping";
4941 tn
->log(20, SSTR("skipping object: "
4942 << bucket_shard_str
{bs
} << "/" << key
<< ": canceled operation"));
4943 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4946 if (entry
->state
!= CLS_RGW_STATE_COMPLETE
) {
4947 set_status() << "non-complete operation, skipping";
4948 tn
->log(20, SSTR("skipping object: "
4949 << bucket_shard_str
{bs
} << "/" << key
<< ": non-complete operation"));
4950 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4953 if (entry
->zones_trace
.exists(zone_id
.id
, target_location_key
)) {
4954 set_status() << "redundant operation, skipping";
4955 tn
->log(20, SSTR("skipping object: "
4956 <<bucket_shard_str
{bs
} <<"/"<<key
<<": redundant operation"));
4957 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4960 if (make_pair
<>(entry
->timestamp
, entry
->op
) != squash_map
[make_pair(entry
->object
, entry
->instance
)]) {
4961 set_status() << "squashed operation, skipping";
4962 tn
->log(20, SSTR("skipping object: "
4963 << bucket_shard_str
{bs
} << "/" << key
<< ": squashed operation"));
4964 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
4967 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
4968 tn
->log(20, SSTR("syncing object: "
4969 << bucket_shard_str
{bs
} << "/" << key
));
4970 updated_status
= false;
4971 while (!marker_tracker
.can_do_op(key
, has_olh_epoch(entry
->op
))) {
4972 if (!updated_status
) {
4973 set_status() << "can't do op, conflicting inflight operation";
4974 updated_status
= true;
4976 tn
->log(5, SSTR("can't do op on key=" << key
<< " need to wait for conflicting operation to complete"));
4977 yield
wait_for_child();
4980 again
= collect(&ret
, nullptr);
4982 tn
->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret
<< ")"));
4984 /* we have reported this error */
4987 if (sync_status
!= 0)
4990 if (sync_status
!= 0) {
4991 /* get error, stop */
4994 if (!marker_tracker
.index_key_to_marker(key
, cur_id
, has_olh_epoch(entry
->op
))) {
4995 set_status() << "can't do op, sync already in progress for object";
4996 tn
->log(20, SSTR("skipping sync of entry: " << cur_id
<< ":" << key
<< " sync already in progress for object"));
4997 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
5001 set_status() << "start object sync";
5002 if (!marker_tracker
.start(cur_id
, 0, entry
->timestamp
)) {
5003 tn
->log(0, SSTR("ERROR: cannot start syncing " << cur_id
<< ". Duplicate entry?"));
5005 std::optional
<uint64_t> versioned_epoch
;
5006 rgw_bucket_entry_owner
owner(entry
->owner
, entry
->owner_display_name
);
5007 if (entry
->ver
.pool
< 0) {
5008 versioned_epoch
= entry
->ver
.epoch
;
5010 tn
->log(20, SSTR("entry->timestamp=" << entry
->timestamp
));
5011 using SyncCR
= RGWBucketSyncSingleEntryCR
<string
, rgw_obj_key
>;
5012 spawn(new SyncCR(sc
, sync_pipe
, key
,
5013 entry
->is_versioned(), versioned_epoch
,
5014 entry
->timestamp
, owner
, entry
->op
, entry
->state
,
5015 cur_id
, &marker_tracker
, entry
->zones_trace
, tn
),
5019 drain_with_cb(sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_bucket_sync_spawn_window
),
5020 [&](uint64_t stack_id
, int ret
) {
5022 tn
->log(10, "a sync operation returned error");
5029 } while (!list_result
.empty() && sync_status
== 0 && !syncstopped
);
5031 drain_all_cb([&](uint64_t stack_id
, int ret
) {
5033 tn
->log(10, "a sync operation returned error");
5038 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
5041 // transition to StateStopped in RGWSyncBucketShardCR. if sync is
5042 // still disabled, we'll delete the sync status object. otherwise we'll
5043 // restart full sync to catch any changes that happened while sync was
5045 sync_info
.state
= rgw_bucket_shard_sync_info::StateStopped
;
5046 return set_cr_done();
5049 yield
call(marker_tracker
.flush());
5051 tn
->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode
));
5052 return set_cr_error(retcode
);
5054 if (sync_status
< 0) {
5055 tn
->log(10, SSTR("backing out with sync_status=" << sync_status
));
5056 return set_cr_error(sync_status
);
5059 if (!truncated
&& extended_result
.next_log
) {
5060 yield
call(new RGWBucketShardIsDoneCR(sc
, bucket_status_obj
, bs
.shard_id
, *extended_result
.next_log
, generation
));
5062 ldout(cct
, 20) << "failed to update bucket sync status: "
5063 << cpp_strerror(retcode
) << dendl
;
5064 return set_cr_error(retcode
);
5067 // delete the shard status object
5068 auto status_obj
= sync_env
->svc
->rados
->obj(marker_tracker
.get_obj());
5069 retcode
= status_obj
.open(dpp
);
5071 return set_cr_error(retcode
);
5073 call(new RGWRadosRemoveOidCR(sync_env
->driver
, std::move(status_obj
)));
5075 ldpp_dout(dpp
, 20) << "failed to remove shard status object: " << cpp_strerror(retcode
) << dendl
;
5076 return set_cr_error(retcode
);
5081 return set_cr_done();
5086 class RGWGetBucketPeersCR
: public RGWCoroutine
{
5087 RGWDataSyncEnv
*sync_env
;
5089 std::optional
<rgw_bucket
> target_bucket
;
5090 std::optional
<rgw_zone_id
> source_zone
;
5091 std::optional
<rgw_bucket
> source_bucket
;
5093 rgw_sync_pipe_info_set
*pipes
;
5094 map
<rgw_bucket
, all_bucket_info
> buckets_info
;
5095 map
<rgw_bucket
, all_bucket_info
>::iterator siiter
;
5096 std::optional
<all_bucket_info
> target_bucket_info
;
5097 std::optional
<all_bucket_info
> source_bucket_info
;
5099 rgw_sync_pipe_info_set::iterator siter
;
5101 std::shared_ptr
<rgw_bucket_get_sync_policy_result
> source_policy
;
5102 std::shared_ptr
<rgw_bucket_get_sync_policy_result
> target_policy
;
5104 RGWSyncTraceNodeRef tn
;
5106 using pipe_const_iter
= map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>::const_iterator
;
5108 static pair
<pipe_const_iter
, pipe_const_iter
> get_pipe_iters(const map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& m
, std::optional
<rgw_zone_id
> zone
) {
5110 return { m
.begin(), m
.end() };
5113 auto b
= m
.find(*zone
);
5117 return { b
, std::next(b
) };
5120 void filter_sources(std::optional
<rgw_zone_id
> source_zone
,
5121 std::optional
<rgw_bucket
> source_bucket
,
5122 const map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& all_sources
,
5123 rgw_sync_pipe_info_set
*result
) {
5124 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": source_zone=" << source_zone
.value_or(rgw_zone_id("*")).id
5125 << " source_bucket=" << source_bucket
.value_or(rgw_bucket())
5126 << " all_sources.size()=" << all_sources
.size() << dendl
;
5127 auto iters
= get_pipe_iters(all_sources
, source_zone
);
5128 for (auto i
= iters
.first
; i
!= iters
.second
; ++i
) {
5129 for (auto& handler
: i
->second
) {
5130 if (!handler
.specific()) {
5131 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": skipping" << dendl
;
5134 if (source_bucket
&&
5135 !source_bucket
->match(*handler
.source
.bucket
)) {
5138 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": adding" << dendl
;
5139 result
->insert(handler
, source_bucket_info
, target_bucket_info
);
5144 void filter_targets(std::optional
<rgw_zone_id
> target_zone
,
5145 std::optional
<rgw_bucket
> target_bucket
,
5146 const map
<rgw_zone_id
, RGWBucketSyncFlowManager::pipe_set
>& all_targets
,
5147 rgw_sync_pipe_info_set
*result
) {
5148 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": target_zone=" << source_zone
.value_or(rgw_zone_id("*")).id
5149 << " target_bucket=" << source_bucket
.value_or(rgw_bucket())
5150 << " all_targets.size()=" << all_targets
.size() << dendl
;
5151 auto iters
= get_pipe_iters(all_targets
, target_zone
);
5152 for (auto i
= iters
.first
; i
!= iters
.second
; ++i
) {
5153 for (auto& handler
: i
->second
) {
5154 if (target_bucket
&&
5155 handler
.dest
.bucket
&&
5156 !target_bucket
->match(*handler
.dest
.bucket
)) {
5157 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": skipping" << dendl
;
5160 ldpp_dout(sync_env
->dpp
, 20) << __func__
<< ": pipe_handler=" << handler
<< ": adding" << dendl
;
5161 result
->insert(handler
, source_bucket_info
, target_bucket_info
);
5166 void update_from_target_bucket_policy();
5167 void update_from_source_bucket_policy();
5169 struct GetHintTargets
: public RGWGenericAsyncCR::Action
{
5170 RGWDataSyncEnv
*sync_env
;
5171 rgw_bucket source_bucket
;
5172 std::set
<rgw_bucket
> targets
;
5174 GetHintTargets(RGWDataSyncEnv
*_sync_env
,
5175 const rgw_bucket
& _source_bucket
) : sync_env(_sync_env
),
5176 source_bucket(_source_bucket
) {}
5177 int operate() override
{
5178 int r
= sync_env
->svc
->bucket_sync
->get_bucket_sync_hints(sync_env
->dpp
,
5184 ldpp_dout(sync_env
->dpp
, 0) << "ERROR: " << __func__
<< "(): failed to fetch bucket sync hints for bucket=" << source_bucket
<< dendl
;
5192 std::shared_ptr
<GetHintTargets
> get_hint_targets_action
;
5193 std::set
<rgw_bucket
>::iterator hiter
;
5196 RGWGetBucketPeersCR(RGWDataSyncEnv
*_sync_env
,
5197 std::optional
<rgw_bucket
> _target_bucket
,
5198 std::optional
<rgw_zone_id
> _source_zone
,
5199 std::optional
<rgw_bucket
> _source_bucket
,
5200 rgw_sync_pipe_info_set
*_pipes
,
5201 const RGWSyncTraceNodeRef
& _tn_parent
)
5202 : RGWCoroutine(_sync_env
->cct
),
5203 sync_env(_sync_env
),
5204 target_bucket(_target_bucket
),
5205 source_zone(_source_zone
),
5206 source_bucket(_source_bucket
),
5208 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "get_bucket_peers",
5209 SSTR( "target=" << target_bucket
.value_or(rgw_bucket())
5210 << ":source=" << target_bucket
.value_or(rgw_bucket())
5211 << ":source_zone=" << source_zone
.value_or(rgw_zone_id("*")).id
))) {
5214 int operate(const DoutPrefixProvider
*dpp
) override
;
5217 std::ostream
& operator<<(std::ostream
& out
, std::optional
<rgw_bucket_shard
>& bs
) {
5226 static RGWCoroutine
* sync_bucket_shard_cr(RGWDataSyncCtx
* sc
,
5227 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease
,
5228 const rgw_bucket_sync_pair_info
& sync_pair
,
5229 std::optional
<uint64_t> gen
,
5230 const RGWSyncTraceNodeRef
& tn
,
5231 ceph::real_time
* progress
);
5233 RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx
*_sc
,
5234 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
5235 const rgw_bucket_shard
& source_bs
,
5236 const RGWSyncTraceNodeRef
& _tn_parent
,
5237 std::optional
<uint64_t> gen
,
5238 ceph::real_time
* progress
)
5239 : RGWCoroutine(_sc
->env
->cct
), sc(_sc
), sync_env(_sc
->env
),
5240 lease_cr(std::move(lease_cr
)),
5241 tn(sync_env
->sync_tracer
->add_node(
5242 _tn_parent
, "bucket_sync_sources",
5243 SSTR( "source=" << source_bs
<< ":source_zone=" << sc
->source_zone
))),
5247 sync_pair
.source_bs
= source_bs
;
5250 int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider
*dpp
)
5253 yield
call(new RGWGetBucketPeersCR(sync_env
, std::nullopt
, sc
->source_zone
,
5254 sync_pair
.source_bs
.bucket
, &pipes
, tn
));
5255 if (retcode
< 0 && retcode
!= -ENOENT
) {
5256 tn
->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode
));
5257 return set_cr_error(retcode
);
5260 ldpp_dout(dpp
, 20) << __func__
<< "(): requested source_bs=" << sync_pair
.source_bs
<< dendl
;
5262 if (pipes
.empty()) {
5263 ldpp_dout(dpp
, 20) << __func__
<< "(): no relevant sync pipes found" << dendl
;
5264 return set_cr_done();
5267 shard_progress
.resize(pipes
.size());
5268 cur_shard_progress
= shard_progress
.begin();
5270 for (siter
= pipes
.begin(); siter
!= pipes
.end(); ++siter
, ++cur_shard_progress
) {
5271 ldpp_dout(dpp
, 20) << __func__
<< "(): sync pipe=" << *siter
<< dendl
;
5273 sync_pair
.dest_bucket
= siter
->target
.get_bucket();
5274 sync_pair
.handler
= siter
->handler
;
5276 ldpp_dout(dpp
, 20) << __func__
<< "(): sync_pair=" << sync_pair
<< dendl
;
5278 yield_spawn_window(sync_bucket_shard_cr(sc
, lease_cr
, sync_pair
,
5279 gen
, tn
, &*cur_shard_progress
),
5280 sc
->lcc
.adj_concurrency(cct
->_conf
->rgw_bucket_sync_spawn_window
),
5281 [&](uint64_t stack_id
, int ret
) {
5283 tn
->log(10, SSTR("ERROR: a sync operation returned error: " << ret
));
5288 drain_all_cb([&](uint64_t stack_id
, int ret
) {
5290 tn
->log(10, SSTR("a sync operation returned error: " << ret
));
5295 *progress
= *std::min_element(shard_progress
.begin(), shard_progress
.end());
5297 return set_cr_done();
5303 class RGWSyncGetBucketInfoCR
: public RGWCoroutine
{
5304 RGWDataSyncEnv
*sync_env
;
5306 RGWBucketInfo
*pbucket_info
;
5307 map
<string
, bufferlist
> *pattrs
;
5308 RGWMetaSyncEnv meta_sync_env
;
5310 RGWSyncTraceNodeRef tn
;
5313 RGWSyncGetBucketInfoCR(RGWDataSyncEnv
*_sync_env
,
5314 const rgw_bucket
& _bucket
,
5315 RGWBucketInfo
*_pbucket_info
,
5316 map
<string
, bufferlist
> *_pattrs
,
5317 const RGWSyncTraceNodeRef
& _tn_parent
)
5318 : RGWCoroutine(_sync_env
->cct
),
5319 sync_env(_sync_env
),
5321 pbucket_info(_pbucket_info
),
5323 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "get_bucket_info",
5327 int operate(const DoutPrefixProvider
*dpp
) override
;
5330 int RGWSyncGetBucketInfoCR::operate(const DoutPrefixProvider
*dpp
)
5333 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->driver
, bucket
, pbucket_info
, pattrs
, dpp
));
5334 if (retcode
== -ENOENT
) {
5335 /* bucket instance info has not been synced in yet, fetch it now */
5337 tn
->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
5338 string raw_key
= string("bucket.instance:") + bucket
.get_key();
5340 meta_sync_env
.init(dpp
, cct
, sync_env
->driver
, sync_env
->svc
->zone
->get_master_conn(), sync_env
->async_rados
,
5341 sync_env
->http_manager
, sync_env
->error_logger
, sync_env
->sync_tracer
);
5343 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env
, raw_key
,
5344 string() /* no marker */,
5345 MDLOG_STATUS_COMPLETE
,
5346 NULL
/* no marker tracker */,
5350 tn
->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str
{bucket
}));
5351 return set_cr_error(retcode
);
5354 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->driver
, bucket
, pbucket_info
, pattrs
, dpp
));
5357 tn
->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{bucket
}));
5358 return set_cr_error(retcode
);
5361 return set_cr_done();
5367 void RGWGetBucketPeersCR::update_from_target_bucket_policy()
5369 if (!target_policy
||
5370 !target_policy
->policy_handler
||
5375 auto handler
= target_policy
->policy_handler
.get();
5377 filter_sources(source_zone
,
5379 handler
->get_sources(),
5382 for (siter
= pipes
->begin(); siter
!= pipes
->end(); ++siter
) {
5383 if (!siter
->source
.has_bucket_info()) {
5384 buckets_info
.emplace(siter
->source
.get_bucket(), all_bucket_info());
5386 if (!siter
->target
.has_bucket_info()) {
5387 buckets_info
.emplace(siter
->target
.get_bucket(), all_bucket_info());
5392 void RGWGetBucketPeersCR::update_from_source_bucket_policy()
5394 if (!source_policy
||
5395 !source_policy
->policy_handler
||
5400 auto handler
= source_policy
->policy_handler
.get();
5402 filter_targets(sync_env
->svc
->zone
->get_zone().id
,
5404 handler
->get_targets(),
5407 for (siter
= pipes
->begin(); siter
!= pipes
->end(); ++siter
) {
5408 if (!siter
->source
.has_bucket_info()) {
5409 buckets_info
.emplace(siter
->source
.get_bucket(), all_bucket_info());
5411 if (!siter
->target
.has_bucket_info()) {
5412 buckets_info
.emplace(siter
->target
.get_bucket(), all_bucket_info());
5418 class RGWSyncGetBucketSyncPolicyHandlerCR
: public RGWCoroutine
{
5419 RGWDataSyncEnv
*sync_env
;
5421 rgw_bucket_get_sync_policy_params get_policy_params
;
5423 std::shared_ptr
<rgw_bucket_get_sync_policy_result
> policy
;
5425 RGWSyncTraceNodeRef tn
;
5430 RGWSyncGetBucketSyncPolicyHandlerCR(RGWDataSyncEnv
*_sync_env
,
5431 std::optional
<rgw_zone_id
> zone
,
5432 const rgw_bucket
& _bucket
,
5433 std::shared_ptr
<rgw_bucket_get_sync_policy_result
>& _policy
,
5434 const RGWSyncTraceNodeRef
& _tn_parent
)
5435 : RGWCoroutine(_sync_env
->cct
),
5436 sync_env(_sync_env
),
5439 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "get_sync_policy_handler",
5441 get_policy_params
.zone
= zone
;
5442 get_policy_params
.bucket
= bucket
;
5445 int operate(const DoutPrefixProvider
*dpp
) override
{
5447 for (i
= 0; i
< 2; ++i
) {
5448 yield
call(new RGWBucketGetSyncPolicyHandlerCR(sync_env
->async_rados
,
5454 retcode
!= -ENOENT
) {
5455 return set_cr_error(retcode
);
5459 return set_cr_done();
5462 /* bucket instance was not found,
5463 * try to get bucket instance info, can trigger
5464 * metadata sync of bucket instance
5466 yield
call(new RGWSyncGetBucketInfoCR(sync_env
,
5472 return set_cr_error(retcode
);
5482 int RGWGetBucketPeersCR::operate(const DoutPrefixProvider
*dpp
)
5488 if (target_bucket
) {
5489 target_policy
= make_shared
<rgw_bucket_get_sync_policy_result
>();
5490 yield
call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env
,
5496 retcode
!= -ENOENT
) {
5497 return set_cr_error(retcode
);
5500 update_from_target_bucket_policy();
5503 if (source_bucket
&& source_zone
) {
5504 source_policy
= make_shared
<rgw_bucket_get_sync_policy_result
>();
5505 yield
call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env
,
5511 retcode
!= -ENOENT
) {
5512 return set_cr_error(retcode
);
5515 if (source_policy
->policy_handler
) {
5516 auto& opt_bucket_info
= source_policy
->policy_handler
->get_bucket_info();
5517 auto& opt_attrs
= source_policy
->policy_handler
->get_bucket_attrs();
5518 if (opt_bucket_info
&& opt_attrs
) {
5519 source_bucket_info
.emplace();
5520 source_bucket_info
->bucket_info
= *opt_bucket_info
;
5521 source_bucket_info
->attrs
= *opt_attrs
;
5525 if (!target_bucket
) {
5526 get_hint_targets_action
= make_shared
<GetHintTargets
>(sync_env
, *source_bucket
);
5528 yield
call(new RGWGenericAsyncCR(cct
, sync_env
->async_rados
,
5529 get_hint_targets_action
));
5531 return set_cr_error(retcode
);
5534 /* hints might have incomplete bucket ids,
5535 * in which case we need to figure out the current
5538 for (hiter
= get_hint_targets_action
->targets
.begin();
5539 hiter
!= get_hint_targets_action
->targets
.end();
5541 ldpp_dout(dpp
, 20) << "Got sync hint for bucket=" << *source_bucket
<< ": " << hiter
->get_key() << dendl
;
5543 target_policy
= make_shared
<rgw_bucket_get_sync_policy_result
>();
5544 yield
call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env
,
5550 retcode
!= -ENOENT
) {
5551 return set_cr_error(retcode
);
5553 update_from_target_bucket_policy();
5558 update_from_source_bucket_policy();
5560 for (siiter
= buckets_info
.begin(); siiter
!= buckets_info
.end(); ++siiter
) {
5561 if (siiter
->second
.bucket_info
.bucket
.name
.empty()) {
5562 yield
call(new RGWSyncGetBucketInfoCR(sync_env
, siiter
->first
,
5563 &siiter
->second
.bucket_info
,
5564 &siiter
->second
.attrs
,
5570 pipes
->update_empty_bucket_info(buckets_info
);
5573 return set_cr_done();
5579 class RGWSyncBucketShardCR
: public RGWCoroutine
{
5581 RGWDataSyncEnv
*sync_env
;
5582 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
;
5583 rgw_bucket_sync_pair_info sync_pair
;
5584 rgw_bucket_sync_pipe
& sync_pipe
;
5585 bool& bucket_stopped
;
5586 uint64_t generation
;
5587 ceph::real_time
* progress
;
5589 const std::string shard_status_oid
;
5590 const rgw_raw_obj bucket_status_obj
;
5591 rgw_bucket_shard_sync_info sync_status
;
5592 RGWObjVersionTracker objv_tracker
;
5594 RGWSyncTraceNodeRef tn
;
5597 RGWSyncBucketShardCR(RGWDataSyncCtx
*_sc
,
5598 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
5599 const rgw_bucket_sync_pair_info
& _sync_pair
,
5600 rgw_bucket_sync_pipe
& sync_pipe
,
5601 bool& bucket_stopped
,
5602 uint64_t generation
,
5603 const RGWSyncTraceNodeRef
& tn
,
5604 ceph::real_time
* progress
)
5605 : RGWCoroutine(_sc
->cct
), sc(_sc
), sync_env(_sc
->env
),
5606 lease_cr(std::move(lease_cr
)), sync_pair(_sync_pair
),
5607 sync_pipe(sync_pipe
), bucket_stopped(bucket_stopped
), generation(generation
), progress(progress
),
5608 shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc
->source_zone
, sync_pair
, generation
)),
5609 bucket_status_obj(sc
->env
->svc
->zone
->get_zone_params().log_pool
,
5610 RGWBucketPipeSyncStatusManager::full_status_oid(sc
->source_zone
,
5611 sync_pair
.source_bs
.bucket
,
5612 sync_pair
.dest_bucket
)),
5616 int operate(const DoutPrefixProvider
*dpp
) override
;
5619 int RGWSyncBucketShardCR::operate(const DoutPrefixProvider
*dpp
)
5622 objv_tracker
.clear();
5623 yield
call(new RGWReadBucketPipeSyncStatusCoroutine(sc
, sync_pair
, &sync_status
, &objv_tracker
, generation
));
5624 if (retcode
< 0 && retcode
!= -ENOENT
) {
5625 tn
->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode
));
5626 return set_cr_error(retcode
);
5629 tn
->log(20, SSTR("sync status for source bucket shard: " << sync_status
.state
));
5630 sync_status
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
5632 *progress
= sync_status
.inc_marker
.timestamp
;
5635 yield
call(new RGWBucketShardIncrementalSyncCR(sc
, sync_pipe
,
5636 shard_status_oid
, bucket_status_obj
, lease_cr
,
5637 sync_status
, generation
, tn
,
5638 objv_tracker
, progress
));
5640 tn
->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode
));
5641 return set_cr_error(retcode
);
5644 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateStopped
) {
5645 tn
->log(20, SSTR("syncstopped indication for source bucket shard"));
5646 bucket_stopped
= true;
5649 return set_cr_done();
5655 class RGWSyncBucketCR
: public RGWCoroutine
{
5657 RGWDataSyncEnv
*env
;
5658 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> data_lease_cr
;
5659 boost::intrusive_ptr
<RGWContinuousLeaseCR
> bucket_lease_cr
;
5660 rgw_bucket_sync_pair_info sync_pair
;
5661 rgw_bucket_sync_pipe sync_pipe
;
5662 std::optional
<uint64_t> gen
;
5663 ceph::real_time
* progress
;
5665 const std::string lock_name
= "bucket sync";
5666 const uint32_t lock_duration
;
5667 const rgw_raw_obj status_obj
;
5668 rgw_bucket_sync_status bucket_status
;
5669 bool bucket_stopped
= false;
5670 RGWObjVersionTracker objv
;
5671 bool init_check_compat
= false;
5672 rgw_bucket_index_marker_info info
;
5673 rgw_raw_obj error_repo
;
5674 rgw_bucket_shard source_bs
;
5676 uint64_t current_gen
= 0;
5678 RGWSyncTraceNodeRef tn
;
5681 RGWSyncBucketCR(RGWDataSyncCtx
*_sc
,
5682 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease_cr
,
5683 const rgw_bucket_sync_pair_info
& _sync_pair
,
5684 std::optional
<uint64_t> gen
,
5685 const RGWSyncTraceNodeRef
& _tn_parent
,
5686 ceph::real_time
* progress
)
5687 : RGWCoroutine(_sc
->cct
), sc(_sc
), env(_sc
->env
),
5688 data_lease_cr(std::move(lease_cr
)), sync_pair(_sync_pair
),
5689 gen(gen
), progress(progress
),
5690 lock_duration(cct
->_conf
->rgw_sync_lease_period
),
5691 status_obj(env
->svc
->zone
->get_zone_params().log_pool
,
5692 RGWBucketPipeSyncStatusManager::full_status_oid(sc
->source_zone
,
5693 sync_pair
.source_bs
.bucket
,
5694 sync_pair
.dest_bucket
)),
5695 tn(env
->sync_tracer
->add_node(_tn_parent
, "bucket",
5696 SSTR(bucket_str
{_sync_pair
.dest_bucket
} << "<-" << bucket_shard_str
{_sync_pair
.source_bs
} ))) {
5699 int operate(const DoutPrefixProvider
*dpp
) override
;
5702 static RGWCoroutine
* sync_bucket_shard_cr(RGWDataSyncCtx
* sc
,
5703 boost::intrusive_ptr
<const RGWContinuousLeaseCR
> lease
,
5704 const rgw_bucket_sync_pair_info
& sync_pair
,
5705 std::optional
<uint64_t> gen
,
5706 const RGWSyncTraceNodeRef
& tn
,
5707 ceph::real_time
* progress
)
5709 return new RGWSyncBucketCR(sc
, std::move(lease
), sync_pair
,
5713 #define RELEASE_LOCK(cr) \
5714 if (cr) {cr->go_down(); drain_all(); cr.reset();}
5716 int RGWSyncBucketCR::operate(const DoutPrefixProvider
*dpp
)
5719 // read source/destination bucket info
5720 yield
call(new RGWSyncGetBucketInfoCR(env
, sync_pair
.source_bs
.bucket
, &sync_pipe
.source_bucket_info
,
5721 &sync_pipe
.source_bucket_attrs
, tn
));
5723 tn
->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{sync_pair
.source_bs
.bucket
}));
5724 return set_cr_error(retcode
);
5727 yield
call(new RGWSyncGetBucketInfoCR(env
, sync_pair
.dest_bucket
, &sync_pipe
.dest_bucket_info
,
5728 &sync_pipe
.dest_bucket_attrs
, tn
));
5730 tn
->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{sync_pair
.source_bs
.bucket
}));
5731 return set_cr_error(retcode
);
5734 sync_pipe
.info
= sync_pair
;
5736 // read bucket sync status
5737 using ReadCR
= RGWSimpleRadosReadCR
<rgw_bucket_sync_status
>;
5738 using WriteCR
= RGWSimpleRadosWriteCR
<rgw_bucket_sync_status
>;
5741 yield
call(new ReadCR(dpp
, env
->driver
,
5742 status_obj
, &bucket_status
, false, &objv
));
5743 if (retcode
== -ENOENT
) {
5744 // if the full sync status object didn't exist yet, run the backward
5745 // compatability logic in InitBucketFullSyncStatusCR below. if it did
5746 // exist, a `bucket sync init` probably requested its re-initialization,
5747 // and shouldn't try to resume incremental sync
5748 init_check_compat
= true;
5750 // use exclusive create to set state=Init
5751 objv
.generate_new_write_ver(cct
);
5752 yield
call(new WriteCR(dpp
, env
->driver
, status_obj
, bucket_status
, &objv
, true));
5753 tn
->log(20, "bucket status object does not exist, create a new one");
5754 if (retcode
== -EEXIST
) {
5755 // raced with another create, read its status
5756 tn
->log(20, "raced with another create, read its status");
5758 yield
call(new ReadCR(dpp
, env
->driver
,
5759 status_obj
, &bucket_status
, false, &objv
));
5763 tn
->log(20, SSTR("ERROR: failed to read bucket status object. error: " << retcode
));
5764 return set_cr_error(retcode
);
5768 tn
->log(20, SSTR("sync status for source bucket: " << bucket_status
.state
<<
5769 ". lease is: " << (bucket_lease_cr
? "taken" : "not taken") << ". stop indications is: " << bucket_stopped
));
5771 if (bucket_status
.state
!= BucketSyncState::Incremental
||
5774 if (!bucket_lease_cr
) {
5775 bucket_lease_cr
.reset(new RGWContinuousLeaseCR(env
->async_rados
, env
->driver
, status_obj
,
5776 lock_name
, lock_duration
, this, &sc
->lcc
));
5777 yield
spawn(bucket_lease_cr
.get(), false);
5778 while (!bucket_lease_cr
->is_locked()) {
5779 if (bucket_lease_cr
->is_done()) {
5780 tn
->log(5, "failed to take lease");
5781 set_status("lease lock failed, early abort");
5783 return set_cr_error(bucket_lease_cr
->get_ret_status());
5785 tn
->log(5, "waiting on bucket lease");
5786 yield
set_sleeping(true);
5790 // if state is Init or Stopped, we query the remote RGW for ther state
5791 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sc
, sync_pair
.source_bs
.bucket
, &info
));
5793 RELEASE_LOCK(bucket_lease_cr
);
5794 return set_cr_error(retcode
);
5796 if (info
.syncstopped
) {
5797 // remote indicates stopped state
5798 tn
->log(20, "remote bilog indicates that sync was stopped");
5800 // if state was incremental, remove all per-shard status objects
5801 if (bucket_status
.state
== BucketSyncState::Incremental
) {
5803 const auto num_shards
= bucket_status
.shards_done_with_gen
.size();
5804 const auto gen
= bucket_status
.incremental_gen
;
5805 call(new RemoveBucketShardStatusCollectCR(sc
, sync_pair
, gen
, num_shards
));
5809 // check if local state is "stopped"
5811 yield
call(new ReadCR(dpp
, env
->driver
,
5812 status_obj
, &bucket_status
, false, &objv
));
5814 tn
->log(20, SSTR("ERROR: failed to read status before writing 'stopped'. error: " << retcode
));
5815 RELEASE_LOCK(bucket_lease_cr
);
5816 return set_cr_error(retcode
);
5818 if (bucket_status
.state
!= BucketSyncState::Stopped
) {
5819 // make sure that state is changed to stopped localy
5820 bucket_status
.state
= BucketSyncState::Stopped
;
5821 yield
call(new WriteCR(dpp
, env
->driver
, status_obj
, bucket_status
,
5824 tn
->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode
));
5825 RELEASE_LOCK(bucket_lease_cr
);
5826 return set_cr_error(retcode
);
5829 RELEASE_LOCK(bucket_lease_cr
);
5830 return set_cr_done();
5832 if (bucket_stopped
) {
5833 tn
->log(20, SSTR("ERROR: switched from 'stop' to 'start' sync. while state is: " << bucket_status
.state
));
5834 bucket_stopped
= false;
5835 bucket_status
.state
= BucketSyncState::Init
;
5839 if (bucket_status
.state
!= BucketSyncState::Incremental
) {
5840 // if the state wasn't Incremental, take a bucket-wide lease to prevent
5841 // different shards from duplicating the init and full sync
5842 if (!bucket_lease_cr
) {
5843 bucket_lease_cr
.reset(new RGWContinuousLeaseCR(env
->async_rados
, env
->driver
, status_obj
,
5844 lock_name
, lock_duration
, this, &sc
->lcc
));
5845 yield
spawn(bucket_lease_cr
.get(), false);
5846 while (!bucket_lease_cr
->is_locked()) {
5847 if (bucket_lease_cr
->is_done()) {
5848 tn
->log(5, "failed to take lease");
5849 set_status("lease lock failed, early abort");
5851 return set_cr_error(bucket_lease_cr
->get_ret_status());
5853 tn
->log(5, "waiting on bucket lease");
5854 yield
set_sleeping(true);
5858 // reread the status after acquiring the lock
5860 yield
call(new ReadCR(dpp
, env
->driver
, status_obj
,
5861 &bucket_status
, false, &objv
));
5863 RELEASE_LOCK(bucket_lease_cr
);
5864 tn
->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode
));
5865 return set_cr_error(retcode
);
5867 tn
->log(20, SSTR("status after acquiring the lock is: " << bucket_status
.state
));
5869 yield
call(new InitBucketFullSyncStatusCR(sc
, sync_pair
, status_obj
,
5870 bucket_status
, objv
,
5871 sync_pipe
.source_bucket_info
,
5872 init_check_compat
, info
));
5875 tn
->log(20, SSTR("ERROR: init full sync failed. error: " << retcode
));
5876 RELEASE_LOCK(bucket_lease_cr
);
5877 return set_cr_error(retcode
);
5881 assert(bucket_status
.state
== BucketSyncState::Incremental
||
5882 bucket_status
.state
== BucketSyncState::Full
);
5884 if (bucket_status
.state
== BucketSyncState::Full
) {
5885 assert(bucket_lease_cr
);
5886 yield
call(new RGWBucketFullSyncCR(sc
, sync_pipe
, status_obj
,
5887 bucket_lease_cr
, bucket_status
,
5890 tn
->log(20, SSTR("ERROR: full sync failed. error: " << retcode
));
5891 RELEASE_LOCK(bucket_lease_cr
);
5892 return set_cr_error(retcode
);
5896 if (bucket_status
.state
== BucketSyncState::Incremental
) {
5897 // lease not required for incremental sync
5898 RELEASE_LOCK(bucket_lease_cr
);
5900 assert(sync_pair
.source_bs
.shard_id
>= 0);
5901 // if a specific gen was requested, compare that to the sync status
5903 current_gen
= bucket_status
.incremental_gen
;
5904 source_bs
= sync_pair
.source_bs
;
5905 if (*gen
> current_gen
) {
5906 /* In case the data log entry is missing for previous gen, it may
5907 * not be marked complete and the sync can get stuck. To avoid it,
5908 * may be we can add this (shardid, gen) to error repo to force
5909 * sync and mark that shard as completed.
5911 pool
= sc
->env
->svc
->zone
->get_zone_params().log_pool
;
5912 if ((static_cast<std::size_t>(source_bs
.shard_id
) < bucket_status
.shards_done_with_gen
.size()) &&
5913 !bucket_status
.shards_done_with_gen
[source_bs
.shard_id
]) {
5914 // use the error repo and sync status timestamp from the datalog shard corresponding to source_bs
5915 error_repo
= datalog_oid_for_error_repo(sc
, sc
->env
->driver
,
5917 yield
call(rgw::error_repo::write_cr(sc
->env
->driver
->svc()->rados
, error_repo
,
5918 rgw::error_repo::encode_key(source_bs
, current_gen
),
5919 ceph::real_clock::zero()));
5921 tn
->log(0, SSTR("ERROR: failed to log prev gen entry (bucket=" << source_bs
.bucket
<< ", shard_id=" << source_bs
.shard_id
<< ", gen=" << current_gen
<< " in error repo: retcode=" << retcode
));
5923 tn
->log(20, SSTR("logged prev gen entry (bucket=" << source_bs
.bucket
<< ", shard_id=" << source_bs
.shard_id
<< ", gen=" << current_gen
<< " in error repo: retcode=" << retcode
));
5927 tn
->log(10, SSTR("ERROR: requested sync of future generation "
5928 << *gen
<< " > " << current_gen
5929 << ", returning " << retcode
<< " for later retry"));
5930 return set_cr_error(retcode
);
5931 } else if (*gen
< current_gen
) {
5932 tn
->log(10, SSTR("WARNING: requested sync of past generation "
5933 << *gen
<< " < " << current_gen
5934 << ", returning success"));
5935 return set_cr_done();
5939 if (static_cast<std::size_t>(sync_pair
.source_bs
.shard_id
) >= bucket_status
.shards_done_with_gen
.size()) {
5940 tn
->log(1, SSTR("bucket shard " << sync_pair
.source_bs
<< " index out of bounds"));
5941 return set_cr_done(); // return success so we don't retry
5943 if (bucket_status
.shards_done_with_gen
[sync_pair
.source_bs
.shard_id
]) {
5944 tn
->log(10, SSTR("bucket shard " << sync_pair
.source_bs
<< " of gen " <<
5945 gen
<< " already synced."));
5946 return set_cr_done();
5949 yield
call(new RGWSyncBucketShardCR(sc
, data_lease_cr
, sync_pair
,
5950 sync_pipe
, bucket_stopped
,
5951 bucket_status
.incremental_gen
, tn
, progress
));
5953 tn
->log(20, SSTR("ERROR: incremental sync failed. error: " << retcode
));
5954 return set_cr_error(retcode
);
5957 // loop back to previous states unless incremental sync returns normally
5958 } while (bucket_status
.state
!= BucketSyncState::Incremental
|| bucket_stopped
);
5960 return set_cr_done();
5966 int RGWBucketPipeSyncStatusManager::do_init(const DoutPrefixProvider
*dpp
,
5969 int ret
= http_manager
.start();
5971 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
5975 sync_module
.reset(new RGWDefaultSyncModuleInstance());
5976 auto async_rados
= driver
->svc()->rados
->get_async_processor();
5978 sync_env
.init(this, driver
->ctx(), driver
,
5979 driver
->svc(), async_rados
, &http_manager
,
5980 error_logger
.get(), driver
->getRados()->get_sync_tracer(),
5981 sync_module
, nullptr);
5983 sync_env
.ostr
= ostr
;
5985 rgw_sync_pipe_info_set pipes
;
5987 ret
= cr_mgr
.run(dpp
, new RGWGetBucketPeersCR(&sync_env
,
5992 sync_env
.sync_tracer
->root_node
));
5994 ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret
<< "): " << cpp_strerror(-ret
) << dendl
;
5998 if (pipes
.empty()) {
5999 ldpp_dout(this, 0) << "No peers. This is not a valid multisite configuration." << dendl
;
6003 for (auto& pipe
: pipes
) {
6004 auto& szone
= pipe
.source
.zone
;
6006 auto conn
= driver
->svc()->zone
->get_zone_conn(szone
);
6008 ldpp_dout(this, 0) << "connection object to zone " << szone
<< " does not exist" << dendl
;
6013 if (!(z
= driver
->svc()->zone
->find_zone(szone
))) {
6014 ldpp_dout(this, 0) << "zone " << szone
<< " does not exist" << dendl
;
6017 sources
.emplace_back(&sync_env
, szone
, conn
,
6018 pipe
.source
.get_bucket_info(),
6019 pipe
.target
.get_bucket(),
6020 pipe
.handler
, z
->name
);
6026 int RGWBucketPipeSyncStatusManager::remote_info(const DoutPrefixProvider
*dpp
,
6028 uint64_t* oldest_gen
,
6029 uint64_t* latest_gen
,
6030 uint64_t* num_shards
)
6032 rgw_bucket_index_marker_info remote_info
;
6033 BucketIndexShardsManager remote_markers
;
6034 auto r
= rgw_read_remote_bilog_info(dpp
, s
.sc
.conn
, s
.info
.bucket
,
6035 remote_info
, remote_markers
,
6039 ldpp_dout(dpp
, 0) << __PRETTY_FUNCTION__
<< ":" << __LINE__
6040 << " rgw_read_remote_bilog_info: r="
6045 *oldest_gen
= remote_info
.oldest_gen
;
6048 *latest_gen
= remote_info
.latest_gen
;
6051 *num_shards
= remote_markers
.get().size();
6056 tl::expected
<std::unique_ptr
<RGWBucketPipeSyncStatusManager
>, int>
6057 RGWBucketPipeSyncStatusManager::construct(
6058 const DoutPrefixProvider
* dpp
,
6059 rgw::sal::RadosStore
* driver
,
6060 std::optional
<rgw_zone_id
> source_zone
,
6061 std::optional
<rgw_bucket
> source_bucket
,
6062 const rgw_bucket
& dest_bucket
,
6065 std::unique_ptr
<RGWBucketPipeSyncStatusManager
> self
{
6066 new RGWBucketPipeSyncStatusManager(driver
, source_zone
, source_bucket
,
6068 auto r
= self
->do_init(dpp
, ostr
);
6070 return tl::unexpected(r
);
6075 int RGWBucketPipeSyncStatusManager::init_sync_status(
6076 const DoutPrefixProvider
*dpp
)
6078 // Just running one at a time saves us from buildup/teardown and in
6079 // practice we only do one zone at a time.
6080 for (auto& source
: sources
) {
6081 list
<RGWCoroutinesStack
*> stacks
;
6082 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(driver
->ctx(), &cr_mgr
);
6083 pretty_print(source
.sc
.env
, "Initializing sync state of bucket {} with zone {}.\n",
6084 source
.info
.bucket
.name
, source
.zone_name
);
6085 stack
->call(new RGWSimpleRadosWriteCR
<rgw_bucket_sync_status
>(
6086 dpp
, source
.sc
.env
->driver
,
6087 {sync_env
.svc
->zone
->get_zone_params().log_pool
,
6088 full_status_oid(source
.sc
.source_zone
,
6091 rgw_bucket_sync_status
{}));
6092 stacks
.push_back(stack
);
6093 auto r
= cr_mgr
.run(dpp
, stacks
);
6095 pretty_print(source
.sc
.env
,
6096 "Initialization of sync state for bucket {} with zone {} "
6097 "failed with error {}\n",
6098 source
.info
.bucket
.name
, source
.zone_name
, cpp_strerror(r
));
6104 tl::expected
<std::map
<int, rgw_bucket_shard_sync_info
>, int>
6105 RGWBucketPipeSyncStatusManager::read_sync_status(
6106 const DoutPrefixProvider
*dpp
)
6108 std::map
<int, rgw_bucket_shard_sync_info
> sync_status
;
6109 list
<RGWCoroutinesStack
*> stacks
;
6111 auto sz
= sources
.begin();
6114 sz
= std::find_if(sources
.begin(), sources
.end(),
6115 [this](const source
& s
) {
6116 return s
.sc
.source_zone
== *source_zone
;
6119 if (sz
== sources
.end()) {
6120 ldpp_dout(this, 0) << "ERROR: failed to find source zone: "
6121 << *source_zone
<< dendl
;
6122 return tl::unexpected(-ENOENT
);
6125 ldpp_dout(this, 5) << "No source zone specified, using source zone: "
6126 << sz
->sc
.source_zone
<< dendl
;
6127 return tl::unexpected(-ENOENT
);
6129 uint64_t num_shards
, latest_gen
;
6130 auto ret
= remote_info(dpp
, *sz
, nullptr, &latest_gen
, &num_shards
);
6132 ldpp_dout(this, 5) << "Unable to get remote info: "
6134 return tl::unexpected(ret
);
6136 auto stack
= new RGWCoroutinesStack(driver
->ctx(), &cr_mgr
);
6137 std::vector
<rgw_bucket_sync_pair_info
> pairs(num_shards
);
6138 for (auto shard
= 0u; shard
< num_shards
; ++shard
) {
6139 auto& pair
= pairs
[shard
];
6140 pair
.source_bs
.bucket
= sz
->info
.bucket
;
6141 pair
.dest_bucket
= sz
->dest
;
6142 pair
.source_bs
.shard_id
= shard
;
6143 stack
->call(new RGWReadBucketPipeSyncStatusCoroutine(
6144 &sz
->sc
, pair
, &sync_status
[shard
],
6145 nullptr, latest_gen
));
6148 stacks
.push_back(stack
);
6150 ret
= cr_mgr
.run(dpp
, stacks
);
6152 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
6153 << bucket_str
{dest_bucket
} << dendl
;
6154 return tl::unexpected(ret
);
6160 namespace rgw::bucket_sync_run
{
6161 // Retry-loop over calls to sync_bucket_shard_cr
6162 class ShardCR
: public RGWCoroutine
{
6163 static constexpr auto allowed_retries
= 10u;
6166 const rgw_bucket_sync_pair_info
& pair
;
6168 unsigned retries
= 0;
6170 ceph::real_time prev_progress
;
6171 ceph::real_time progress
;
6175 ShardCR(RGWDataSyncCtx
& sc
, const rgw_bucket_sync_pair_info
& pair
,
6177 : RGWCoroutine(sc
.cct
), sc(sc
), pair(pair
), gen(gen
) {}
6179 int operate(const DoutPrefixProvider
*dpp
) override
{
6181 // Since all errors (except ECANCELED) are considered retryable,
6182 // retry other errors so long as we're making progress.
6183 for (retries
= 0u, retcode
= -EDOM
;
6184 (retries
< allowed_retries
) && (retcode
!= 0);
6186 ldpp_dout(dpp
, 5) << "ShardCR: syncing bucket shard on: "
6187 << "zone=" << sc
.source_zone
6188 << ", bucket=" << pair
.source_bs
.bucket
.name
6189 << ", shard=" << pair
.source_bs
.shard_id
6192 yield
call(sync_bucket_shard_cr(&sc
, nullptr, pair
, gen
,
6193 sc
.env
->sync_tracer
->root_node
,
6196 if (retcode
== -ECANCELED
) {
6197 ldpp_dout(dpp
, -1) << "ERROR: Got -ECANCELED for "
6198 << pair
.source_bs
<< dendl
;
6200 return set_cr_error(retcode
);
6201 } else if (retcode
< 0) {
6202 ldpp_dout(dpp
, 5) << "WARNING: Got error, retcode=" << retcode
<< " for "
6203 << pair
.source_bs
<< "on retry "
6204 << retries
+ 1 << " of " << allowed_retries
6205 << " allowed" << dendl
;
6206 // Reset the retry counter if we made any progress
6207 if (progress
!= prev_progress
) {
6210 prev_progress
= progress
;
6214 ldpp_dout(dpp
, -1) << "ERROR: Exhausted retries for "
6215 << pair
.source_bs
<< " retcode="
6216 << retcode
<< dendl
;
6218 return set_cr_error(retcode
);
6222 return set_cr_done();
6228 // Loop over calls to ShardCR with limited concurrency
6229 class GenCR
: public RGWShardCollectCR
{
6230 static constexpr auto MAX_CONCURRENT_SHARDS
= 64;
6235 std::vector
<rgw_bucket_sync_pair_info
> pairs
;
6236 decltype(pairs
)::const_iterator iter
;
6239 GenCR(RGWDataSyncCtx
& sc
, const rgw_bucket
& source
, const rgw_bucket
& dest
,
6240 const uint64_t gen
, const uint64_t shards
,
6241 const RGWBucketSyncFlowManager::pipe_handler
& handler
)
6242 : RGWShardCollectCR(sc
.cct
, MAX_CONCURRENT_SHARDS
),
6244 pairs
.resize(shards
);
6245 for (auto shard
= 0u; shard
< shards
; ++shard
) {
6246 auto& pair
= pairs
[shard
];
6247 pair
.handler
= handler
;
6248 pair
.source_bs
.bucket
= source
;
6249 pair
.dest_bucket
= dest
;
6250 pair
.source_bs
.shard_id
= shard
;
6252 iter
= pairs
.cbegin();
6253 assert(pairs
.size() == shards
);
6256 virtual bool spawn_next() override
{
6257 if (iter
== pairs
.cend()) {
6260 spawn(new ShardCR(sc
, *iter
, gen
), false);
6265 int handle_result(int r
) override
{
6267 ldpp_dout(sc
.env
->dpp
, 4) << "ERROR: Error syncing shard: "
6268 << cpp_strerror(r
) << dendl
;
6274 // Read sync status, loop over calls to GenCR
6275 class SourceCR
: public RGWCoroutine
{
6277 const RGWBucketInfo
& info
;
6278 const rgw_bucket
& dest
;
6279 const RGWBucketSyncFlowManager::pipe_handler
& handler
;
6280 const rgw_raw_obj status_obj
{
6281 sc
.env
->svc
->zone
->get_zone_params().log_pool
,
6282 RGWBucketPipeSyncStatusManager::full_status_oid(sc
.source_zone
, info
.bucket
,
6285 BucketSyncState state
= BucketSyncState::Incremental
;
6287 uint64_t num_shards
= 0;
6288 rgw_bucket_sync_status status
;
6289 std::string zone_name
;
6293 SourceCR(RGWDataSyncCtx
& sc
, const RGWBucketInfo
& info
,
6294 const rgw_bucket
& dest
,
6295 const RGWBucketSyncFlowManager::pipe_handler
& handler
,
6296 const std::string
& zone_name
)
6297 : RGWCoroutine(sc
.cct
), sc(sc
), info(info
), dest(dest
), handler(handler
),
6298 zone_name(zone_name
) {}
6300 int operate(const DoutPrefixProvider
*dpp
) override
{
6302 // Get the source's status. In incremental sync, this gives us
6303 // the generation and shard count that is next needed to be run.
6304 yield
call(new RGWSimpleRadosReadCR
<rgw_bucket_sync_status
>(
6305 dpp
, sc
.env
->driver
, status_obj
, &status
));
6307 ldpp_dout(dpp
, -1) << "ERROR: Unable to fetch status for zone="
6308 << sc
.source_zone
<< " retcode="
6309 << retcode
<< dendl
;
6311 return set_cr_error(retcode
);
6314 if (status
.state
== BucketSyncState::Stopped
) {
6316 pretty_print(sc
.env
, "Sync of bucket {} from source zone {} is in state Stopped. "
6317 "Nothing to do.\n", dest
.name
, zone_name
);
6318 ldpp_dout(dpp
, 5) << "SourceCR: Bucket is in state Stopped, returning."
6321 return set_cr_done();
6325 state
= status
.state
;
6326 gen
= status
.incremental_gen
;
6327 num_shards
= status
.shards_done_with_gen
.size();
6329 ldpp_dout(dpp
, 5) << "SourceCR: "
6330 << "state=" << state
6332 << ", num_shards=" << num_shards
6335 // Special case to handle full sync. Since full sync no longer
6336 // uses shards and has no generations, we sync shard zero,
6337 // though use the current generation so a following
6338 // incremental sync can carry on.
6339 if (state
!= BucketSyncState::Incremental
) {
6340 pretty_print(sc
.env
, "Beginning full sync of bucket {} from source zone {}.\n",
6341 dest
.name
, zone_name
);
6342 ldpp_dout(dpp
, 5) << "SourceCR: Calling GenCR with "
6344 << ", num_shards=" << 1
6346 yield
call(new GenCR(sc
, info
.bucket
, dest
, gen
, 1, handler
));
6348 pretty_print(sc
.env
, "Beginning incremental sync of bucket {}, generation {} from source zone {}.\n",
6349 dest
.name
, gen
, zone_name
);
6350 ldpp_dout(dpp
, 5) << "SourceCR: Calling GenCR with "
6352 << ", num_shards=" << num_shards
6354 yield
call(new GenCR(sc
, info
.bucket
, dest
, gen
, num_shards
,
6358 ldpp_dout(dpp
, -1) << "ERROR: Giving up syncing from "
6359 << sc
.source_zone
<< " retcode="
6360 << retcode
<< dendl
;
6362 return set_cr_error(retcode
);
6365 pretty_print(sc
.env
, "Completed.\n");
6367 yield
call(new RGWSimpleRadosReadCR
<rgw_bucket_sync_status
>(
6368 dpp
, sc
.env
->driver
, status_obj
, &status
));
6370 ldpp_dout(dpp
, -1) << "ERROR: Unable to fetch status for zone="
6371 << sc
.source_zone
<< " retcode="
6372 << retcode
<< dendl
;
6374 return set_cr_error(retcode
);
6376 // Repeat until we have done an incremental run and the
6377 // generation remains unchanged.
6378 ldpp_dout(dpp
, 5) << "SourceCR: "
6379 << "state=" << state
6381 << ", num_shards=" << num_shards
6382 << ", status.state=" << status
.state
6383 << ", status.incremental_gen=" << status
.incremental_gen
6384 << ", status.shards_done_with_gen.size()=" << status
.shards_done_with_gen
.size()
6386 } while (state
!= BucketSyncState::Incremental
||
6387 gen
!= status
.incremental_gen
);
6389 return set_cr_done();
6394 } // namespace rgw::bucket_sync_run
6396 int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider
*dpp
)
6398 list
<RGWCoroutinesStack
*> stacks
;
6399 for (auto& source
: sources
) {
6400 auto stack
= new RGWCoroutinesStack(driver
->ctx(), &cr_mgr
);
6401 stack
->call(new rgw::bucket_sync_run::SourceCR(
6402 source
.sc
, source
.info
, source
.dest
, source
.handler
,
6404 stacks
.push_back(stack
);
6406 auto ret
= cr_mgr
.run(dpp
, stacks
);
6408 ldpp_dout(this, 0) << "ERROR: Sync unsuccessful on bucket "
6409 << bucket_str
{dest_bucket
} << dendl
;
6414 unsigned RGWBucketPipeSyncStatusManager::get_subsys() const
6419 std::ostream
& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream
& out
) const
6421 auto zone
= std::string_view
{source_zone
.value_or(rgw_zone_id("*")).id
};
6422 return out
<< "bucket sync zone:" << zone
.substr(0, 8)
6423 << " bucket:" << dest_bucket
<< ' ';
6426 string
RGWBucketPipeSyncStatusManager::full_status_oid(const rgw_zone_id
& source_zone
,
6427 const rgw_bucket
& source_bucket
,
6428 const rgw_bucket
& dest_bucket
)
6430 if (source_bucket
== dest_bucket
) {
6431 return bucket_full_status_oid_prefix
+ "." + source_zone
.id
+ ":"
6432 + dest_bucket
.get_key();
6434 return bucket_full_status_oid_prefix
+ "." + source_zone
.id
+ ":"
6435 + dest_bucket
.get_key() + ":" + source_bucket
.get_key();
6439 inline std::string
generation_token(uint64_t gen
) {
6440 return (gen
== 0) ? "" : (":" + std::to_string(gen
));
6443 string
RGWBucketPipeSyncStatusManager::inc_status_oid(const rgw_zone_id
& source_zone
,
6444 const rgw_bucket_sync_pair_info
& sync_pair
,
6447 if (sync_pair
.source_bs
.bucket
== sync_pair
.dest_bucket
) {
6448 return bucket_status_oid_prefix
+ "." + source_zone
.id
+ ":" + sync_pair
.source_bs
.get_key() +
6449 generation_token(gen
);
6451 return bucket_status_oid_prefix
+ "." + source_zone
.id
+ ":" + sync_pair
.dest_bucket
.get_key() + ":" + sync_pair
.source_bs
.get_key() +
6452 generation_token(gen
);
6456 string
RGWBucketPipeSyncStatusManager::obj_status_oid(const rgw_bucket_sync_pipe
& sync_pipe
,
6457 const rgw_zone_id
& source_zone
,
6460 string prefix
= object_status_oid_prefix
+ "." + source_zone
.id
+ ":" + obj
.bucket
.get_key();
6461 if (sync_pipe
.source_bucket_info
.bucket
!=
6462 sync_pipe
.dest_bucket_info
.bucket
) {
6463 prefix
+= string("/") + sync_pipe
.dest_bucket_info
.bucket
.get_key();
6465 return prefix
+ ":" + obj
.key
.name
+ ":" + obj
.key
.instance
;
6468 int rgw_read_remote_bilog_info(const DoutPrefixProvider
*dpp
,
6470 const rgw_bucket
& bucket
,
6471 rgw_bucket_index_marker_info
& info
,
6472 BucketIndexShardsManager
& markers
,
6475 const auto instance_key
= bucket
.get_key();
6476 const rgw_http_param_pair params
[] = {
6477 { "type" , "bucket-index" },
6478 { "bucket-instance", instance_key
.c_str() },
6479 { "info" , nullptr },
6480 { nullptr, nullptr }
6482 int r
= conn
->get_json_resource(dpp
, "/admin/log/", params
, y
, info
);
6484 ldpp_dout(dpp
, -1) << "failed to fetch remote log markers: " << cpp_strerror(r
) << dendl
;
6487 // parse shard markers
6488 r
= markers
.from_string(info
.max_marker
, -1);
6490 ldpp_dout(dpp
, -1) << "failed to decode remote log markers" << dendl
;
6496 class RGWCollectBucketSyncStatusCR
: public RGWShardCollectCR
{
6497 static constexpr int max_concurrent_shards
= 16;
6498 rgw::sal::RadosStore
* const driver
;
6499 RGWDataSyncCtx
*const sc
;
6500 RGWDataSyncEnv
*const env
;
6503 rgw_bucket_sync_pair_info sync_pair
;
6504 using Vector
= std::vector
<rgw_bucket_shard_sync_info
>;
6505 Vector::iterator i
, end
;
6507 int handle_result(int r
) override
{
6508 if (r
== -ENOENT
) { // ENOENT is not a fatal error
6512 ldout(cct
, 4) << "failed to read bucket shard sync status: "
6513 << cpp_strerror(r
) << dendl
;
6518 RGWCollectBucketSyncStatusCR(rgw::sal::RadosStore
* driver
, RGWDataSyncCtx
*sc
,
6519 const rgw_bucket_sync_pair_info
& sync_pair
,
6522 : RGWShardCollectCR(sc
->cct
, max_concurrent_shards
),
6523 driver(driver
), sc(sc
), env(sc
->env
), gen(gen
), sync_pair(sync_pair
),
6524 i(status
->begin()), end(status
->end())
6527 bool spawn_next() override
{
6531 spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc
, sync_pair
, &*i
, nullptr, gen
), false);
6533 ++sync_pair
.source_bs
.shard_id
;
6538 int rgw_read_bucket_full_sync_status(const DoutPrefixProvider
*dpp
,
6539 rgw::sal::RadosStore
*driver
,
6540 const rgw_sync_bucket_pipe
& pipe
,
6541 rgw_bucket_sync_status
*status
,
6544 auto get_oid
= RGWBucketPipeSyncStatusManager::full_status_oid
;
6545 const rgw_raw_obj obj
{driver
->svc()->zone
->get_zone_params().log_pool
,
6546 get_oid(*pipe
.source
.zone
, *pipe
.source
.bucket
, *pipe
.dest
.bucket
)};
6548 auto svc
= driver
->svc()->sysobj
;
6549 auto sysobj
= svc
->get_obj(obj
);
6551 int ret
= sysobj
.rop().read(dpp
, &bl
, y
);
6556 auto iter
= bl
.cbegin();
6558 rgw_bucket_sync_status result
;
6559 decode(result
, iter
);
6562 } catch (const buffer::error
& err
) {
6563 lderr(svc
->ctx()) << "error decoding " << obj
<< ": " << err
.what() << dendl
;
6568 int rgw_read_bucket_inc_sync_status(const DoutPrefixProvider
*dpp
,
6569 rgw::sal::RadosStore
*driver
,
6570 const rgw_sync_bucket_pipe
& pipe
,
6572 std::vector
<rgw_bucket_shard_sync_info
> *status
)
6574 if (!pipe
.source
.zone
||
6575 !pipe
.source
.bucket
||
6577 !pipe
.dest
.bucket
) {
6581 rgw_bucket_sync_pair_info sync_pair
;
6582 sync_pair
.source_bs
.bucket
= *pipe
.source
.bucket
;
6583 sync_pair
.source_bs
.shard_id
= 0;
6584 sync_pair
.dest_bucket
= *pipe
.dest
.bucket
;
6587 RGWSyncModuleInstanceRef module
; // null sync module
6588 env
.init(dpp
, driver
->ctx(), driver
, driver
->svc(), driver
->svc()->rados
->get_async_processor(),
6589 nullptr, nullptr, nullptr, module
, nullptr);
6592 sc
.init(&env
, nullptr, *pipe
.source
.zone
);
6594 RGWCoroutinesManager
crs(driver
->ctx(), driver
->getRados()->get_cr_registry());
6595 return crs
.run(dpp
, new RGWCollectBucketSyncStatusCR(driver
, &sc
,
6601 void rgw_data_sync_info::generate_test_instances(list
<rgw_data_sync_info
*>& o
)
6603 auto info
= new rgw_data_sync_info
;
6604 info
->state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
6605 info
->num_shards
= 8;
6607 o
.push_back(new rgw_data_sync_info
);
6610 void rgw_data_sync_marker::generate_test_instances(list
<rgw_data_sync_marker
*>& o
)
6612 auto marker
= new rgw_data_sync_marker
;
6613 marker
->state
= rgw_data_sync_marker::IncrementalSync
;
6614 marker
->marker
= "01234";
6616 o
.push_back(marker
);
6617 o
.push_back(new rgw_data_sync_marker
);
6620 void rgw_data_sync_status::generate_test_instances(list
<rgw_data_sync_status
*>& o
)
6622 o
.push_back(new rgw_data_sync_status
);
6625 void rgw_bucket_shard_full_sync_marker::dump(Formatter
*f
) const
6627 encode_json("position", position
, f
);
6628 encode_json("count", count
, f
);
6631 void rgw_bucket_shard_inc_sync_marker::decode_json(JSONObj
*obj
)
6633 JSONDecoder::decode_json("position", position
, obj
);
6634 JSONDecoder::decode_json("timestamp", timestamp
, obj
);
6637 void rgw_bucket_shard_inc_sync_marker::dump(Formatter
*f
) const
6639 encode_json("position", position
, f
);
6640 encode_json("timestamp", timestamp
, f
);
6643 void rgw_bucket_shard_sync_info::decode_json(JSONObj
*obj
)
6646 JSONDecoder::decode_json("status", s
, obj
);
6647 if (s
== "full-sync") {
6648 state
= StateFullSync
;
6649 } else if (s
== "incremental-sync") {
6650 state
= StateIncrementalSync
;
6651 } else if (s
== "stopped") {
6652 state
= StateStopped
;
6656 JSONDecoder::decode_json("inc_marker", inc_marker
, obj
);
6659 void rgw_bucket_shard_full_sync_marker::decode_json(JSONObj
*obj
)
6661 JSONDecoder::decode_json("position", position
, obj
);
6662 JSONDecoder::decode_json("count", count
, obj
);
6665 void rgw_bucket_shard_sync_info::dump(Formatter
*f
) const
6667 const char *s
{nullptr};
6668 switch ((SyncState
)state
) {
6675 case StateIncrementalSync
:
6676 s
= "incremental-sync";
6685 encode_json("status", s
, f
);
6686 encode_json("inc_marker", inc_marker
, f
);
6689 void rgw_bucket_full_sync_status::decode_json(JSONObj
*obj
)
6691 JSONDecoder::decode_json("position", position
, obj
);
6692 JSONDecoder::decode_json("count", count
, obj
);
6695 void rgw_bucket_full_sync_status::dump(Formatter
*f
) const
6697 encode_json("position", position
, f
);
6698 encode_json("count", count
, f
);
6701 void encode_json(const char *name
, BucketSyncState state
, Formatter
*f
)
6704 case BucketSyncState::Init
:
6705 encode_json(name
, "init", f
);
6707 case BucketSyncState::Full
:
6708 encode_json(name
, "full-sync", f
);
6710 case BucketSyncState::Incremental
:
6711 encode_json(name
, "incremental-sync", f
);
6713 case BucketSyncState::Stopped
:
6714 encode_json(name
, "stopped", f
);
6717 encode_json(name
, "unknown", f
);
6722 void decode_json_obj(BucketSyncState
& state
, JSONObj
*obj
)
6725 decode_json_obj(s
, obj
);
6726 if (s
== "full-sync") {
6727 state
= BucketSyncState::Full
;
6728 } else if (s
== "incremental-sync") {
6729 state
= BucketSyncState::Incremental
;
6730 } else if (s
== "stopped") {
6731 state
= BucketSyncState::Stopped
;
6733 state
= BucketSyncState::Init
;
6737 void rgw_bucket_sync_status::decode_json(JSONObj
*obj
)
6739 JSONDecoder::decode_json("state", state
, obj
);
6740 JSONDecoder::decode_json("full", full
, obj
);
6741 JSONDecoder::decode_json("incremental_gen", incremental_gen
, obj
);
6744 void rgw_bucket_sync_status::dump(Formatter
*f
) const
6746 encode_json("state", state
, f
);
6747 encode_json("full", full
, f
);
6748 encode_json("incremental_gen", incremental_gen
, f
);
6752 void bilog_status_v2::dump(Formatter
*f
) const
6754 encode_json("sync_status", sync_status
, f
);
6755 encode_json("inc_status", inc_status
, f
);
6758 void bilog_status_v2::decode_json(JSONObj
*obj
)
6760 JSONDecoder::decode_json("sync_status", sync_status
, obj
);
6761 JSONDecoder::decode_json("inc_status", inc_status
, obj
);