1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/utility/string_ref.hpp>
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/errno.h"
13 #include "rgw_common.h"
14 #include "rgw_rados.h"
17 #include "rgw_data_sync.h"
18 #include "rgw_rest_conn.h"
19 #include "rgw_cr_rados.h"
20 #include "rgw_cr_rest.h"
21 #include "rgw_http_client.h"
22 #include "rgw_bucket.h"
23 #include "rgw_metadata.h"
24 #include "rgw_sync_counters.h"
25 #include "rgw_sync_module.h"
26 #include "rgw_sync_log_trim.h"
28 #include "cls/lock/cls_lock_client.h"
30 #include "services/svc_zone.h"
31 #include "services/svc_sync_modules.h"
33 #include "include/random.h"
35 #include <boost/asio/yield.hpp>
37 #define dout_subsys ceph_subsys_rgw
40 #define dout_prefix (*_dout << "data sync: ")
42 static string datalog_sync_status_oid_prefix
= "datalog.sync-status";
43 static string datalog_sync_status_shard_prefix
= "datalog.sync-status.shard";
44 static string datalog_sync_full_sync_index_prefix
= "data.full-sync.index";
45 static string bucket_status_oid_prefix
= "bucket.sync-status";
46 static string object_status_oid_prefix
= "bucket.sync-status";
49 void rgw_datalog_info::decode_json(JSONObj
*obj
) {
50 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
53 void rgw_datalog_entry::decode_json(JSONObj
*obj
) {
54 JSONDecoder::decode_json("key", key
, obj
);
56 JSONDecoder::decode_json("timestamp", ut
, obj
);
57 timestamp
= ut
.to_real_time();
60 void rgw_datalog_shard_data::decode_json(JSONObj
*obj
) {
61 JSONDecoder::decode_json("marker", marker
, obj
);
62 JSONDecoder::decode_json("truncated", truncated
, obj
);
63 JSONDecoder::decode_json("entries", entries
, obj
);
66 class RGWReadDataSyncStatusMarkersCR
: public RGWShardCollectCR
{
67 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
73 map
<uint32_t, rgw_data_sync_marker
>& markers
;
76 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv
*env
, int num_shards
,
77 map
<uint32_t, rgw_data_sync_marker
>& markers
)
78 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
79 env(env
), num_shards(num_shards
), markers(markers
)
81 bool spawn_next() override
;
84 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
86 if (shard_id
>= num_shards
) {
89 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
90 spawn(new CR(env
->async_rados
, env
->store
->svc
.sysobj
,
91 rgw_raw_obj(env
->store
->svc
.zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(env
->source_zone
, shard_id
)),
98 class RGWReadDataSyncRecoveringShardsCR
: public RGWShardCollectCR
{
99 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
103 uint64_t max_entries
;
108 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
>& omapkeys
;
111 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv
*env
, uint64_t _max_entries
, int _num_shards
,
112 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
>& omapkeys
)
113 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
), env(env
),
114 max_entries(_max_entries
), num_shards(_num_shards
), omapkeys(omapkeys
)
116 bool spawn_next() override
;
119 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
121 if (shard_id
>= num_shards
)
124 string error_oid
= RGWDataSyncStatusManager::shard_obj_name(env
->source_zone
, shard_id
) + ".retry";
125 auto& shard_keys
= omapkeys
[shard_id
];
126 shard_keys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
127 spawn(new RGWRadosGetOmapKeysCR(env
->store
, rgw_raw_obj(env
->store
->svc
.zone
->get_zone_params().log_pool
, error_oid
),
128 marker
, max_entries
, shard_keys
), false);
134 class RGWReadDataSyncStatusCoroutine
: public RGWCoroutine
{
135 RGWDataSyncEnv
*sync_env
;
136 rgw_data_sync_status
*sync_status
;
139 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
140 rgw_data_sync_status
*_status
)
141 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
143 int operate() override
;
146 int RGWReadDataSyncStatusCoroutine::operate()
150 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_data_sync_info
>;
152 bool empty_on_enoent
= false; // fail on ENOENT
153 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
->svc
.sysobj
,
154 rgw_raw_obj(sync_env
->store
->svc
.zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
155 &sync_status
->sync_info
, empty_on_enoent
));
158 ldout(sync_env
->cct
, 4) << "failed to read sync status info with "
159 << cpp_strerror(retcode
) << dendl
;
160 return set_cr_error(retcode
);
162 // read shard markers
163 using ReadMarkersCR
= RGWReadDataSyncStatusMarkersCR
;
164 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
165 sync_status
->sync_markers
));
167 ldout(sync_env
->cct
, 4) << "failed to read sync status markers with "
168 << cpp_strerror(retcode
) << dendl
;
169 return set_cr_error(retcode
);
171 return set_cr_done();
176 class RGWReadRemoteDataLogShardInfoCR
: public RGWCoroutine
{
177 RGWDataSyncEnv
*sync_env
;
179 RGWRESTReadResource
*http_op
;
182 RGWDataChangesLogInfo
*shard_info
;
185 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv
*_sync_env
,
186 int _shard_id
, RGWDataChangesLogInfo
*_shard_info
) : RGWCoroutine(_sync_env
->cct
),
190 shard_info(_shard_info
) {
193 ~RGWReadRemoteDataLogShardInfoCR() override
{
199 int operate() override
{
203 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
204 rgw_http_param_pair pairs
[] = { { "type" , "data" },
209 string p
= "/admin/log/";
211 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
213 init_new_io(http_op
);
215 int ret
= http_op
->aio_read();
217 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
218 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
219 return set_cr_error(ret
);
225 int ret
= http_op
->wait(shard_info
);
227 return set_cr_error(ret
);
229 return set_cr_done();
236 struct read_remote_data_log_response
{
239 list
<rgw_data_change_log_entry
> entries
;
241 read_remote_data_log_response() : truncated(false) {}
243 void decode_json(JSONObj
*obj
) {
244 JSONDecoder::decode_json("marker", marker
, obj
);
245 JSONDecoder::decode_json("truncated", truncated
, obj
);
246 JSONDecoder::decode_json("entries", entries
, obj
);
250 class RGWReadRemoteDataLogShardCR
: public RGWCoroutine
{
251 RGWDataSyncEnv
*sync_env
;
253 RGWRESTReadResource
*http_op
= nullptr;
256 const std::string
& marker
;
257 string
*pnext_marker
;
258 list
<rgw_data_change_log_entry
> *entries
;
261 read_remote_data_log_response response
;
262 std::optional
<PerfGuard
> timer
;
265 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv
*_sync_env
, int _shard_id
,
266 const std::string
& marker
, string
*pnext_marker
,
267 list
<rgw_data_change_log_entry
> *_entries
,
269 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
270 shard_id(_shard_id
), marker(marker
), pnext_marker(pnext_marker
),
271 entries(_entries
), truncated(_truncated
) {
273 ~RGWReadRemoteDataLogShardCR() override
{
279 int operate() override
{
283 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
284 rgw_http_param_pair pairs
[] = { { "type" , "data" },
286 { "marker", marker
.c_str() },
287 { "extra-info", "true" },
290 string p
= "/admin/log/";
292 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
294 init_new_io(http_op
);
296 if (sync_env
->counters
) {
297 timer
.emplace(sync_env
->counters
, sync_counters::l_poll
);
299 int ret
= http_op
->aio_read();
301 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
302 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
303 if (sync_env
->counters
) {
304 sync_env
->counters
->inc(sync_counters::l_poll_err
);
306 return set_cr_error(ret
);
313 int ret
= http_op
->wait(&response
);
315 if (sync_env
->counters
&& ret
!= -ENOENT
) {
316 sync_env
->counters
->inc(sync_counters::l_poll_err
);
318 return set_cr_error(ret
);
321 entries
->swap(response
.entries
);
322 *pnext_marker
= response
.marker
;
323 *truncated
= response
.truncated
;
324 return set_cr_done();
331 class RGWReadRemoteDataLogInfoCR
: public RGWShardCollectCR
{
332 RGWDataSyncEnv
*sync_env
;
335 map
<int, RGWDataChangesLogInfo
> *datalog_info
;
338 #define READ_DATALOG_MAX_CONCURRENT 10
341 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv
*_sync_env
,
343 map
<int, RGWDataChangesLogInfo
> *_datalog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
344 sync_env(_sync_env
), num_shards(_num_shards
),
345 datalog_info(_datalog_info
), shard_id(0) {}
346 bool spawn_next() override
;
349 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
350 if (shard_id
>= num_shards
) {
353 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, shard_id
, &(*datalog_info
)[shard_id
]), false);
358 class RGWListRemoteDataLogShardCR
: public RGWSimpleCoroutine
{
359 RGWDataSyncEnv
*sync_env
;
360 RGWRESTReadResource
*http_op
;
364 uint32_t max_entries
;
365 rgw_datalog_shard_data
*result
;
368 RGWListRemoteDataLogShardCR(RGWDataSyncEnv
*env
, int _shard_id
,
369 const string
& _marker
, uint32_t _max_entries
,
370 rgw_datalog_shard_data
*_result
)
371 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
372 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
374 int send_request() override
{
375 RGWRESTConn
*conn
= sync_env
->conn
;
376 RGWRados
*store
= sync_env
->store
;
379 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
381 char max_entries_buf
[32];
382 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
384 const char *marker_key
= (marker
.empty() ? "" : "marker");
386 rgw_http_param_pair pairs
[] = { { "type", "data" },
388 { "max-entries", max_entries_buf
},
389 { marker_key
, marker
.c_str() },
392 string p
= "/admin/log/";
394 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
395 init_new_io(http_op
);
397 int ret
= http_op
->aio_read();
399 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
400 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
408 int request_complete() override
{
409 int ret
= http_op
->wait(result
);
411 if (ret
< 0 && ret
!= -ENOENT
) {
412 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret
<< dendl
;
419 class RGWListRemoteDataLogCR
: public RGWShardCollectCR
{
420 RGWDataSyncEnv
*sync_env
;
422 map
<int, string
> shards
;
423 int max_entries_per_shard
;
424 map
<int, rgw_datalog_shard_data
> *result
;
426 map
<int, string
>::iterator iter
;
427 #define READ_DATALOG_MAX_CONCURRENT 10
430 RGWListRemoteDataLogCR(RGWDataSyncEnv
*_sync_env
,
431 map
<int, string
>& _shards
,
432 int _max_entries_per_shard
,
433 map
<int, rgw_datalog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
434 sync_env(_sync_env
), max_entries_per_shard(_max_entries_per_shard
),
436 shards
.swap(_shards
);
437 iter
= shards
.begin();
439 bool spawn_next() override
;
442 bool RGWListRemoteDataLogCR::spawn_next() {
443 if (iter
== shards
.end()) {
447 spawn(new RGWListRemoteDataLogShardCR(sync_env
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
452 class RGWInitDataSyncStatusCoroutine
: public RGWCoroutine
{
453 static constexpr uint32_t lock_duration
= 30;
454 RGWDataSyncEnv
*sync_env
;
456 const rgw_pool
& pool
;
457 const uint32_t num_shards
;
459 string sync_status_oid
;
463 rgw_data_sync_status
*status
;
464 map
<int, RGWDataChangesLogInfo
> shards_info
;
466 RGWSyncTraceNodeRef tn
;
468 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
, uint32_t num_shards
,
469 uint64_t instance_id
,
470 RGWSyncTraceNodeRef
& _tn_parent
,
471 rgw_data_sync_status
*status
)
472 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), store(sync_env
->store
),
473 pool(store
->svc
.zone
->get_zone_params().log_pool
),
474 num_shards(num_shards
), status(status
),
475 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "init_data_sync_status")) {
476 lock_name
= "sync_lock";
478 status
->sync_info
.instance_id
= instance_id
;
480 #define COOKIE_LEN 16
481 char buf
[COOKIE_LEN
+ 1];
483 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
486 sync_status_oid
= RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
);
490 int operate() override
{
493 using LockCR
= RGWSimpleRadosLockCR
;
494 yield
call(new LockCR(sync_env
->async_rados
, store
,
495 rgw_raw_obj
{pool
, sync_status_oid
},
496 lock_name
, cookie
, lock_duration
));
498 tn
->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid
));
499 return set_cr_error(retcode
);
501 using WriteInfoCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_info
>;
502 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
->svc
.sysobj
,
503 rgw_raw_obj
{pool
, sync_status_oid
},
506 tn
->log(0, SSTR("ERROR: failed to write sync status info with " << retcode
));
507 return set_cr_error(retcode
);
510 /* take lock again, we just recreated the object */
511 yield
call(new LockCR(sync_env
->async_rados
, store
,
512 rgw_raw_obj
{pool
, sync_status_oid
},
513 lock_name
, cookie
, lock_duration
));
515 tn
->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid
));
516 return set_cr_error(retcode
);
519 tn
->log(10, "took lease");
521 /* fetch current position in logs */
523 RGWRESTConn
*conn
= store
->svc
.zone
->get_zone_conn_by_id(sync_env
->source_zone
);
525 tn
->log(0, SSTR("ERROR: connection to zone " << sync_env
->source_zone
<< " does not exist!"));
526 return set_cr_error(-EIO
);
528 for (uint32_t i
= 0; i
< num_shards
; i
++) {
529 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, i
, &shards_info
[i
]), true);
532 while (collect(&ret
, NULL
)) {
534 tn
->log(0, SSTR("ERROR: failed to read remote data log shards"));
535 return set_state(RGWCoroutine_Error
);
540 for (uint32_t i
= 0; i
< num_shards
; i
++) {
541 RGWDataChangesLogInfo
& info
= shards_info
[i
];
542 auto& marker
= status
->sync_markers
[i
];
543 marker
.next_step_marker
= info
.marker
;
544 marker
.timestamp
= info
.last_update
;
545 const auto& oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, i
);
546 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>;
547 spawn(new WriteMarkerCR(sync_env
->async_rados
, store
->svc
.sysobj
,
548 rgw_raw_obj
{pool
, oid
}, marker
), true);
551 while (collect(&ret
, NULL
)) {
553 tn
->log(0, SSTR("ERROR: failed to write data sync status markers"));
554 return set_state(RGWCoroutine_Error
);
559 status
->sync_info
.state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
560 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
->svc
.sysobj
,
561 rgw_raw_obj
{pool
, sync_status_oid
},
564 tn
->log(0, SSTR("ERROR: failed to write sync status info with " << retcode
));
565 return set_cr_error(retcode
);
567 yield
call(new RGWSimpleRadosUnlockCR(sync_env
->async_rados
, store
,
568 rgw_raw_obj
{pool
, sync_status_oid
},
570 return set_cr_done();
576 int RGWRemoteDataLog::read_log_info(rgw_datalog_info
*log_info
)
578 rgw_http_param_pair pairs
[] = { { "type", "data" },
581 int ret
= sync_env
.conn
->get_json_resource("/admin/log", pairs
, *log_info
);
583 ldpp_dout(dpp
, 0) << "ERROR: failed to fetch datalog info" << dendl
;
587 ldpp_dout(dpp
, 20) << "remote datalog, num_shards=" << log_info
->num_shards
<< dendl
;
592 int RGWRemoteDataLog::read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
)
594 rgw_datalog_info log_info
;
595 int ret
= read_log_info(&log_info
);
600 return run(new RGWReadRemoteDataLogInfoCR(&sync_env
, log_info
.num_shards
, shards_info
));
603 int RGWRemoteDataLog::read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
)
605 return run(new RGWListRemoteDataLogCR(&sync_env
, shard_markers
, 1, result
));
608 int RGWRemoteDataLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
,
609 RGWSyncTraceManager
*_sync_tracer
, RGWSyncModuleInstanceRef
& _sync_module
,
610 PerfCounters
* counters
)
612 sync_env
.init(dpp
, store
->ctx(), store
, _conn
, async_rados
, &http_manager
, _error_logger
,
613 _sync_tracer
, _source_zone
, _sync_module
, counters
);
619 int ret
= http_manager
.start();
621 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
625 tn
= sync_env
.sync_tracer
->add_node(sync_env
.sync_tracer
->root_node
, "data");
632 void RGWRemoteDataLog::finish()
637 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status
*sync_status
)
639 // cannot run concurrently with run_sync(), so run in a separate manager
640 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
641 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
642 int ret
= http_manager
.start();
644 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
647 RGWDataSyncEnv sync_env_local
= sync_env
;
648 sync_env_local
.http_manager
= &http_manager
;
649 ret
= crs
.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local
, sync_status
));
654 int RGWRemoteDataLog::read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
)
656 // cannot run concurrently with run_sync(), so run in a separate manager
657 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
658 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
659 int ret
= http_manager
.start();
661 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
664 RGWDataSyncEnv sync_env_local
= sync_env
;
665 sync_env_local
.http_manager
= &http_manager
;
666 std::vector
<RGWRadosGetOmapKeysCR::ResultPtr
> omapkeys
;
667 omapkeys
.resize(num_shards
);
668 uint64_t max_entries
{1};
669 ret
= crs
.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local
, max_entries
, num_shards
, omapkeys
));
673 for (int i
= 0; i
< num_shards
; i
++) {
674 if (omapkeys
[i
]->entries
.size() != 0) {
675 recovering_shards
.insert(i
);
683 int RGWRemoteDataLog::init_sync_status(int num_shards
)
685 rgw_data_sync_status sync_status
;
686 sync_status
.sync_info
.num_shards
= num_shards
;
688 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
689 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
690 int ret
= http_manager
.start();
692 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
695 RGWDataSyncEnv sync_env_local
= sync_env
;
696 sync_env_local
.http_manager
= &http_manager
;
697 auto instance_id
= ceph::util::generate_random_number
<uint64_t>();
698 ret
= crs
.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local
, num_shards
, instance_id
, tn
, &sync_status
));
703 static string
full_data_sync_index_shard_oid(const string
& source_zone
, int shard_id
)
705 char buf
[datalog_sync_full_sync_index_prefix
.size() + 1 + source_zone
.size() + 1 + 16];
706 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_full_sync_index_prefix
.c_str(), source_zone
.c_str(), shard_id
);
710 struct read_metadata_list
{
716 read_metadata_list() : truncated(false), count(0) {}
718 void decode_json(JSONObj
*obj
) {
719 JSONDecoder::decode_json("marker", marker
, obj
);
720 JSONDecoder::decode_json("truncated", truncated
, obj
);
721 JSONDecoder::decode_json("keys", keys
, obj
);
722 JSONDecoder::decode_json("count", count
, obj
);
726 struct bucket_instance_meta_info
{
730 RGWBucketInstanceMetadataObject data
;
732 bucket_instance_meta_info() {}
734 void decode_json(JSONObj
*obj
) {
735 JSONDecoder::decode_json("key", key
, obj
);
736 JSONDecoder::decode_json("ver", ver
, obj
);
737 JSONDecoder::decode_json("mtime", mtime
, obj
);
738 JSONDecoder::decode_json("data", data
, obj
);
742 class RGWListBucketIndexesCR
: public RGWCoroutine
{
743 RGWDataSyncEnv
*sync_env
;
747 rgw_data_sync_status
*sync_status
;
753 list
<string
>::iterator iter
;
755 RGWShardedOmapCRManager
*entries_index
;
760 bucket_instance_meta_info meta_info
;
767 read_metadata_list result
;
770 RGWListBucketIndexesCR(RGWDataSyncEnv
*_sync_env
,
771 rgw_data_sync_status
*_sync_status
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
772 store(sync_env
->store
), sync_status(_sync_status
),
773 req_ret(0), ret(0), entries_index(NULL
), i(0), failed(false), truncated(false) {
774 oid_prefix
= datalog_sync_full_sync_index_prefix
+ "." + sync_env
->source_zone
;
775 path
= "/admin/metadata/bucket.instance";
776 num_shards
= sync_status
->sync_info
.num_shards
;
778 ~RGWListBucketIndexesCR() override
{
779 delete entries_index
;
782 int operate() override
{
784 entries_index
= new RGWShardedOmapCRManager(sync_env
->async_rados
, store
, this, num_shards
,
785 store
->svc
.zone
->get_zone_params().log_pool
,
787 yield
; // yield so OmapAppendCRs can start
791 string entrypoint
= string("/admin/metadata/bucket.instance");
793 rgw_http_param_pair pairs
[] = {{"max-entries", "1000"},
794 {"marker", result
.marker
.c_str()},
797 call(new RGWReadRESTResourceCR
<read_metadata_list
>(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
,
798 entrypoint
, pairs
, &result
));
801 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl
;
802 return set_cr_error(retcode
);
805 for (iter
= result
.keys
.begin(); iter
!= result
.keys
.end(); ++iter
) {
806 ldout(sync_env
->cct
, 20) << "list metadata: section=bucket.instance key=" << *iter
<< dendl
;
810 rgw_http_param_pair pairs
[] = {{"key", key
.c_str()},
813 call(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
, path
, pairs
, &meta_info
));
816 num_shards
= meta_info
.data
.get_bucket_info().num_shards
;
817 if (num_shards
> 0) {
818 for (i
= 0; i
< num_shards
; i
++) {
820 snprintf(buf
, sizeof(buf
), ":%d", i
);
822 yield entries_index
->append(s
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, i
));
825 yield entries_index
->append(key
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, -1));
828 truncated
= result
.truncated
;
832 if (!entries_index
->finish()) {
837 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
->sync_markers
.begin(); iter
!= sync_status
->sync_markers
.end(); ++iter
) {
838 int shard_id
= (int)iter
->first
;
839 rgw_data_sync_marker
& marker
= iter
->second
;
840 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
841 spawn(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
->svc
.sysobj
,
842 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
847 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
848 EIO
, string("failed to build bucket instances map")));
850 while (collect(&ret
, NULL
)) {
852 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
853 -ret
, string("failed to store sync status: ") + cpp_strerror(-ret
)));
861 yield
return set_cr_error(req_ret
);
863 yield
return set_cr_done();
869 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
871 class RGWDataSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
872 RGWDataSyncEnv
*sync_env
;
875 rgw_data_sync_marker sync_marker
;
877 map
<string
, string
> key_to_marker
;
878 map
<string
, string
> marker_to_key
;
880 void handle_finish(const string
& marker
) override
{
881 map
<string
, string
>::iterator iter
= marker_to_key
.find(marker
);
882 if (iter
== marker_to_key
.end()) {
885 key_to_marker
.erase(iter
->second
);
886 reset_need_retry(iter
->second
);
887 marker_to_key
.erase(iter
);
890 RGWSyncTraceNodeRef tn
;
893 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
894 const string
& _marker_oid
,
895 const rgw_data_sync_marker
& _marker
,
896 RGWSyncTraceNodeRef
& _tn
) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW
),
898 marker_oid(_marker_oid
),
899 sync_marker(_marker
),
902 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
903 sync_marker
.marker
= new_marker
;
904 sync_marker
.pos
= index_pos
;
905 sync_marker
.timestamp
= timestamp
;
907 tn
->log(20, SSTR("updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
));
908 RGWRados
*store
= sync_env
->store
;
910 return new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
->svc
.sysobj
,
911 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, marker_oid
),
916 * create index from key -> marker, and from marker -> key
917 * this is useful so that we can insure that we only have one
918 * entry for any key that is used. This is needed when doing
919 * incremenatl sync of data, and we don't want to run multiple
920 * concurrent sync operations for the same bucket shard
922 bool index_key_to_marker(const string
& key
, const string
& marker
) {
923 if (key_to_marker
.find(key
) != key_to_marker
.end()) {
927 key_to_marker
[key
] = marker
;
928 marker_to_key
[marker
] = key
;
932 RGWOrderCallCR
*allocate_order_control_cr() override
{
933 return new RGWLastCallerWinsCR(sync_env
->cct
);
937 // ostream wrappers to print buckets without copying strings
940 explicit bucket_str(const rgw_bucket
& b
) : b(b
) {}
942 std::ostream
& operator<<(std::ostream
& out
, const bucket_str
& rhs
) {
944 if (!b
.tenant
.empty()) {
945 out
<< b
.tenant
<< '/';
948 if (!b
.bucket_id
.empty()) {
949 out
<< ':' << b
.bucket_id
;
954 struct bucket_str_noinstance
{
956 explicit bucket_str_noinstance(const rgw_bucket
& b
) : b(b
) {}
958 std::ostream
& operator<<(std::ostream
& out
, const bucket_str_noinstance
& rhs
) {
960 if (!b
.tenant
.empty()) {
961 out
<< b
.tenant
<< '/';
967 struct bucket_shard_str
{
968 const rgw_bucket_shard
& bs
;
969 explicit bucket_shard_str(const rgw_bucket_shard
& bs
) : bs(bs
) {}
971 std::ostream
& operator<<(std::ostream
& out
, const bucket_shard_str
& rhs
) {
973 out
<< bucket_str
{bs
.bucket
};
974 if (bs
.shard_id
>= 0) {
975 out
<< ':' << bs
.shard_id
;
980 class RGWRunBucketSyncCoroutine
: public RGWCoroutine
{
981 RGWDataSyncEnv
*sync_env
;
983 RGWBucketInfo bucket_info
;
984 rgw_bucket_shard_sync_info sync_status
;
985 RGWMetaSyncEnv meta_sync_env
;
987 const std::string status_oid
;
989 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
990 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
992 RGWSyncTraceNodeRef tn
;
995 RGWRunBucketSyncCoroutine(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
, const RGWSyncTraceNodeRef
& _tn_parent
)
996 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
997 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
998 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "bucket",
999 SSTR(bucket_shard_str
{bs
}))) {
1001 ~RGWRunBucketSyncCoroutine() override
{
1007 int operate() override
;
1010 class RGWDataSyncSingleEntryCR
: public RGWCoroutine
{
1011 RGWDataSyncEnv
*sync_env
;
1014 string entry_marker
;
1016 rgw_bucket_shard bs
;
1022 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1024 boost::intrusive_ptr
<RGWOmapAppend
> error_repo
;
1025 bool remove_from_repo
;
1029 RGWSyncTraceNodeRef tn
;
1031 RGWDataSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
1032 const string
& _raw_key
, const string
& _entry_marker
, RGWDataSyncShardMarkerTrack
*_marker_tracker
,
1033 RGWOmapAppend
*_error_repo
, bool _remove_from_repo
, const RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
),
1034 sync_env(_sync_env
),
1035 raw_key(_raw_key
), entry_marker(_entry_marker
),
1037 marker_tracker(_marker_tracker
),
1038 error_repo(_error_repo
), remove_from_repo(_remove_from_repo
) {
1039 set_description() << "data sync single entry (source_zone=" << sync_env
->source_zone
<< ") key=" <<_raw_key
<< " entry=" << entry_marker
;
1040 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", raw_key
);
1043 int operate() override
{
1047 int ret
= rgw_bucket_parse_bucket_key(sync_env
->cct
, raw_key
,
1048 &bs
.bucket
, &bs
.shard_id
);
1050 return set_cr_error(-EIO
);
1052 if (marker_tracker
) {
1053 marker_tracker
->reset_need_retry(raw_key
);
1055 tn
->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str
{bs
}));
1056 call(new RGWRunBucketSyncCoroutine(sync_env
, bs
, tn
));
1058 } while (marker_tracker
&& marker_tracker
->need_retry(raw_key
));
1060 sync_status
= retcode
;
1062 if (sync_status
== -ENOENT
) {
1063 // this was added when 'tenant/' was added to datalog entries, because
1064 // preexisting tenant buckets could never sync and would stay in the
1065 // error_repo forever
1066 tn
->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key
));
1070 if (sync_status
< 0) {
1071 // write actual sync failures for 'radosgw-admin sync error list'
1072 if (sync_status
!= -EBUSY
&& sync_status
!= -EAGAIN
) {
1073 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", raw_key
,
1074 -sync_status
, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status
)));
1076 tn
->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode
));
1079 if (error_repo
&& !error_repo
->append(raw_key
)) {
1080 tn
->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode
));
1082 } else if (error_repo
&& remove_from_repo
) {
1084 yield
call(new RGWRadosRemoveOmapKeysCR(sync_env
->store
, error_repo
->get_obj(), keys
));
1086 tn
->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1087 << error_repo
->get_obj() << " retcode=" << retcode
));
1090 /* FIXME: what do do in case of error */
1091 if (marker_tracker
&& !entry_marker
.empty()) {
1093 yield
call(marker_tracker
->finish(entry_marker
));
1095 if (sync_status
== 0) {
1096 sync_status
= retcode
;
1098 if (sync_status
< 0) {
1099 return set_cr_error(sync_status
);
1101 return set_cr_done();
1107 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1108 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1110 class RGWDataSyncShardCR
: public RGWCoroutine
{
1111 RGWDataSyncEnv
*sync_env
;
1116 rgw_data_sync_marker sync_marker
;
1118 RGWRadosGetOmapKeysCR::ResultPtr omapkeys
;
1119 std::set
<std::string
> entries
;
1120 std::set
<std::string
>::iterator iter
;
1124 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1126 std::string next_marker
;
1127 list
<rgw_data_change_log_entry
> log_entries
;
1128 list
<rgw_data_change_log_entry
>::iterator log_iter
;
1134 boost::asio::coroutine incremental_cr
;
1135 boost::asio::coroutine full_cr
;
1138 set
<string
> modified_shards
;
1139 set
<string
> current_modified
;
1141 set
<string
>::iterator modified_iter
;
1147 bool *reset_backoff
;
1149 set
<string
> spawned_keys
;
1151 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1152 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1157 RGWOmapAppend
*error_repo
;
1158 std::set
<std::string
> error_entries
;
1159 string error_marker
;
1160 int max_error_entries
;
1162 ceph::coarse_real_time error_retry_time
;
1164 #define RETRY_BACKOFF_SECS_MIN 60
1165 #define RETRY_BACKOFF_SECS_DEFAULT 60
1166 #define RETRY_BACKOFF_SECS_MAX 600
1167 uint32_t retry_backoff_secs
;
1169 RGWSyncTraceNodeRef tn
;
1171 RGWDataSyncShardCR(RGWDataSyncEnv
*_sync_env
,
1173 uint32_t _shard_id
, const rgw_data_sync_marker
& _marker
,
1174 RGWSyncTraceNodeRef
& _tn
,
1175 bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1176 sync_env(_sync_env
),
1178 shard_id(_shard_id
),
1179 sync_marker(_marker
),
1180 marker_tracker(NULL
), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
1181 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW
), reset_backoff(NULL
),
1182 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES
),
1183 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT
), tn(_tn
) {
1184 set_description() << "data sync shard source_zone=" << sync_env
->source_zone
<< " shard_id=" << shard_id
;
1185 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
);
1186 error_oid
= status_oid
+ ".retry";
1189 ~RGWDataSyncShardCR() override
{
1190 delete marker_tracker
;
1199 void append_modified_shards(set
<string
>& keys
) {
1200 Mutex::Locker
l(inc_lock
);
1201 modified_shards
.insert(keys
.begin(), keys
.end());
1204 void set_marker_tracker(RGWDataSyncShardMarkerTrack
*mt
) {
1205 delete marker_tracker
;
1206 marker_tracker
= mt
;
1209 int operate() override
{
1212 switch (sync_marker
.state
) {
1213 case rgw_data_sync_marker::FullSync
:
1217 tn
->log(10, SSTR("full sync failed (r=" << r
<< ")"));
1219 return set_cr_error(r
);
1222 case rgw_data_sync_marker::IncrementalSync
:
1223 r
= incremental_sync();
1226 tn
->log(10, SSTR("incremental sync failed (r=" << r
<< ")"));
1228 return set_cr_error(r
);
1232 return set_cr_error(-EIO
);
1238 void init_lease_cr() {
1239 set_status("acquiring sync lock");
1240 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1241 string lock_name
= "sync_lock";
1245 RGWRados
*store
= sync_env
->store
;
1246 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1247 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, status_oid
),
1248 lock_name
, lock_duration
, this));
1249 lease_stack
.reset(spawn(lease_cr
.get(), false));
1253 #define OMAP_GET_MAX_ENTRIES 100
1254 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1256 tn
->log(10, "start full sync");
1257 yield
init_lease_cr();
1258 while (!lease_cr
->is_locked()) {
1259 if (lease_cr
->is_done()) {
1260 tn
->log(5, "failed to take lease");
1261 set_status("lease lock failed, early abort");
1263 return set_cr_error(lease_cr
->get_ret_status());
1268 tn
->log(10, "took lease");
1269 oid
= full_data_sync_index_shard_oid(sync_env
->source_zone
, shard_id
);
1270 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
, tn
));
1271 total_entries
= sync_marker
.pos
;
1273 if (!lease_cr
->is_locked()) {
1274 stop_spawned_services();
1276 return set_cr_error(-ECANCELED
);
1278 omapkeys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
1279 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
),
1280 sync_marker
.marker
, max_entries
, omapkeys
));
1282 tn
->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode
));
1283 lease_cr
->go_down();
1285 return set_cr_error(retcode
);
1287 entries
= std::move(omapkeys
->entries
);
1288 if (entries
.size() > 0) {
1289 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1291 tn
->log(20, SSTR("retrieved " << entries
.size() << " entries to sync"));
1292 iter
= entries
.begin();
1293 for (; iter
!= entries
.end(); ++iter
) {
1294 tn
->log(20, SSTR("full sync: " << *iter
));
1296 if (!marker_tracker
->start(*iter
, total_entries
, real_time())) {
1297 tn
->log(0, SSTR("ERROR: cannot start syncing " << *iter
<< ". Duplicate entry?"));
1299 // fetch remote and write locally
1300 yield
spawn(new RGWDataSyncSingleEntryCR(sync_env
, *iter
, *iter
, marker_tracker
, error_repo
, false, tn
), false);
1302 sync_marker
.marker
= *iter
;
1304 while ((int)num_spawned() > spawn_window
) {
1305 set_status() << "num_spawned() > spawn_window";
1306 yield
wait_for_child();
1308 while (collect(&ret
, lease_stack
.get())) {
1310 tn
->log(10, "a sync operation returned error");
1315 } while (omapkeys
->more
);
1318 drain_all_but_stack(lease_stack
.get());
1320 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1323 /* update marker to reflect we're done with full sync */
1324 sync_marker
.state
= rgw_data_sync_marker::IncrementalSync
;
1325 sync_marker
.marker
= sync_marker
.next_step_marker
;
1326 sync_marker
.next_step_marker
.clear();
1327 RGWRados
*store
= sync_env
->store
;
1328 call(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
->svc
.sysobj
,
1329 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, status_oid
),
1333 tn
->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode
));
1334 lease_cr
->go_down();
1336 return set_cr_error(retcode
);
1338 // keep lease and transition to incremental_sync()
1343 int incremental_sync() {
1344 reenter(&incremental_cr
) {
1345 tn
->log(10, "start incremental sync");
1347 tn
->log(10, "lease already held from full sync");
1349 yield
init_lease_cr();
1350 while (!lease_cr
->is_locked()) {
1351 if (lease_cr
->is_done()) {
1352 tn
->log(5, "failed to take lease");
1353 set_status("lease lock failed, early abort");
1355 return set_cr_error(lease_cr
->get_ret_status());
1360 set_status("lease acquired");
1361 tn
->log(10, "took lease");
1363 error_repo
= new RGWOmapAppend(sync_env
->async_rados
, sync_env
->store
,
1364 rgw_raw_obj(pool
, error_oid
),
1367 spawn(error_repo
, false);
1368 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
, tn
));
1370 if (!lease_cr
->is_locked()) {
1371 stop_spawned_services();
1373 return set_cr_error(-ECANCELED
);
1375 current_modified
.clear();
1377 current_modified
.swap(modified_shards
);
1380 if (current_modified
.size() > 0) {
1381 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1383 /* process out of band updates */
1384 for (modified_iter
= current_modified
.begin(); modified_iter
!= current_modified
.end(); ++modified_iter
) {
1386 tn
->log(20, SSTR("received async update notification: " << *modified_iter
));
1387 spawn(new RGWDataSyncSingleEntryCR(sync_env
, *modified_iter
, string(), marker_tracker
, error_repo
, false, tn
), false);
1391 if (error_retry_time
<= ceph::coarse_real_clock::now()) {
1392 /* process bucket shards that previously failed */
1393 omapkeys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
1394 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, error_oid
),
1395 error_marker
, max_error_entries
, omapkeys
));
1396 error_entries
= std::move(omapkeys
->entries
);
1397 tn
->log(20, SSTR("read error repo, got " << error_entries
.size() << " entries"));
1398 iter
= error_entries
.begin();
1399 for (; iter
!= error_entries
.end(); ++iter
) {
1400 error_marker
= *iter
;
1401 tn
->log(20, SSTR("handle error entry: " << error_marker
));
1402 spawn(new RGWDataSyncSingleEntryCR(sync_env
, error_marker
, error_marker
, nullptr /* no marker tracker */, error_repo
, true, tn
), false);
1404 if (!omapkeys
->more
) {
1405 if (error_marker
.empty() && error_entries
.empty()) {
1406 /* the retry repo is empty, we back off a bit before calling it again */
1407 retry_backoff_secs
*= 2;
1408 if (retry_backoff_secs
> RETRY_BACKOFF_SECS_MAX
) {
1409 retry_backoff_secs
= RETRY_BACKOFF_SECS_MAX
;
1412 retry_backoff_secs
= RETRY_BACKOFF_SECS_DEFAULT
;
1414 error_retry_time
= ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs
);
1415 error_marker
.clear();
1420 #define INCREMENTAL_MAX_ENTRIES 100
1421 tn
->log(20, SSTR("shard_id=" << shard_id
<< " sync_marker=" << sync_marker
.marker
));
1422 spawned_keys
.clear();
1423 yield
call(new RGWReadRemoteDataLogShardCR(sync_env
, shard_id
, sync_marker
.marker
,
1424 &next_marker
, &log_entries
, &truncated
));
1425 if (retcode
< 0 && retcode
!= -ENOENT
) {
1426 tn
->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode
));
1427 stop_spawned_services();
1429 return set_cr_error(retcode
);
1432 if (log_entries
.size() > 0) {
1433 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
1436 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end(); ++log_iter
) {
1437 tn
->log(20, SSTR("shard_id=" << shard_id
<< " log_entry: " << log_iter
->log_id
<< ":" << log_iter
->log_timestamp
<< ":" << log_iter
->entry
.key
));
1438 if (!marker_tracker
->index_key_to_marker(log_iter
->entry
.key
, log_iter
->log_id
)) {
1439 tn
->log(20, SSTR("skipping sync of entry: " << log_iter
->log_id
<< ":" << log_iter
->entry
.key
<< " sync already in progress for bucket shard"));
1440 marker_tracker
->try_update_high_marker(log_iter
->log_id
, 0, log_iter
->log_timestamp
);
1443 if (!marker_tracker
->start(log_iter
->log_id
, 0, log_iter
->log_timestamp
)) {
1444 tn
->log(0, SSTR("ERROR: cannot start syncing " << log_iter
->log_id
<< ". Duplicate entry?"));
1447 * don't spawn the same key more than once. We can do that as long as we don't yield
1449 if (spawned_keys
.find(log_iter
->entry
.key
) == spawned_keys
.end()) {
1450 spawned_keys
.insert(log_iter
->entry
.key
);
1451 spawn(new RGWDataSyncSingleEntryCR(sync_env
, log_iter
->entry
.key
, log_iter
->log_id
, marker_tracker
, error_repo
, false, tn
), false);
1453 stop_spawned_services();
1455 return set_cr_error(retcode
);
1460 while ((int)num_spawned() > spawn_window
) {
1461 set_status() << "num_spawned() > spawn_window";
1462 yield
wait_for_child();
1464 while (collect(&ret
, lease_stack
.get())) {
1466 tn
->log(10, "a sync operation returned error");
1467 /* we have reported this error */
1469 /* not waiting for child here */
1473 tn
->log(20, SSTR("shard_id=" << shard_id
<< " sync_marker=" << sync_marker
.marker
1474 << " next_marker=" << next_marker
<< " truncated=" << truncated
));
1475 if (!next_marker
.empty()) {
1476 sync_marker
.marker
= next_marker
;
1477 } else if (!log_entries
.empty()) {
1478 sync_marker
.marker
= log_entries
.back().log_id
;
1481 // we reached the end, wait a while before checking for more
1482 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
1483 yield
wait(get_idle_interval());
1490 utime_t
get_idle_interval() const {
1491 #define INCREMENTAL_INTERVAL 20
1492 ceph::timespan interval
= std::chrono::seconds(INCREMENTAL_INTERVAL
);
1493 if (!ceph::coarse_real_clock::is_zero(error_retry_time
)) {
1494 auto now
= ceph::coarse_real_clock::now();
1495 if (error_retry_time
> now
) {
1496 auto d
= error_retry_time
- now
;
1502 // convert timespan -> time_point -> utime_t
1503 return utime_t(ceph::coarse_real_clock::zero() + interval
);
1506 void stop_spawned_services() {
1507 lease_cr
->go_down();
1509 error_repo
->finish();
1516 class RGWDataSyncShardControlCR
: public RGWBackoffControlCR
{
1517 RGWDataSyncEnv
*sync_env
;
1522 rgw_data_sync_marker sync_marker
;
1524 RGWSyncTraceNodeRef tn
;
1526 RGWDataSyncShardControlCR(RGWDataSyncEnv
*_sync_env
, const rgw_pool
& _pool
,
1527 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
,
1528 RGWSyncTraceNodeRef
& _tn_parent
) : RGWBackoffControlCR(_sync_env
->cct
, false),
1529 sync_env(_sync_env
),
1531 shard_id(_shard_id
),
1532 sync_marker(_marker
) {
1533 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "shard", std::to_string(shard_id
));
1536 RGWCoroutine
*alloc_cr() override
{
1537 return new RGWDataSyncShardCR(sync_env
, pool
, shard_id
, sync_marker
, tn
, backoff_ptr());
1540 RGWCoroutine
*alloc_finisher_cr() override
{
1541 RGWRados
*store
= sync_env
->store
;
1542 return new RGWSimpleRadosReadCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
->svc
.sysobj
,
1543 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
1547 void append_modified_shards(set
<string
>& keys
) {
1548 Mutex::Locker
l(cr_lock());
1550 RGWDataSyncShardCR
*cr
= static_cast<RGWDataSyncShardCR
*>(get_cr());
1555 cr
->append_modified_shards(keys
);
1559 class RGWDataSyncCR
: public RGWCoroutine
{
1560 RGWDataSyncEnv
*sync_env
;
1561 uint32_t num_shards
;
1563 rgw_data_sync_status sync_status
;
1565 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1567 Mutex shard_crs_lock
;
1568 map
<int, RGWDataSyncShardControlCR
*> shard_crs
;
1570 bool *reset_backoff
;
1572 RGWSyncTraceNodeRef tn
;
1574 RGWDataSyncModule
*data_sync_module
{nullptr};
1576 RGWDataSyncCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
, RGWSyncTraceNodeRef
& _tn
, bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1577 sync_env(_sync_env
),
1578 num_shards(_num_shards
),
1579 marker_tracker(NULL
),
1580 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1581 reset_backoff(_reset_backoff
), tn(_tn
) {
1585 ~RGWDataSyncCR() override
{
1586 for (auto iter
: shard_crs
) {
1591 int operate() override
{
1594 /* read sync status */
1595 yield
call(new RGWReadDataSyncStatusCoroutine(sync_env
, &sync_status
));
1597 data_sync_module
= sync_env
->sync_module
->get_data_handler();
1599 if (retcode
< 0 && retcode
!= -ENOENT
) {
1600 tn
->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode
));
1601 return set_cr_error(retcode
);
1604 /* state: init status */
1605 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateInit
) {
1606 tn
->log(20, SSTR("init"));
1607 sync_status
.sync_info
.num_shards
= num_shards
;
1608 uint64_t instance_id
;
1609 instance_id
= ceph::util::generate_random_number
<uint64_t>();
1610 yield
call(new RGWInitDataSyncStatusCoroutine(sync_env
, num_shards
, instance_id
, tn
, &sync_status
));
1612 tn
->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode
));
1613 return set_cr_error(retcode
);
1615 // sets state = StateBuildingFullSyncMaps
1617 *reset_backoff
= true;
1620 data_sync_module
->init(sync_env
, sync_status
.sync_info
.instance_id
);
1622 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateBuildingFullSyncMaps
) {
1623 tn
->log(10, SSTR("building full sync maps"));
1624 /* call sync module init here */
1625 sync_status
.sync_info
.num_shards
= num_shards
;
1626 yield
call(data_sync_module
->init_sync(sync_env
));
1628 tn
->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode
));
1629 return set_cr_error(retcode
);
1631 /* state: building full sync maps */
1632 yield
call(new RGWListBucketIndexesCR(sync_env
, &sync_status
));
1634 tn
->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode
));
1635 return set_cr_error(retcode
);
1637 sync_status
.sync_info
.state
= rgw_data_sync_info::StateSync
;
1639 /* update new state */
1640 yield
call(set_sync_info_cr());
1642 tn
->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode
));
1643 return set_cr_error(retcode
);
1646 *reset_backoff
= true;
1649 yield
call(data_sync_module
->start_sync(sync_env
));
1652 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateSync
) {
1653 tn
->log(10, SSTR("spawning " << num_shards
<< " shards sync"));
1654 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
.sync_markers
.begin();
1655 iter
!= sync_status
.sync_markers
.end(); ++iter
) {
1656 RGWDataSyncShardControlCR
*cr
= new RGWDataSyncShardControlCR(sync_env
, sync_env
->store
->svc
.zone
->get_zone_params().log_pool
,
1657 iter
->first
, iter
->second
, tn
);
1659 shard_crs_lock
.Lock();
1660 shard_crs
[iter
->first
] = cr
;
1661 shard_crs_lock
.Unlock();
1667 return set_cr_done();
1672 RGWCoroutine
*set_sync_info_cr() {
1673 RGWRados
*store
= sync_env
->store
;
1674 return new RGWSimpleRadosWriteCR
<rgw_data_sync_info
>(sync_env
->async_rados
, store
->svc
.sysobj
,
1675 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
1676 sync_status
.sync_info
);
1679 void wakeup(int shard_id
, set
<string
>& keys
) {
1680 Mutex::Locker
l(shard_crs_lock
);
1681 map
<int, RGWDataSyncShardControlCR
*>::iterator iter
= shard_crs
.find(shard_id
);
1682 if (iter
== shard_crs
.end()) {
1685 iter
->second
->append_modified_shards(keys
);
1686 iter
->second
->wakeup();
1690 class RGWDefaultDataSyncModule
: public RGWDataSyncModule
{
1692 RGWDefaultDataSyncModule() {}
1694 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1695 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1696 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1697 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1700 class RGWDefaultSyncModuleInstance
: public RGWSyncModuleInstance
{
1701 RGWDefaultDataSyncModule data_handler
;
1703 RGWDefaultSyncModuleInstance() {}
1704 RGWDataSyncModule
*get_data_handler() override
{
1705 return &data_handler
;
1707 bool supports_user_writes() override
{
1712 int RGWDefaultSyncModule::create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
)
1714 instance
->reset(new RGWDefaultSyncModuleInstance());
1718 RGWCoroutine
*RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
)
1720 return new RGWFetchRemoteObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
, bucket_info
,
1722 key
, std::nullopt
, versioned_epoch
,
1723 true, zones_trace
, sync_env
->counters
);
1726 RGWCoroutine
*RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
,
1727 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1729 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1730 bucket_info
, key
, versioned
, versioned_epoch
,
1731 NULL
, NULL
, false, &mtime
, zones_trace
);
1734 RGWCoroutine
*RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1735 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1737 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1738 bucket_info
, key
, versioned
, versioned_epoch
,
1739 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
1742 class RGWArchiveDataSyncModule
: public RGWDefaultDataSyncModule
{
1744 RGWArchiveDataSyncModule() {}
1746 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1747 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1748 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1749 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1752 class RGWArchiveSyncModuleInstance
: public RGWDefaultSyncModuleInstance
{
1753 RGWArchiveDataSyncModule data_handler
;
1755 RGWArchiveSyncModuleInstance() {}
1756 RGWDataSyncModule
*get_data_handler() override
{
1757 return &data_handler
;
1759 RGWMetadataHandler
*alloc_bucket_meta_handler() override
{
1760 return RGWArchiveBucketMetaHandlerAllocator::alloc();
1762 RGWMetadataHandler
*alloc_bucket_instance_meta_handler() override
{
1763 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
1767 int RGWArchiveSyncModule::create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
)
1769 instance
->reset(new RGWArchiveSyncModuleInstance());
1773 RGWCoroutine
*RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
)
1775 ldout(sync_env
->cct
, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
1776 if (!bucket_info
.versioned() ||
1777 (bucket_info
.flags
& BUCKET_VERSIONS_SUSPENDED
)) {
1778 ldout(sync_env
->cct
, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl
;
1779 bucket_info
.flags
= (bucket_info
.flags
& ~BUCKET_VERSIONS_SUSPENDED
) | BUCKET_VERSIONED
;
1780 int op_ret
= sync_env
->store
->put_bucket_instance_info(bucket_info
, false, real_time(), NULL
);
1782 ldout(sync_env
->cct
, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl
;
1787 std::optional
<rgw_obj_key
> dest_key
;
1789 if (versioned_epoch
.value_or(0) == 0) { /* force version if not set */
1790 versioned_epoch
= 0;
1792 if (key
.instance
.empty()) {
1793 sync_env
->store
->gen_rand_obj_instance_name(&(*dest_key
));
1797 return new RGWFetchRemoteObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1798 bucket_info
, std::nullopt
,
1799 key
, dest_key
, versioned_epoch
,
1800 true, zones_trace
, nullptr);
1803 RGWCoroutine
*RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
,
1804 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1806 ldout(sync_env
->cct
, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info
.bucket
<< " k=" << key
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1810 RGWCoroutine
*RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1811 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1813 ldout(sync_env
->cct
, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info
.bucket
<< " k=" << key
<< " mtime=" << mtime
1814 << " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1815 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1816 bucket_info
, key
, versioned
, versioned_epoch
,
1817 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
1820 class RGWDataSyncControlCR
: public RGWBackoffControlCR
1822 RGWDataSyncEnv
*sync_env
;
1823 uint32_t num_shards
;
1825 RGWSyncTraceNodeRef tn
;
1827 static constexpr bool exit_on_error
= false; // retry on all errors
1829 RGWDataSyncControlCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
,
1830 RGWSyncTraceNodeRef
& _tn_parent
) : RGWBackoffControlCR(_sync_env
->cct
, exit_on_error
),
1831 sync_env(_sync_env
), num_shards(_num_shards
) {
1832 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "sync");
1835 RGWCoroutine
*alloc_cr() override
{
1836 return new RGWDataSyncCR(sync_env
, num_shards
, tn
, backoff_ptr());
1839 void wakeup(int shard_id
, set
<string
>& keys
) {
1840 Mutex
& m
= cr_lock();
1843 RGWDataSyncCR
*cr
= static_cast<RGWDataSyncCR
*>(get_cr());
1853 tn
->log(20, SSTR("notify shard=" << shard_id
<< " keys=" << keys
));
1854 cr
->wakeup(shard_id
, keys
);
1861 void RGWRemoteDataLog::wakeup(int shard_id
, set
<string
>& keys
) {
1862 RWLock::RLocker
rl(lock
);
1863 if (!data_sync_cr
) {
1866 data_sync_cr
->wakeup(shard_id
, keys
);
1869 int RGWRemoteDataLog::run_sync(int num_shards
)
1872 data_sync_cr
= new RGWDataSyncControlCR(&sync_env
, num_shards
, tn
);
1873 data_sync_cr
->get(); // run() will drop a ref, so take another
1876 int r
= run(data_sync_cr
);
1879 data_sync_cr
->put();
1880 data_sync_cr
= NULL
;
1884 ldpp_dout(dpp
, 0) << "ERROR: failed to run sync" << dendl
;
1890 int RGWDataSyncStatusManager::init()
1894 if (!store
->svc
.zone
->find_zone_by_id(source_zone
, &zone_def
)) {
1895 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone
<< dendl
;
1899 if (!store
->svc
.sync_modules
->get_manager()->supports_data_export(zone_def
->tier_type
)) {
1903 const RGWZoneParams
& zone_params
= store
->svc
.zone
->get_zone_params();
1905 if (sync_module
== nullptr) {
1906 sync_module
= store
->get_sync_module();
1909 conn
= store
->svc
.zone
->get_zone_conn_by_id(source_zone
);
1911 ldpp_dout(this, 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
1915 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
1917 int r
= source_log
.init(source_zone
, conn
, error_logger
, store
->get_sync_tracer(),
1918 sync_module
, counters
);
1920 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
1925 rgw_datalog_info datalog_info
;
1926 r
= source_log
.read_log_info(&datalog_info
);
1928 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r
<< dendl
;
1933 num_shards
= datalog_info
.num_shards
;
1935 for (int i
= 0; i
< num_shards
; i
++) {
1936 shard_objs
[i
] = rgw_raw_obj(zone_params
.log_pool
, shard_obj_name(source_zone
, i
));
1942 void RGWDataSyncStatusManager::finalize()
1944 delete error_logger
;
1945 error_logger
= nullptr;
1948 unsigned RGWDataSyncStatusManager::get_subsys() const
1953 std::ostream
& RGWDataSyncStatusManager::gen_prefix(std::ostream
& out
) const
1955 auto zone
= std::string_view
{source_zone
};
1956 return out
<< "data sync zone:" << zone
.substr(0, 8) << ' ';
1959 string
RGWDataSyncStatusManager::sync_status_oid(const string
& source_zone
)
1961 char buf
[datalog_sync_status_oid_prefix
.size() + source_zone
.size() + 16];
1962 snprintf(buf
, sizeof(buf
), "%s.%s", datalog_sync_status_oid_prefix
.c_str(), source_zone
.c_str());
1967 string
RGWDataSyncStatusManager::shard_obj_name(const string
& source_zone
, int shard_id
)
1969 char buf
[datalog_sync_status_shard_prefix
.size() + source_zone
.size() + 16];
1970 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_status_shard_prefix
.c_str(), source_zone
.c_str(), shard_id
);
1975 int RGWRemoteBucketLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
,
1976 const rgw_bucket
& bucket
, int shard_id
,
1977 RGWSyncErrorLogger
*_error_logger
,
1978 RGWSyncTraceManager
*_sync_tracer
,
1979 RGWSyncModuleInstanceRef
& _sync_module
)
1982 source_zone
= _source_zone
;
1984 bs
.shard_id
= shard_id
;
1986 sync_env
.init(dpp
, store
->ctx(), store
, conn
, async_rados
, http_manager
,
1987 _error_logger
, _sync_tracer
, source_zone
, _sync_module
, nullptr);
1992 class RGWReadRemoteBucketIndexLogInfoCR
: public RGWCoroutine
{
1993 RGWDataSyncEnv
*sync_env
;
1994 const string instance_key
;
1996 rgw_bucket_index_marker_info
*info
;
1999 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv
*_sync_env
,
2000 const rgw_bucket_shard
& bs
,
2001 rgw_bucket_index_marker_info
*_info
)
2002 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2003 instance_key(bs
.get_key()), info(_info
) {}
2005 int operate() override
{
2008 rgw_http_param_pair pairs
[] = { { "type" , "bucket-index" },
2009 { "bucket-instance", instance_key
.c_str() },
2013 string p
= "/admin/log/";
2014 call(new RGWReadRESTResourceCR
<rgw_bucket_index_marker_info
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, info
));
2017 return set_cr_error(retcode
);
2019 return set_cr_done();
2025 class RGWInitBucketShardSyncStatusCoroutine
: public RGWCoroutine
{
2026 RGWDataSyncEnv
*sync_env
;
2028 rgw_bucket_shard bs
;
2029 const string sync_status_oid
;
2031 rgw_bucket_shard_sync_info
& status
;
2033 rgw_bucket_index_marker_info info
;
2035 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
2036 const rgw_bucket_shard
& bs
,
2037 rgw_bucket_shard_sync_info
& _status
)
2038 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2039 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
2043 int operate() override
{
2045 /* fetch current position in logs */
2046 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env
, bs
, &info
));
2047 if (retcode
< 0 && retcode
!= -ENOENT
) {
2048 ldout(cct
, 0) << "ERROR: failed to fetch bucket index status" << dendl
;
2049 return set_cr_error(retcode
);
2052 auto store
= sync_env
->store
;
2053 rgw_raw_obj
obj(store
->svc
.zone
->get_zone_params().log_pool
, sync_status_oid
);
2055 if (info
.syncstopped
) {
2056 call(new RGWRadosRemoveCR(store
, obj
));
2058 status
.state
= rgw_bucket_shard_sync_info::StateFullSync
;
2059 status
.inc_marker
.position
= info
.max_marker
;
2060 map
<string
, bufferlist
> attrs
;
2061 status
.encode_all_attrs(attrs
);
2062 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
->svc
.sysobj
, obj
, attrs
));
2065 if (info
.syncstopped
) {
2069 return set_cr_error(retcode
);
2071 return set_cr_done();
2077 RGWCoroutine
*RGWRemoteBucketLog::init_sync_status_cr()
2079 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env
, bs
, init_status
);
2082 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2085 static bool decode_attr(CephContext
*cct
, map
<string
, bufferlist
>& attrs
, const string
& attr_name
, T
*val
)
2087 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
2088 if (iter
== attrs
.end()) {
2093 auto biter
= iter
->second
.cbegin();
2095 decode(*val
, biter
);
2096 } catch (buffer::error
& err
) {
2097 ldout(cct
, 0) << "ERROR: failed to decode attribute: " << attr_name
<< dendl
;
2103 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
)
2105 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"state", &state
)) {
2106 decode_attr(cct
, attrs
, "state", &state
);
2108 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"full_marker", &full_marker
)) {
2109 decode_attr(cct
, attrs
, "full_marker", &full_marker
);
2111 if (!decode_attr(cct
, attrs
, BUCKET_SYNC_ATTR_PREFIX
"inc_marker", &inc_marker
)) {
2112 decode_attr(cct
, attrs
, "inc_marker", &inc_marker
);
2116 void rgw_bucket_shard_sync_info::encode_all_attrs(map
<string
, bufferlist
>& attrs
)
2118 encode_state_attr(attrs
);
2119 full_marker
.encode_attr(attrs
);
2120 inc_marker
.encode_attr(attrs
);
2123 void rgw_bucket_shard_sync_info::encode_state_attr(map
<string
, bufferlist
>& attrs
)
2126 encode(state
, attrs
[BUCKET_SYNC_ATTR_PREFIX
"state"]);
2129 void rgw_bucket_shard_full_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
2132 encode(*this, attrs
[BUCKET_SYNC_ATTR_PREFIX
"full_marker"]);
2135 void rgw_bucket_shard_inc_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
2138 encode(*this, attrs
[BUCKET_SYNC_ATTR_PREFIX
"inc_marker"]);
2141 class RGWReadBucketSyncStatusCoroutine
: public RGWCoroutine
{
2142 RGWDataSyncEnv
*sync_env
;
2144 rgw_bucket_shard_sync_info
*status
;
2146 map
<string
, bufferlist
> attrs
;
2148 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
2149 const rgw_bucket_shard
& bs
,
2150 rgw_bucket_shard_sync_info
*_status
)
2151 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2152 oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
2154 int operate() override
;
2157 int RGWReadBucketSyncStatusCoroutine::operate()
2160 yield
call(new RGWSimpleRadosReadAttrsCR(sync_env
->async_rados
, sync_env
->store
->svc
.sysobj
,
2161 rgw_raw_obj(sync_env
->store
->svc
.zone
->get_zone_params().log_pool
, oid
),
2163 if (retcode
== -ENOENT
) {
2164 *status
= rgw_bucket_shard_sync_info();
2165 return set_cr_done();
2168 ldout(sync_env
->cct
, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid
<< " ret=" << retcode
<< dendl
;
2169 return set_cr_error(retcode
);
2171 status
->decode_from_attrs(sync_env
->cct
, attrs
);
2172 return set_cr_done();
2177 #define OMAP_READ_MAX_ENTRIES 10
2178 class RGWReadRecoveringBucketShardsCoroutine
: public RGWCoroutine
{
2179 RGWDataSyncEnv
*sync_env
;
2185 set
<string
>& recovering_buckets
;
2189 RGWRadosGetOmapKeysCR::ResultPtr omapkeys
;
2190 set
<string
> error_entries
;
2191 int max_omap_entries
;
2195 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv
*_sync_env
, const int _shard_id
,
2196 set
<string
>& _recovering_buckets
, const int _max_entries
)
2197 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2198 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
2199 recovering_buckets(_recovering_buckets
), max_omap_entries(OMAP_READ_MAX_ENTRIES
)
2201 error_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
) + ".retry";
2204 int operate() override
;
2207 int RGWReadRecoveringBucketShardsCoroutine::operate()
2210 //read recovering bucket shards
2213 omapkeys
= std::make_shared
<RGWRadosGetOmapKeysCR::Result
>();
2214 yield
call(new RGWRadosGetOmapKeysCR(store
, rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, error_oid
),
2215 marker
, max_omap_entries
, omapkeys
));
2217 if (retcode
== -ENOENT
) {
2222 ldout(sync_env
->cct
, 0) << "failed to read recovering bucket shards with "
2223 << cpp_strerror(retcode
) << dendl
;
2224 return set_cr_error(retcode
);
2227 error_entries
= std::move(omapkeys
->entries
);
2228 if (error_entries
.empty()) {
2232 count
+= error_entries
.size();
2233 marker
= *error_entries
.rbegin();
2234 recovering_buckets
.insert(std::make_move_iterator(error_entries
.begin()),
2235 std::make_move_iterator(error_entries
.end()));
2236 } while (omapkeys
->more
&& count
< max_entries
);
2238 return set_cr_done();
2244 class RGWReadPendingBucketShardsCoroutine
: public RGWCoroutine
{
2245 RGWDataSyncEnv
*sync_env
;
2251 set
<string
>& pending_buckets
;
2255 rgw_data_sync_marker
* sync_marker
;
2258 std::string next_marker
;
2259 list
<rgw_data_change_log_entry
> log_entries
;
2263 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv
*_sync_env
, const int _shard_id
,
2264 set
<string
>& _pending_buckets
,
2265 rgw_data_sync_marker
* _sync_marker
, const int _max_entries
)
2266 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2267 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
2268 pending_buckets(_pending_buckets
), sync_marker(_sync_marker
)
2270 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
);
2273 int operate() override
;
2276 int RGWReadPendingBucketShardsCoroutine::operate()
2279 //read sync status marker
2280 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
2281 yield
call(new CR(sync_env
->async_rados
, store
->svc
.sysobj
,
2282 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, status_oid
),
2285 ldout(sync_env
->cct
,0) << "failed to read sync status marker with "
2286 << cpp_strerror(retcode
) << dendl
;
2287 return set_cr_error(retcode
);
2290 //read pending bucket shards
2291 marker
= sync_marker
->marker
;
2294 yield
call(new RGWReadRemoteDataLogShardCR(sync_env
, shard_id
, marker
,
2295 &next_marker
, &log_entries
, &truncated
));
2297 if (retcode
== -ENOENT
) {
2302 ldout(sync_env
->cct
,0) << "failed to read remote data log info with "
2303 << cpp_strerror(retcode
) << dendl
;
2304 return set_cr_error(retcode
);
2307 if (log_entries
.empty()) {
2311 count
+= log_entries
.size();
2312 for (const auto& entry
: log_entries
) {
2313 pending_buckets
.insert(entry
.entry
.key
);
2315 }while(truncated
&& count
< max_entries
);
2317 return set_cr_done();
2323 int RGWRemoteDataLog::read_shard_status(int shard_id
, set
<string
>& pending_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
)
2325 // cannot run concurrently with run_sync(), so run in a separate manager
2326 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
2327 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
2328 int ret
= http_manager
.start();
2330 ldpp_dout(dpp
, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
2333 RGWDataSyncEnv sync_env_local
= sync_env
;
2334 sync_env_local
.http_manager
= &http_manager
;
2335 list
<RGWCoroutinesStack
*> stacks
;
2336 RGWCoroutinesStack
* recovering_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
2337 recovering_stack
->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local
, shard_id
, recovering_buckets
, max_entries
));
2338 stacks
.push_back(recovering_stack
);
2339 RGWCoroutinesStack
* pending_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
2340 pending_stack
->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local
, shard_id
, pending_buckets
, sync_marker
, max_entries
));
2341 stacks
.push_back(pending_stack
);
2342 ret
= crs
.run(stacks
);
2343 http_manager
.stop();
2347 RGWCoroutine
*RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
)
2349 return new RGWReadBucketSyncStatusCoroutine(&sync_env
, bs
, sync_status
);
2352 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2353 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
2354 delete iter
->second
;
2356 delete error_logger
;
2360 void rgw_bucket_entry_owner::decode_json(JSONObj
*obj
)
2362 JSONDecoder::decode_json("ID", id
, obj
);
2363 JSONDecoder::decode_json("DisplayName", display_name
, obj
);
2366 struct bucket_list_entry
{
2373 string storage_class
;
2374 rgw_bucket_entry_owner owner
;
2375 uint64_t versioned_epoch
;
2378 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2380 void decode_json(JSONObj
*obj
) {
2381 JSONDecoder::decode_json("IsDeleteMarker", delete_marker
, obj
);
2382 JSONDecoder::decode_json("Key", key
.name
, obj
);
2383 JSONDecoder::decode_json("VersionId", key
.instance
, obj
);
2384 JSONDecoder::decode_json("IsLatest", is_latest
, obj
);
2386 JSONDecoder::decode_json("RgwxMtime", mtime_str
, obj
);
2390 if (parse_iso8601(mtime_str
.c_str(), &t
, &nsec
)) {
2392 ts
.tv_sec
= (uint64_t)internal_timegm(&t
);
2394 mtime
= real_clock::from_ceph_timespec(ts
);
2396 JSONDecoder::decode_json("ETag", etag
, obj
);
2397 JSONDecoder::decode_json("Size", size
, obj
);
2398 JSONDecoder::decode_json("StorageClass", storage_class
, obj
);
2399 JSONDecoder::decode_json("Owner", owner
, obj
);
2400 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch
, obj
);
2401 JSONDecoder::decode_json("RgwxTag", rgw_tag
, obj
);
2402 if (key
.instance
== "null" && !versioned_epoch
) {
2403 key
.instance
.clear();
2407 RGWModifyOp
get_modify_op() const {
2408 if (delete_marker
) {
2409 return CLS_RGW_OP_LINK_OLH_DM
;
2410 } else if (!key
.instance
.empty() && key
.instance
!= "null") {
2411 return CLS_RGW_OP_LINK_OLH
;
2413 return CLS_RGW_OP_ADD
;
2418 struct bucket_list_result
{
2422 string version_id_marker
;
2425 list
<bucket_list_entry
> entries
;
2427 bucket_list_result() : max_keys(0), is_truncated(false) {}
2429 void decode_json(JSONObj
*obj
) {
2430 JSONDecoder::decode_json("Name", name
, obj
);
2431 JSONDecoder::decode_json("Prefix", prefix
, obj
);
2432 JSONDecoder::decode_json("KeyMarker", key_marker
, obj
);
2433 JSONDecoder::decode_json("VersionIdMarker", version_id_marker
, obj
);
2434 JSONDecoder::decode_json("MaxKeys", max_keys
, obj
);
2435 JSONDecoder::decode_json("IsTruncated", is_truncated
, obj
);
2436 JSONDecoder::decode_json("Entries", entries
, obj
);
2440 class RGWListBucketShardCR
: public RGWCoroutine
{
2441 RGWDataSyncEnv
*sync_env
;
2442 const rgw_bucket_shard
& bs
;
2443 const string instance_key
;
2444 rgw_obj_key marker_position
;
2446 bucket_list_result
*result
;
2449 RGWListBucketShardCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2450 rgw_obj_key
& _marker_position
, bucket_list_result
*_result
)
2451 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2452 instance_key(bs
.get_key()), marker_position(_marker_position
),
2455 int operate() override
{
2458 rgw_http_param_pair pairs
[] = { { "rgwx-bucket-instance", instance_key
.c_str() },
2459 { "versions" , NULL
},
2460 { "format" , "json" },
2461 { "objs-container" , "true" },
2462 { "key-marker" , marker_position
.name
.c_str() },
2463 { "version-id-marker" , marker_position
.instance
.c_str() },
2465 // don't include tenant in the url, it's already part of instance_key
2466 string p
= string("/") + bs
.bucket
.name
;
2467 call(new RGWReadRESTResourceCR
<bucket_list_result
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, result
));
2470 return set_cr_error(retcode
);
2472 return set_cr_done();
2478 class RGWListBucketIndexLogCR
: public RGWCoroutine
{
2479 RGWDataSyncEnv
*sync_env
;
2480 const string instance_key
;
2483 list
<rgw_bi_log_entry
> *result
;
2484 std::optional
<PerfGuard
> timer
;
2487 RGWListBucketIndexLogCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2488 string
& _marker
, list
<rgw_bi_log_entry
> *_result
)
2489 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2490 instance_key(bs
.get_key()), marker(_marker
), result(_result
) {}
2492 int operate() override
{
2494 if (sync_env
->counters
) {
2495 timer
.emplace(sync_env
->counters
, sync_counters::l_poll
);
2498 rgw_http_param_pair pairs
[] = { { "bucket-instance", instance_key
.c_str() },
2499 { "format" , "json" },
2500 { "marker" , marker
.c_str() },
2501 { "type", "bucket-index" },
2504 call(new RGWReadRESTResourceCR
<list
<rgw_bi_log_entry
> >(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, "/admin/log", pairs
, result
));
2508 if (sync_env
->counters
) {
2509 sync_env
->counters
->inc(sync_counters::l_poll_err
);
2511 return set_cr_error(retcode
);
2513 return set_cr_done();
2519 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2521 class RGWBucketFullSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<rgw_obj_key
, rgw_obj_key
> {
2522 RGWDataSyncEnv
*sync_env
;
2525 rgw_bucket_shard_full_sync_marker sync_marker
;
2527 RGWSyncTraceNodeRef tn
;
2530 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2531 const string
& _marker_oid
,
2532 const rgw_bucket_shard_full_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2533 sync_env(_sync_env
),
2534 marker_oid(_marker_oid
),
2535 sync_marker(_marker
) {}
2537 void set_tn(RGWSyncTraceNodeRef
& _tn
) {
2541 RGWCoroutine
*store_marker(const rgw_obj_key
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2542 sync_marker
.position
= new_marker
;
2543 sync_marker
.count
= index_pos
;
2545 map
<string
, bufferlist
> attrs
;
2546 sync_marker
.encode_attr(attrs
);
2548 RGWRados
*store
= sync_env
->store
;
2550 tn
->log(20, SSTR("updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
));
2551 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
->svc
.sysobj
,
2552 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, marker_oid
),
2556 RGWOrderCallCR
*allocate_order_control_cr() override
{
2557 return new RGWLastCallerWinsCR(sync_env
->cct
);
2561 class RGWBucketIncSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, rgw_obj_key
> {
2562 RGWDataSyncEnv
*sync_env
;
2565 rgw_bucket_shard_inc_sync_marker sync_marker
;
2567 map
<rgw_obj_key
, string
> key_to_marker
;
2573 map
<string
, operation
> marker_to_op
;
2574 std::set
<std::string
> pending_olh
; // object names with pending olh operations
2576 RGWSyncTraceNodeRef tn
;
2578 void handle_finish(const string
& marker
) override
{
2579 auto iter
= marker_to_op
.find(marker
);
2580 if (iter
== marker_to_op
.end()) {
2583 auto& op
= iter
->second
;
2584 key_to_marker
.erase(op
.key
);
2585 reset_need_retry(op
.key
);
2587 pending_olh
.erase(op
.key
.name
);
2589 marker_to_op
.erase(iter
);
2593 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2594 const string
& _marker_oid
,
2595 const rgw_bucket_shard_inc_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2596 sync_env(_sync_env
),
2597 marker_oid(_marker_oid
),
2598 sync_marker(_marker
) {}
2600 void set_tn(RGWSyncTraceNodeRef
& _tn
) {
2604 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2605 sync_marker
.position
= new_marker
;
2607 map
<string
, bufferlist
> attrs
;
2608 sync_marker
.encode_attr(attrs
);
2610 RGWRados
*store
= sync_env
->store
;
2612 tn
->log(20, SSTR("updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
));
2613 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
,
2615 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, marker_oid
),
2620 * create index from key -> <op, marker>, and from marker -> key
2621 * this is useful so that we can insure that we only have one
2622 * entry for any key that is used. This is needed when doing
2623 * incremenatl sync of data, and we don't want to run multiple
2624 * concurrent sync operations for the same bucket shard
2625 * Also, we should make sure that we don't run concurrent operations on the same key with
2628 bool index_key_to_marker(const rgw_obj_key
& key
, const string
& marker
, bool is_olh
) {
2629 auto result
= key_to_marker
.emplace(key
, marker
);
2630 if (!result
.second
) { // exists
2631 set_need_retry(key
);
2634 marker_to_op
[marker
] = operation
{key
, is_olh
};
2636 // prevent other olh ops from starting on this object name
2637 pending_olh
.insert(key
.name
);
2642 bool can_do_op(const rgw_obj_key
& key
, bool is_olh
) {
2643 // serialize olh ops on the same object name
2644 if (is_olh
&& pending_olh
.count(key
.name
)) {
2645 tn
->log(20, SSTR("sync of " << key
<< " waiting for pending olh op"));
2648 return (key_to_marker
.find(key
) == key_to_marker
.end());
2651 RGWOrderCallCR
*allocate_order_control_cr() override
{
2652 return new RGWLastCallerWinsCR(sync_env
->cct
);
2656 template <class T
, class K
>
2657 class RGWBucketSyncSingleEntryCR
: public RGWCoroutine
{
2658 RGWDataSyncEnv
*sync_env
;
2660 RGWBucketInfo
*bucket_info
;
2661 const rgw_bucket_shard
& bs
;
2665 std::optional
<uint64_t> versioned_epoch
;
2666 rgw_bucket_entry_owner owner
;
2667 real_time timestamp
;
2669 RGWPendingState op_state
;
2672 RGWSyncShardMarkerTrack
<T
, K
> *marker_tracker
;
2676 stringstream error_ss
;
2678 bool error_injection
;
2680 RGWDataSyncModule
*data_sync_module
;
2682 rgw_zone_set zones_trace
;
2684 RGWSyncTraceNodeRef tn
;
2686 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
2687 RGWBucketInfo
*_bucket_info
,
2688 const rgw_bucket_shard
& bs
,
2689 const rgw_obj_key
& _key
, bool _versioned
,
2690 std::optional
<uint64_t> _versioned_epoch
,
2691 real_time
& _timestamp
,
2692 const rgw_bucket_entry_owner
& _owner
,
2693 RGWModifyOp _op
, RGWPendingState _op_state
,
2694 const T
& _entry_marker
, RGWSyncShardMarkerTrack
<T
, K
> *_marker_tracker
, rgw_zone_set
& _zones_trace
,
2695 RGWSyncTraceNodeRef
& _tn_parent
) : RGWCoroutine(_sync_env
->cct
),
2696 sync_env(_sync_env
),
2697 bucket_info(_bucket_info
), bs(bs
),
2698 key(_key
), versioned(_versioned
), versioned_epoch(_versioned_epoch
),
2700 timestamp(_timestamp
), op(_op
),
2701 op_state(_op_state
),
2702 entry_marker(_entry_marker
),
2703 marker_tracker(_marker_tracker
),
2706 ss
<< bucket_shard_str
{bs
} << "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]";
2707 set_description() << "bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
;
2710 tn
= sync_env
->sync_tracer
->add_node(_tn_parent
, "entry", SSTR(key
));
2712 tn
->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
));
2713 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_data_inject_err_probability
> 0);
2715 data_sync_module
= sync_env
->sync_module
->get_data_handler();
2717 zones_trace
= _zones_trace
;
2718 zones_trace
.insert(sync_env
->store
->svc
.zone
->get_zone().id
);
2721 int operate() override
{
2723 /* skip entries that are not complete */
2724 if (op_state
!= CLS_RGW_STATE_COMPLETE
) {
2727 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
2730 marker_tracker
->reset_need_retry(key
);
2731 if (key
.name
.empty()) {
2732 /* shouldn't happen */
2733 set_status("skipping empty entry");
2734 tn
->log(0, "entry with empty obj name, skipping");
2737 if (error_injection
&&
2738 rand() % 10000 < cct
->_conf
->rgw_sync_data_inject_err_probability
* 10000.0) {
2739 tn
->log(0, SSTR(": injecting data sync error on key=" << key
.name
));
2741 } else if (op
== CLS_RGW_OP_ADD
||
2742 op
== CLS_RGW_OP_LINK_OLH
) {
2743 set_status("syncing obj");
2744 tn
->log(5, SSTR("bucket sync: sync obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
2745 call(data_sync_module
->sync_object(sync_env
, *bucket_info
, key
, versioned_epoch
, &zones_trace
));
2746 } else if (op
== CLS_RGW_OP_DEL
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2747 set_status("removing obj");
2748 if (op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2751 tn
->log(10, SSTR("removing obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
2752 call(data_sync_module
->remove_object(sync_env
, *bucket_info
, key
, timestamp
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
2753 // our copy of the object is more recent, continue as if it succeeded
2754 if (retcode
== -ERR_PRECONDITION_FAILED
) {
2757 } else if (op
== CLS_RGW_OP_LINK_OLH_DM
) {
2758 set_status("creating delete marker");
2759 tn
->log(10, SSTR("creating delete marker: obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]"));
2760 call(data_sync_module
->create_delete_marker(sync_env
, *bucket_info
, key
, timestamp
, owner
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
2762 tn
->set_resource_name(SSTR(bucket_str_noinstance(bucket_info
->bucket
) << "/" << key
));
2764 } while (marker_tracker
->need_retry(key
));
2766 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
2768 tn
->log(10, "success");
2770 tn
->log(10, SSTR("failed, retcode=" << retcode
<< " (" << cpp_strerror(-retcode
) << ")"));
2774 if (retcode
< 0 && retcode
!= -ENOENT
) {
2775 set_status() << "failed to sync obj; retcode=" << retcode
;
2776 tn
->log(0, SSTR("ERROR: failed to sync object: "
2777 << bucket_shard_str
{bs
} << "/" << key
.name
));
2778 error_ss
<< bucket_shard_str
{bs
} << "/" << key
.name
;
2779 sync_status
= retcode
;
2781 if (!error_ss
.str().empty()) {
2782 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", error_ss
.str(), -retcode
, string("failed to sync object") + cpp_strerror(-sync_status
)));
2785 if (sync_status
== 0) {
2787 set_status() << "calling marker_tracker->finish(" << entry_marker
<< ")";
2788 yield
call(marker_tracker
->finish(entry_marker
));
2789 sync_status
= retcode
;
2791 if (sync_status
< 0) {
2792 return set_cr_error(sync_status
);
2794 return set_cr_done();
2800 #define BUCKET_SYNC_SPAWN_WINDOW 20
2802 class RGWBucketShardFullSyncCR
: public RGWCoroutine
{
2803 RGWDataSyncEnv
*sync_env
;
2804 const rgw_bucket_shard
& bs
;
2805 RGWBucketInfo
*bucket_info
;
2806 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2807 bucket_list_result list_result
;
2808 list
<bucket_list_entry
>::iterator entries_iter
;
2809 rgw_bucket_shard_sync_info
& sync_info
;
2810 RGWBucketFullSyncShardMarkerTrack marker_tracker
;
2811 rgw_obj_key list_marker
;
2812 bucket_list_entry
*entry
{nullptr};
2814 int total_entries
{0};
2818 const string
& status_oid
;
2820 rgw_zone_set zones_trace
;
2822 RGWSyncTraceNodeRef tn
;
2824 RGWBucketShardFullSyncCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2825 RGWBucketInfo
*_bucket_info
,
2826 const std::string
& status_oid
,
2827 RGWContinuousLeaseCR
*lease_cr
,
2828 rgw_bucket_shard_sync_info
& sync_info
,
2829 RGWSyncTraceNodeRef tn_parent
)
2830 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2831 bucket_info(_bucket_info
), lease_cr(lease_cr
), sync_info(sync_info
),
2832 marker_tracker(sync_env
, status_oid
, sync_info
.full_marker
),
2833 status_oid(status_oid
),
2834 tn(sync_env
->sync_tracer
->add_node(tn_parent
, "full_sync",
2835 SSTR(bucket_shard_str
{bs
}))) {
2836 zones_trace
.insert(sync_env
->source_zone
);
2837 marker_tracker
.set_tn(tn
);
2840 int operate() override
;
2843 int RGWBucketShardFullSyncCR::operate()
2847 list_marker
= sync_info
.full_marker
.position
;
2849 total_entries
= sync_info
.full_marker
.count
;
2851 if (!lease_cr
->is_locked()) {
2853 return set_cr_error(-ECANCELED
);
2855 set_status("listing remote bucket");
2856 tn
->log(20, "listing bucket for full sync");
2857 yield
call(new RGWListBucketShardCR(sync_env
, bs
, list_marker
,
2859 if (retcode
< 0 && retcode
!= -ENOENT
) {
2860 set_status("failed bucket listing, going down");
2862 return set_cr_error(retcode
);
2864 if (list_result
.entries
.size() > 0) {
2865 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
); /* actually have entries to sync */
2867 entries_iter
= list_result
.entries
.begin();
2868 for (; entries_iter
!= list_result
.entries
.end(); ++entries_iter
) {
2869 if (!lease_cr
->is_locked()) {
2871 return set_cr_error(-ECANCELED
);
2873 tn
->log(20, SSTR("[full sync] syncing object: "
2874 << bucket_shard_str
{bs
} << "/" << entries_iter
->key
));
2875 entry
= &(*entries_iter
);
2877 list_marker
= entries_iter
->key
;
2878 if (!marker_tracker
.start(entry
->key
, total_entries
, real_time())) {
2879 tn
->log(0, SSTR("ERROR: cannot start syncing " << entry
->key
<< ". Duplicate entry?"));
2881 using SyncCR
= RGWBucketSyncSingleEntryCR
<rgw_obj_key
, rgw_obj_key
>;
2882 yield
spawn(new SyncCR(sync_env
, bucket_info
, bs
, entry
->key
,
2883 false, /* versioned, only matters for object removal */
2884 entry
->versioned_epoch
, entry
->mtime
,
2885 entry
->owner
, entry
->get_modify_op(), CLS_RGW_STATE_COMPLETE
,
2886 entry
->key
, &marker_tracker
, zones_trace
, tn
),
2889 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
2890 yield
wait_for_child();
2893 again
= collect(&ret
, nullptr);
2895 tn
->log(10, "a sync operation returned error");
2897 /* we have reported this error */
2902 } while (list_result
.is_truncated
&& sync_status
== 0);
2903 set_status("done iterating over all objects");
2904 /* wait for all operations to complete */
2905 while (num_spawned()) {
2906 yield
wait_for_child();
2909 again
= collect(&ret
, nullptr);
2911 tn
->log(10, "a sync operation returned error");
2913 /* we have reported this error */
2917 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
2918 if (!lease_cr
->is_locked()) {
2919 return set_cr_error(-ECANCELED
);
2921 /* update sync state to incremental */
2922 if (sync_status
== 0) {
2924 sync_info
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
2925 map
<string
, bufferlist
> attrs
;
2926 sync_info
.encode_state_attr(attrs
);
2927 RGWRados
*store
= sync_env
->store
;
2928 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
->svc
.sysobj
,
2929 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, status_oid
),
2933 tn
->log(10, SSTR("backing out with sync_status=" << sync_status
));
2935 if (retcode
< 0 && sync_status
== 0) { /* actually tried to set incremental state and failed */
2936 tn
->log(0, SSTR("ERROR: failed to set sync state on bucket "
2937 << bucket_shard_str
{bs
} << " retcode=" << retcode
));
2938 return set_cr_error(retcode
);
2940 if (sync_status
< 0) {
2941 return set_cr_error(sync_status
);
2943 return set_cr_done();
2948 static bool has_olh_epoch(RGWModifyOp op
) {
2949 return op
== CLS_RGW_OP_LINK_OLH
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
;
2952 class RGWBucketShardIncrementalSyncCR
: public RGWCoroutine
{
2953 RGWDataSyncEnv
*sync_env
;
2954 const rgw_bucket_shard
& bs
;
2955 RGWBucketInfo
*bucket_info
;
2956 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2957 list
<rgw_bi_log_entry
> list_result
;
2958 list
<rgw_bi_log_entry
>::iterator entries_iter
, entries_end
;
2959 map
<pair
<string
, string
>, pair
<real_time
, RGWModifyOp
> > squash_map
;
2960 rgw_bucket_shard_sync_info
& sync_info
;
2962 rgw_bi_log_entry
*entry
{nullptr};
2963 RGWBucketIncSyncShardMarkerTrack marker_tracker
;
2964 bool updated_status
{false};
2965 const string
& status_oid
;
2966 const string
& zone_id
;
2971 bool syncstopped
{false};
2973 RGWSyncTraceNodeRef tn
;
2975 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv
*_sync_env
,
2976 const rgw_bucket_shard
& bs
,
2977 RGWBucketInfo
*_bucket_info
,
2978 const std::string
& status_oid
,
2979 RGWContinuousLeaseCR
*lease_cr
,
2980 rgw_bucket_shard_sync_info
& sync_info
,
2981 RGWSyncTraceNodeRef
& _tn_parent
)
2982 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2983 bucket_info(_bucket_info
), lease_cr(lease_cr
), sync_info(sync_info
),
2984 marker_tracker(sync_env
, status_oid
, sync_info
.inc_marker
),
2985 status_oid(status_oid
), zone_id(_sync_env
->store
->svc
.zone
->get_zone().id
),
2986 tn(sync_env
->sync_tracer
->add_node(_tn_parent
, "inc_sync",
2987 SSTR(bucket_shard_str
{bs
})))
2989 set_description() << "bucket shard incremental sync bucket="
2990 << bucket_shard_str
{bs
};
2992 marker_tracker
.set_tn(tn
);
2995 int operate() override
;
2998 int RGWBucketShardIncrementalSyncCR::operate()
3003 if (!lease_cr
->is_locked()) {
3005 tn
->log(0, "ERROR: lease is not taken, abort");
3006 return set_cr_error(-ECANCELED
);
3008 tn
->log(20, SSTR("listing bilog for incremental sync" << sync_info
.inc_marker
.position
));
3009 set_status() << "listing bilog; position=" << sync_info
.inc_marker
.position
;
3010 yield
call(new RGWListBucketIndexLogCR(sync_env
, bs
, sync_info
.inc_marker
.position
,
3012 if (retcode
< 0 && retcode
!= -ENOENT
) {
3013 /* wait for all operations to complete */
3015 return set_cr_error(retcode
);
3018 entries_iter
= list_result
.begin();
3019 entries_end
= list_result
.end();
3020 for (; entries_iter
!= entries_end
; ++entries_iter
) {
3021 auto e
= *entries_iter
;
3022 if (e
.op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
) {
3023 ldout(sync_env
->cct
, 20) << "syncstop on " << e
.timestamp
<< dendl
;
3025 entries_end
= entries_iter
; // dont sync past here
3028 if (e
.op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
3031 if (e
.op
== CLS_RGW_OP_CANCEL
) {
3034 if (e
.state
!= CLS_RGW_STATE_COMPLETE
) {
3037 if (e
.zones_trace
.find(zone_id
) != e
.zones_trace
.end()) {
3040 auto& squash_entry
= squash_map
[make_pair(e
.object
, e
.instance
)];
3041 // don't squash over olh entries - we need to apply their olh_epoch
3042 if (has_olh_epoch(squash_entry
.second
) && !has_olh_epoch(e
.op
)) {
3045 if (squash_entry
.first
<= e
.timestamp
) {
3046 squash_entry
= make_pair
<>(e
.timestamp
, e
.op
);
3050 entries_iter
= list_result
.begin();
3051 for (; entries_iter
!= entries_end
; ++entries_iter
) {
3052 if (!lease_cr
->is_locked()) {
3054 return set_cr_error(-ECANCELED
);
3056 entry
= &(*entries_iter
);
3058 ssize_t p
= entry
->id
.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3062 cur_id
= entry
->id
.substr(p
+ 1);
3065 sync_info
.inc_marker
.position
= cur_id
;
3067 if (entry
->op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
|| entry
->op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
3068 ldout(sync_env
->cct
, 20) << "detected syncstop or resync on " << entries_iter
->timestamp
<< ", skipping entry" << dendl
;
3069 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3073 if (!key
.set(rgw_obj_index_key
{entry
->object
, entry
->instance
})) {
3074 set_status() << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry";
3075 tn
->log(20, SSTR("parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry"));
3076 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3080 tn
->log(20, SSTR("parsed entry: id=" << cur_id
<< " iter->object=" << entry
->object
<< " iter->instance=" << entry
->instance
<< " name=" << key
.name
<< " instance=" << key
.instance
<< " ns=" << key
.ns
));
3082 if (!key
.ns
.empty()) {
3083 set_status() << "skipping entry in namespace: " << entry
->object
;
3084 tn
->log(20, SSTR("skipping entry in namespace: " << entry
->object
));
3085 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3089 set_status() << "got entry.id=" << cur_id
<< " key=" << key
<< " op=" << (int)entry
->op
;
3090 if (entry
->op
== CLS_RGW_OP_CANCEL
) {
3091 set_status() << "canceled operation, skipping";
3092 tn
->log(20, SSTR("skipping object: "
3093 << bucket_shard_str
{bs
} << "/" << key
<< ": canceled operation"));
3094 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3097 if (entry
->state
!= CLS_RGW_STATE_COMPLETE
) {
3098 set_status() << "non-complete operation, skipping";
3099 tn
->log(20, SSTR("skipping object: "
3100 << bucket_shard_str
{bs
} << "/" << key
<< ": non-complete operation"));
3101 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3104 if (entry
->zones_trace
.find(zone_id
) != entry
->zones_trace
.end()) {
3105 set_status() << "redundant operation, skipping";
3106 tn
->log(20, SSTR("skipping object: "
3107 <<bucket_shard_str
{bs
} <<"/"<<key
<<": redundant operation"));
3108 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3111 if (make_pair
<>(entry
->timestamp
, entry
->op
) != squash_map
[make_pair(entry
->object
, entry
->instance
)]) {
3112 set_status() << "squashed operation, skipping";
3113 tn
->log(20, SSTR("skipping object: "
3114 << bucket_shard_str
{bs
} << "/" << key
<< ": squashed operation"));
3115 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3118 tn
->set_flag(RGW_SNS_FLAG_ACTIVE
);
3119 tn
->log(20, SSTR("syncing object: "
3120 << bucket_shard_str
{bs
} << "/" << key
));
3121 updated_status
= false;
3122 while (!marker_tracker
.can_do_op(key
, has_olh_epoch(entry
->op
))) {
3123 if (!updated_status
) {
3124 set_status() << "can't do op, conflicting inflight operation";
3125 updated_status
= true;
3127 tn
->log(5, SSTR("can't do op on key=" << key
<< " need to wait for conflicting operation to complete"));
3128 yield
wait_for_child();
3131 again
= collect(&ret
, nullptr);
3133 tn
->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret
<< ")"));
3135 /* we have reported this error */
3138 if (sync_status
!= 0)
3141 if (sync_status
!= 0) {
3142 /* get error, stop */
3145 if (!marker_tracker
.index_key_to_marker(key
, cur_id
, has_olh_epoch(entry
->op
))) {
3146 set_status() << "can't do op, sync already in progress for object";
3147 tn
->log(20, SSTR("skipping sync of entry: " << cur_id
<< ":" << key
<< " sync already in progress for object"));
3148 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
3152 set_status() << "start object sync";
3153 if (!marker_tracker
.start(cur_id
, 0, entry
->timestamp
)) {
3154 tn
->log(0, SSTR("ERROR: cannot start syncing " << cur_id
<< ". Duplicate entry?"));
3156 std::optional
<uint64_t> versioned_epoch
;
3157 rgw_bucket_entry_owner
owner(entry
->owner
, entry
->owner_display_name
);
3158 if (entry
->ver
.pool
< 0) {
3159 versioned_epoch
= entry
->ver
.epoch
;
3161 tn
->log(20, SSTR("entry->timestamp=" << entry
->timestamp
));
3162 using SyncCR
= RGWBucketSyncSingleEntryCR
<string
, rgw_obj_key
>;
3163 spawn(new SyncCR(sync_env
, bucket_info
, bs
, key
,
3164 entry
->is_versioned(), versioned_epoch
,
3165 entry
->timestamp
, owner
, entry
->op
, entry
->state
,
3166 cur_id
, &marker_tracker
, entry
->zones_trace
, tn
),
3170 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
3171 set_status() << "num_spawned() > spawn_window";
3172 yield
wait_for_child();
3175 again
= collect(&ret
, nullptr);
3177 tn
->log(10, "a sync operation returned error");
3179 /* we have reported this error */
3181 /* not waiting for child here */
3185 } while (!list_result
.empty() && sync_status
== 0 && !syncstopped
);
3187 while (num_spawned()) {
3188 yield
wait_for_child();
3191 again
= collect(&ret
, nullptr);
3193 tn
->log(10, "a sync operation returned error");
3195 /* we have reported this error */
3197 /* not waiting for child here */
3200 tn
->unset_flag(RGW_SNS_FLAG_ACTIVE
);
3203 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3204 // still disabled, we'll delete the sync status object. otherwise we'll
3205 // restart full sync to catch any changes that happened while sync was
3207 sync_info
.state
= rgw_bucket_shard_sync_info::StateInit
;
3208 return set_cr_done();
3211 yield
call(marker_tracker
.flush());
3213 tn
->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode
));
3214 return set_cr_error(retcode
);
3216 if (sync_status
< 0) {
3217 tn
->log(10, SSTR("backing out with sync_status=" << sync_status
));
3218 return set_cr_error(sync_status
);
3220 return set_cr_done();
3225 int RGWRunBucketSyncCoroutine::operate()
3229 set_status("acquiring sync lock");
3230 auto store
= sync_env
->store
;
3231 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
3232 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, status_oid
),
3234 cct
->_conf
->rgw_sync_lease_period
,
3236 lease_stack
.reset(spawn(lease_cr
.get(), false));
3238 while (!lease_cr
->is_locked()) {
3239 if (lease_cr
->is_done()) {
3240 tn
->log(5, "failed to take lease");
3241 set_status("lease lock failed, early abort");
3243 return set_cr_error(lease_cr
->get_ret_status());
3249 tn
->log(10, "took lease");
3250 yield
call(new RGWReadBucketSyncStatusCoroutine(sync_env
, bs
, &sync_status
));
3251 if (retcode
< 0 && retcode
!= -ENOENT
) {
3252 tn
->log(0, "ERROR: failed to read sync status for bucket");
3253 lease_cr
->go_down();
3255 return set_cr_error(retcode
);
3258 tn
->log(20, SSTR("sync status for bucket: " << sync_status
.state
));
3260 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
3261 if (retcode
== -ENOENT
) {
3262 /* bucket instance info has not been synced in yet, fetch it now */
3264 tn
->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
3265 string raw_key
= string("bucket.instance:") + bs
.bucket
.get_key();
3267 meta_sync_env
.init(sync_env
->dpp
, cct
, sync_env
->store
, sync_env
->store
->svc
.zone
->get_master_conn(), sync_env
->async_rados
,
3268 sync_env
->http_manager
, sync_env
->error_logger
, sync_env
->sync_tracer
);
3270 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env
, raw_key
,
3271 string() /* no marker */,
3272 MDLOG_STATUS_COMPLETE
,
3273 NULL
/* no marker tracker */,
3277 tn
->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str
{bs
.bucket
}));
3278 lease_cr
->go_down();
3280 return set_cr_error(retcode
);
3283 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
3286 tn
->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{bs
.bucket
}));
3287 lease_cr
->go_down();
3289 return set_cr_error(retcode
);
3293 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateInit
) {
3294 yield
call(new RGWInitBucketShardSyncStatusCoroutine(sync_env
, bs
, sync_status
));
3295 if (retcode
== -ENOENT
) {
3296 tn
->log(0, "bucket sync disabled");
3297 lease_cr
->abort(); // deleted lease object, abort/wakeup instead of unlock
3301 return set_cr_done();
3304 tn
->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode
));
3305 lease_cr
->go_down();
3307 return set_cr_error(retcode
);
3311 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateFullSync
) {
3312 yield
call(new RGWBucketShardFullSyncCR(sync_env
, bs
, &bucket_info
,
3313 status_oid
, lease_cr
.get(),
3316 tn
->log(5, SSTR("full sync on bucket failed, retcode=" << retcode
));
3317 lease_cr
->go_down();
3319 return set_cr_error(retcode
);
3323 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateIncrementalSync
) {
3324 yield
call(new RGWBucketShardIncrementalSyncCR(sync_env
, bs
, &bucket_info
,
3325 status_oid
, lease_cr
.get(),
3328 tn
->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode
));
3329 lease_cr
->go_down();
3331 return set_cr_error(retcode
);
3334 // loop back to previous states unless incremental sync returns normally
3335 } while (sync_status
.state
!= rgw_bucket_shard_sync_info::StateIncrementalSync
);
3337 lease_cr
->go_down();
3339 return set_cr_done();
3345 RGWCoroutine
*RGWRemoteBucketLog::run_sync_cr()
3347 return new RGWRunBucketSyncCoroutine(&sync_env
, bs
, sync_env
.sync_tracer
->root_node
);
3350 int RGWBucketSyncStatusManager::init()
3352 conn
= store
->svc
.zone
->get_zone_conn_by_id(source_zone
);
3354 ldpp_dout(this, 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
3358 int ret
= http_manager
.start();
3360 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
3365 const string key
= bucket
.get_key();
3367 rgw_http_param_pair pairs
[] = { { "key", key
.c_str() },
3370 string path
= string("/admin/metadata/bucket.instance");
3372 bucket_instance_meta_info result
;
3373 ret
= cr_mgr
.run(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), conn
, &http_manager
, path
, pairs
, &result
));
3375 ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone
<< " path=" << path
<< " key=" << key
<< " ret=" << ret
<< dendl
;
3379 RGWBucketInfo
& bi
= result
.data
.get_bucket_info();
3380 num_shards
= bi
.num_shards
;
3382 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
3384 sync_module
.reset(new RGWDefaultSyncModuleInstance());
3386 int effective_num_shards
= (num_shards
? num_shards
: 1);
3388 auto async_rados
= store
->get_async_rados();
3390 for (int i
= 0; i
< effective_num_shards
; i
++) {
3391 RGWRemoteBucketLog
*l
= new RGWRemoteBucketLog(this, store
, this, async_rados
, &http_manager
);
3392 ret
= l
->init(source_zone
, conn
, bucket
, (num_shards
? i
: -1), error_logger
, store
->get_sync_tracer(), sync_module
);
3394 ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl
;
3403 int RGWBucketSyncStatusManager::init_sync_status()
3405 list
<RGWCoroutinesStack
*> stacks
;
3407 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3408 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3409 RGWRemoteBucketLog
*l
= iter
->second
;
3410 stack
->call(l
->init_sync_status_cr());
3412 stacks
.push_back(stack
);
3415 return cr_mgr
.run(stacks
);
3418 int RGWBucketSyncStatusManager::read_sync_status()
3420 list
<RGWCoroutinesStack
*> stacks
;
3422 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3423 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3424 RGWRemoteBucketLog
*l
= iter
->second
;
3425 stack
->call(l
->read_sync_status_cr(&sync_status
[iter
->first
]));
3427 stacks
.push_back(stack
);
3430 int ret
= cr_mgr
.run(stacks
);
3432 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
3433 << bucket_str
{bucket
} << dendl
;
3440 int RGWBucketSyncStatusManager::run()
3442 list
<RGWCoroutinesStack
*> stacks
;
3444 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3445 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3446 RGWRemoteBucketLog
*l
= iter
->second
;
3447 stack
->call(l
->run_sync_cr());
3449 stacks
.push_back(stack
);
3452 int ret
= cr_mgr
.run(stacks
);
3454 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
3455 << bucket_str
{bucket
} << dendl
;
3462 unsigned RGWBucketSyncStatusManager::get_subsys() const
3467 std::ostream
& RGWBucketSyncStatusManager::gen_prefix(std::ostream
& out
) const
3469 auto zone
= std::string_view
{source_zone
};
3470 return out
<< "bucket sync zone:" << zone
.substr(0, 8)
3471 << " bucket:" << bucket
.name
<< ' ';
3474 string
RGWBucketSyncStatusManager::status_oid(const string
& source_zone
,
3475 const rgw_bucket_shard
& bs
)
3477 return bucket_status_oid_prefix
+ "." + source_zone
+ ":" + bs
.get_key();
3480 string
RGWBucketSyncStatusManager::obj_status_oid(const string
& source_zone
,
3483 return object_status_oid_prefix
+ "." + source_zone
+ ":" + obj
.bucket
.get_key() + ":" +
3484 obj
.key
.name
+ ":" + obj
.key
.instance
;
3487 class RGWCollectBucketSyncStatusCR
: public RGWShardCollectCR
{
3488 static constexpr int max_concurrent_shards
= 16;
3489 RGWRados
*const store
;
3490 RGWDataSyncEnv
*const env
;
3491 const int num_shards
;
3492 rgw_bucket_shard bs
;
3494 using Vector
= std::vector
<rgw_bucket_shard_sync_info
>;
3495 Vector::iterator i
, end
;
3498 RGWCollectBucketSyncStatusCR(RGWRados
*store
, RGWDataSyncEnv
*env
,
3499 int num_shards
, const rgw_bucket
& bucket
,
3501 : RGWShardCollectCR(store
->ctx(), max_concurrent_shards
),
3502 store(store
), env(env
), num_shards(num_shards
),
3503 bs(bucket
, num_shards
> 0 ? 0 : -1), // start at shard 0 or -1
3504 i(status
->begin()), end(status
->end())
3507 bool spawn_next() override
{
3511 spawn(new RGWReadBucketSyncStatusCoroutine(env
, bs
, &*i
), false);
3518 int rgw_bucket_sync_status(const DoutPrefixProvider
*dpp
, RGWRados
*store
, const std::string
& source_zone
,
3519 const RGWBucketInfo
& bucket_info
,
3520 std::vector
<rgw_bucket_shard_sync_info
> *status
)
3522 const auto num_shards
= bucket_info
.num_shards
;
3524 status
->resize(std::max
<size_t>(1, num_shards
));
3527 RGWSyncModuleInstanceRef module
; // null sync module
3528 env
.init(dpp
, store
->ctx(), store
, nullptr, store
->get_async_rados(),
3529 nullptr, nullptr, nullptr, source_zone
, module
, nullptr);
3531 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
3532 return crs
.run(new RGWCollectBucketSyncStatusCR(store
, &env
, num_shards
,
3533 bucket_info
.bucket
, status
));
3537 // TODO: move into rgw_data_sync_trim.cc
3539 #define dout_prefix (*_dout << "data trim: ")
3543 /// return the marker that it's safe to trim up to
3544 const std::string
& get_stable_marker(const rgw_data_sync_marker
& m
)
3546 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
3549 /// populate the container starting with 'dest' with the minimum stable marker
3550 /// of each shard for all of the peers in [first, last)
3551 template <typename IterIn
, typename IterOut
>
3552 void take_min_markers(IterIn first
, IterIn last
, IterOut dest
)
3554 if (first
== last
) {
3557 for (auto p
= first
; p
!= last
; ++p
) {
3559 for (auto &shard
: p
->sync_markers
) {
3560 const auto& stable
= get_stable_marker(shard
.second
);
3569 } // anonymous namespace
3571 class DataLogTrimCR
: public RGWCoroutine
{
3572 using TrimCR
= RGWSyncLogTrimCR
;
3574 RGWHTTPManager
*http
;
3575 const int num_shards
;
3576 const std::string
& zone_id
; //< my zone id
3577 std::vector
<rgw_data_sync_status
> peer_status
; //< sync status for each peer
3578 std::vector
<std::string
> min_shard_markers
; //< min marker per shard
3579 std::vector
<std::string
>& last_trim
; //< last trimmed marker per shard
3583 DataLogTrimCR(RGWRados
*store
, RGWHTTPManager
*http
,
3584 int num_shards
, std::vector
<std::string
>& last_trim
)
3585 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3586 num_shards(num_shards
),
3587 zone_id(store
->svc
.zone
->get_zone().id
),
3588 peer_status(store
->svc
.zone
->get_zone_data_notify_to_map().size()),
3589 min_shard_markers(num_shards
, TrimCR::max_marker
),
3590 last_trim(last_trim
)
3593 int operate() override
;
3596 int DataLogTrimCR::operate()
3599 ldout(cct
, 10) << "fetching sync status for zone " << zone_id
<< dendl
;
3600 set_status("fetching sync status");
3602 // query data sync status from each sync peer
3603 rgw_http_param_pair params
[] = {
3605 { "status", nullptr },
3606 { "source-zone", zone_id
.c_str() },
3607 { nullptr, nullptr }
3610 auto p
= peer_status
.begin();
3611 for (auto& c
: store
->svc
.zone
->get_zone_data_notify_to_map()) {
3612 ldout(cct
, 20) << "query sync status from " << c
.first
<< dendl
;
3613 using StatusCR
= RGWReadRESTResourceCR
<rgw_data_sync_status
>;
3614 spawn(new StatusCR(cct
, c
.second
, http
, "/admin/log/", params
, &*p
),
3620 // must get a successful reply from all peers to consider trimming
3622 while (ret
== 0 && num_spawned() > 0) {
3623 yield
wait_for_child();
3629 ldout(cct
, 4) << "failed to fetch sync status from all peers" << dendl
;
3630 return set_cr_error(ret
);
3633 ldout(cct
, 10) << "trimming log shards" << dendl
;
3634 set_status("trimming log shards");
3636 // determine the minimum marker for each shard
3637 take_min_markers(peer_status
.begin(), peer_status
.end(),
3638 min_shard_markers
.begin());
3640 for (int i
= 0; i
< num_shards
; i
++) {
3641 const auto& m
= min_shard_markers
[i
];
3642 if (m
<= last_trim
[i
]) {
3645 ldout(cct
, 10) << "trimming log shard " << i
3646 << " at marker=" << m
3647 << " last_trim=" << last_trim
[i
] << dendl
;
3648 spawn(new TrimCR(store
, store
->data_log
->get_oid(i
),
3653 return set_cr_done();
3658 RGWCoroutine
* create_admin_data_log_trim_cr(RGWRados
*store
,
3659 RGWHTTPManager
*http
,
3661 std::vector
<std::string
>& markers
)
3663 return new DataLogTrimCR(store
, http
, num_shards
, markers
);
3666 class DataLogTrimPollCR
: public RGWCoroutine
{
3668 RGWHTTPManager
*http
;
3669 const int num_shards
;
3670 const utime_t interval
; //< polling interval
3671 const std::string lock_oid
; //< use first data log shard for lock
3672 const std::string lock_cookie
;
3673 std::vector
<std::string
> last_trim
; //< last trimmed marker per shard
3676 DataLogTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
3677 int num_shards
, utime_t interval
)
3678 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3679 num_shards(num_shards
), interval(interval
),
3680 lock_oid(store
->data_log
->get_oid(0)),
3681 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
3682 last_trim(num_shards
)
3685 int operate() override
;
3688 int DataLogTrimPollCR::operate()
3692 set_status("sleeping");
3695 // request a 'data_trim' lock that covers the entire wait interval to
3696 // prevent other gateways from attempting to trim for the duration
3697 set_status("acquiring trim lock");
3698 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
3699 rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, lock_oid
),
3700 "data_trim", lock_cookie
,
3703 // if the lock is already held, go back to sleep and try again later
3704 ldout(cct
, 4) << "failed to lock " << lock_oid
<< ", trying again in "
3705 << interval
.sec() << "s" << dendl
;
3709 set_status("trimming");
3710 yield
call(new DataLogTrimCR(store
, http
, num_shards
, last_trim
));
3712 // note that the lock is not released. this is intentional, as it avoids
3713 // duplicating this work in other gateways
3719 RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
3720 RGWHTTPManager
*http
,
3721 int num_shards
, utime_t interval
)
3723 return new DataLogTrimPollCR(store
, http
, num_shards
, interval
);