1 #include <boost/utility/string_ref.hpp>
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
21 #include "rgw_sync_log_trim.h"
23 #include "cls/lock/cls_lock_client.h"
25 #include "auth/Crypto.h"
27 #include <boost/asio/yield.hpp>
29 #define dout_subsys ceph_subsys_rgw
32 #define dout_prefix (*_dout << "data sync: ")
34 static string datalog_sync_status_oid_prefix
= "datalog.sync-status";
35 static string datalog_sync_status_shard_prefix
= "datalog.sync-status.shard";
36 static string datalog_sync_full_sync_index_prefix
= "data.full-sync.index";
37 static string bucket_status_oid_prefix
= "bucket.sync-status";
39 class RGWSyncDebugLogger
{
46 RGWSyncDebugLogger(CephContext
*_cct
, const string
& source_zone
,
47 const string
& sync_type
, const string
& sync_stage
,
48 const string
& resource
, bool log_start
= true) {
49 init(_cct
, source_zone
, sync_type
, sync_stage
, resource
, log_start
);
51 RGWSyncDebugLogger() : cct(NULL
), ended(false) {}
52 ~RGWSyncDebugLogger();
54 void init(CephContext
*_cct
, const string
& source_zone
,
55 const string
& sync_type
, const string
& sync_stage
,
56 const string
& resource
, bool log_start
= true);
57 void log(const string
& state
);
58 void finish(int status
);
61 void RGWSyncDebugLogger::init(CephContext
*_cct
, const string
& source_zone
,
62 const string
& sync_type
, const string
& sync_section
,
63 const string
& resource
, bool log_start
)
67 string zone_str
= source_zone
.substr(0, 8);
68 prefix
= "Sync:" + zone_str
+ ":" + sync_type
+ ":" + sync_section
+ ":" + resource
;
74 RGWSyncDebugLogger::~RGWSyncDebugLogger()
81 void RGWSyncDebugLogger::log(const string
& state
)
83 ldout(cct
, 5) << prefix
<< ":" << state
<< dendl
;
86 void RGWSyncDebugLogger::finish(int status
)
89 ldout(cct
, 5) << prefix
<< ":" << "finish r=" << status
<< dendl
;
92 class RGWDataSyncDebugLogger
: public RGWSyncDebugLogger
{
94 RGWDataSyncDebugLogger() {}
95 RGWDataSyncDebugLogger(RGWDataSyncEnv
*sync_env
, const string
& sync_section
,
96 const string
& resource
, bool log_start
= true) {
97 init(sync_env
, sync_section
, resource
, log_start
);
99 void init(RGWDataSyncEnv
*sync_env
, const string
& sync_section
,
100 const string
& resource
, bool log_start
= true) {
101 RGWSyncDebugLogger::init(sync_env
->cct
, sync_env
->source_zone
, "data", sync_section
, resource
, log_start
);
106 void rgw_datalog_info::decode_json(JSONObj
*obj
) {
107 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
110 void rgw_datalog_entry::decode_json(JSONObj
*obj
) {
111 JSONDecoder::decode_json("key", key
, obj
);
113 JSONDecoder::decode_json("timestamp", ut
, obj
);
114 timestamp
= ut
.to_real_time();
117 void rgw_datalog_shard_data::decode_json(JSONObj
*obj
) {
118 JSONDecoder::decode_json("marker", marker
, obj
);
119 JSONDecoder::decode_json("truncated", truncated
, obj
);
120 JSONDecoder::decode_json("entries", entries
, obj
);
123 class RGWReadDataSyncStatusMarkersCR
: public RGWShardCollectCR
{
124 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
127 const int num_shards
;
130 map
<uint32_t, rgw_data_sync_marker
>& markers
;
133 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv
*env
, int num_shards
,
134 map
<uint32_t, rgw_data_sync_marker
>& markers
)
135 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
136 env(env
), num_shards(num_shards
), markers(markers
)
138 bool spawn_next() override
;
141 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
143 if (shard_id
>= num_shards
) {
146 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
147 spawn(new CR(env
->async_rados
, env
->store
,
148 rgw_raw_obj(env
->store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(env
->source_zone
, shard_id
)),
155 class RGWReadDataSyncRecoveringShardsCR
: public RGWShardCollectCR
{
156 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
160 uint64_t max_entries
;
165 map
<int, std::set
<std::string
>> &entries_map
;
168 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv
*env
, uint64_t _max_entries
, int _num_shards
,
169 map
<int, std::set
<std::string
>>& _entries_map
)
170 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
), env(env
),
171 max_entries(_max_entries
), num_shards(_num_shards
), entries_map(_entries_map
)
173 bool spawn_next() override
;
176 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
178 if (shard_id
> num_shards
)
181 string error_oid
= RGWDataSyncStatusManager::shard_obj_name(env
->source_zone
, shard_id
) + ".retry";
182 spawn(new RGWRadosGetOmapKeysCR(env
->store
, rgw_raw_obj(env
->store
->get_zone_params().log_pool
, error_oid
),
183 marker
, &entries_map
[shard_id
], max_entries
), false);
189 class RGWReadDataSyncStatusCoroutine
: public RGWCoroutine
{
190 RGWDataSyncEnv
*sync_env
;
191 rgw_data_sync_status
*sync_status
;
194 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
195 rgw_data_sync_status
*_status
)
196 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
198 int operate() override
;
201 int RGWReadDataSyncStatusCoroutine::operate()
205 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_data_sync_info
>;
207 bool empty_on_enoent
= false; // fail on ENOENT
208 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
,
209 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
210 &sync_status
->sync_info
, empty_on_enoent
));
213 ldout(sync_env
->cct
, 4) << "failed to read sync status info with "
214 << cpp_strerror(retcode
) << dendl
;
215 return set_cr_error(retcode
);
217 // read shard markers
218 using ReadMarkersCR
= RGWReadDataSyncStatusMarkersCR
;
219 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
220 sync_status
->sync_markers
));
222 ldout(sync_env
->cct
, 4) << "failed to read sync status markers with "
223 << cpp_strerror(retcode
) << dendl
;
224 return set_cr_error(retcode
);
226 return set_cr_done();
231 class RGWReadRemoteDataLogShardInfoCR
: public RGWCoroutine
{
232 RGWDataSyncEnv
*sync_env
;
234 RGWRESTReadResource
*http_op
;
237 RGWDataChangesLogInfo
*shard_info
;
240 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv
*_sync_env
,
241 int _shard_id
, RGWDataChangesLogInfo
*_shard_info
) : RGWCoroutine(_sync_env
->cct
),
245 shard_info(_shard_info
) {
248 ~RGWReadRemoteDataLogShardInfoCR() override
{
254 int operate() override
{
258 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
259 rgw_http_param_pair pairs
[] = { { "type" , "data" },
264 string p
= "/admin/log/";
266 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
268 http_op
->set_user_info((void *)stack
);
270 int ret
= http_op
->aio_read();
272 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
273 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
274 return set_cr_error(ret
);
280 int ret
= http_op
->wait(shard_info
);
282 return set_cr_error(ret
);
284 return set_cr_done();
291 struct read_remote_data_log_response
{
294 list
<rgw_data_change_log_entry
> entries
;
296 read_remote_data_log_response() : truncated(false) {}
298 void decode_json(JSONObj
*obj
) {
299 JSONDecoder::decode_json("marker", marker
, obj
);
300 JSONDecoder::decode_json("truncated", truncated
, obj
);
301 JSONDecoder::decode_json("entries", entries
, obj
);
305 class RGWReadRemoteDataLogShardCR
: public RGWCoroutine
{
306 RGWDataSyncEnv
*sync_env
;
308 RGWRESTReadResource
*http_op
;
312 list
<rgw_data_change_log_entry
> *entries
;
315 read_remote_data_log_response response
;
318 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv
*_sync_env
,
319 int _shard_id
, string
*_pmarker
, list
<rgw_data_change_log_entry
> *_entries
, bool *_truncated
) : RGWCoroutine(_sync_env
->cct
),
325 truncated(_truncated
) {
327 ~RGWReadRemoteDataLogShardCR() override
{
333 int operate() override
{
337 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
338 rgw_http_param_pair pairs
[] = { { "type" , "data" },
340 { "marker", pmarker
->c_str() },
341 { "extra-info", "true" },
344 string p
= "/admin/log/";
346 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
348 http_op
->set_user_info((void *)stack
);
350 int ret
= http_op
->aio_read();
352 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
353 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
354 return set_cr_error(ret
);
360 int ret
= http_op
->wait(&response
);
362 return set_cr_error(ret
);
365 entries
->swap(response
.entries
);
366 *pmarker
= response
.marker
;
367 *truncated
= response
.truncated
;
368 return set_cr_done();
375 class RGWReadRemoteDataLogInfoCR
: public RGWShardCollectCR
{
376 RGWDataSyncEnv
*sync_env
;
379 map
<int, RGWDataChangesLogInfo
> *datalog_info
;
382 #define READ_DATALOG_MAX_CONCURRENT 10
385 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv
*_sync_env
,
387 map
<int, RGWDataChangesLogInfo
> *_datalog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
388 sync_env(_sync_env
), num_shards(_num_shards
),
389 datalog_info(_datalog_info
), shard_id(0) {}
390 bool spawn_next() override
;
393 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
394 if (shard_id
>= num_shards
) {
397 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, shard_id
, &(*datalog_info
)[shard_id
]), false);
402 class RGWListRemoteDataLogShardCR
: public RGWSimpleCoroutine
{
403 RGWDataSyncEnv
*sync_env
;
404 RGWRESTReadResource
*http_op
;
408 uint32_t max_entries
;
409 rgw_datalog_shard_data
*result
;
412 RGWListRemoteDataLogShardCR(RGWDataSyncEnv
*env
, int _shard_id
,
413 const string
& _marker
, uint32_t _max_entries
,
414 rgw_datalog_shard_data
*_result
)
415 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
416 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
418 int send_request() override
{
419 RGWRESTConn
*conn
= sync_env
->conn
;
420 RGWRados
*store
= sync_env
->store
;
423 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
425 char max_entries_buf
[32];
426 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
428 const char *marker_key
= (marker
.empty() ? "" : "marker");
430 rgw_http_param_pair pairs
[] = { { "type", "data" },
432 { "max-entries", max_entries_buf
},
433 { marker_key
, marker
.c_str() },
436 string p
= "/admin/log/";
438 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
439 http_op
->set_user_info((void *)stack
);
441 int ret
= http_op
->aio_read();
443 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
444 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
452 int request_complete() override
{
453 int ret
= http_op
->wait(result
);
455 if (ret
< 0 && ret
!= -ENOENT
) {
456 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret
<< dendl
;
463 class RGWListRemoteDataLogCR
: public RGWShardCollectCR
{
464 RGWDataSyncEnv
*sync_env
;
466 map
<int, string
> shards
;
467 int max_entries_per_shard
;
468 map
<int, rgw_datalog_shard_data
> *result
;
470 map
<int, string
>::iterator iter
;
471 #define READ_DATALOG_MAX_CONCURRENT 10
474 RGWListRemoteDataLogCR(RGWDataSyncEnv
*_sync_env
,
475 map
<int, string
>& _shards
,
476 int _max_entries_per_shard
,
477 map
<int, rgw_datalog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
478 sync_env(_sync_env
), max_entries_per_shard(_max_entries_per_shard
),
480 shards
.swap(_shards
);
481 iter
= shards
.begin();
483 bool spawn_next() override
;
486 bool RGWListRemoteDataLogCR::spawn_next() {
487 if (iter
== shards
.end()) {
491 spawn(new RGWListRemoteDataLogShardCR(sync_env
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
496 class RGWInitDataSyncStatusCoroutine
: public RGWCoroutine
{
497 static constexpr uint32_t lock_duration
= 30;
498 RGWDataSyncEnv
*sync_env
;
500 const rgw_pool
& pool
;
501 const uint32_t num_shards
;
503 string sync_status_oid
;
507 rgw_data_sync_status
*status
;
508 map
<int, RGWDataChangesLogInfo
> shards_info
;
510 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
, uint32_t num_shards
,
511 uint64_t instance_id
,
512 rgw_data_sync_status
*status
)
513 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), store(sync_env
->store
),
514 pool(store
->get_zone_params().log_pool
),
515 num_shards(num_shards
), status(status
) {
516 lock_name
= "sync_lock";
518 status
->sync_info
.instance_id
= instance_id
;
520 #define COOKIE_LEN 16
521 char buf
[COOKIE_LEN
+ 1];
523 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
526 sync_status_oid
= RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
);
529 int operate() override
{
532 using LockCR
= RGWSimpleRadosLockCR
;
533 yield
call(new LockCR(sync_env
->async_rados
, store
,
534 rgw_raw_obj
{pool
, sync_status_oid
},
535 lock_name
, cookie
, lock_duration
));
537 ldout(cct
, 0) << "ERROR: failed to take a lock on " << sync_status_oid
<< dendl
;
538 return set_cr_error(retcode
);
540 using WriteInfoCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_info
>;
541 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
,
542 rgw_raw_obj
{pool
, sync_status_oid
},
545 ldout(cct
, 0) << "ERROR: failed to write sync status info with " << retcode
<< dendl
;
546 return set_cr_error(retcode
);
549 /* take lock again, we just recreated the object */
550 yield
call(new LockCR(sync_env
->async_rados
, store
,
551 rgw_raw_obj
{pool
, sync_status_oid
},
552 lock_name
, cookie
, lock_duration
));
554 ldout(cct
, 0) << "ERROR: failed to take a lock on " << sync_status_oid
<< dendl
;
555 return set_cr_error(retcode
);
558 /* fetch current position in logs */
560 RGWRESTConn
*conn
= store
->get_zone_conn_by_id(sync_env
->source_zone
);
562 ldout(cct
, 0) << "ERROR: connection to zone " << sync_env
->source_zone
<< " does not exist!" << dendl
;
563 return set_cr_error(-EIO
);
565 for (uint32_t i
= 0; i
< num_shards
; i
++) {
566 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, i
, &shards_info
[i
]), true);
569 while (collect(&ret
, NULL
)) {
571 ldout(cct
, 0) << "ERROR: failed to read remote data log shards" << dendl
;
572 return set_state(RGWCoroutine_Error
);
577 for (uint32_t i
= 0; i
< num_shards
; i
++) {
578 RGWDataChangesLogInfo
& info
= shards_info
[i
];
579 auto& marker
= status
->sync_markers
[i
];
580 marker
.next_step_marker
= info
.marker
;
581 marker
.timestamp
= info
.last_update
;
582 const auto& oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, i
);
583 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>;
584 spawn(new WriteMarkerCR(sync_env
->async_rados
, store
,
585 rgw_raw_obj
{pool
, oid
}, marker
), true);
588 while (collect(&ret
, NULL
)) {
590 ldout(cct
, 0) << "ERROR: failed to write data sync status markers" << dendl
;
591 return set_state(RGWCoroutine_Error
);
596 status
->sync_info
.state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
597 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
,
598 rgw_raw_obj
{pool
, sync_status_oid
},
601 ldout(cct
, 0) << "ERROR: failed to write sync status info with " << retcode
<< dendl
;
602 return set_cr_error(retcode
);
604 yield
call(new RGWSimpleRadosUnlockCR(sync_env
->async_rados
, store
,
605 rgw_raw_obj
{pool
, sync_status_oid
},
607 return set_cr_done();
613 int RGWRemoteDataLog::read_log_info(rgw_datalog_info
*log_info
)
615 rgw_http_param_pair pairs
[] = { { "type", "data" },
618 int ret
= sync_env
.conn
->get_json_resource("/admin/log", pairs
, *log_info
);
620 ldout(store
->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl
;
624 ldout(store
->ctx(), 20) << "remote datalog, num_shards=" << log_info
->num_shards
<< dendl
;
629 int RGWRemoteDataLog::read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
)
631 rgw_datalog_info log_info
;
632 int ret
= read_log_info(&log_info
);
637 return run(new RGWReadRemoteDataLogInfoCR(&sync_env
, log_info
.num_shards
, shards_info
));
640 int RGWRemoteDataLog::read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
)
642 if (store
->is_meta_master()) {
646 return run(new RGWListRemoteDataLogCR(&sync_env
, shard_markers
, 1, result
));
649 int RGWRemoteDataLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& _sync_module
)
651 sync_env
.init(store
->ctx(), store
, _conn
, async_rados
, &http_manager
, _error_logger
,
652 _source_zone
, _sync_module
, observer
);
658 int ret
= http_manager
.set_threaded();
660 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
669 void RGWRemoteDataLog::finish()
674 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status
*sync_status
)
676 // cannot run concurrently with run_sync(), so run in a separate manager
677 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
678 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
679 int ret
= http_manager
.set_threaded();
681 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
684 RGWDataSyncEnv sync_env_local
= sync_env
;
685 sync_env_local
.http_manager
= &http_manager
;
686 ret
= crs
.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local
, sync_status
));
691 int RGWRemoteDataLog::read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
)
693 // cannot run concurrently with run_sync(), so run in a separate manager
694 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
695 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
696 int ret
= http_manager
.set_threaded();
698 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
701 RGWDataSyncEnv sync_env_local
= sync_env
;
702 sync_env_local
.http_manager
= &http_manager
;
703 map
<int, std::set
<std::string
>> entries_map
;
704 uint64_t max_entries
{1};
705 ret
= crs
.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local
, max_entries
, num_shards
, entries_map
));
709 for (const auto& entry
: entries_map
) {
710 if (entry
.second
.size() != 0) {
711 recovering_shards
.insert(entry
.first
);
719 int RGWRemoteDataLog::init_sync_status(int num_shards
)
721 rgw_data_sync_status sync_status
;
722 sync_status
.sync_info
.num_shards
= num_shards
;
724 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
725 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
726 int ret
= http_manager
.set_threaded();
728 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
731 RGWDataSyncEnv sync_env_local
= sync_env
;
732 sync_env_local
.http_manager
= &http_manager
;
733 uint64_t instance_id
;
734 get_random_bytes((char *)&instance_id
, sizeof(instance_id
));
735 ret
= crs
.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local
, num_shards
, instance_id
, &sync_status
));
740 static string
full_data_sync_index_shard_oid(const string
& source_zone
, int shard_id
)
742 char buf
[datalog_sync_full_sync_index_prefix
.size() + 1 + source_zone
.size() + 1 + 16];
743 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_full_sync_index_prefix
.c_str(), source_zone
.c_str(), shard_id
);
747 struct bucket_instance_meta_info
{
751 RGWBucketInstanceMetadataObject data
;
753 bucket_instance_meta_info() {}
755 void decode_json(JSONObj
*obj
) {
756 JSONDecoder::decode_json("key", key
, obj
);
757 JSONDecoder::decode_json("ver", ver
, obj
);
758 JSONDecoder::decode_json("mtime", mtime
, obj
);
759 JSONDecoder::decode_json("data", data
, obj
);
763 class RGWListBucketIndexesCR
: public RGWCoroutine
{
764 RGWDataSyncEnv
*sync_env
;
768 rgw_data_sync_status
*sync_status
;
775 list
<string
>::iterator iter
;
777 RGWShardedOmapCRManager
*entries_index
;
782 bucket_instance_meta_info meta_info
;
790 RGWListBucketIndexesCR(RGWDataSyncEnv
*_sync_env
,
791 rgw_data_sync_status
*_sync_status
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
792 store(sync_env
->store
), sync_status(_sync_status
),
793 req_ret(0), ret(0), entries_index(NULL
), i(0), failed(false) {
794 oid_prefix
= datalog_sync_full_sync_index_prefix
+ "." + sync_env
->source_zone
;
795 path
= "/admin/metadata/bucket.instance";
796 num_shards
= sync_status
->sync_info
.num_shards
;
798 ~RGWListBucketIndexesCR() override
{
799 delete entries_index
;
802 int operate() override
{
805 string entrypoint
= string("/admin/metadata/bucket.instance");
806 /* FIXME: need a better scaling solution here, requires streaming output */
807 call(new RGWReadRESTResourceCR
<list
<string
> >(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
,
808 entrypoint
, NULL
, &result
));
811 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl
;
812 return set_cr_error(retcode
);
814 entries_index
= new RGWShardedOmapCRManager(sync_env
->async_rados
, store
, this, num_shards
,
815 store
->get_zone_params().log_pool
,
817 yield
; // yield so OmapAppendCRs can start
818 for (iter
= result
.begin(); iter
!= result
.end(); ++iter
) {
819 ldout(sync_env
->cct
, 20) << "list metadata: section=bucket.index key=" << *iter
<< dendl
;
824 rgw_http_param_pair pairs
[] = { { "key", key
.c_str() },
827 call(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
, path
, pairs
, &meta_info
));
830 num_shards
= meta_info
.data
.get_bucket_info().num_shards
;
831 if (num_shards
> 0) {
832 for (i
= 0; i
< num_shards
; i
++) {
834 snprintf(buf
, sizeof(buf
), ":%d", i
);
836 yield entries_index
->append(s
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, i
));
839 yield entries_index
->append(key
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, -1));
843 if (!entries_index
->finish()) {
848 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
->sync_markers
.begin(); iter
!= sync_status
->sync_markers
.end(); ++iter
) {
849 int shard_id
= (int)iter
->first
;
850 rgw_data_sync_marker
& marker
= iter
->second
;
851 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
852 spawn(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
853 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
857 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
858 EIO
, string("failed to build bucket instances map")));
860 while (collect(&ret
, NULL
)) {
862 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
863 -ret
, string("failed to store sync status: ") + cpp_strerror(-ret
)));
870 yield
return set_cr_error(req_ret
);
872 yield
return set_cr_done();
878 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
880 class RGWDataSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
881 RGWDataSyncEnv
*sync_env
;
884 rgw_data_sync_marker sync_marker
;
886 map
<string
, string
> key_to_marker
;
887 map
<string
, string
> marker_to_key
;
889 void handle_finish(const string
& marker
) override
{
890 map
<string
, string
>::iterator iter
= marker_to_key
.find(marker
);
891 if (iter
== marker_to_key
.end()) {
894 key_to_marker
.erase(iter
->second
);
895 reset_need_retry(iter
->second
);
896 marker_to_key
.erase(iter
);
900 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
901 const string
& _marker_oid
,
902 const rgw_data_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW
),
904 marker_oid(_marker_oid
),
905 sync_marker(_marker
) {}
907 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
908 sync_marker
.marker
= new_marker
;
909 sync_marker
.pos
= index_pos
;
911 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
912 RGWRados
*store
= sync_env
->store
;
914 return new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
915 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
920 * create index from key -> marker, and from marker -> key
921 * this is useful so that we can insure that we only have one
922 * entry for any key that is used. This is needed when doing
923 * incremenatl sync of data, and we don't want to run multiple
924 * concurrent sync operations for the same bucket shard
926 bool index_key_to_marker(const string
& key
, const string
& marker
) {
927 if (key_to_marker
.find(key
) != key_to_marker
.end()) {
931 key_to_marker
[key
] = marker
;
932 marker_to_key
[marker
] = key
;
937 // ostream wrappers to print buckets without copying strings
940 bucket_str(const rgw_bucket
& b
) : b(b
) {}
942 std::ostream
& operator<<(std::ostream
& out
, const bucket_str
& rhs
) {
944 if (!b
.tenant
.empty()) {
945 out
<< b
.tenant
<< '/';
948 if (!b
.bucket_id
.empty()) {
949 out
<< ':' << b
.bucket_id
;
954 struct bucket_shard_str
{
955 const rgw_bucket_shard
& bs
;
956 bucket_shard_str(const rgw_bucket_shard
& bs
) : bs(bs
) {}
958 std::ostream
& operator<<(std::ostream
& out
, const bucket_shard_str
& rhs
) {
960 out
<< bucket_str
{bs
.bucket
};
961 if (bs
.shard_id
>= 0) {
962 out
<< ':' << bs
.shard_id
;
967 class RGWRunBucketSyncCoroutine
: public RGWCoroutine
{
968 RGWDataSyncEnv
*sync_env
;
970 RGWBucketInfo bucket_info
;
971 rgw_bucket_shard_sync_info sync_status
;
972 RGWMetaSyncEnv meta_sync_env
;
974 RGWDataSyncDebugLogger logger
;
975 const std::string status_oid
;
977 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
978 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
981 RGWRunBucketSyncCoroutine(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
)
982 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
983 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)) {
984 logger
.init(sync_env
, "Bucket", bs
.get_key());
986 ~RGWRunBucketSyncCoroutine() override
{
992 int operate() override
;
995 class RGWDataSyncSingleEntryCR
: public RGWCoroutine
{
996 RGWDataSyncEnv
*sync_env
;
1001 rgw_bucket_shard bs
;
1007 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1009 boost::intrusive_ptr
<RGWOmapAppend
> error_repo
;
1010 bool remove_from_repo
;
1015 RGWDataSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
1016 const string
& _raw_key
, const string
& _entry_marker
, RGWDataSyncShardMarkerTrack
*_marker_tracker
,
1017 RGWOmapAppend
*_error_repo
, bool _remove_from_repo
) : RGWCoroutine(_sync_env
->cct
),
1018 sync_env(_sync_env
),
1019 raw_key(_raw_key
), entry_marker(_entry_marker
),
1021 marker_tracker(_marker_tracker
),
1022 error_repo(_error_repo
), remove_from_repo(_remove_from_repo
) {
1023 set_description() << "data sync single entry (source_zone=" << sync_env
->source_zone
<< ") key=" <<_raw_key
<< " entry=" << entry_marker
;
1026 int operate() override
{
1030 int ret
= rgw_bucket_parse_bucket_key(sync_env
->cct
, raw_key
,
1031 &bs
.bucket
, &bs
.shard_id
);
1033 return set_cr_error(-EIO
);
1035 if (marker_tracker
) {
1036 marker_tracker
->reset_need_retry(raw_key
);
1038 call(new RGWRunBucketSyncCoroutine(sync_env
, bs
));
1040 } while (marker_tracker
&& marker_tracker
->need_retry(raw_key
));
1042 sync_status
= retcode
;
1044 if (sync_status
== -ENOENT
) {
1045 // this was added when 'tenant/' was added to datalog entries, because
1046 // preexisting tenant buckets could never sync and would stay in the
1047 // error_repo forever
1048 ldout(sync_env
->store
->ctx(), 0) << "WARNING: skipping data log entry "
1049 "for missing bucket " << raw_key
<< dendl
;
1053 if (sync_status
< 0) {
1054 // write actual sync failures for 'radosgw-admin sync error list'
1055 if (sync_status
!= -EBUSY
&& sync_status
!= -EAGAIN
) {
1056 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", raw_key
,
1057 -sync_status
, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status
)));
1059 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode
<< dendl
;
1062 if (error_repo
&& !error_repo
->append(raw_key
)) {
1063 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode
<< dendl
;
1065 } else if (error_repo
&& remove_from_repo
) {
1067 yield
call(new RGWRadosRemoveOmapKeysCR(sync_env
->store
, error_repo
->get_obj(), keys
));
1069 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1070 << error_repo
->get_obj() << " retcode=" << retcode
<< dendl
;
1073 if (sync_env
->observer
) {
1074 sync_env
->observer
->on_bucket_changed(bs
.bucket
.get_key());
1076 /* FIXME: what do do in case of error */
1077 if (marker_tracker
&& !entry_marker
.empty()) {
1079 yield
call(marker_tracker
->finish(entry_marker
));
1081 if (sync_status
== 0) {
1082 sync_status
= retcode
;
1084 if (sync_status
< 0) {
1085 return set_cr_error(sync_status
);
1087 return set_cr_done();
1093 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1094 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1096 enum RemoteDatalogStatus
{
1097 RemoteNotTrimmed
= 0,
1099 RemoteMightTrimmed
= 2
1102 class RGWDataSyncShardCR
: public RGWCoroutine
{
1103 RGWDataSyncEnv
*sync_env
;
1108 rgw_data_sync_marker sync_marker
;
1110 std::set
<std::string
> entries
;
1111 std::set
<std::string
>::iterator iter
;
1115 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1117 list
<rgw_data_change_log_entry
> log_entries
;
1118 list
<rgw_data_change_log_entry
>::iterator log_iter
;
1121 RGWDataChangesLogInfo shard_info
;
1122 string datalog_marker
;
1124 RemoteDatalogStatus remote_trimmed
;
1128 boost::asio::coroutine incremental_cr
;
1129 boost::asio::coroutine full_cr
;
1132 set
<string
> modified_shards
;
1133 set
<string
> current_modified
;
1135 set
<string
>::iterator modified_iter
;
1141 bool *reset_backoff
;
1143 set
<string
> spawned_keys
;
1145 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1146 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1151 RGWOmapAppend
*error_repo
;
1152 std::set
<std::string
> error_entries
;
1153 string error_marker
;
1154 int max_error_entries
;
1156 ceph::real_time error_retry_time
;
1158 #define RETRY_BACKOFF_SECS_MIN 60
1159 #define RETRY_BACKOFF_SECS_DEFAULT 60
1160 #define RETRY_BACKOFF_SECS_MAX 600
1161 uint32_t retry_backoff_secs
;
1163 RGWDataSyncDebugLogger logger
;
1165 RGWDataSyncShardCR(RGWDataSyncEnv
*_sync_env
,
1167 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
, bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1168 sync_env(_sync_env
),
1170 shard_id(_shard_id
),
1171 sync_marker(_marker
),
1172 marker_tracker(NULL
), truncated(false), remote_trimmed(RemoteNotTrimmed
), inc_lock("RGWDataSyncShardCR::inc_lock"),
1173 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW
), reset_backoff(NULL
),
1174 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES
),
1175 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT
) {
1176 set_description() << "data sync shard source_zone=" << sync_env
->source_zone
<< " shard_id=" << shard_id
;
1177 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
);
1178 error_oid
= status_oid
+ ".retry";
1180 logger
.init(sync_env
, "DataShard", status_oid
);
1183 ~RGWDataSyncShardCR() override
{
1184 delete marker_tracker
;
1193 void append_modified_shards(set
<string
>& keys
) {
1194 Mutex::Locker
l(inc_lock
);
1195 modified_shards
.insert(keys
.begin(), keys
.end());
1198 void set_marker_tracker(RGWDataSyncShardMarkerTrack
*mt
) {
1199 delete marker_tracker
;
1200 marker_tracker
= mt
;
1203 int operate() override
{
1206 switch (sync_marker
.state
) {
1207 case rgw_data_sync_marker::FullSync
:
1210 ldout(cct
, 10) << "sync: full_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1211 return set_cr_error(r
);
1214 case rgw_data_sync_marker::IncrementalSync
:
1215 r
= incremental_sync();
1217 ldout(cct
, 10) << "sync: incremental_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1218 return set_cr_error(r
);
1222 return set_cr_error(-EIO
);
1228 void init_lease_cr() {
1229 set_status("acquiring sync lock");
1230 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1231 string lock_name
= "sync_lock";
1235 RGWRados
*store
= sync_env
->store
;
1236 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1237 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
1238 lock_name
, lock_duration
, this));
1239 lease_stack
.reset(spawn(lease_cr
.get(), false));
1243 #define OMAP_GET_MAX_ENTRIES 100
1244 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1246 yield
init_lease_cr();
1247 while (!lease_cr
->is_locked()) {
1248 if (lease_cr
->is_done()) {
1249 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1250 set_status("lease lock failed, early abort");
1251 return set_cr_error(lease_cr
->get_ret_status());
1256 logger
.log("full sync");
1257 oid
= full_data_sync_index_shard_oid(sync_env
->source_zone
, shard_id
);
1258 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
));
1259 total_entries
= sync_marker
.pos
;
1261 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
), sync_marker
.marker
, &entries
, max_entries
));
1263 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode
<< dendl
;
1264 lease_cr
->go_down();
1266 return set_cr_error(retcode
);
1268 iter
= entries
.begin();
1269 for (; iter
!= entries
.end(); ++iter
) {
1270 ldout(sync_env
->cct
, 20) << __func__
<< ": full sync: " << *iter
<< dendl
;
1272 if (!marker_tracker
->start(*iter
, total_entries
, real_time())) {
1273 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << *iter
<< ". Duplicate entry?" << dendl
;
1275 // fetch remote and write locally
1276 yield
spawn(new RGWDataSyncSingleEntryCR(sync_env
, *iter
, *iter
, marker_tracker
, error_repo
, false), false);
1278 lease_cr
->go_down();
1280 return set_cr_error(retcode
);
1283 sync_marker
.marker
= *iter
;
1285 } while ((int)entries
.size() == max_entries
);
1287 lease_cr
->go_down();
1291 /* update marker to reflect we're done with full sync */
1292 sync_marker
.state
= rgw_data_sync_marker::IncrementalSync
;
1293 sync_marker
.marker
= sync_marker
.next_step_marker
;
1294 sync_marker
.next_step_marker
.clear();
1295 RGWRados
*store
= sync_env
->store
;
1296 call(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
1297 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
1301 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync marker: retcode=" << retcode
<< dendl
;
1302 lease_cr
->go_down();
1303 return set_cr_error(retcode
);
1309 int incremental_sync() {
1310 reenter(&incremental_cr
) {
1311 yield
init_lease_cr();
1312 while (!lease_cr
->is_locked()) {
1313 if (lease_cr
->is_done()) {
1314 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1315 set_status("lease lock failed, early abort");
1316 return set_cr_error(lease_cr
->get_ret_status());
1321 set_status("lease acquired");
1322 error_repo
= new RGWOmapAppend(sync_env
->async_rados
, sync_env
->store
,
1323 rgw_raw_obj(pool
, error_oid
),
1326 spawn(error_repo
, false);
1327 logger
.log("inc sync");
1328 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
));
1330 current_modified
.clear();
1332 current_modified
.swap(modified_shards
);
1335 /* process out of band updates */
1336 for (modified_iter
= current_modified
.begin(); modified_iter
!= current_modified
.end(); ++modified_iter
) {
1338 ldout(sync_env
->cct
, 20) << __func__
<< "(): async update notification: " << *modified_iter
<< dendl
;
1339 spawn(new RGWDataSyncSingleEntryCR(sync_env
, *modified_iter
, string(), marker_tracker
, error_repo
, false), false);
1343 /* process bucket shards that previously failed */
1344 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, error_oid
),
1345 error_marker
, &error_entries
,
1346 max_error_entries
));
1347 ldout(sync_env
->cct
, 20) << __func__
<< "(): read error repo, got " << error_entries
.size() << " entries" << dendl
;
1348 iter
= error_entries
.begin();
1349 for (; iter
!= error_entries
.end(); ++iter
) {
1350 error_marker
= *iter
;
1351 ldout(sync_env
->cct
, 20) << __func__
<< "(): handle error entry: " << error_marker
<< dendl
;
1352 spawn(new RGWDataSyncSingleEntryCR(sync_env
, error_marker
, error_marker
, nullptr /* no marker tracker */, error_repo
, true), false);
1354 if ((int)error_entries
.size() != max_error_entries
) {
1355 if (error_marker
.empty() && error_entries
.empty()) {
1356 /* the retry repo is empty, we back off a bit before calling it again */
1357 retry_backoff_secs
*= 2;
1358 if (retry_backoff_secs
> RETRY_BACKOFF_SECS_MAX
) {
1359 retry_backoff_secs
= RETRY_BACKOFF_SECS_MAX
;
1362 retry_backoff_secs
= RETRY_BACKOFF_SECS_DEFAULT
;
1364 error_retry_time
= ceph::real_clock::now() + make_timespan(retry_backoff_secs
);
1365 error_marker
.clear();
1369 yield
call(new RGWReadRemoteDataLogShardInfoCR(sync_env
, shard_id
, &shard_info
));
1371 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode
<< dendl
;
1372 stop_spawned_services();
1374 return set_cr_error(retcode
);
1376 datalog_marker
= shard_info
.marker
;
1377 remote_trimmed
= RemoteNotTrimmed
;
1378 #define INCREMENTAL_MAX_ENTRIES 100
1379 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " datalog_marker=" << datalog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< dendl
;
1380 if (datalog_marker
> sync_marker
.marker
) {
1381 spawned_keys
.clear();
1382 if (sync_marker
.marker
.empty())
1383 remote_trimmed
= RemoteMightTrimmed
; //remote data log shard might be trimmed;
1384 yield
call(new RGWReadRemoteDataLogShardCR(sync_env
, shard_id
, &sync_marker
.marker
, &log_entries
, &truncated
));
1386 ldout(sync_env
->cct
, 0) << "ERROR: failed to read remote data log info: ret=" << retcode
<< dendl
;
1387 stop_spawned_services();
1389 return set_cr_error(retcode
);
1391 if ((remote_trimmed
== RemoteMightTrimmed
) && sync_marker
.marker
.empty() && log_entries
.empty())
1392 remote_trimmed
= RemoteTrimmed
;
1394 remote_trimmed
= RemoteNotTrimmed
;
1395 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end(); ++log_iter
) {
1396 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " log_entry: " << log_iter
->log_id
<< ":" << log_iter
->log_timestamp
<< ":" << log_iter
->entry
.key
<< dendl
;
1397 if (!marker_tracker
->index_key_to_marker(log_iter
->entry
.key
, log_iter
->log_id
)) {
1398 ldout(sync_env
->cct
, 20) << __func__
<< ": skipping sync of entry: " << log_iter
->log_id
<< ":" << log_iter
->entry
.key
<< " sync already in progress for bucket shard" << dendl
;
1399 marker_tracker
->try_update_high_marker(log_iter
->log_id
, 0, log_iter
->log_timestamp
);
1402 if (!marker_tracker
->start(log_iter
->log_id
, 0, log_iter
->log_timestamp
)) {
1403 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << log_iter
->log_id
<< ". Duplicate entry?" << dendl
;
1406 * don't spawn the same key more than once. We can do that as long as we don't yield
1408 if (spawned_keys
.find(log_iter
->entry
.key
) == spawned_keys
.end()) {
1409 spawned_keys
.insert(log_iter
->entry
.key
);
1410 spawn(new RGWDataSyncSingleEntryCR(sync_env
, log_iter
->entry
.key
, log_iter
->log_id
, marker_tracker
, error_repo
, false), false);
1412 stop_spawned_services();
1414 return set_cr_error(retcode
);
1419 while ((int)num_spawned() > spawn_window
) {
1420 set_status() << "num_spawned() > spawn_window";
1421 yield
wait_for_child();
1423 while (collect(&ret
, lease_stack
.get())) {
1425 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
1426 /* we have reported this error */
1428 /* not waiting for child here */
1432 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " datalog_marker=" << datalog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< dendl
;
1433 if (datalog_marker
== sync_marker
.marker
|| remote_trimmed
== RemoteTrimmed
) {
1434 #define INCREMENTAL_INTERVAL 20
1435 yield
wait(utime_t(INCREMENTAL_INTERVAL
, 0));
1441 void stop_spawned_services() {
1442 lease_cr
->go_down();
1444 error_repo
->finish();
1451 class RGWDataSyncShardControlCR
: public RGWBackoffControlCR
{
1452 RGWDataSyncEnv
*sync_env
;
1457 rgw_data_sync_marker sync_marker
;
1460 RGWDataSyncShardControlCR(RGWDataSyncEnv
*_sync_env
, rgw_pool
& _pool
,
1461 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
) : RGWBackoffControlCR(_sync_env
->cct
, false),
1462 sync_env(_sync_env
),
1464 shard_id(_shard_id
),
1465 sync_marker(_marker
) {
1468 RGWCoroutine
*alloc_cr() override
{
1469 return new RGWDataSyncShardCR(sync_env
, pool
, shard_id
, sync_marker
, backoff_ptr());
1472 RGWCoroutine
*alloc_finisher_cr() override
{
1473 RGWRados
*store
= sync_env
->store
;
1474 return new RGWSimpleRadosReadCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
1475 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
1479 void append_modified_shards(set
<string
>& keys
) {
1480 Mutex::Locker
l(cr_lock());
1482 RGWDataSyncShardCR
*cr
= static_cast<RGWDataSyncShardCR
*>(get_cr());
1487 cr
->append_modified_shards(keys
);
1491 class RGWDataSyncCR
: public RGWCoroutine
{
1492 RGWDataSyncEnv
*sync_env
;
1493 uint32_t num_shards
;
1495 rgw_data_sync_status sync_status
;
1497 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1499 Mutex shard_crs_lock
;
1500 map
<int, RGWDataSyncShardControlCR
*> shard_crs
;
1502 bool *reset_backoff
;
1504 RGWDataSyncDebugLogger logger
;
1506 RGWDataSyncModule
*data_sync_module
{nullptr};
1508 RGWDataSyncCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
, bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1509 sync_env(_sync_env
),
1510 num_shards(_num_shards
),
1511 marker_tracker(NULL
),
1512 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1513 reset_backoff(_reset_backoff
), logger(sync_env
, "Data", "all") {
1517 ~RGWDataSyncCR() override
{
1518 for (auto iter
: shard_crs
) {
1523 int operate() override
{
1526 /* read sync status */
1527 yield
call(new RGWReadDataSyncStatusCoroutine(sync_env
, &sync_status
));
1529 data_sync_module
= sync_env
->sync_module
->get_data_handler();
1531 if (retcode
< 0 && retcode
!= -ENOENT
) {
1532 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode
<< dendl
;
1533 return set_cr_error(retcode
);
1536 /* state: init status */
1537 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateInit
) {
1538 ldout(sync_env
->cct
, 20) << __func__
<< "(): init" << dendl
;
1539 sync_status
.sync_info
.num_shards
= num_shards
;
1540 uint64_t instance_id
;
1541 get_random_bytes((char *)&instance_id
, sizeof(instance_id
));
1542 yield
call(new RGWInitDataSyncStatusCoroutine(sync_env
, num_shards
, instance_id
, &sync_status
));
1544 ldout(sync_env
->cct
, 0) << "ERROR: failed to init sync, retcode=" << retcode
<< dendl
;
1545 return set_cr_error(retcode
);
1547 // sets state = StateBuildingFullSyncMaps
1549 *reset_backoff
= true;
1552 data_sync_module
->init(sync_env
, sync_status
.sync_info
.instance_id
);
1554 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateBuildingFullSyncMaps
) {
1555 /* call sync module init here */
1556 sync_status
.sync_info
.num_shards
= num_shards
;
1557 yield
call(data_sync_module
->init_sync(sync_env
));
1559 ldout(sync_env
->cct
, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode
<< dendl
;
1560 return set_cr_error(retcode
);
1562 /* state: building full sync maps */
1563 ldout(sync_env
->cct
, 20) << __func__
<< "(): building full sync maps" << dendl
;
1564 yield
call(new RGWListBucketIndexesCR(sync_env
, &sync_status
));
1566 ldout(sync_env
->cct
, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode
<< dendl
;
1567 return set_cr_error(retcode
);
1569 sync_status
.sync_info
.state
= rgw_data_sync_info::StateSync
;
1571 /* update new state */
1572 yield
call(set_sync_info_cr());
1574 ldout(sync_env
->cct
, 0) << "ERROR: failed to write sync status, retcode=" << retcode
<< dendl
;
1575 return set_cr_error(retcode
);
1578 *reset_backoff
= true;
1582 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateSync
) {
1583 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
.sync_markers
.begin();
1584 iter
!= sync_status
.sync_markers
.end(); ++iter
) {
1585 RGWDataSyncShardControlCR
*cr
= new RGWDataSyncShardControlCR(sync_env
, sync_env
->store
->get_zone_params().log_pool
,
1586 iter
->first
, iter
->second
);
1588 shard_crs_lock
.Lock();
1589 shard_crs
[iter
->first
] = cr
;
1590 shard_crs_lock
.Unlock();
1596 return set_cr_done();
1601 RGWCoroutine
*set_sync_info_cr() {
1602 RGWRados
*store
= sync_env
->store
;
1603 return new RGWSimpleRadosWriteCR
<rgw_data_sync_info
>(sync_env
->async_rados
, store
,
1604 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
1605 sync_status
.sync_info
);
1608 void wakeup(int shard_id
, set
<string
>& keys
) {
1609 Mutex::Locker
l(shard_crs_lock
);
1610 map
<int, RGWDataSyncShardControlCR
*>::iterator iter
= shard_crs
.find(shard_id
);
1611 if (iter
== shard_crs
.end()) {
1614 iter
->second
->append_modified_shards(keys
);
1615 iter
->second
->wakeup();
1619 class RGWDefaultDataSyncModule
: public RGWDataSyncModule
{
1621 RGWDefaultDataSyncModule() {}
1623 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1624 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1625 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1626 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1629 class RGWDefaultSyncModuleInstance
: public RGWSyncModuleInstance
{
1630 RGWDefaultDataSyncModule data_handler
;
1632 RGWDefaultSyncModuleInstance() {}
1633 RGWDataSyncModule
*get_data_handler() override
{
1634 return &data_handler
;
1638 int RGWDefaultSyncModule::create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
)
1640 instance
->reset(new RGWDefaultSyncModuleInstance());
1644 RGWCoroutine
*RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1646 return new RGWFetchRemoteObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
, bucket_info
,
1647 key
, versioned_epoch
,
1651 RGWCoroutine
*RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
,
1652 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1654 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1655 bucket_info
, key
, versioned
, versioned_epoch
,
1656 NULL
, NULL
, false, &mtime
, zones_trace
);
1659 RGWCoroutine
*RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1660 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1662 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1663 bucket_info
, key
, versioned
, versioned_epoch
,
1664 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
1667 class RGWDataSyncControlCR
: public RGWBackoffControlCR
1669 RGWDataSyncEnv
*sync_env
;
1670 uint32_t num_shards
;
1672 static constexpr bool exit_on_error
= false; // retry on all errors
1674 RGWDataSyncControlCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
) : RGWBackoffControlCR(_sync_env
->cct
, exit_on_error
),
1675 sync_env(_sync_env
), num_shards(_num_shards
) {
1678 RGWCoroutine
*alloc_cr() override
{
1679 return new RGWDataSyncCR(sync_env
, num_shards
, backoff_ptr());
1682 void wakeup(int shard_id
, set
<string
>& keys
) {
1683 Mutex
& m
= cr_lock();
1686 RGWDataSyncCR
*cr
= static_cast<RGWDataSyncCR
*>(get_cr());
1696 cr
->wakeup(shard_id
, keys
);
1703 void RGWRemoteDataLog::wakeup(int shard_id
, set
<string
>& keys
) {
1704 RWLock::RLocker
rl(lock
);
1705 if (!data_sync_cr
) {
1708 data_sync_cr
->wakeup(shard_id
, keys
);
1711 int RGWRemoteDataLog::run_sync(int num_shards
)
1714 data_sync_cr
= new RGWDataSyncControlCR(&sync_env
, num_shards
);
1715 data_sync_cr
->get(); // run() will drop a ref, so take another
1718 int r
= run(data_sync_cr
);
1721 data_sync_cr
->put();
1722 data_sync_cr
= NULL
;
1726 ldout(store
->ctx(), 0) << "ERROR: failed to run sync" << dendl
;
1732 int RGWDataSyncStatusManager::init()
1734 auto zone_def_iter
= store
->zone_by_id
.find(source_zone
);
1735 if (zone_def_iter
== store
->zone_by_id
.end()) {
1736 ldout(store
->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone
<< dendl
;
1740 auto& zone_def
= zone_def_iter
->second
;
1742 if (!store
->get_sync_modules_manager()->supports_data_export(zone_def
.tier_type
)) {
1746 RGWZoneParams
& zone_params
= store
->get_zone_params();
1748 if (sync_module
== nullptr) {
1749 sync_module
= store
->get_sync_module();
1752 conn
= store
->get_zone_conn_by_id(source_zone
);
1754 ldout(store
->ctx(), 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
1758 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
1760 int r
= source_log
.init(source_zone
, conn
, error_logger
, sync_module
);
1762 lderr(store
->ctx()) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
1767 rgw_datalog_info datalog_info
;
1768 r
= source_log
.read_log_info(&datalog_info
);
1770 ldout(store
->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r
<< dendl
;
1775 num_shards
= datalog_info
.num_shards
;
1777 for (int i
= 0; i
< num_shards
; i
++) {
1778 shard_objs
[i
] = rgw_raw_obj(zone_params
.log_pool
, shard_obj_name(source_zone
, i
));
1784 void RGWDataSyncStatusManager::finalize()
1786 delete error_logger
;
1787 error_logger
= nullptr;
1790 string
RGWDataSyncStatusManager::sync_status_oid(const string
& source_zone
)
1792 char buf
[datalog_sync_status_oid_prefix
.size() + source_zone
.size() + 16];
1793 snprintf(buf
, sizeof(buf
), "%s.%s", datalog_sync_status_oid_prefix
.c_str(), source_zone
.c_str());
1798 string
RGWDataSyncStatusManager::shard_obj_name(const string
& source_zone
, int shard_id
)
1800 char buf
[datalog_sync_status_shard_prefix
.size() + source_zone
.size() + 16];
1801 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_status_shard_prefix
.c_str(), source_zone
.c_str(), shard_id
);
1806 int RGWRemoteBucketLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
,
1807 const rgw_bucket
& bucket
, int shard_id
,
1808 RGWSyncErrorLogger
*_error_logger
,
1809 RGWSyncModuleInstanceRef
& _sync_module
)
1812 source_zone
= _source_zone
;
1814 bs
.shard_id
= shard_id
;
1816 sync_env
.init(store
->ctx(), store
, conn
, async_rados
, http_manager
,
1817 _error_logger
, source_zone
, _sync_module
, nullptr);
1822 class RGWReadRemoteBucketIndexLogInfoCR
: public RGWCoroutine
{
1823 RGWDataSyncEnv
*sync_env
;
1824 const string instance_key
;
1826 rgw_bucket_index_marker_info
*info
;
1829 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv
*_sync_env
,
1830 const rgw_bucket_shard
& bs
,
1831 rgw_bucket_index_marker_info
*_info
)
1832 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1833 instance_key(bs
.get_key()), info(_info
) {}
1835 int operate() override
{
1838 rgw_http_param_pair pairs
[] = { { "type" , "bucket-index" },
1839 { "bucket-instance", instance_key
.c_str() },
1843 string p
= "/admin/log/";
1844 call(new RGWReadRESTResourceCR
<rgw_bucket_index_marker_info
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, info
));
1847 return set_cr_error(retcode
);
1849 return set_cr_done();
1855 class RGWInitBucketShardSyncStatusCoroutine
: public RGWCoroutine
{
1856 RGWDataSyncEnv
*sync_env
;
1858 rgw_bucket_shard bs
;
1859 const string sync_status_oid
;
1861 rgw_bucket_shard_sync_info
& status
;
1863 rgw_bucket_index_marker_info info
;
1865 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
1866 const rgw_bucket_shard
& bs
,
1867 rgw_bucket_shard_sync_info
& _status
)
1868 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
1869 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
1873 int operate() override
{
1875 /* fetch current position in logs */
1876 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env
, bs
, &info
));
1877 if (retcode
< 0 && retcode
!= -ENOENT
) {
1878 ldout(cct
, 0) << "ERROR: failed to fetch bucket index status" << dendl
;
1879 return set_cr_error(retcode
);
1882 auto store
= sync_env
->store
;
1883 rgw_raw_obj
obj(store
->get_zone_params().log_pool
, sync_status_oid
);
1885 if (info
.syncstopped
) {
1886 call(new RGWRadosRemoveCR(store
, obj
));
1888 status
.state
= rgw_bucket_shard_sync_info::StateFullSync
;
1889 status
.inc_marker
.position
= info
.max_marker
;
1890 map
<string
, bufferlist
> attrs
;
1891 status
.encode_all_attrs(attrs
);
1892 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
, obj
, attrs
));
1895 return set_cr_done();
1901 RGWCoroutine
*RGWRemoteBucketLog::init_sync_status_cr()
1903 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env
, bs
, init_status
);
1907 static void decode_attr(CephContext
*cct
, map
<string
, bufferlist
>& attrs
, const string
& attr_name
, T
*val
)
1909 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
1910 if (iter
== attrs
.end()) {
1915 bufferlist::iterator biter
= iter
->second
.begin();
1917 ::decode(*val
, biter
);
1918 } catch (buffer::error
& err
) {
1919 ldout(cct
, 0) << "ERROR: failed to decode attribute: " << attr_name
<< dendl
;
1923 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
)
1925 decode_attr(cct
, attrs
, "state", &state
);
1926 decode_attr(cct
, attrs
, "full_marker", &full_marker
);
1927 decode_attr(cct
, attrs
, "inc_marker", &inc_marker
);
1930 void rgw_bucket_shard_sync_info::encode_all_attrs(map
<string
, bufferlist
>& attrs
)
1932 encode_state_attr(attrs
);
1933 full_marker
.encode_attr(attrs
);
1934 inc_marker
.encode_attr(attrs
);
1937 void rgw_bucket_shard_sync_info::encode_state_attr(map
<string
, bufferlist
>& attrs
)
1939 ::encode(state
, attrs
["state"]);
1942 void rgw_bucket_shard_full_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
1944 ::encode(*this, attrs
["full_marker"]);
1947 void rgw_bucket_shard_inc_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
1949 ::encode(*this, attrs
["inc_marker"]);
1952 class RGWReadBucketSyncStatusCoroutine
: public RGWCoroutine
{
1953 RGWDataSyncEnv
*sync_env
;
1955 rgw_bucket_shard_sync_info
*status
;
1957 map
<string
, bufferlist
> attrs
;
1959 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
1960 const rgw_bucket_shard
& bs
,
1961 rgw_bucket_shard_sync_info
*_status
)
1962 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1963 oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
1965 int operate() override
;
1968 int RGWReadBucketSyncStatusCoroutine::operate()
1971 yield
call(new RGWSimpleRadosReadAttrsCR(sync_env
->async_rados
, sync_env
->store
,
1972 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, oid
),
1974 if (retcode
== -ENOENT
) {
1975 *status
= rgw_bucket_shard_sync_info();
1976 return set_cr_done();
1979 ldout(sync_env
->cct
, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid
<< " ret=" << retcode
<< dendl
;
1980 return set_cr_error(retcode
);
1982 status
->decode_from_attrs(sync_env
->cct
, attrs
);
1983 return set_cr_done();
1988 #define OMAP_READ_MAX_ENTRIES 10
1989 class RGWReadRecoveringBucketShardsCoroutine
: public RGWCoroutine
{
1990 RGWDataSyncEnv
*sync_env
;
1996 set
<string
>& recovering_buckets
;
2000 set
<string
> error_entries
;
2001 int max_omap_entries
;
2005 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv
*_sync_env
, const int _shard_id
,
2006 set
<string
>& _recovering_buckets
, const int _max_entries
)
2007 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2008 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
2009 recovering_buckets(_recovering_buckets
), max_omap_entries(OMAP_READ_MAX_ENTRIES
)
2011 error_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
) + ".retry";
2014 int operate() override
;
2017 int RGWReadRecoveringBucketShardsCoroutine::operate()
2020 //read recovering bucket shards
2023 yield
call(new RGWRadosGetOmapKeysCR(store
, rgw_raw_obj(store
->get_zone_params().log_pool
, error_oid
),
2024 marker
, &error_entries
, max_omap_entries
));
2026 if (retcode
== -ENOENT
) {
2031 ldout(sync_env
->cct
, 0) << "failed to read recovering bucket shards with "
2032 << cpp_strerror(retcode
) << dendl
;
2033 return set_cr_error(retcode
);
2036 if (error_entries
.empty()) {
2040 count
+= error_entries
.size();
2041 marker
= *error_entries
.rbegin();
2042 recovering_buckets
.insert(error_entries
.begin(), error_entries
.end());
2043 }while((int)error_entries
.size() == max_omap_entries
&& count
< max_entries
);
2045 return set_cr_done();
2051 class RGWReadPendingBucketShardsCoroutine
: public RGWCoroutine
{
2052 RGWDataSyncEnv
*sync_env
;
2058 set
<string
>& pending_buckets
;
2062 rgw_data_sync_marker
* sync_marker
;
2065 list
<rgw_data_change_log_entry
> log_entries
;
2069 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv
*_sync_env
, const int _shard_id
,
2070 set
<string
>& _pending_buckets
,
2071 rgw_data_sync_marker
* _sync_marker
, const int _max_entries
)
2072 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2073 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
2074 pending_buckets(_pending_buckets
), sync_marker(_sync_marker
)
2076 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
);
2079 int operate() override
;
2082 int RGWReadPendingBucketShardsCoroutine::operate()
2085 //read sync status marker
2086 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
2087 yield
call(new CR(sync_env
->async_rados
, store
,
2088 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
2091 ldout(sync_env
->cct
,0) << "failed to read sync status marker with "
2092 << cpp_strerror(retcode
) << dendl
;
2093 return set_cr_error(retcode
);
2096 //read pending bucket shards
2097 marker
= sync_marker
->marker
;
2100 yield
call(new RGWReadRemoteDataLogShardCR(sync_env
, shard_id
, &marker
, &log_entries
, &truncated
));
2102 if (retcode
== -ENOENT
) {
2107 ldout(sync_env
->cct
,0) << "failed to read remote data log info with "
2108 << cpp_strerror(retcode
) << dendl
;
2109 return set_cr_error(retcode
);
2112 if (log_entries
.empty()) {
2116 count
+= log_entries
.size();
2117 for (const auto& entry
: log_entries
) {
2118 pending_buckets
.insert(entry
.entry
.key
);
2120 }while(truncated
&& count
< max_entries
);
2122 return set_cr_done();
2128 int RGWRemoteDataLog::read_shard_status(int shard_id
, set
<string
>& pending_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
)
2130 // cannot run concurrently with run_sync(), so run in a separate manager
2131 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
2132 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
2133 int ret
= http_manager
.set_threaded();
2135 ldout(store
->ctx(), 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
2138 RGWDataSyncEnv sync_env_local
= sync_env
;
2139 sync_env_local
.http_manager
= &http_manager
;
2140 list
<RGWCoroutinesStack
*> stacks
;
2141 RGWCoroutinesStack
* recovering_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
2142 recovering_stack
->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local
, shard_id
, recovering_buckets
, max_entries
));
2143 stacks
.push_back(recovering_stack
);
2144 RGWCoroutinesStack
* pending_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
2145 pending_stack
->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local
, shard_id
, pending_buckets
, sync_marker
, max_entries
));
2146 stacks
.push_back(pending_stack
);
2147 ret
= crs
.run(stacks
);
2148 http_manager
.stop();
2152 RGWCoroutine
*RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
)
2154 return new RGWReadBucketSyncStatusCoroutine(&sync_env
, bs
, sync_status
);
2157 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2158 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
2159 delete iter
->second
;
2161 delete error_logger
;
2165 void rgw_bucket_entry_owner::decode_json(JSONObj
*obj
)
2167 JSONDecoder::decode_json("ID", id
, obj
);
2168 JSONDecoder::decode_json("DisplayName", display_name
, obj
);
2171 struct bucket_list_entry
{
2178 string storage_class
;
2179 rgw_bucket_entry_owner owner
;
2180 uint64_t versioned_epoch
;
2183 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2185 void decode_json(JSONObj
*obj
) {
2186 JSONDecoder::decode_json("IsDeleteMarker", delete_marker
, obj
);
2187 JSONDecoder::decode_json("Key", key
.name
, obj
);
2188 JSONDecoder::decode_json("VersionId", key
.instance
, obj
);
2189 JSONDecoder::decode_json("IsLatest", is_latest
, obj
);
2191 JSONDecoder::decode_json("RgwxMtime", mtime_str
, obj
);
2195 if (parse_iso8601(mtime_str
.c_str(), &t
, &nsec
)) {
2197 ts
.tv_sec
= (uint64_t)internal_timegm(&t
);
2199 mtime
= real_clock::from_ceph_timespec(ts
);
2201 JSONDecoder::decode_json("ETag", etag
, obj
);
2202 JSONDecoder::decode_json("Size", size
, obj
);
2203 JSONDecoder::decode_json("StorageClass", storage_class
, obj
);
2204 JSONDecoder::decode_json("Owner", owner
, obj
);
2205 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch
, obj
);
2206 JSONDecoder::decode_json("RgwxTag", rgw_tag
, obj
);
2210 struct bucket_list_result
{
2214 string version_id_marker
;
2217 list
<bucket_list_entry
> entries
;
2219 bucket_list_result() : max_keys(0), is_truncated(false) {}
2221 void decode_json(JSONObj
*obj
) {
2222 JSONDecoder::decode_json("Name", name
, obj
);
2223 JSONDecoder::decode_json("Prefix", prefix
, obj
);
2224 JSONDecoder::decode_json("KeyMarker", key_marker
, obj
);
2225 JSONDecoder::decode_json("VersionIdMarker", version_id_marker
, obj
);
2226 JSONDecoder::decode_json("MaxKeys", max_keys
, obj
);
2227 JSONDecoder::decode_json("IsTruncated", is_truncated
, obj
);
2228 JSONDecoder::decode_json("Entries", entries
, obj
);
2232 class RGWListBucketShardCR
: public RGWCoroutine
{
2233 RGWDataSyncEnv
*sync_env
;
2234 const rgw_bucket_shard
& bs
;
2235 const string instance_key
;
2236 rgw_obj_key marker_position
;
2238 bucket_list_result
*result
;
2241 RGWListBucketShardCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2242 rgw_obj_key
& _marker_position
, bucket_list_result
*_result
)
2243 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2244 instance_key(bs
.get_key()), marker_position(_marker_position
),
2247 int operate() override
{
2250 rgw_http_param_pair pairs
[] = { { "rgwx-bucket-instance", instance_key
.c_str() },
2251 { "versions" , NULL
},
2252 { "format" , "json" },
2253 { "objs-container" , "true" },
2254 { "key-marker" , marker_position
.name
.c_str() },
2255 { "version-id-marker" , marker_position
.instance
.c_str() },
2257 // don't include tenant in the url, it's already part of instance_key
2258 string p
= string("/") + bs
.bucket
.name
;
2259 call(new RGWReadRESTResourceCR
<bucket_list_result
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, result
));
2262 return set_cr_error(retcode
);
2264 return set_cr_done();
2270 class RGWListBucketIndexLogCR
: public RGWCoroutine
{
2271 RGWDataSyncEnv
*sync_env
;
2272 const string instance_key
;
2275 list
<rgw_bi_log_entry
> *result
;
2278 RGWListBucketIndexLogCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2279 string
& _marker
, list
<rgw_bi_log_entry
> *_result
)
2280 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2281 instance_key(bs
.get_key()), marker(_marker
), result(_result
) {}
2283 int operate() override
{
2286 rgw_http_param_pair pairs
[] = { { "bucket-instance", instance_key
.c_str() },
2287 { "format" , "json" },
2288 { "marker" , marker
.c_str() },
2289 { "type", "bucket-index" },
2292 call(new RGWReadRESTResourceCR
<list
<rgw_bi_log_entry
> >(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, "/admin/log", pairs
, result
));
2295 return set_cr_error(retcode
);
2297 return set_cr_done();
2303 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2305 class RGWBucketFullSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<rgw_obj_key
, rgw_obj_key
> {
2306 RGWDataSyncEnv
*sync_env
;
2309 rgw_bucket_shard_full_sync_marker sync_marker
;
2312 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2313 const string
& _marker_oid
,
2314 const rgw_bucket_shard_full_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2315 sync_env(_sync_env
),
2316 marker_oid(_marker_oid
),
2317 sync_marker(_marker
) {}
2319 RGWCoroutine
*store_marker(const rgw_obj_key
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2320 sync_marker
.position
= new_marker
;
2321 sync_marker
.count
= index_pos
;
2323 map
<string
, bufferlist
> attrs
;
2324 sync_marker
.encode_attr(attrs
);
2326 RGWRados
*store
= sync_env
->store
;
2328 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
2329 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
,
2330 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
2335 class RGWBucketIncSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, rgw_obj_key
> {
2336 RGWDataSyncEnv
*sync_env
;
2339 rgw_bucket_shard_inc_sync_marker sync_marker
;
2341 map
<rgw_obj_key
, string
> key_to_marker
;
2342 map
<string
, rgw_obj_key
> marker_to_key
;
2344 void handle_finish(const string
& marker
) override
{
2345 map
<string
, rgw_obj_key
>::iterator iter
= marker_to_key
.find(marker
);
2346 if (iter
== marker_to_key
.end()) {
2349 key_to_marker
.erase(iter
->second
);
2350 reset_need_retry(iter
->second
);
2351 marker_to_key
.erase(iter
);
2355 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2356 const string
& _marker_oid
,
2357 const rgw_bucket_shard_inc_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2358 sync_env(_sync_env
),
2359 marker_oid(_marker_oid
),
2360 sync_marker(_marker
) {}
2362 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2363 sync_marker
.position
= new_marker
;
2365 map
<string
, bufferlist
> attrs
;
2366 sync_marker
.encode_attr(attrs
);
2368 RGWRados
*store
= sync_env
->store
;
2370 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
2371 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
,
2373 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
2378 * create index from key -> <op, marker>, and from marker -> key
2379 * this is useful so that we can insure that we only have one
2380 * entry for any key that is used. This is needed when doing
2381 * incremenatl sync of data, and we don't want to run multiple
2382 * concurrent sync operations for the same bucket shard
2383 * Also, we should make sure that we don't run concurrent operations on the same key with
2386 bool index_key_to_marker(const rgw_obj_key
& key
, const string
& marker
) {
2387 if (key_to_marker
.find(key
) != key_to_marker
.end()) {
2388 set_need_retry(key
);
2391 key_to_marker
[key
] = marker
;
2392 marker_to_key
[marker
] = key
;
2396 bool can_do_op(const rgw_obj_key
& key
) {
2397 return (key_to_marker
.find(key
) == key_to_marker
.end());
2401 template <class T
, class K
>
2402 class RGWBucketSyncSingleEntryCR
: public RGWCoroutine
{
2403 RGWDataSyncEnv
*sync_env
;
2405 RGWBucketInfo
*bucket_info
;
2406 const rgw_bucket_shard
& bs
;
2410 uint64_t versioned_epoch
;
2411 rgw_bucket_entry_owner owner
;
2412 real_time timestamp
;
2414 RGWPendingState op_state
;
2417 RGWSyncShardMarkerTrack
<T
, K
> *marker_tracker
;
2421 stringstream error_ss
;
2423 RGWDataSyncDebugLogger logger
;
2425 bool error_injection
;
2427 RGWDataSyncModule
*data_sync_module
;
2429 rgw_zone_set zones_trace
;
2432 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
2433 RGWBucketInfo
*_bucket_info
,
2434 const rgw_bucket_shard
& bs
,
2435 const rgw_obj_key
& _key
, bool _versioned
, uint64_t _versioned_epoch
,
2436 real_time
& _timestamp
,
2437 const rgw_bucket_entry_owner
& _owner
,
2438 RGWModifyOp _op
, RGWPendingState _op_state
,
2439 const T
& _entry_marker
, RGWSyncShardMarkerTrack
<T
, K
> *_marker_tracker
, rgw_zone_set
& _zones_trace
) : RGWCoroutine(_sync_env
->cct
),
2440 sync_env(_sync_env
),
2441 bucket_info(_bucket_info
), bs(bs
),
2442 key(_key
), versioned(_versioned
), versioned_epoch(_versioned_epoch
),
2444 timestamp(_timestamp
), op(_op
),
2445 op_state(_op_state
),
2446 entry_marker(_entry_marker
),
2447 marker_tracker(_marker_tracker
),
2450 ss
<< bucket_shard_str
{bs
} << "/" << key
<< "[" << versioned_epoch
<< "]";
2451 set_description() << "bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
;
2452 ldout(sync_env
->cct
, 20) << "bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
<< dendl
;
2455 logger
.init(sync_env
, "Object", ss
.str());
2457 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_data_inject_err_probability
> 0);
2459 data_sync_module
= sync_env
->sync_module
->get_data_handler();
2461 zones_trace
= _zones_trace
;
2462 zones_trace
.insert(sync_env
->store
->get_zone().id
);
2465 int operate() override
{
2467 /* skip entries that are not complete */
2468 if (op_state
!= CLS_RGW_STATE_COMPLETE
) {
2473 marker_tracker
->reset_need_retry(key
);
2474 if (key
.name
.empty()) {
2475 /* shouldn't happen */
2476 set_status("skipping empty entry");
2477 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): entry with empty obj name, skipping" << dendl
;
2480 if (error_injection
&&
2481 rand() % 10000 < cct
->_conf
->rgw_sync_data_inject_err_probability
* 10000.0) {
2482 ldout(sync_env
->cct
, 0) << __func__
<< ": injecting data sync error on key=" << key
.name
<< dendl
;
2484 } else if (op
== CLS_RGW_OP_ADD
||
2485 op
== CLS_RGW_OP_LINK_OLH
) {
2486 if (op
== CLS_RGW_OP_ADD
&& !key
.instance
.empty() && key
.instance
!= "null") {
2487 set_status("skipping entry");
2488 ldout(sync_env
->cct
, 10) << "bucket skipping sync obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
<< "]: versioned object will be synced on link_olh" << dendl
;
2492 set_status("syncing obj");
2493 ldout(sync_env
->cct
, 5) << "bucket sync: sync obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
<< "]" << dendl
;
2494 logger
.log("fetch");
2495 call(data_sync_module
->sync_object(sync_env
, *bucket_info
, key
, versioned_epoch
, &zones_trace
));
2496 } else if (op
== CLS_RGW_OP_DEL
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2497 set_status("removing obj");
2498 if (op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2501 logger
.log("remove");
2502 call(data_sync_module
->remove_object(sync_env
, *bucket_info
, key
, timestamp
, versioned
, versioned_epoch
, &zones_trace
));
2503 } else if (op
== CLS_RGW_OP_LINK_OLH_DM
) {
2504 logger
.log("creating delete marker");
2505 set_status("creating delete marker");
2506 ldout(sync_env
->cct
, 10) << "creating delete marker: obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
<< "]" << dendl
;
2507 call(data_sync_module
->create_delete_marker(sync_env
, *bucket_info
, key
, timestamp
, owner
, versioned
, versioned_epoch
, &zones_trace
));
2510 } while (marker_tracker
->need_retry(key
));
2516 ss
<< "done, retcode=" << retcode
;
2518 logger
.log(ss
.str());
2521 if (retcode
< 0 && retcode
!= -ENOENT
) {
2522 set_status() << "failed to sync obj; retcode=" << retcode
;
2523 ldout(sync_env
->cct
, 0) << "ERROR: failed to sync object: "
2524 << bucket_shard_str
{bs
} << "/" << key
.name
<< dendl
;
2525 error_ss
<< bucket_shard_str
{bs
} << "/" << key
.name
;
2526 sync_status
= retcode
;
2528 if (!error_ss
.str().empty()) {
2529 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", error_ss
.str(), -retcode
, string("failed to sync object") + cpp_strerror(-sync_status
)));
2532 if (sync_status
== 0) {
2534 set_status() << "calling marker_tracker->finish(" << entry_marker
<< ")";
2535 yield
call(marker_tracker
->finish(entry_marker
));
2536 sync_status
= retcode
;
2538 if (sync_status
< 0) {
2539 return set_cr_error(sync_status
);
2541 return set_cr_done();
2547 #define BUCKET_SYNC_SPAWN_WINDOW 20
2549 class RGWBucketShardFullSyncCR
: public RGWCoroutine
{
2550 RGWDataSyncEnv
*sync_env
;
2551 const rgw_bucket_shard
& bs
;
2552 RGWBucketInfo
*bucket_info
;
2553 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2554 bucket_list_result list_result
;
2555 list
<bucket_list_entry
>::iterator entries_iter
;
2556 rgw_bucket_shard_full_sync_marker
& full_marker
;
2557 RGWBucketFullSyncShardMarkerTrack marker_tracker
;
2558 rgw_obj_key list_marker
;
2559 bucket_list_entry
*entry
{nullptr};
2560 RGWModifyOp op
{CLS_RGW_OP_ADD
};
2562 int total_entries
{0};
2566 const string
& status_oid
;
2568 RGWDataSyncDebugLogger logger
;
2569 rgw_zone_set zones_trace
;
2571 RGWBucketShardFullSyncCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2572 RGWBucketInfo
*_bucket_info
,
2573 const std::string
& status_oid
,
2574 RGWContinuousLeaseCR
*lease_cr
,
2575 rgw_bucket_shard_full_sync_marker
& _full_marker
)
2576 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2577 bucket_info(_bucket_info
), lease_cr(lease_cr
), full_marker(_full_marker
),
2578 marker_tracker(sync_env
, status_oid
, full_marker
),
2579 status_oid(status_oid
) {
2580 logger
.init(sync_env
, "BucketFull", bs
.get_key());
2581 zones_trace
.insert(sync_env
->source_zone
);
2584 int operate() override
;
2587 int RGWBucketShardFullSyncCR::operate()
2591 list_marker
= full_marker
.position
;
2593 total_entries
= full_marker
.count
;
2595 if (!lease_cr
->is_locked()) {
2597 return set_cr_error(-ECANCELED
);
2599 set_status("listing remote bucket");
2600 ldout(sync_env
->cct
, 20) << __func__
<< "(): listing bucket for full sync" << dendl
;
2601 yield
call(new RGWListBucketShardCR(sync_env
, bs
, list_marker
,
2603 if (retcode
< 0 && retcode
!= -ENOENT
) {
2604 set_status("failed bucket listing, going down");
2606 return set_cr_error(retcode
);
2608 entries_iter
= list_result
.entries
.begin();
2609 for (; entries_iter
!= list_result
.entries
.end(); ++entries_iter
) {
2610 if (!lease_cr
->is_locked()) {
2612 return set_cr_error(-ECANCELED
);
2614 ldout(sync_env
->cct
, 20) << "[full sync] syncing object: "
2615 << bucket_shard_str
{bs
} << "/" << entries_iter
->key
<< dendl
;
2616 entry
= &(*entries_iter
);
2618 list_marker
= entries_iter
->key
;
2619 if (!marker_tracker
.start(entry
->key
, total_entries
, real_time())) {
2620 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << entry
->key
<< ". Duplicate entry?" << dendl
;
2622 op
= (entry
->key
.instance
.empty() || entry
->key
.instance
== "null" ? CLS_RGW_OP_ADD
: CLS_RGW_OP_LINK_OLH
);
2623 using SyncCR
= RGWBucketSyncSingleEntryCR
<rgw_obj_key
, rgw_obj_key
>;
2624 yield
spawn(new SyncCR(sync_env
, bucket_info
, bs
, entry
->key
,
2625 false, /* versioned, only matters for object removal */
2626 entry
->versioned_epoch
, entry
->mtime
,
2627 entry
->owner
, op
, CLS_RGW_STATE_COMPLETE
,
2628 entry
->key
, &marker_tracker
, zones_trace
),
2631 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
2632 yield
wait_for_child();
2635 again
= collect(&ret
, nullptr);
2637 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2639 /* we have reported this error */
2644 } while (list_result
.is_truncated
&& sync_status
== 0);
2645 set_status("done iterating over all objects");
2646 /* wait for all operations to complete */
2647 while (num_spawned()) {
2648 yield
wait_for_child();
2651 again
= collect(&ret
, nullptr);
2653 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2655 /* we have reported this error */
2659 if (!lease_cr
->is_locked()) {
2660 return set_cr_error(-ECANCELED
);
2662 /* update sync state to incremental */
2663 if (sync_status
== 0) {
2665 rgw_bucket_shard_sync_info sync_status
;
2666 sync_status
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
2667 map
<string
, bufferlist
> attrs
;
2668 sync_status
.encode_state_attr(attrs
);
2669 RGWRados
*store
= sync_env
->store
;
2670 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
,
2671 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
2675 ldout(sync_env
->cct
, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status
<< ")" << dendl
;
2677 if (retcode
< 0 && sync_status
== 0) { /* actually tried to set incremental state and failed */
2678 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync state on bucket "
2679 << bucket_shard_str
{bs
} << " retcode=" << retcode
<< dendl
;
2680 return set_cr_error(retcode
);
2682 if (sync_status
< 0) {
2683 return set_cr_error(sync_status
);
2685 return set_cr_done();
2690 class RGWBucketShardIncrementalSyncCR
: public RGWCoroutine
{
2691 RGWDataSyncEnv
*sync_env
;
2692 const rgw_bucket_shard
& bs
;
2693 RGWBucketInfo
*bucket_info
;
2694 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2695 list
<rgw_bi_log_entry
> list_result
;
2696 list
<rgw_bi_log_entry
>::iterator entries_iter
;
2697 map
<pair
<string
, string
>, pair
<real_time
, RGWModifyOp
> > squash_map
;
2698 rgw_bucket_shard_inc_sync_marker
& inc_marker
;
2700 rgw_bi_log_entry
*entry
{nullptr};
2701 RGWBucketIncSyncShardMarkerTrack marker_tracker
;
2702 bool updated_status
{false};
2703 const string
& status_oid
;
2704 const string
& zone_id
;
2705 ceph::real_time sync_modify_time
;
2709 RGWDataSyncDebugLogger logger
;
2712 bool syncstopped
{false};
2715 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv
*_sync_env
,
2716 const rgw_bucket_shard
& bs
,
2717 RGWBucketInfo
*_bucket_info
,
2718 const std::string
& status_oid
,
2719 RGWContinuousLeaseCR
*lease_cr
,
2720 rgw_bucket_shard_inc_sync_marker
& _inc_marker
)
2721 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2722 bucket_info(_bucket_info
), lease_cr(lease_cr
), inc_marker(_inc_marker
),
2723 marker_tracker(sync_env
, status_oid
, inc_marker
), status_oid(status_oid
) , zone_id(_sync_env
->store
->get_zone().id
){
2724 set_description() << "bucket shard incremental sync bucket="
2725 << bucket_shard_str
{bs
};
2727 logger
.init(sync_env
, "BucketInc", bs
.get_key());
2730 int operate() override
;
2733 int RGWBucketShardIncrementalSyncCR::operate()
2738 if (!lease_cr
->is_locked()) {
2740 return set_cr_error(-ECANCELED
);
2742 ldout(sync_env
->cct
, 20) << __func__
<< "(): listing bilog for incremental sync" << inc_marker
.position
<< dendl
;
2743 set_status() << "listing bilog; position=" << inc_marker
.position
;
2744 yield
call(new RGWListBucketIndexLogCR(sync_env
, bs
, inc_marker
.position
,
2746 if (retcode
< 0 && retcode
!= -ENOENT
) {
2749 /* wait for all operations to complete */
2750 return set_cr_error(retcode
);
2752 /* no need to retry */
2757 for (auto& e
: list_result
) {
2758 if (e
.op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
&& (sync_modify_time
< e
.timestamp
)) {
2759 ldout(sync_env
->cct
, 20) << " syncstop on " << e
.timestamp
<< dendl
;
2760 sync_modify_time
= e
.timestamp
;
2764 if (e
.op
== RGWModifyOp::CLS_RGW_OP_RESYNC
&& (sync_modify_time
< e
.timestamp
)) {
2765 ldout(sync_env
->cct
, 20) << " resync on " << e
.timestamp
<< dendl
;
2766 sync_modify_time
= e
.timestamp
;
2767 syncstopped
= false;
2770 if (e
.op
== CLS_RGW_OP_CANCEL
) {
2773 if (e
.state
!= CLS_RGW_STATE_COMPLETE
) {
2776 if (e
.zones_trace
.find(zone_id
) != e
.zones_trace
.end()) {
2779 auto& squash_entry
= squash_map
[make_pair(e
.object
, e
.instance
)];
2780 if (squash_entry
.first
<= e
.timestamp
) {
2781 squash_entry
= make_pair
<>(e
.timestamp
, e
.op
);
2785 entries_iter
= list_result
.begin();
2786 for (; entries_iter
!= list_result
.end(); ++entries_iter
) {
2787 if (!lease_cr
->is_locked()) {
2789 return set_cr_error(-ECANCELED
);
2791 entry
= &(*entries_iter
);
2793 ssize_t p
= entry
->id
.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2797 cur_id
= entry
->id
.substr(p
+ 1);
2800 inc_marker
.position
= cur_id
;
2802 if (entry
->op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
|| entry
->op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
2803 ldout(sync_env
->cct
, 20) << "detected syncstop or resync on " << entries_iter
->timestamp
<< " , skipping entry" << dendl
;
2804 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2808 if (!key
.set(rgw_obj_index_key
{entry
->object
, entry
->instance
})) {
2809 set_status() << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry";
2810 ldout(sync_env
->cct
, 20) << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry" << dendl
;
2811 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2815 ldout(sync_env
->cct
, 20) << "parsed entry: id=" << cur_id
<< " iter->object=" << entry
->object
<< " iter->instance=" << entry
->instance
<< " name=" << key
.name
<< " instance=" << key
.instance
<< " ns=" << key
.ns
<< dendl
;
2817 if (!key
.ns
.empty()) {
2818 set_status() << "skipping entry in namespace: " << entry
->object
;
2819 ldout(sync_env
->cct
, 20) << "skipping entry in namespace: " << entry
->object
<< dendl
;
2820 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2824 set_status() << "got entry.id=" << cur_id
<< " key=" << key
<< " op=" << (int)entry
->op
;
2825 if (entry
->op
== CLS_RGW_OP_CANCEL
) {
2826 set_status() << "canceled operation, skipping";
2827 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2828 << bucket_shard_str
{bs
} << "/" << key
<< ": canceled operation" << dendl
;
2829 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2832 if (entry
->state
!= CLS_RGW_STATE_COMPLETE
) {
2833 set_status() << "non-complete operation, skipping";
2834 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2835 << bucket_shard_str
{bs
} << "/" << key
<< ": non-complete operation" << dendl
;
2836 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2839 if (entry
->zones_trace
.find(zone_id
) != entry
->zones_trace
.end()) {
2840 set_status() << "redundant operation, skipping";
2841 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2842 <<bucket_shard_str
{bs
} <<"/"<<key
<<": redundant operation" << dendl
;
2843 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2846 if (make_pair
<>(entry
->timestamp
, entry
->op
) != squash_map
[make_pair(entry
->object
, entry
->instance
)]) {
2847 set_status() << "squashed operation, skipping";
2848 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2849 << bucket_shard_str
{bs
} << "/" << key
<< ": squashed operation" << dendl
;
2850 /* not updating high marker though */
2853 ldout(sync_env
->cct
, 20) << "[inc sync] syncing object: "
2854 << bucket_shard_str
{bs
} << "/" << key
<< dendl
;
2855 updated_status
= false;
2856 while (!marker_tracker
.can_do_op(key
)) {
2857 if (!updated_status
) {
2858 set_status() << "can't do op, conflicting inflight operation";
2859 updated_status
= true;
2861 ldout(sync_env
->cct
, 5) << *this << ": [inc sync] can't do op on key=" << key
<< " need to wait for conflicting operation to complete" << dendl
;
2862 yield
wait_for_child();
2865 again
= collect(&ret
, nullptr);
2867 ldout(sync_env
->cct
, 0) << "ERROR: a child operation returned error (ret=" << ret
<< ")" << dendl
;
2869 /* we have reported this error */
2872 if (sync_status
!= 0)
2875 if (sync_status
!= 0) {
2876 /* get error, stop */
2879 if (!marker_tracker
.index_key_to_marker(key
, cur_id
)) {
2880 set_status() << "can't do op, sync already in progress for object";
2881 ldout(sync_env
->cct
, 20) << __func__
<< ": skipping sync of entry: " << cur_id
<< ":" << key
<< " sync already in progress for object" << dendl
;
2882 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2886 set_status() << "start object sync";
2887 if (!marker_tracker
.start(cur_id
, 0, entry
->timestamp
)) {
2888 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << cur_id
<< ". Duplicate entry?" << dendl
;
2890 uint64_t versioned_epoch
= 0;
2891 rgw_bucket_entry_owner
owner(entry
->owner
, entry
->owner_display_name
);
2892 if (entry
->ver
.pool
< 0) {
2893 versioned_epoch
= entry
->ver
.epoch
;
2895 ldout(sync_env
->cct
, 20) << __func__
<< "(): entry->timestamp=" << entry
->timestamp
<< dendl
;
2896 using SyncCR
= RGWBucketSyncSingleEntryCR
<string
, rgw_obj_key
>;
2897 spawn(new SyncCR(sync_env
, bucket_info
, bs
, key
,
2898 entry
->is_versioned(), versioned_epoch
,
2899 entry
->timestamp
, owner
, entry
->op
, entry
->state
,
2900 cur_id
, &marker_tracker
, entry
->zones_trace
),
2904 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
2905 set_status() << "num_spawned() > spawn_window";
2906 yield
wait_for_child();
2909 again
= collect(&ret
, nullptr);
2911 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2913 /* we have reported this error */
2915 /* not waiting for child here */
2919 } while (!list_result
.empty() && sync_status
== 0);
2925 const string
& oid
= RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
);
2926 RGWRados
*store
= sync_env
->store
;
2927 call(new RGWRadosRemoveCR(store
, rgw_raw_obj
{store
->get_zone_params().log_pool
, oid
}));
2930 return set_cr_done();
2933 while (num_spawned()) {
2934 yield
wait_for_child();
2937 again
= collect(&ret
, nullptr);
2939 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2941 /* we have reported this error */
2943 /* not waiting for child here */
2947 yield
call(marker_tracker
.flush());
2949 ldout(sync_env
->cct
, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode
<< dendl
;
2950 return set_cr_error(retcode
);
2952 if (sync_status
< 0) {
2953 ldout(sync_env
->cct
, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status
<< ")" << dendl
;
2956 /* wait for all operations to complete */
2959 if (sync_status
< 0) {
2960 return set_cr_error(sync_status
);
2963 return set_cr_done();
2968 int RGWRunBucketSyncCoroutine::operate()
2972 set_status("acquiring sync lock");
2973 auto store
= sync_env
->store
;
2974 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
2975 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
2977 cct
->_conf
->rgw_sync_lease_period
,
2979 lease_stack
.reset(spawn(lease_cr
.get(), false));
2981 while (!lease_cr
->is_locked()) {
2982 if (lease_cr
->is_done()) {
2983 ldout(cct
, 5) << "lease cr failed, done early" << dendl
;
2984 set_status("lease lock failed, early abort");
2985 return set_cr_error(lease_cr
->get_ret_status());
2991 yield
call(new RGWReadBucketSyncStatusCoroutine(sync_env
, bs
, &sync_status
));
2992 if (retcode
< 0 && retcode
!= -ENOENT
) {
2993 ldout(sync_env
->cct
, 0) << "ERROR: failed to read sync status for bucket="
2994 << bucket_shard_str
{bs
} << dendl
;
2995 lease_cr
->go_down();
2997 return set_cr_error(retcode
);
3000 ldout(sync_env
->cct
, 20) << __func__
<< "(): sync status for bucket "
3001 << bucket_shard_str
{bs
} << ": " << sync_status
.state
<< dendl
;
3003 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
3004 if (retcode
== -ENOENT
) {
3005 /* bucket instance info has not been synced in yet, fetch it now */
3007 ldout(sync_env
->cct
, 10) << "no local info for bucket "
3008 << bucket_str
{bs
.bucket
} << ": fetching metadata" << dendl
;
3009 string raw_key
= string("bucket.instance:") + bs
.bucket
.get_key();
3011 meta_sync_env
.init(cct
, sync_env
->store
, sync_env
->store
->rest_master_conn
, sync_env
->async_rados
, sync_env
->http_manager
, sync_env
->error_logger
);
3013 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env
, raw_key
,
3014 string() /* no marker */,
3015 MDLOG_STATUS_COMPLETE
,
3016 NULL
/* no marker tracker */));
3019 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str
{bs
.bucket
} << dendl
;
3020 lease_cr
->go_down();
3022 return set_cr_error(retcode
);
3025 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
3028 ldout(sync_env
->cct
, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{bs
.bucket
} << dendl
;
3029 lease_cr
->go_down();
3031 return set_cr_error(retcode
);
3034 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateInit
) {
3035 yield
call(new RGWInitBucketShardSyncStatusCoroutine(sync_env
, bs
, sync_status
));
3037 ldout(sync_env
->cct
, 0) << "ERROR: init sync on " << bucket_shard_str
{bs
}
3038 << " failed, retcode=" << retcode
<< dendl
;
3039 lease_cr
->go_down();
3041 return set_cr_error(retcode
);
3045 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateFullSync
) {
3046 yield
call(new RGWBucketShardFullSyncCR(sync_env
, bs
, &bucket_info
,
3047 status_oid
, lease_cr
.get(),
3048 sync_status
.full_marker
));
3050 ldout(sync_env
->cct
, 5) << "full sync on " << bucket_shard_str
{bs
}
3051 << " failed, retcode=" << retcode
<< dendl
;
3052 lease_cr
->go_down();
3054 return set_cr_error(retcode
);
3056 sync_status
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
3059 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateIncrementalSync
) {
3060 yield
call(new RGWBucketShardIncrementalSyncCR(sync_env
, bs
, &bucket_info
,
3061 status_oid
, lease_cr
.get(),
3062 sync_status
.inc_marker
));
3064 ldout(sync_env
->cct
, 5) << "incremental sync on " << bucket_shard_str
{bs
}
3065 << " failed, retcode=" << retcode
<< dendl
;
3066 lease_cr
->go_down();
3068 return set_cr_error(retcode
);
3072 lease_cr
->go_down();
3074 return set_cr_done();
3080 RGWCoroutine
*RGWRemoteBucketLog::run_sync_cr()
3082 return new RGWRunBucketSyncCoroutine(&sync_env
, bs
);
3085 int RGWBucketSyncStatusManager::init()
3087 conn
= store
->get_zone_conn_by_id(source_zone
);
3089 ldout(store
->ctx(), 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
3093 int ret
= http_manager
.set_threaded();
3095 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
3100 const string key
= bucket
.get_key();
3102 rgw_http_param_pair pairs
[] = { { "key", key
.c_str() },
3105 string path
= string("/admin/metadata/bucket.instance");
3107 bucket_instance_meta_info result
;
3108 ret
= cr_mgr
.run(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), conn
, &http_manager
, path
, pairs
, &result
));
3110 ldout(store
->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone
<< " path=" << path
<< " key=" << key
<< " ret=" << ret
<< dendl
;
3114 RGWBucketInfo
& bi
= result
.data
.get_bucket_info();
3115 num_shards
= bi
.num_shards
;
3117 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
3119 sync_module
.reset(new RGWDefaultSyncModuleInstance());
3121 int effective_num_shards
= (num_shards
? num_shards
: 1);
3123 auto async_rados
= store
->get_async_rados();
3125 for (int i
= 0; i
< effective_num_shards
; i
++) {
3126 RGWRemoteBucketLog
*l
= new RGWRemoteBucketLog(store
, this, async_rados
, &http_manager
);
3127 ret
= l
->init(source_zone
, conn
, bucket
, (num_shards
? i
: -1), error_logger
, sync_module
);
3129 ldout(store
->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl
;
3138 int RGWBucketSyncStatusManager::init_sync_status()
3140 list
<RGWCoroutinesStack
*> stacks
;
3142 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3143 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3144 RGWRemoteBucketLog
*l
= iter
->second
;
3145 stack
->call(l
->init_sync_status_cr());
3147 stacks
.push_back(stack
);
3150 return cr_mgr
.run(stacks
);
3153 int RGWBucketSyncStatusManager::read_sync_status()
3155 list
<RGWCoroutinesStack
*> stacks
;
3157 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3158 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3159 RGWRemoteBucketLog
*l
= iter
->second
;
3160 stack
->call(l
->read_sync_status_cr(&sync_status
[iter
->first
]));
3162 stacks
.push_back(stack
);
3165 int ret
= cr_mgr
.run(stacks
);
3167 ldout(store
->ctx(), 0) << "ERROR: failed to read sync status for "
3168 << bucket_str
{bucket
} << dendl
;
3175 int RGWBucketSyncStatusManager::run()
3177 list
<RGWCoroutinesStack
*> stacks
;
3179 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3180 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3181 RGWRemoteBucketLog
*l
= iter
->second
;
3182 stack
->call(l
->run_sync_cr());
3184 stacks
.push_back(stack
);
3187 int ret
= cr_mgr
.run(stacks
);
3189 ldout(store
->ctx(), 0) << "ERROR: failed to read sync status for "
3190 << bucket_str
{bucket
} << dendl
;
3197 string
RGWBucketSyncStatusManager::status_oid(const string
& source_zone
,
3198 const rgw_bucket_shard
& bs
)
3200 return bucket_status_oid_prefix
+ "." + source_zone
+ ":" + bs
.get_key();
3203 class RGWCollectBucketSyncStatusCR
: public RGWShardCollectCR
{
3204 static constexpr int max_concurrent_shards
= 16;
3205 RGWRados
*const store
;
3206 RGWDataSyncEnv
*const env
;
3207 const int num_shards
;
3208 rgw_bucket_shard bs
;
3210 using Vector
= std::vector
<rgw_bucket_shard_sync_info
>;
3211 Vector::iterator i
, end
;
3214 RGWCollectBucketSyncStatusCR(RGWRados
*store
, RGWDataSyncEnv
*env
,
3215 int num_shards
, const rgw_bucket
& bucket
,
3217 : RGWShardCollectCR(store
->ctx(), max_concurrent_shards
),
3218 store(store
), env(env
), num_shards(num_shards
),
3219 bs(bucket
, num_shards
> 0 ? 0 : -1), // start at shard 0 or -1
3220 i(status
->begin()), end(status
->end())
3223 bool spawn_next() override
{
3227 spawn(new RGWReadBucketSyncStatusCoroutine(env
, bs
, &*i
), false);
3234 int rgw_bucket_sync_status(RGWRados
*store
, const std::string
& source_zone
,
3235 const RGWBucketInfo
& bucket_info
,
3236 std::vector
<rgw_bucket_shard_sync_info
> *status
)
3238 const auto num_shards
= bucket_info
.num_shards
;
3240 status
->resize(std::max
<size_t>(1, num_shards
));
3243 RGWSyncModuleInstanceRef module
; // null sync module
3244 env
.init(store
->ctx(), store
, nullptr, store
->get_async_rados(),
3245 nullptr, nullptr, source_zone
, module
, nullptr);
3247 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
3248 return crs
.run(new RGWCollectBucketSyncStatusCR(store
, &env
, num_shards
,
3249 bucket_info
.bucket
, status
));
3253 // TODO: move into rgw_data_sync_trim.cc
3255 #define dout_prefix (*_dout << "data trim: ")
3259 /// return the marker that it's safe to trim up to
3260 const std::string
& get_stable_marker(const rgw_data_sync_marker
& m
)
3262 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
3265 /// comparison operator for take_min_markers()
3266 bool operator<(const rgw_data_sync_marker
& lhs
,
3267 const rgw_data_sync_marker
& rhs
)
3269 // sort by stable marker
3270 return get_stable_marker(lhs
) < get_stable_marker(rhs
);
3273 /// populate the container starting with 'dest' with the minimum stable marker
3274 /// of each shard for all of the peers in [first, last)
3275 template <typename IterIn
, typename IterOut
>
3276 void take_min_markers(IterIn first
, IterIn last
, IterOut dest
)
3278 if (first
== last
) {
3281 // initialize markers with the first peer's
3283 for (auto &shard
: first
->sync_markers
) {
3284 *m
= std::move(shard
.second
);
3287 // for remaining peers, replace with smaller markers
3288 for (auto p
= first
+ 1; p
!= last
; ++p
) {
3290 for (auto &shard
: p
->sync_markers
) {
3291 if (shard
.second
< *m
) {
3292 *m
= std::move(shard
.second
);
3299 } // anonymous namespace
3301 class DataLogTrimCR
: public RGWCoroutine
{
3303 RGWHTTPManager
*http
;
3304 const int num_shards
;
3305 const std::string
& zone_id
; //< my zone id
3306 std::vector
<rgw_data_sync_status
> peer_status
; //< sync status for each peer
3307 std::vector
<rgw_data_sync_marker
> min_shard_markers
; //< min marker per shard
3308 std::vector
<std::string
>& last_trim
; //< last trimmed marker per shard
3312 DataLogTrimCR(RGWRados
*store
, RGWHTTPManager
*http
,
3313 int num_shards
, std::vector
<std::string
>& last_trim
)
3314 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3315 num_shards(num_shards
),
3316 zone_id(store
->get_zone().id
),
3317 peer_status(store
->zone_conn_map
.size()),
3318 min_shard_markers(num_shards
),
3319 last_trim(last_trim
)
3322 int operate() override
;
3325 int DataLogTrimCR::operate()
3328 ldout(cct
, 10) << "fetching sync status for zone " << zone_id
<< dendl
;
3329 set_status("fetching sync status");
3331 // query data sync status from each sync peer
3332 rgw_http_param_pair params
[] = {
3334 { "status", nullptr },
3335 { "source-zone", zone_id
.c_str() },
3336 { nullptr, nullptr }
3339 auto p
= peer_status
.begin();
3340 for (auto& c
: store
->zone_conn_map
) {
3341 ldout(cct
, 20) << "query sync status from " << c
.first
<< dendl
;
3342 using StatusCR
= RGWReadRESTResourceCR
<rgw_data_sync_status
>;
3343 spawn(new StatusCR(cct
, c
.second
, http
, "/admin/log/", params
, &*p
),
3349 // must get a successful reply from all peers to consider trimming
3351 while (ret
== 0 && num_spawned() > 0) {
3352 yield
wait_for_child();
3358 ldout(cct
, 4) << "failed to fetch sync status from all peers" << dendl
;
3359 return set_cr_error(ret
);
3362 ldout(cct
, 10) << "trimming log shards" << dendl
;
3363 set_status("trimming log shards");
3365 // determine the minimum marker for each shard
3366 take_min_markers(peer_status
.begin(), peer_status
.end(),
3367 min_shard_markers
.begin());
3369 for (int i
= 0; i
< num_shards
; i
++) {
3370 const auto& m
= min_shard_markers
[i
];
3371 auto& stable
= get_stable_marker(m
);
3372 if (stable
<= last_trim
[i
]) {
3375 ldout(cct
, 10) << "trimming log shard " << i
3376 << " at marker=" << stable
3377 << " last_trim=" << last_trim
[i
] << dendl
;
3378 using TrimCR
= RGWSyncLogTrimCR
;
3379 spawn(new TrimCR(store
, store
->data_log
->get_oid(i
),
3380 stable
, &last_trim
[i
]),
3384 return set_cr_done();
3389 class DataLogTrimPollCR
: public RGWCoroutine
{
3391 RGWHTTPManager
*http
;
3392 const int num_shards
;
3393 const utime_t interval
; //< polling interval
3394 const std::string lock_oid
; //< use first data log shard for lock
3395 const std::string lock_cookie
;
3396 std::vector
<std::string
> last_trim
; //< last trimmed marker per shard
3399 DataLogTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
3400 int num_shards
, utime_t interval
)
3401 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3402 num_shards(num_shards
), interval(interval
),
3403 lock_oid(store
->data_log
->get_oid(0)),
3404 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
3405 last_trim(num_shards
)
3408 int operate() override
;
3411 int DataLogTrimPollCR::operate()
3415 set_status("sleeping");
3418 // request a 'data_trim' lock that covers the entire wait interval to
3419 // prevent other gateways from attempting to trim for the duration
3420 set_status("acquiring trim lock");
3421 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
3422 rgw_raw_obj(store
->get_zone_params().log_pool
, lock_oid
),
3423 "data_trim", lock_cookie
,
3426 // if the lock is already held, go back to sleep and try again later
3427 ldout(cct
, 4) << "failed to lock " << lock_oid
<< ", trying again in "
3428 << interval
.sec() << "s" << dendl
;
3432 set_status("trimming");
3433 yield
call(new DataLogTrimCR(store
, http
, num_shards
, last_trim
));
3435 // note that the lock is not released. this is intentional, as it avoids
3436 // duplicating this work in other gateways
3442 RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
3443 RGWHTTPManager
*http
,
3444 int num_shards
, utime_t interval
)
3446 return new DataLogTrimPollCR(store
, http
, num_shards
, interval
);