1 #include <boost/utility/string_ref.hpp>
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
22 #include "cls/lock/cls_lock_client.h"
24 #include "auth/Crypto.h"
26 #include <boost/asio/yield.hpp>
28 #define dout_subsys ceph_subsys_rgw
31 #define dout_prefix (*_dout << "data sync: ")
33 static string datalog_sync_status_oid_prefix
= "datalog.sync-status";
34 static string datalog_sync_status_shard_prefix
= "datalog.sync-status.shard";
35 static string datalog_sync_full_sync_index_prefix
= "data.full-sync.index";
36 static string bucket_status_oid_prefix
= "bucket.sync-status";
38 class RGWSyncDebugLogger
{
45 RGWSyncDebugLogger(CephContext
*_cct
, const string
& source_zone
,
46 const string
& sync_type
, const string
& sync_stage
,
47 const string
& resource
, bool log_start
= true) {
48 init(_cct
, source_zone
, sync_type
, sync_stage
, resource
, log_start
);
50 RGWSyncDebugLogger() : cct(NULL
), ended(false) {}
51 ~RGWSyncDebugLogger();
53 void init(CephContext
*_cct
, const string
& source_zone
,
54 const string
& sync_type
, const string
& sync_stage
,
55 const string
& resource
, bool log_start
= true);
56 void log(const string
& state
);
57 void finish(int status
);
60 void RGWSyncDebugLogger::init(CephContext
*_cct
, const string
& source_zone
,
61 const string
& sync_type
, const string
& sync_section
,
62 const string
& resource
, bool log_start
)
66 string zone_str
= source_zone
.substr(0, 8);
67 prefix
= "Sync:" + zone_str
+ ":" + sync_type
+ ":" + sync_section
+ ":" + resource
;
73 RGWSyncDebugLogger::~RGWSyncDebugLogger()
80 void RGWSyncDebugLogger::log(const string
& state
)
82 ldout(cct
, 5) << prefix
<< ":" << state
<< dendl
;
85 void RGWSyncDebugLogger::finish(int status
)
88 ldout(cct
, 5) << prefix
<< ":" << "finish r=" << status
<< dendl
;
91 class RGWDataSyncDebugLogger
: public RGWSyncDebugLogger
{
93 RGWDataSyncDebugLogger() {}
94 RGWDataSyncDebugLogger(RGWDataSyncEnv
*sync_env
, const string
& sync_section
,
95 const string
& resource
, bool log_start
= true) {
96 init(sync_env
, sync_section
, resource
, log_start
);
98 void init(RGWDataSyncEnv
*sync_env
, const string
& sync_section
,
99 const string
& resource
, bool log_start
= true) {
100 RGWSyncDebugLogger::init(sync_env
->cct
, sync_env
->source_zone
, "data", sync_section
, resource
, log_start
);
105 void rgw_datalog_info::decode_json(JSONObj
*obj
) {
106 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
109 void rgw_datalog_entry::decode_json(JSONObj
*obj
) {
110 JSONDecoder::decode_json("key", key
, obj
);
112 JSONDecoder::decode_json("timestamp", ut
, obj
);
113 timestamp
= ut
.to_real_time();
116 void rgw_datalog_shard_data::decode_json(JSONObj
*obj
) {
117 JSONDecoder::decode_json("marker", marker
, obj
);
118 JSONDecoder::decode_json("truncated", truncated
, obj
);
119 JSONDecoder::decode_json("entries", entries
, obj
);
122 class RGWReadDataSyncStatusMarkersCR
: public RGWShardCollectCR
{
123 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
126 const int num_shards
;
129 map
<uint32_t, rgw_data_sync_marker
>& markers
;
132 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv
*env
, int num_shards
,
133 map
<uint32_t, rgw_data_sync_marker
>& markers
)
134 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
135 env(env
), num_shards(num_shards
), markers(markers
)
137 bool spawn_next() override
;
140 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
142 if (shard_id
>= num_shards
) {
145 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
146 spawn(new CR(env
->async_rados
, env
->store
,
147 rgw_raw_obj(env
->store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(env
->source_zone
, shard_id
)),
154 class RGWReadDataSyncStatusCoroutine
: public RGWCoroutine
{
155 RGWDataSyncEnv
*sync_env
;
156 rgw_data_sync_status
*sync_status
;
159 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
160 rgw_data_sync_status
*_status
)
161 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
163 int operate() override
;
166 int RGWReadDataSyncStatusCoroutine::operate()
170 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_data_sync_info
>;
172 bool empty_on_enoent
= false; // fail on ENOENT
173 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
,
174 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
175 &sync_status
->sync_info
, empty_on_enoent
));
178 ldout(sync_env
->cct
, 4) << "failed to read sync status info with "
179 << cpp_strerror(retcode
) << dendl
;
180 return set_cr_error(retcode
);
182 // read shard markers
183 using ReadMarkersCR
= RGWReadDataSyncStatusMarkersCR
;
184 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
185 sync_status
->sync_markers
));
187 ldout(sync_env
->cct
, 4) << "failed to read sync status markers with "
188 << cpp_strerror(retcode
) << dendl
;
189 return set_cr_error(retcode
);
191 return set_cr_done();
196 class RGWReadRemoteDataLogShardInfoCR
: public RGWCoroutine
{
197 RGWDataSyncEnv
*sync_env
;
199 RGWRESTReadResource
*http_op
;
202 RGWDataChangesLogInfo
*shard_info
;
205 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv
*_sync_env
,
206 int _shard_id
, RGWDataChangesLogInfo
*_shard_info
) : RGWCoroutine(_sync_env
->cct
),
210 shard_info(_shard_info
) {
213 ~RGWReadRemoteDataLogShardInfoCR() override
{
219 int operate() override
{
223 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
224 rgw_http_param_pair pairs
[] = { { "type" , "data" },
229 string p
= "/admin/log/";
231 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
233 http_op
->set_user_info((void *)stack
);
235 int ret
= http_op
->aio_read();
237 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
238 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
239 return set_cr_error(ret
);
245 int ret
= http_op
->wait(shard_info
);
247 return set_cr_error(ret
);
249 return set_cr_done();
256 struct read_remote_data_log_response
{
259 list
<rgw_data_change_log_entry
> entries
;
261 read_remote_data_log_response() : truncated(false) {}
263 void decode_json(JSONObj
*obj
) {
264 JSONDecoder::decode_json("marker", marker
, obj
);
265 JSONDecoder::decode_json("truncated", truncated
, obj
);
266 JSONDecoder::decode_json("entries", entries
, obj
);
270 class RGWReadRemoteDataLogShardCR
: public RGWCoroutine
{
271 RGWDataSyncEnv
*sync_env
;
273 RGWRESTReadResource
*http_op
;
277 list
<rgw_data_change_log_entry
> *entries
;
280 read_remote_data_log_response response
;
283 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv
*_sync_env
,
284 int _shard_id
, string
*_pmarker
, list
<rgw_data_change_log_entry
> *_entries
, bool *_truncated
) : RGWCoroutine(_sync_env
->cct
),
290 truncated(_truncated
) {
292 ~RGWReadRemoteDataLogShardCR() override
{
298 int operate() override
{
302 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
303 rgw_http_param_pair pairs
[] = { { "type" , "data" },
305 { "marker", pmarker
->c_str() },
306 { "extra-info", "true" },
309 string p
= "/admin/log/";
311 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
313 http_op
->set_user_info((void *)stack
);
315 int ret
= http_op
->aio_read();
317 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
318 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
319 return set_cr_error(ret
);
325 int ret
= http_op
->wait(&response
);
327 return set_cr_error(ret
);
330 entries
->swap(response
.entries
);
331 *pmarker
= response
.marker
;
332 *truncated
= response
.truncated
;
333 return set_cr_done();
340 class RGWReadRemoteDataLogInfoCR
: public RGWShardCollectCR
{
341 RGWDataSyncEnv
*sync_env
;
344 map
<int, RGWDataChangesLogInfo
> *datalog_info
;
347 #define READ_DATALOG_MAX_CONCURRENT 10
350 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv
*_sync_env
,
352 map
<int, RGWDataChangesLogInfo
> *_datalog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
353 sync_env(_sync_env
), num_shards(_num_shards
),
354 datalog_info(_datalog_info
), shard_id(0) {}
355 bool spawn_next() override
;
358 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
359 if (shard_id
>= num_shards
) {
362 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, shard_id
, &(*datalog_info
)[shard_id
]), false);
367 class RGWListRemoteDataLogShardCR
: public RGWSimpleCoroutine
{
368 RGWDataSyncEnv
*sync_env
;
369 RGWRESTReadResource
*http_op
;
373 uint32_t max_entries
;
374 rgw_datalog_shard_data
*result
;
377 RGWListRemoteDataLogShardCR(RGWDataSyncEnv
*env
, int _shard_id
,
378 const string
& _marker
, uint32_t _max_entries
,
379 rgw_datalog_shard_data
*_result
)
380 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
381 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
383 int send_request() override
{
384 RGWRESTConn
*conn
= sync_env
->conn
;
385 RGWRados
*store
= sync_env
->store
;
388 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
390 char max_entries_buf
[32];
391 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
393 const char *marker_key
= (marker
.empty() ? "" : "marker");
395 rgw_http_param_pair pairs
[] = { { "type", "data" },
397 { "max-entries", max_entries_buf
},
398 { marker_key
, marker
.c_str() },
401 string p
= "/admin/log/";
403 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
404 http_op
->set_user_info((void *)stack
);
406 int ret
= http_op
->aio_read();
408 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
409 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
417 int request_complete() override
{
418 int ret
= http_op
->wait(result
);
420 if (ret
< 0 && ret
!= -ENOENT
) {
421 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret
<< dendl
;
428 class RGWListRemoteDataLogCR
: public RGWShardCollectCR
{
429 RGWDataSyncEnv
*sync_env
;
431 map
<int, string
> shards
;
432 int max_entries_per_shard
;
433 map
<int, rgw_datalog_shard_data
> *result
;
435 map
<int, string
>::iterator iter
;
436 #define READ_DATALOG_MAX_CONCURRENT 10
439 RGWListRemoteDataLogCR(RGWDataSyncEnv
*_sync_env
,
440 map
<int, string
>& _shards
,
441 int _max_entries_per_shard
,
442 map
<int, rgw_datalog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
443 sync_env(_sync_env
), max_entries_per_shard(_max_entries_per_shard
),
445 shards
.swap(_shards
);
446 iter
= shards
.begin();
448 bool spawn_next() override
;
451 bool RGWListRemoteDataLogCR::spawn_next() {
452 if (iter
== shards
.end()) {
456 spawn(new RGWListRemoteDataLogShardCR(sync_env
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
461 class RGWInitDataSyncStatusCoroutine
: public RGWCoroutine
{
462 static constexpr uint32_t lock_duration
= 30;
463 RGWDataSyncEnv
*sync_env
;
465 const rgw_pool
& pool
;
466 const uint32_t num_shards
;
468 string sync_status_oid
;
472 rgw_data_sync_status
*status
;
473 map
<int, RGWDataChangesLogInfo
> shards_info
;
475 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
, uint32_t num_shards
,
476 uint64_t instance_id
,
477 rgw_data_sync_status
*status
)
478 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), store(sync_env
->store
),
479 pool(store
->get_zone_params().log_pool
),
480 num_shards(num_shards
), status(status
) {
481 lock_name
= "sync_lock";
483 status
->sync_info
.instance_id
= instance_id
;
485 #define COOKIE_LEN 16
486 char buf
[COOKIE_LEN
+ 1];
488 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
491 sync_status_oid
= RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
);
494 int operate() override
{
497 using LockCR
= RGWSimpleRadosLockCR
;
498 yield
call(new LockCR(sync_env
->async_rados
, store
,
499 rgw_raw_obj
{pool
, sync_status_oid
},
500 lock_name
, cookie
, lock_duration
));
502 ldout(cct
, 0) << "ERROR: failed to take a lock on " << sync_status_oid
<< dendl
;
503 return set_cr_error(retcode
);
505 using WriteInfoCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_info
>;
506 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
,
507 rgw_raw_obj
{pool
, sync_status_oid
},
510 ldout(cct
, 0) << "ERROR: failed to write sync status info with " << retcode
<< dendl
;
511 return set_cr_error(retcode
);
514 /* take lock again, we just recreated the object */
515 yield
call(new LockCR(sync_env
->async_rados
, store
,
516 rgw_raw_obj
{pool
, sync_status_oid
},
517 lock_name
, cookie
, lock_duration
));
519 ldout(cct
, 0) << "ERROR: failed to take a lock on " << sync_status_oid
<< dendl
;
520 return set_cr_error(retcode
);
523 /* fetch current position in logs */
525 RGWRESTConn
*conn
= store
->get_zone_conn_by_id(sync_env
->source_zone
);
527 ldout(cct
, 0) << "ERROR: connection to zone " << sync_env
->source_zone
<< " does not exist!" << dendl
;
528 return set_cr_error(-EIO
);
530 for (uint32_t i
= 0; i
< num_shards
; i
++) {
531 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, i
, &shards_info
[i
]), true);
534 while (collect(&ret
, NULL
)) {
536 ldout(cct
, 0) << "ERROR: failed to read remote data log shards" << dendl
;
537 return set_state(RGWCoroutine_Error
);
542 for (uint32_t i
= 0; i
< num_shards
; i
++) {
543 RGWDataChangesLogInfo
& info
= shards_info
[i
];
544 auto& marker
= status
->sync_markers
[i
];
545 marker
.next_step_marker
= info
.marker
;
546 marker
.timestamp
= info
.last_update
;
547 const auto& oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, i
);
548 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>;
549 spawn(new WriteMarkerCR(sync_env
->async_rados
, store
,
550 rgw_raw_obj
{pool
, oid
}, marker
), true);
553 while (collect(&ret
, NULL
)) {
555 ldout(cct
, 0) << "ERROR: failed to write data sync status markers" << dendl
;
556 return set_state(RGWCoroutine_Error
);
561 status
->sync_info
.state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
562 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
,
563 rgw_raw_obj
{pool
, sync_status_oid
},
566 ldout(cct
, 0) << "ERROR: failed to write sync status info with " << retcode
<< dendl
;
567 return set_cr_error(retcode
);
569 yield
call(new RGWSimpleRadosUnlockCR(sync_env
->async_rados
, store
,
570 rgw_raw_obj
{pool
, sync_status_oid
},
572 return set_cr_done();
578 int RGWRemoteDataLog::read_log_info(rgw_datalog_info
*log_info
)
580 rgw_http_param_pair pairs
[] = { { "type", "data" },
583 int ret
= sync_env
.conn
->get_json_resource("/admin/log", pairs
, *log_info
);
585 ldout(store
->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl
;
589 ldout(store
->ctx(), 20) << "remote datalog, num_shards=" << log_info
->num_shards
<< dendl
;
594 int RGWRemoteDataLog::read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
)
596 rgw_datalog_info log_info
;
597 int ret
= read_log_info(&log_info
);
602 return run(new RGWReadRemoteDataLogInfoCR(&sync_env
, log_info
.num_shards
, shards_info
));
605 int RGWRemoteDataLog::read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
)
607 if (store
->is_meta_master()) {
611 return run(new RGWListRemoteDataLogCR(&sync_env
, shard_markers
, 1, result
));
614 int RGWRemoteDataLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& _sync_module
)
616 sync_env
.init(store
->ctx(), store
, _conn
, async_rados
, &http_manager
, _error_logger
, _source_zone
, _sync_module
);
622 int ret
= http_manager
.set_threaded();
624 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
633 void RGWRemoteDataLog::finish()
638 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status
*sync_status
)
640 // cannot run concurrently with run_sync(), so run in a separate manager
641 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
642 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
643 int ret
= http_manager
.set_threaded();
645 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
648 RGWDataSyncEnv sync_env_local
= sync_env
;
649 sync_env_local
.http_manager
= &http_manager
;
650 ret
= crs
.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local
, sync_status
));
655 int RGWRemoteDataLog::init_sync_status(int num_shards
)
657 rgw_data_sync_status sync_status
;
658 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
659 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
660 int ret
= http_manager
.set_threaded();
662 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
665 RGWDataSyncEnv sync_env_local
= sync_env
;
666 sync_env_local
.http_manager
= &http_manager
;
667 uint64_t instance_id
;
668 get_random_bytes((char *)&instance_id
, sizeof(instance_id
));
669 ret
= crs
.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local
, num_shards
, instance_id
, &sync_status
));
674 static string
full_data_sync_index_shard_oid(const string
& source_zone
, int shard_id
)
676 char buf
[datalog_sync_full_sync_index_prefix
.size() + 1 + source_zone
.size() + 1 + 16];
677 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_full_sync_index_prefix
.c_str(), source_zone
.c_str(), shard_id
);
681 struct bucket_instance_meta_info
{
685 RGWBucketInstanceMetadataObject data
;
687 bucket_instance_meta_info() {}
689 void decode_json(JSONObj
*obj
) {
690 JSONDecoder::decode_json("key", key
, obj
);
691 JSONDecoder::decode_json("ver", ver
, obj
);
692 JSONDecoder::decode_json("mtime", mtime
, obj
);
693 JSONDecoder::decode_json("data", data
, obj
);
697 class RGWListBucketIndexesCR
: public RGWCoroutine
{
698 RGWDataSyncEnv
*sync_env
;
702 rgw_data_sync_status
*sync_status
;
709 list
<string
>::iterator iter
;
711 RGWShardedOmapCRManager
*entries_index
;
716 bucket_instance_meta_info meta_info
;
724 RGWListBucketIndexesCR(RGWDataSyncEnv
*_sync_env
,
725 rgw_data_sync_status
*_sync_status
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
726 store(sync_env
->store
), sync_status(_sync_status
),
727 req_ret(0), ret(0), entries_index(NULL
), i(0), failed(false) {
728 oid_prefix
= datalog_sync_full_sync_index_prefix
+ "." + sync_env
->source_zone
;
729 path
= "/admin/metadata/bucket.instance";
730 num_shards
= sync_status
->sync_info
.num_shards
;
732 ~RGWListBucketIndexesCR() override
{
733 delete entries_index
;
736 int operate() override
{
738 entries_index
= new RGWShardedOmapCRManager(sync_env
->async_rados
, store
, this, num_shards
,
739 store
->get_zone_params().log_pool
,
742 string entrypoint
= string("/admin/metadata/bucket.instance");
743 /* FIXME: need a better scaling solution here, requires streaming output */
744 call(new RGWReadRESTResourceCR
<list
<string
> >(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
,
745 entrypoint
, NULL
, &result
));
747 if (get_ret_status() < 0) {
748 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl
;
749 return set_state(RGWCoroutine_Error
);
751 for (iter
= result
.begin(); iter
!= result
.end(); ++iter
) {
752 ldout(sync_env
->cct
, 20) << "list metadata: section=bucket.index key=" << *iter
<< dendl
;
757 rgw_http_param_pair pairs
[] = { { "key", key
.c_str() },
760 call(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
, path
, pairs
, &meta_info
));
763 num_shards
= meta_info
.data
.get_bucket_info().num_shards
;
764 if (num_shards
> 0) {
765 for (i
= 0; i
< num_shards
; i
++) {
767 snprintf(buf
, sizeof(buf
), ":%d", i
);
769 yield entries_index
->append(s
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, i
));
772 yield entries_index
->append(key
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, -1));
776 if (!entries_index
->finish()) {
781 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
->sync_markers
.begin(); iter
!= sync_status
->sync_markers
.end(); ++iter
) {
782 int shard_id
= (int)iter
->first
;
783 rgw_data_sync_marker
& marker
= iter
->second
;
784 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
785 spawn(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
786 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
790 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
791 EIO
, string("failed to build bucket instances map")));
793 while (collect(&ret
, NULL
)) {
795 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
796 -ret
, string("failed to store sync status: ") + cpp_strerror(-ret
)));
803 yield
return set_cr_error(req_ret
);
805 yield
return set_cr_done();
811 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
813 class RGWDataSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
814 RGWDataSyncEnv
*sync_env
;
817 rgw_data_sync_marker sync_marker
;
819 map
<string
, string
> key_to_marker
;
820 map
<string
, string
> marker_to_key
;
822 void handle_finish(const string
& marker
) override
{
823 map
<string
, string
>::iterator iter
= marker_to_key
.find(marker
);
824 if (iter
== marker_to_key
.end()) {
827 key_to_marker
.erase(iter
->second
);
828 reset_need_retry(iter
->second
);
829 marker_to_key
.erase(iter
);
833 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
834 const string
& _marker_oid
,
835 const rgw_data_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW
),
837 marker_oid(_marker_oid
),
838 sync_marker(_marker
) {}
840 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
841 sync_marker
.marker
= new_marker
;
842 sync_marker
.pos
= index_pos
;
844 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
845 RGWRados
*store
= sync_env
->store
;
847 return new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
848 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
853 * create index from key -> marker, and from marker -> key
854 * this is useful so that we can insure that we only have one
855 * entry for any key that is used. This is needed when doing
856 * incremenatl sync of data, and we don't want to run multiple
857 * concurrent sync operations for the same bucket shard
859 bool index_key_to_marker(const string
& key
, const string
& marker
) {
860 if (key_to_marker
.find(key
) != key_to_marker
.end()) {
864 key_to_marker
[key
] = marker
;
865 marker_to_key
[marker
] = key
;
870 // ostream wrappers to print buckets without copying strings
873 bucket_str(const rgw_bucket
& b
) : b(b
) {}
875 std::ostream
& operator<<(std::ostream
& out
, const bucket_str
& rhs
) {
877 if (!b
.tenant
.empty()) {
878 out
<< b
.tenant
<< '/';
881 if (!b
.bucket_id
.empty()) {
882 out
<< ':' << b
.bucket_id
;
887 struct bucket_shard_str
{
888 const rgw_bucket_shard
& bs
;
889 bucket_shard_str(const rgw_bucket_shard
& bs
) : bs(bs
) {}
891 std::ostream
& operator<<(std::ostream
& out
, const bucket_shard_str
& rhs
) {
893 out
<< bucket_str
{bs
.bucket
};
894 if (bs
.shard_id
>= 0) {
895 out
<< ':' << bs
.shard_id
;
900 class RGWRunBucketSyncCoroutine
: public RGWCoroutine
{
901 RGWDataSyncEnv
*sync_env
;
903 RGWBucketInfo bucket_info
;
904 rgw_bucket_shard_sync_info sync_status
;
905 RGWMetaSyncEnv meta_sync_env
;
907 RGWDataSyncDebugLogger logger
;
908 const std::string status_oid
;
910 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
911 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
914 RGWRunBucketSyncCoroutine(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
)
915 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
916 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)) {
917 logger
.init(sync_env
, "Bucket", bs
.get_key());
919 ~RGWRunBucketSyncCoroutine() override
{
925 int operate() override
;
928 class RGWDataSyncSingleEntryCR
: public RGWCoroutine
{
929 RGWDataSyncEnv
*sync_env
;
940 RGWDataSyncShardMarkerTrack
*marker_tracker
;
942 boost::intrusive_ptr
<RGWOmapAppend
> error_repo
;
943 bool remove_from_repo
;
948 RGWDataSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
949 const string
& _raw_key
, const string
& _entry_marker
, RGWDataSyncShardMarkerTrack
*_marker_tracker
,
950 RGWOmapAppend
*_error_repo
, bool _remove_from_repo
) : RGWCoroutine(_sync_env
->cct
),
952 raw_key(_raw_key
), entry_marker(_entry_marker
),
954 marker_tracker(_marker_tracker
),
955 error_repo(_error_repo
), remove_from_repo(_remove_from_repo
) {
956 set_description() << "data sync single entry (source_zone=" << sync_env
->source_zone
<< ") key=" <<_raw_key
<< " entry=" << entry_marker
;
959 int operate() override
{
963 int ret
= rgw_bucket_parse_bucket_key(sync_env
->cct
, raw_key
,
964 &bs
.bucket
, &bs
.shard_id
);
966 return set_cr_error(-EIO
);
968 if (marker_tracker
) {
969 marker_tracker
->reset_need_retry(raw_key
);
971 call(new RGWRunBucketSyncCoroutine(sync_env
, bs
));
973 } while (marker_tracker
&& marker_tracker
->need_retry(raw_key
));
975 sync_status
= retcode
;
977 if (sync_status
== -ENOENT
) {
978 // this was added when 'tenant/' was added to datalog entries, because
979 // preexisting tenant buckets could never sync and would stay in the
980 // error_repo forever
981 ldout(sync_env
->store
->ctx(), 0) << "WARNING: skipping data log entry "
982 "for missing bucket " << raw_key
<< dendl
;
986 if (sync_status
< 0) {
987 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", raw_key
,
988 -sync_status
, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status
)));
990 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode
<< dendl
;
992 if (error_repo
&& !error_repo
->append(raw_key
)) {
993 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode
<< dendl
;
995 } else if (error_repo
&& remove_from_repo
) {
997 yield
call(new RGWRadosRemoveOmapKeysCR(sync_env
->store
, error_repo
->get_obj(), keys
));
999 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1000 << error_repo
->get_obj() << " retcode=" << retcode
<< dendl
;
1003 /* FIXME: what do do in case of error */
1004 if (marker_tracker
&& !entry_marker
.empty()) {
1006 yield
call(marker_tracker
->finish(entry_marker
));
1008 if (sync_status
== 0) {
1009 sync_status
= retcode
;
1011 if (sync_status
< 0) {
1012 return set_cr_error(sync_status
);
1014 return set_cr_done();
1020 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1021 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1023 enum RemoteDatalogStatus
{
1024 RemoteNotTrimmed
= 0,
1026 RemoteMightTrimmed
= 2
1029 class RGWDataSyncShardCR
: public RGWCoroutine
{
1030 RGWDataSyncEnv
*sync_env
;
1035 rgw_data_sync_marker sync_marker
;
1037 map
<string
, bufferlist
> entries
;
1038 map
<string
, bufferlist
>::iterator iter
;
1042 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1044 list
<rgw_data_change_log_entry
> log_entries
;
1045 list
<rgw_data_change_log_entry
>::iterator log_iter
;
1048 RGWDataChangesLogInfo shard_info
;
1049 string datalog_marker
;
1051 RemoteDatalogStatus remote_trimmed
;
1055 boost::asio::coroutine incremental_cr
;
1056 boost::asio::coroutine full_cr
;
1059 set
<string
> modified_shards
;
1060 set
<string
> current_modified
;
1062 set
<string
>::iterator modified_iter
;
1068 bool *reset_backoff
;
1070 set
<string
> spawned_keys
;
1072 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1073 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1078 RGWOmapAppend
*error_repo
;
1079 map
<string
, bufferlist
> error_entries
;
1080 string error_marker
;
1081 int max_error_entries
;
1083 ceph::real_time error_retry_time
;
1085 #define RETRY_BACKOFF_SECS_MIN 60
1086 #define RETRY_BACKOFF_SECS_DEFAULT 60
1087 #define RETRY_BACKOFF_SECS_MAX 600
1088 uint32_t retry_backoff_secs
;
1090 RGWDataSyncDebugLogger logger
;
1092 RGWDataSyncShardCR(RGWDataSyncEnv
*_sync_env
,
1094 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
, bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1095 sync_env(_sync_env
),
1097 shard_id(_shard_id
),
1098 sync_marker(_marker
),
1099 marker_tracker(NULL
), truncated(false), remote_trimmed(RemoteNotTrimmed
), inc_lock("RGWDataSyncShardCR::inc_lock"),
1100 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW
), reset_backoff(NULL
),
1101 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES
),
1102 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT
) {
1103 set_description() << "data sync shard source_zone=" << sync_env
->source_zone
<< " shard_id=" << shard_id
;
1104 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
);
1105 error_oid
= status_oid
+ ".retry";
1107 logger
.init(sync_env
, "DataShard", status_oid
);
1110 ~RGWDataSyncShardCR() override
{
1111 delete marker_tracker
;
1120 void append_modified_shards(set
<string
>& keys
) {
1121 Mutex::Locker
l(inc_lock
);
1122 modified_shards
.insert(keys
.begin(), keys
.end());
1125 void set_marker_tracker(RGWDataSyncShardMarkerTrack
*mt
) {
1126 delete marker_tracker
;
1127 marker_tracker
= mt
;
1130 int operate() override
{
1133 switch (sync_marker
.state
) {
1134 case rgw_data_sync_marker::FullSync
:
1137 ldout(cct
, 10) << "sync: full_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1138 return set_cr_error(r
);
1141 case rgw_data_sync_marker::IncrementalSync
:
1142 r
= incremental_sync();
1144 ldout(cct
, 10) << "sync: incremental_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1145 return set_cr_error(r
);
1149 return set_cr_error(-EIO
);
1155 void init_lease_cr() {
1156 set_status("acquiring sync lock");
1157 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1158 string lock_name
= "sync_lock";
1162 RGWRados
*store
= sync_env
->store
;
1163 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1164 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
1165 lock_name
, lock_duration
, this));
1166 lease_stack
.reset(spawn(lease_cr
.get(), false));
1170 #define OMAP_GET_MAX_ENTRIES 100
1171 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1173 yield
init_lease_cr();
1174 while (!lease_cr
->is_locked()) {
1175 if (lease_cr
->is_done()) {
1176 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1177 set_status("lease lock failed, early abort");
1178 return set_cr_error(lease_cr
->get_ret_status());
1183 logger
.log("full sync");
1184 oid
= full_data_sync_index_shard_oid(sync_env
->source_zone
, shard_id
);
1185 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
));
1186 total_entries
= sync_marker
.pos
;
1188 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
), sync_marker
.marker
, &entries
, max_entries
));
1190 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode
<< dendl
;
1191 lease_cr
->go_down();
1193 return set_cr_error(retcode
);
1195 iter
= entries
.begin();
1196 for (; iter
!= entries
.end(); ++iter
) {
1197 ldout(sync_env
->cct
, 20) << __func__
<< ": full sync: " << iter
->first
<< dendl
;
1199 if (!marker_tracker
->start(iter
->first
, total_entries
, real_time())) {
1200 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << iter
->first
<< ". Duplicate entry?" << dendl
;
1202 // fetch remote and write locally
1203 yield
spawn(new RGWDataSyncSingleEntryCR(sync_env
, iter
->first
, iter
->first
, marker_tracker
, error_repo
, false), false);
1205 lease_cr
->go_down();
1207 return set_cr_error(retcode
);
1210 sync_marker
.marker
= iter
->first
;
1212 } while ((int)entries
.size() == max_entries
);
1214 lease_cr
->go_down();
1218 /* update marker to reflect we're done with full sync */
1219 sync_marker
.state
= rgw_data_sync_marker::IncrementalSync
;
1220 sync_marker
.marker
= sync_marker
.next_step_marker
;
1221 sync_marker
.next_step_marker
.clear();
1222 RGWRados
*store
= sync_env
->store
;
1223 call(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
1224 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
1228 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync marker: retcode=" << retcode
<< dendl
;
1229 lease_cr
->go_down();
1230 return set_cr_error(retcode
);
1236 int incremental_sync() {
1237 reenter(&incremental_cr
) {
1238 yield
init_lease_cr();
1239 while (!lease_cr
->is_locked()) {
1240 if (lease_cr
->is_done()) {
1241 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1242 set_status("lease lock failed, early abort");
1243 return set_cr_error(lease_cr
->get_ret_status());
1248 set_status("lease acquired");
1249 error_repo
= new RGWOmapAppend(sync_env
->async_rados
, sync_env
->store
,
1250 rgw_raw_obj(pool
, error_oid
),
1253 spawn(error_repo
, false);
1254 logger
.log("inc sync");
1255 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
));
1257 current_modified
.clear();
1259 current_modified
.swap(modified_shards
);
1262 /* process out of band updates */
1263 for (modified_iter
= current_modified
.begin(); modified_iter
!= current_modified
.end(); ++modified_iter
) {
1265 ldout(sync_env
->cct
, 20) << __func__
<< "(): async update notification: " << *modified_iter
<< dendl
;
1266 spawn(new RGWDataSyncSingleEntryCR(sync_env
, *modified_iter
, string(), marker_tracker
, error_repo
, false), false);
1270 /* process bucket shards that previously failed */
1271 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, error_oid
),
1272 error_marker
, &error_entries
,
1273 max_error_entries
));
1274 ldout(sync_env
->cct
, 20) << __func__
<< "(): read error repo, got " << error_entries
.size() << " entries" << dendl
;
1275 iter
= error_entries
.begin();
1276 for (; iter
!= error_entries
.end(); ++iter
) {
1277 ldout(sync_env
->cct
, 20) << __func__
<< "(): handle error entry: " << iter
->first
<< dendl
;
1278 spawn(new RGWDataSyncSingleEntryCR(sync_env
, iter
->first
, iter
->first
, nullptr /* no marker tracker */, error_repo
, true), false);
1279 error_marker
= iter
->first
;
1281 if ((int)error_entries
.size() != max_error_entries
) {
1282 if (error_marker
.empty() && error_entries
.empty()) {
1283 /* the retry repo is empty, we back off a bit before calling it again */
1284 retry_backoff_secs
*= 2;
1285 if (retry_backoff_secs
> RETRY_BACKOFF_SECS_MAX
) {
1286 retry_backoff_secs
= RETRY_BACKOFF_SECS_MAX
;
1289 retry_backoff_secs
= RETRY_BACKOFF_SECS_DEFAULT
;
1291 error_retry_time
= ceph::real_clock::now() + make_timespan(retry_backoff_secs
);
1292 error_marker
.clear();
1296 yield
call(new RGWReadRemoteDataLogShardInfoCR(sync_env
, shard_id
, &shard_info
));
1298 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode
<< dendl
;
1299 stop_spawned_services();
1301 return set_cr_error(retcode
);
1303 datalog_marker
= shard_info
.marker
;
1304 remote_trimmed
= RemoteNotTrimmed
;
1305 #define INCREMENTAL_MAX_ENTRIES 100
1306 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " datalog_marker=" << datalog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< dendl
;
1307 if (datalog_marker
> sync_marker
.marker
) {
1308 spawned_keys
.clear();
1309 if (sync_marker
.marker
.empty())
1310 remote_trimmed
= RemoteMightTrimmed
; //remote data log shard might be trimmed;
1311 yield
call(new RGWReadRemoteDataLogShardCR(sync_env
, shard_id
, &sync_marker
.marker
, &log_entries
, &truncated
));
1313 ldout(sync_env
->cct
, 0) << "ERROR: failed to read remote data log info: ret=" << retcode
<< dendl
;
1314 stop_spawned_services();
1316 return set_cr_error(retcode
);
1318 if ((remote_trimmed
== RemoteMightTrimmed
) && sync_marker
.marker
.empty() && log_entries
.empty())
1319 remote_trimmed
= RemoteTrimmed
;
1321 remote_trimmed
= RemoteNotTrimmed
;
1322 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end(); ++log_iter
) {
1323 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " log_entry: " << log_iter
->log_id
<< ":" << log_iter
->log_timestamp
<< ":" << log_iter
->entry
.key
<< dendl
;
1324 if (!marker_tracker
->index_key_to_marker(log_iter
->entry
.key
, log_iter
->log_id
)) {
1325 ldout(sync_env
->cct
, 20) << __func__
<< ": skipping sync of entry: " << log_iter
->log_id
<< ":" << log_iter
->entry
.key
<< " sync already in progress for bucket shard" << dendl
;
1326 marker_tracker
->try_update_high_marker(log_iter
->log_id
, 0, log_iter
->log_timestamp
);
1329 if (!marker_tracker
->start(log_iter
->log_id
, 0, log_iter
->log_timestamp
)) {
1330 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << log_iter
->log_id
<< ". Duplicate entry?" << dendl
;
1333 * don't spawn the same key more than once. We can do that as long as we don't yield
1335 if (spawned_keys
.find(log_iter
->entry
.key
) == spawned_keys
.end()) {
1336 spawned_keys
.insert(log_iter
->entry
.key
);
1337 spawn(new RGWDataSyncSingleEntryCR(sync_env
, log_iter
->entry
.key
, log_iter
->log_id
, marker_tracker
, error_repo
, false), false);
1339 stop_spawned_services();
1341 return set_cr_error(retcode
);
1346 while ((int)num_spawned() > spawn_window
) {
1347 set_status() << "num_spawned() > spawn_window";
1348 yield
wait_for_child();
1350 while (collect(&ret
, lease_stack
.get())) {
1352 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
1353 /* we have reported this error */
1355 /* not waiting for child here */
1359 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " datalog_marker=" << datalog_marker
<< " sync_marker.marker=" << sync_marker
.marker
<< dendl
;
1360 if (datalog_marker
== sync_marker
.marker
|| remote_trimmed
== RemoteTrimmed
) {
1361 #define INCREMENTAL_INTERVAL 20
1362 yield
wait(utime_t(INCREMENTAL_INTERVAL
, 0));
1368 void stop_spawned_services() {
1369 lease_cr
->go_down();
1371 error_repo
->finish();
1378 class RGWDataSyncShardControlCR
: public RGWBackoffControlCR
{
1379 RGWDataSyncEnv
*sync_env
;
1384 rgw_data_sync_marker sync_marker
;
1387 RGWDataSyncShardControlCR(RGWDataSyncEnv
*_sync_env
, rgw_pool
& _pool
,
1388 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
) : RGWBackoffControlCR(_sync_env
->cct
, false),
1389 sync_env(_sync_env
),
1391 shard_id(_shard_id
),
1392 sync_marker(_marker
) {
1395 RGWCoroutine
*alloc_cr() override
{
1396 return new RGWDataSyncShardCR(sync_env
, pool
, shard_id
, sync_marker
, backoff_ptr());
1399 RGWCoroutine
*alloc_finisher_cr() override
{
1400 RGWRados
*store
= sync_env
->store
;
1401 return new RGWSimpleRadosReadCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
1402 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
1406 void append_modified_shards(set
<string
>& keys
) {
1407 Mutex::Locker
l(cr_lock());
1409 RGWDataSyncShardCR
*cr
= static_cast<RGWDataSyncShardCR
*>(get_cr());
1414 cr
->append_modified_shards(keys
);
1418 class RGWDataSyncCR
: public RGWCoroutine
{
1419 RGWDataSyncEnv
*sync_env
;
1420 uint32_t num_shards
;
1422 rgw_data_sync_status sync_status
;
1424 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1426 Mutex shard_crs_lock
;
1427 map
<int, RGWDataSyncShardControlCR
*> shard_crs
;
1429 bool *reset_backoff
;
1431 RGWDataSyncDebugLogger logger
;
1433 RGWDataSyncModule
*data_sync_module
{nullptr};
1435 RGWDataSyncCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
, bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1436 sync_env(_sync_env
),
1437 num_shards(_num_shards
),
1438 marker_tracker(NULL
),
1439 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1440 reset_backoff(_reset_backoff
), logger(sync_env
, "Data", "all") {
1444 ~RGWDataSyncCR() override
{
1445 for (auto iter
: shard_crs
) {
1450 int operate() override
{
1453 /* read sync status */
1454 yield
call(new RGWReadDataSyncStatusCoroutine(sync_env
, &sync_status
));
1456 data_sync_module
= sync_env
->sync_module
->get_data_handler();
1458 if (retcode
< 0 && retcode
!= -ENOENT
) {
1459 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode
<< dendl
;
1460 return set_cr_error(retcode
);
1463 /* state: init status */
1464 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateInit
) {
1465 ldout(sync_env
->cct
, 20) << __func__
<< "(): init" << dendl
;
1466 sync_status
.sync_info
.num_shards
= num_shards
;
1467 uint64_t instance_id
;
1468 get_random_bytes((char *)&instance_id
, sizeof(instance_id
));
1469 yield
call(new RGWInitDataSyncStatusCoroutine(sync_env
, num_shards
, instance_id
, &sync_status
));
1471 ldout(sync_env
->cct
, 0) << "ERROR: failed to init sync, retcode=" << retcode
<< dendl
;
1472 return set_cr_error(retcode
);
1474 // sets state = StateBuildingFullSyncMaps
1476 *reset_backoff
= true;
1479 data_sync_module
->init(sync_env
, sync_status
.sync_info
.instance_id
);
1481 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateBuildingFullSyncMaps
) {
1482 /* call sync module init here */
1483 yield
call(data_sync_module
->init_sync(sync_env
));
1485 ldout(sync_env
->cct
, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode
<< dendl
;
1486 return set_cr_error(retcode
);
1488 /* state: building full sync maps */
1489 ldout(sync_env
->cct
, 20) << __func__
<< "(): building full sync maps" << dendl
;
1490 yield
call(new RGWListBucketIndexesCR(sync_env
, &sync_status
));
1492 ldout(sync_env
->cct
, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode
<< dendl
;
1493 return set_cr_error(retcode
);
1495 sync_status
.sync_info
.state
= rgw_data_sync_info::StateSync
;
1497 /* update new state */
1498 yield
call(set_sync_info_cr());
1500 ldout(sync_env
->cct
, 0) << "ERROR: failed to write sync status, retcode=" << retcode
<< dendl
;
1501 return set_cr_error(retcode
);
1504 *reset_backoff
= true;
1508 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateSync
) {
1509 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
.sync_markers
.begin();
1510 iter
!= sync_status
.sync_markers
.end(); ++iter
) {
1511 RGWDataSyncShardControlCR
*cr
= new RGWDataSyncShardControlCR(sync_env
, sync_env
->store
->get_zone_params().log_pool
,
1512 iter
->first
, iter
->second
);
1514 shard_crs_lock
.Lock();
1515 shard_crs
[iter
->first
] = cr
;
1516 shard_crs_lock
.Unlock();
1522 return set_cr_done();
1527 RGWCoroutine
*set_sync_info_cr() {
1528 RGWRados
*store
= sync_env
->store
;
1529 return new RGWSimpleRadosWriteCR
<rgw_data_sync_info
>(sync_env
->async_rados
, store
,
1530 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
1531 sync_status
.sync_info
);
1534 void wakeup(int shard_id
, set
<string
>& keys
) {
1535 Mutex::Locker
l(shard_crs_lock
);
1536 map
<int, RGWDataSyncShardControlCR
*>::iterator iter
= shard_crs
.find(shard_id
);
1537 if (iter
== shard_crs
.end()) {
1540 iter
->second
->append_modified_shards(keys
);
1541 iter
->second
->wakeup();
1545 class RGWDefaultDataSyncModule
: public RGWDataSyncModule
{
1547 RGWDefaultDataSyncModule() {}
1549 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1550 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1551 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1552 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1555 class RGWDefaultSyncModuleInstance
: public RGWSyncModuleInstance
{
1556 RGWDefaultDataSyncModule data_handler
;
1558 RGWDefaultSyncModuleInstance() {}
1559 RGWDataSyncModule
*get_data_handler() override
{
1560 return &data_handler
;
1564 int RGWDefaultSyncModule::create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
)
1566 instance
->reset(new RGWDefaultSyncModuleInstance());
1570 RGWCoroutine
*RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1572 return new RGWFetchRemoteObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
, bucket_info
,
1573 key
, versioned_epoch
,
1577 RGWCoroutine
*RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
,
1578 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1580 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1581 bucket_info
, key
, versioned
, versioned_epoch
,
1582 NULL
, NULL
, false, &mtime
, zones_trace
);
1585 RGWCoroutine
*RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1586 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1588 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1589 bucket_info
, key
, versioned
, versioned_epoch
,
1590 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
1593 class RGWDataSyncControlCR
: public RGWBackoffControlCR
1595 RGWDataSyncEnv
*sync_env
;
1596 uint32_t num_shards
;
1599 RGWDataSyncControlCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
) : RGWBackoffControlCR(_sync_env
->cct
, true),
1600 sync_env(_sync_env
), num_shards(_num_shards
) {
1603 RGWCoroutine
*alloc_cr() override
{
1604 return new RGWDataSyncCR(sync_env
, num_shards
, backoff_ptr());
1607 void wakeup(int shard_id
, set
<string
>& keys
) {
1608 Mutex
& m
= cr_lock();
1611 RGWDataSyncCR
*cr
= static_cast<RGWDataSyncCR
*>(get_cr());
1621 cr
->wakeup(shard_id
, keys
);
1628 void RGWRemoteDataLog::wakeup(int shard_id
, set
<string
>& keys
) {
1629 RWLock::RLocker
rl(lock
);
1630 if (!data_sync_cr
) {
1633 data_sync_cr
->wakeup(shard_id
, keys
);
1636 int RGWRemoteDataLog::run_sync(int num_shards
)
1639 data_sync_cr
= new RGWDataSyncControlCR(&sync_env
, num_shards
);
1640 data_sync_cr
->get(); // run() will drop a ref, so take another
1643 int r
= run(data_sync_cr
);
1646 data_sync_cr
->put();
1647 data_sync_cr
= NULL
;
1651 ldout(store
->ctx(), 0) << "ERROR: failed to run sync" << dendl
;
1657 int RGWDataSyncStatusManager::init()
1659 auto zone_def_iter
= store
->zone_by_id
.find(source_zone
);
1660 if (zone_def_iter
== store
->zone_by_id
.end()) {
1661 ldout(store
->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone
<< dendl
;
1665 auto& zone_def
= zone_def_iter
->second
;
1667 if (!store
->get_sync_modules_manager()->supports_data_export(zone_def
.tier_type
)) {
1671 RGWZoneParams
& zone_params
= store
->get_zone_params();
1673 sync_module
= store
->get_sync_module();
1675 conn
= store
->get_zone_conn_by_id(source_zone
);
1677 ldout(store
->ctx(), 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
1681 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
1683 int r
= source_log
.init(source_zone
, conn
, error_logger
, sync_module
);
1685 lderr(store
->ctx()) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
1690 rgw_datalog_info datalog_info
;
1691 r
= source_log
.read_log_info(&datalog_info
);
1693 ldout(store
->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r
<< dendl
;
1698 num_shards
= datalog_info
.num_shards
;
1700 for (int i
= 0; i
< num_shards
; i
++) {
1701 shard_objs
[i
] = rgw_raw_obj(zone_params
.log_pool
, shard_obj_name(source_zone
, i
));
1707 void RGWDataSyncStatusManager::finalize()
1709 delete error_logger
;
1710 error_logger
= nullptr;
1713 string
RGWDataSyncStatusManager::sync_status_oid(const string
& source_zone
)
1715 char buf
[datalog_sync_status_oid_prefix
.size() + source_zone
.size() + 16];
1716 snprintf(buf
, sizeof(buf
), "%s.%s", datalog_sync_status_oid_prefix
.c_str(), source_zone
.c_str());
1721 string
RGWDataSyncStatusManager::shard_obj_name(const string
& source_zone
, int shard_id
)
1723 char buf
[datalog_sync_status_shard_prefix
.size() + source_zone
.size() + 16];
1724 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_status_shard_prefix
.c_str(), source_zone
.c_str(), shard_id
);
1729 int RGWRemoteBucketLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
,
1730 const rgw_bucket
& bucket
, int shard_id
,
1731 RGWSyncErrorLogger
*_error_logger
,
1732 RGWSyncModuleInstanceRef
& _sync_module
)
1735 source_zone
= _source_zone
;
1737 bs
.shard_id
= shard_id
;
1739 sync_env
.init(store
->ctx(), store
, conn
, async_rados
, http_manager
, _error_logger
, source_zone
, _sync_module
);
1744 struct bucket_index_marker_info
{
1748 bool syncstopped
{false};
1750 void decode_json(JSONObj
*obj
) {
1751 JSONDecoder::decode_json("bucket_ver", bucket_ver
, obj
);
1752 JSONDecoder::decode_json("master_ver", master_ver
, obj
);
1753 JSONDecoder::decode_json("max_marker", max_marker
, obj
);
1754 JSONDecoder::decode_json("syncstopped", syncstopped
, obj
);
1758 class RGWReadRemoteBucketIndexLogInfoCR
: public RGWCoroutine
{
1759 RGWDataSyncEnv
*sync_env
;
1760 const string instance_key
;
1762 bucket_index_marker_info
*info
;
1765 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv
*_sync_env
,
1766 const rgw_bucket_shard
& bs
,
1767 bucket_index_marker_info
*_info
)
1768 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1769 instance_key(bs
.get_key()), info(_info
) {}
1771 int operate() override
{
1774 rgw_http_param_pair pairs
[] = { { "type" , "bucket-index" },
1775 { "bucket-instance", instance_key
.c_str() },
1779 string p
= "/admin/log/";
1780 call(new RGWReadRESTResourceCR
<bucket_index_marker_info
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, info
));
1783 return set_cr_error(retcode
);
1785 return set_cr_done();
1791 class RGWInitBucketShardSyncStatusCoroutine
: public RGWCoroutine
{
1792 RGWDataSyncEnv
*sync_env
;
1794 rgw_bucket_shard bs
;
1795 const string sync_status_oid
;
1797 rgw_bucket_shard_sync_info
& status
;
1799 bucket_index_marker_info info
;
1801 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
1802 const rgw_bucket_shard
& bs
,
1803 rgw_bucket_shard_sync_info
& _status
)
1804 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
1805 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
1809 int operate() override
{
1811 /* fetch current position in logs */
1812 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env
, bs
, &info
));
1813 if (retcode
< 0 && retcode
!= -ENOENT
) {
1814 ldout(cct
, 0) << "ERROR: failed to fetch bucket index status" << dendl
;
1815 return set_cr_error(retcode
);
1818 auto store
= sync_env
->store
;
1819 rgw_raw_obj
obj(store
->get_zone_params().log_pool
, sync_status_oid
);
1821 if (info
.syncstopped
) {
1822 call(new RGWRadosRemoveCR(store
, obj
));
1824 status
.state
= rgw_bucket_shard_sync_info::StateFullSync
;
1825 status
.inc_marker
.position
= info
.max_marker
;
1826 map
<string
, bufferlist
> attrs
;
1827 status
.encode_all_attrs(attrs
);
1828 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
, obj
, attrs
));
1831 return set_cr_done();
1837 RGWCoroutine
*RGWRemoteBucketLog::init_sync_status_cr()
1839 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env
, bs
, init_status
);
1843 static void decode_attr(CephContext
*cct
, map
<string
, bufferlist
>& attrs
, const string
& attr_name
, T
*val
)
1845 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
1846 if (iter
== attrs
.end()) {
1851 bufferlist::iterator biter
= iter
->second
.begin();
1853 ::decode(*val
, biter
);
1854 } catch (buffer::error
& err
) {
1855 ldout(cct
, 0) << "ERROR: failed to decode attribute: " << attr_name
<< dendl
;
1859 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
)
1861 decode_attr(cct
, attrs
, "state", &state
);
1862 decode_attr(cct
, attrs
, "full_marker", &full_marker
);
1863 decode_attr(cct
, attrs
, "inc_marker", &inc_marker
);
1866 void rgw_bucket_shard_sync_info::encode_all_attrs(map
<string
, bufferlist
>& attrs
)
1868 encode_state_attr(attrs
);
1869 full_marker
.encode_attr(attrs
);
1870 inc_marker
.encode_attr(attrs
);
1873 void rgw_bucket_shard_sync_info::encode_state_attr(map
<string
, bufferlist
>& attrs
)
1875 ::encode(state
, attrs
["state"]);
1878 void rgw_bucket_shard_full_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
1880 ::encode(*this, attrs
["full_marker"]);
1883 void rgw_bucket_shard_inc_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
1885 ::encode(*this, attrs
["inc_marker"]);
1888 class RGWReadBucketSyncStatusCoroutine
: public RGWCoroutine
{
1889 RGWDataSyncEnv
*sync_env
;
1891 rgw_bucket_shard_sync_info
*status
;
1893 map
<string
, bufferlist
> attrs
;
1895 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
1896 const rgw_bucket_shard
& bs
,
1897 rgw_bucket_shard_sync_info
*_status
)
1898 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1899 oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
1901 int operate() override
;
1904 int RGWReadBucketSyncStatusCoroutine::operate()
1907 yield
call(new RGWSimpleRadosReadAttrsCR(sync_env
->async_rados
, sync_env
->store
,
1908 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, oid
),
1910 if (retcode
== -ENOENT
) {
1911 *status
= rgw_bucket_shard_sync_info();
1912 return set_cr_done();
1915 ldout(sync_env
->cct
, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid
<< " ret=" << retcode
<< dendl
;
1916 return set_cr_error(retcode
);
1918 status
->decode_from_attrs(sync_env
->cct
, attrs
);
1919 return set_cr_done();
1923 RGWCoroutine
*RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
)
1925 return new RGWReadBucketSyncStatusCoroutine(&sync_env
, bs
, sync_status
);
1928 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
1929 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
1930 delete iter
->second
;
1932 delete error_logger
;
1936 void rgw_bucket_entry_owner::decode_json(JSONObj
*obj
)
1938 JSONDecoder::decode_json("ID", id
, obj
);
1939 JSONDecoder::decode_json("DisplayName", display_name
, obj
);
1942 struct bucket_list_entry
{
1949 string storage_class
;
1950 rgw_bucket_entry_owner owner
;
1951 uint64_t versioned_epoch
;
1954 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
1956 void decode_json(JSONObj
*obj
) {
1957 JSONDecoder::decode_json("IsDeleteMarker", delete_marker
, obj
);
1958 JSONDecoder::decode_json("Key", key
.name
, obj
);
1959 JSONDecoder::decode_json("VersionId", key
.instance
, obj
);
1960 JSONDecoder::decode_json("IsLatest", is_latest
, obj
);
1962 JSONDecoder::decode_json("RgwxMtime", mtime_str
, obj
);
1966 if (parse_iso8601(mtime_str
.c_str(), &t
, &nsec
)) {
1968 ts
.tv_sec
= (uint64_t)internal_timegm(&t
);
1970 mtime
= real_clock::from_ceph_timespec(ts
);
1972 JSONDecoder::decode_json("ETag", etag
, obj
);
1973 JSONDecoder::decode_json("Size", size
, obj
);
1974 JSONDecoder::decode_json("StorageClass", storage_class
, obj
);
1975 JSONDecoder::decode_json("Owner", owner
, obj
);
1976 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch
, obj
);
1977 JSONDecoder::decode_json("RgwxTag", rgw_tag
, obj
);
1981 struct bucket_list_result
{
1985 string version_id_marker
;
1988 list
<bucket_list_entry
> entries
;
1990 bucket_list_result() : max_keys(0), is_truncated(false) {}
1992 void decode_json(JSONObj
*obj
) {
1993 JSONDecoder::decode_json("Name", name
, obj
);
1994 JSONDecoder::decode_json("Prefix", prefix
, obj
);
1995 JSONDecoder::decode_json("KeyMarker", key_marker
, obj
);
1996 JSONDecoder::decode_json("VersionIdMarker", version_id_marker
, obj
);
1997 JSONDecoder::decode_json("MaxKeys", max_keys
, obj
);
1998 JSONDecoder::decode_json("IsTruncated", is_truncated
, obj
);
1999 JSONDecoder::decode_json("Entries", entries
, obj
);
2003 class RGWListBucketShardCR
: public RGWCoroutine
{
2004 RGWDataSyncEnv
*sync_env
;
2005 const rgw_bucket_shard
& bs
;
2006 const string instance_key
;
2007 rgw_obj_key marker_position
;
2009 bucket_list_result
*result
;
2012 RGWListBucketShardCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2013 rgw_obj_key
& _marker_position
, bucket_list_result
*_result
)
2014 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2015 instance_key(bs
.get_key()), marker_position(_marker_position
),
2018 int operate() override
{
2021 rgw_http_param_pair pairs
[] = { { "rgwx-bucket-instance", instance_key
.c_str() },
2022 { "versions" , NULL
},
2023 { "format" , "json" },
2024 { "objs-container" , "true" },
2025 { "key-marker" , marker_position
.name
.c_str() },
2026 { "version-id-marker" , marker_position
.instance
.c_str() },
2028 // don't include tenant in the url, it's already part of instance_key
2029 string p
= string("/") + bs
.bucket
.name
;
2030 call(new RGWReadRESTResourceCR
<bucket_list_result
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, result
));
2033 return set_cr_error(retcode
);
2035 return set_cr_done();
2041 class RGWListBucketIndexLogCR
: public RGWCoroutine
{
2042 RGWDataSyncEnv
*sync_env
;
2043 const string instance_key
;
2046 list
<rgw_bi_log_entry
> *result
;
2049 RGWListBucketIndexLogCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2050 string
& _marker
, list
<rgw_bi_log_entry
> *_result
)
2051 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2052 instance_key(bs
.get_key()), marker(_marker
), result(_result
) {}
2054 int operate() override
{
2057 rgw_http_param_pair pairs
[] = { { "bucket-instance", instance_key
.c_str() },
2058 { "format" , "json" },
2059 { "marker" , marker
.c_str() },
2060 { "type", "bucket-index" },
2063 call(new RGWReadRESTResourceCR
<list
<rgw_bi_log_entry
> >(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, "/admin/log", pairs
, result
));
2066 return set_cr_error(retcode
);
2068 return set_cr_done();
2074 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2076 class RGWBucketFullSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<rgw_obj_key
, rgw_obj_key
> {
2077 RGWDataSyncEnv
*sync_env
;
2080 rgw_bucket_shard_full_sync_marker sync_marker
;
2083 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2084 const string
& _marker_oid
,
2085 const rgw_bucket_shard_full_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2086 sync_env(_sync_env
),
2087 marker_oid(_marker_oid
),
2088 sync_marker(_marker
) {}
2090 RGWCoroutine
*store_marker(const rgw_obj_key
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2091 sync_marker
.position
= new_marker
;
2092 sync_marker
.count
= index_pos
;
2094 map
<string
, bufferlist
> attrs
;
2095 sync_marker
.encode_attr(attrs
);
2097 RGWRados
*store
= sync_env
->store
;
2099 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
2100 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
,
2101 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
2106 class RGWBucketIncSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, rgw_obj_key
> {
2107 RGWDataSyncEnv
*sync_env
;
2110 rgw_bucket_shard_inc_sync_marker sync_marker
;
2112 map
<rgw_obj_key
, string
> key_to_marker
;
2113 map
<string
, rgw_obj_key
> marker_to_key
;
2115 void handle_finish(const string
& marker
) override
{
2116 map
<string
, rgw_obj_key
>::iterator iter
= marker_to_key
.find(marker
);
2117 if (iter
== marker_to_key
.end()) {
2120 key_to_marker
.erase(iter
->second
);
2121 reset_need_retry(iter
->second
);
2122 marker_to_key
.erase(iter
);
2126 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2127 const string
& _marker_oid
,
2128 const rgw_bucket_shard_inc_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2129 sync_env(_sync_env
),
2130 marker_oid(_marker_oid
),
2131 sync_marker(_marker
) {}
2133 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2134 sync_marker
.position
= new_marker
;
2136 map
<string
, bufferlist
> attrs
;
2137 sync_marker
.encode_attr(attrs
);
2139 RGWRados
*store
= sync_env
->store
;
2141 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
2142 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
,
2144 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
2149 * create index from key -> <op, marker>, and from marker -> key
2150 * this is useful so that we can insure that we only have one
2151 * entry for any key that is used. This is needed when doing
2152 * incremenatl sync of data, and we don't want to run multiple
2153 * concurrent sync operations for the same bucket shard
2154 * Also, we should make sure that we don't run concurrent operations on the same key with
2157 bool index_key_to_marker(const rgw_obj_key
& key
, const string
& marker
) {
2158 if (key_to_marker
.find(key
) != key_to_marker
.end()) {
2159 set_need_retry(key
);
2162 key_to_marker
[key
] = marker
;
2163 marker_to_key
[marker
] = key
;
2167 bool can_do_op(const rgw_obj_key
& key
) {
2168 return (key_to_marker
.find(key
) == key_to_marker
.end());
2172 template <class T
, class K
>
2173 class RGWBucketSyncSingleEntryCR
: public RGWCoroutine
{
2174 RGWDataSyncEnv
*sync_env
;
2176 RGWBucketInfo
*bucket_info
;
2177 const rgw_bucket_shard
& bs
;
2181 uint64_t versioned_epoch
;
2182 rgw_bucket_entry_owner owner
;
2183 real_time timestamp
;
2185 RGWPendingState op_state
;
2188 RGWSyncShardMarkerTrack
<T
, K
> *marker_tracker
;
2192 stringstream error_ss
;
2194 RGWDataSyncDebugLogger logger
;
2196 bool error_injection
;
2198 RGWDataSyncModule
*data_sync_module
;
2200 rgw_zone_set zones_trace
;
2203 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
2204 RGWBucketInfo
*_bucket_info
,
2205 const rgw_bucket_shard
& bs
,
2206 const rgw_obj_key
& _key
, bool _versioned
, uint64_t _versioned_epoch
,
2207 real_time
& _timestamp
,
2208 const rgw_bucket_entry_owner
& _owner
,
2209 RGWModifyOp _op
, RGWPendingState _op_state
,
2210 const T
& _entry_marker
, RGWSyncShardMarkerTrack
<T
, K
> *_marker_tracker
, rgw_zone_set
& _zones_trace
) : RGWCoroutine(_sync_env
->cct
),
2211 sync_env(_sync_env
),
2212 bucket_info(_bucket_info
), bs(bs
),
2213 key(_key
), versioned(_versioned
), versioned_epoch(_versioned_epoch
),
2215 timestamp(_timestamp
), op(_op
),
2216 op_state(_op_state
),
2217 entry_marker(_entry_marker
),
2218 marker_tracker(_marker_tracker
),
2221 ss
<< bucket_shard_str
{bs
} << "/" << key
<< "[" << versioned_epoch
<< "]";
2222 set_description() << "bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
;
2223 ldout(sync_env
->cct
, 20) << "bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
<< dendl
;
2226 logger
.init(sync_env
, "Object", ss
.str());
2228 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_data_inject_err_probability
> 0);
2230 data_sync_module
= sync_env
->sync_module
->get_data_handler();
2232 zones_trace
= _zones_trace
;
2233 zones_trace
.insert(sync_env
->store
->get_zone().id
);
2236 int operate() override
{
2238 /* skip entries that are not complete */
2239 if (op_state
!= CLS_RGW_STATE_COMPLETE
) {
2244 marker_tracker
->reset_need_retry(key
);
2245 if (key
.name
.empty()) {
2246 /* shouldn't happen */
2247 set_status("skipping empty entry");
2248 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): entry with empty obj name, skipping" << dendl
;
2251 if (error_injection
&&
2252 rand() % 10000 < cct
->_conf
->rgw_sync_data_inject_err_probability
* 10000.0) {
2253 ldout(sync_env
->cct
, 0) << __func__
<< ": injecting data sync error on key=" << key
.name
<< dendl
;
2255 } else if (op
== CLS_RGW_OP_ADD
||
2256 op
== CLS_RGW_OP_LINK_OLH
) {
2257 if (op
== CLS_RGW_OP_ADD
&& !key
.instance
.empty() && key
.instance
!= "null") {
2258 set_status("skipping entry");
2259 ldout(sync_env
->cct
, 10) << "bucket skipping sync obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
<< "]: versioned object will be synced on link_olh" << dendl
;
2263 set_status("syncing obj");
2264 ldout(sync_env
->cct
, 5) << "bucket sync: sync obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
<< "]" << dendl
;
2265 logger
.log("fetch");
2266 call(data_sync_module
->sync_object(sync_env
, *bucket_info
, key
, versioned_epoch
, &zones_trace
));
2267 } else if (op
== CLS_RGW_OP_DEL
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2268 set_status("removing obj");
2269 if (op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2272 logger
.log("remove");
2273 call(data_sync_module
->remove_object(sync_env
, *bucket_info
, key
, timestamp
, versioned
, versioned_epoch
, &zones_trace
));
2274 } else if (op
== CLS_RGW_OP_LINK_OLH_DM
) {
2275 logger
.log("creating delete marker");
2276 set_status("creating delete marker");
2277 ldout(sync_env
->cct
, 10) << "creating delete marker: obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
<< "]" << dendl
;
2278 call(data_sync_module
->create_delete_marker(sync_env
, *bucket_info
, key
, timestamp
, owner
, versioned
, versioned_epoch
, &zones_trace
));
2281 } while (marker_tracker
->need_retry(key
));
2287 ss
<< "done, retcode=" << retcode
;
2289 logger
.log(ss
.str());
2292 if (retcode
< 0 && retcode
!= -ENOENT
) {
2293 set_status() << "failed to sync obj; retcode=" << retcode
;
2294 ldout(sync_env
->cct
, 0) << "ERROR: failed to sync object: "
2295 << bucket_shard_str
{bs
} << "/" << key
.name
<< dendl
;
2296 error_ss
<< bucket_shard_str
{bs
} << "/" << key
.name
;
2297 sync_status
= retcode
;
2299 if (!error_ss
.str().empty()) {
2300 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", error_ss
.str(), -retcode
, "failed to sync object"));
2303 if (sync_status
== 0) {
2305 set_status() << "calling marker_tracker->finish(" << entry_marker
<< ")";
2306 yield
call(marker_tracker
->finish(entry_marker
));
2307 sync_status
= retcode
;
2309 if (sync_status
< 0) {
2310 return set_cr_error(sync_status
);
2312 return set_cr_done();
2318 #define BUCKET_SYNC_SPAWN_WINDOW 20
2320 class RGWBucketShardFullSyncCR
: public RGWCoroutine
{
2321 RGWDataSyncEnv
*sync_env
;
2322 const rgw_bucket_shard
& bs
;
2323 RGWBucketInfo
*bucket_info
;
2324 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2325 bucket_list_result list_result
;
2326 list
<bucket_list_entry
>::iterator entries_iter
;
2327 rgw_bucket_shard_full_sync_marker
& full_marker
;
2328 RGWBucketFullSyncShardMarkerTrack marker_tracker
;
2329 rgw_obj_key list_marker
;
2330 bucket_list_entry
*entry
{nullptr};
2331 RGWModifyOp op
{CLS_RGW_OP_ADD
};
2333 int total_entries
{0};
2337 const string
& status_oid
;
2339 RGWDataSyncDebugLogger logger
;
2340 rgw_zone_set zones_trace
;
2342 RGWBucketShardFullSyncCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2343 RGWBucketInfo
*_bucket_info
,
2344 const std::string
& status_oid
,
2345 RGWContinuousLeaseCR
*lease_cr
,
2346 rgw_bucket_shard_full_sync_marker
& _full_marker
)
2347 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2348 bucket_info(_bucket_info
), lease_cr(lease_cr
), full_marker(_full_marker
),
2349 marker_tracker(sync_env
, status_oid
, full_marker
),
2350 status_oid(status_oid
) {
2351 logger
.init(sync_env
, "BucketFull", bs
.get_key());
2352 zones_trace
.insert(sync_env
->source_zone
);
2355 int operate() override
;
2358 int RGWBucketShardFullSyncCR::operate()
2362 list_marker
= full_marker
.position
;
2364 total_entries
= full_marker
.count
;
2366 if (!lease_cr
->is_locked()) {
2368 return set_cr_error(-ECANCELED
);
2370 set_status("listing remote bucket");
2371 ldout(sync_env
->cct
, 20) << __func__
<< "(): listing bucket for full sync" << dendl
;
2372 yield
call(new RGWListBucketShardCR(sync_env
, bs
, list_marker
,
2374 if (retcode
< 0 && retcode
!= -ENOENT
) {
2375 set_status("failed bucket listing, going down");
2377 return set_cr_error(retcode
);
2379 entries_iter
= list_result
.entries
.begin();
2380 for (; entries_iter
!= list_result
.entries
.end(); ++entries_iter
) {
2381 if (!lease_cr
->is_locked()) {
2383 return set_cr_error(-ECANCELED
);
2385 ldout(sync_env
->cct
, 20) << "[full sync] syncing object: "
2386 << bucket_shard_str
{bs
} << "/" << entries_iter
->key
<< dendl
;
2387 entry
= &(*entries_iter
);
2389 list_marker
= entries_iter
->key
;
2390 if (!marker_tracker
.start(entry
->key
, total_entries
, real_time())) {
2391 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << entry
->key
<< ". Duplicate entry?" << dendl
;
2393 op
= (entry
->key
.instance
.empty() || entry
->key
.instance
== "null" ? CLS_RGW_OP_ADD
: CLS_RGW_OP_LINK_OLH
);
2394 using SyncCR
= RGWBucketSyncSingleEntryCR
<rgw_obj_key
, rgw_obj_key
>;
2395 yield
spawn(new SyncCR(sync_env
, bucket_info
, bs
, entry
->key
,
2396 false, /* versioned, only matters for object removal */
2397 entry
->versioned_epoch
, entry
->mtime
,
2398 entry
->owner
, op
, CLS_RGW_STATE_COMPLETE
,
2399 entry
->key
, &marker_tracker
, zones_trace
),
2402 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
2403 yield
wait_for_child();
2406 again
= collect(&ret
, nullptr);
2408 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2410 /* we have reported this error */
2415 } while (list_result
.is_truncated
&& sync_status
== 0);
2416 set_status("done iterating over all objects");
2417 /* wait for all operations to complete */
2418 while (num_spawned()) {
2419 yield
wait_for_child();
2422 again
= collect(&ret
, nullptr);
2424 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2426 /* we have reported this error */
2430 if (!lease_cr
->is_locked()) {
2431 return set_cr_error(-ECANCELED
);
2433 /* update sync state to incremental */
2434 if (sync_status
== 0) {
2436 rgw_bucket_shard_sync_info sync_status
;
2437 sync_status
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
2438 map
<string
, bufferlist
> attrs
;
2439 sync_status
.encode_state_attr(attrs
);
2440 RGWRados
*store
= sync_env
->store
;
2441 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
,
2442 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
2446 ldout(sync_env
->cct
, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status
<< ")" << dendl
;
2448 if (retcode
< 0 && sync_status
== 0) { /* actually tried to set incremental state and failed */
2449 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync state on bucket "
2450 << bucket_shard_str
{bs
} << " retcode=" << retcode
<< dendl
;
2451 return set_cr_error(retcode
);
2453 if (sync_status
< 0) {
2454 return set_cr_error(sync_status
);
2456 return set_cr_done();
2461 class RGWBucketShardIncrementalSyncCR
: public RGWCoroutine
{
2462 RGWDataSyncEnv
*sync_env
;
2463 const rgw_bucket_shard
& bs
;
2464 RGWBucketInfo
*bucket_info
;
2465 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2466 list
<rgw_bi_log_entry
> list_result
;
2467 list
<rgw_bi_log_entry
>::iterator entries_iter
;
2468 map
<pair
<string
, string
>, pair
<real_time
, RGWModifyOp
> > squash_map
;
2469 rgw_bucket_shard_inc_sync_marker
& inc_marker
;
2471 rgw_bi_log_entry
*entry
{nullptr};
2472 RGWBucketIncSyncShardMarkerTrack marker_tracker
;
2473 bool updated_status
{false};
2474 const string
& status_oid
;
2475 const string
& zone_id
;
2476 ceph::real_time sync_modify_time
;
2480 RGWDataSyncDebugLogger logger
;
2483 bool syncstopped
{false};
2486 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv
*_sync_env
,
2487 const rgw_bucket_shard
& bs
,
2488 RGWBucketInfo
*_bucket_info
,
2489 const std::string
& status_oid
,
2490 RGWContinuousLeaseCR
*lease_cr
,
2491 rgw_bucket_shard_inc_sync_marker
& _inc_marker
)
2492 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2493 bucket_info(_bucket_info
), lease_cr(lease_cr
), inc_marker(_inc_marker
),
2494 marker_tracker(sync_env
, status_oid
, inc_marker
), status_oid(status_oid
) , zone_id(_sync_env
->store
->get_zone().id
){
2495 set_description() << "bucket shard incremental sync bucket="
2496 << bucket_shard_str
{bs
};
2498 logger
.init(sync_env
, "BucketInc", bs
.get_key());
2501 int operate() override
;
2504 int RGWBucketShardIncrementalSyncCR::operate()
2509 if (!lease_cr
->is_locked()) {
2511 return set_cr_error(-ECANCELED
);
2513 ldout(sync_env
->cct
, 20) << __func__
<< "(): listing bilog for incremental sync" << inc_marker
.position
<< dendl
;
2514 set_status() << "listing bilog; position=" << inc_marker
.position
;
2515 yield
call(new RGWListBucketIndexLogCR(sync_env
, bs
, inc_marker
.position
,
2517 if (retcode
< 0 && retcode
!= -ENOENT
) {
2520 /* wait for all operations to complete */
2521 return set_cr_error(retcode
);
2523 /* no need to retry */
2528 for (auto& e
: list_result
) {
2529 if (e
.op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
&& (sync_modify_time
< e
.timestamp
)) {
2530 ldout(sync_env
->cct
, 20) << " syncstop on " << e
.timestamp
<< dendl
;
2531 sync_modify_time
= e
.timestamp
;
2535 if (e
.op
== RGWModifyOp::CLS_RGW_OP_RESYNC
&& (sync_modify_time
< e
.timestamp
)) {
2536 ldout(sync_env
->cct
, 20) << " resync on " << e
.timestamp
<< dendl
;
2537 sync_modify_time
= e
.timestamp
;
2538 syncstopped
= false;
2541 if (e
.state
!= CLS_RGW_STATE_COMPLETE
) {
2544 if (e
.zones_trace
.find(zone_id
) != e
.zones_trace
.end()) {
2547 auto& squash_entry
= squash_map
[make_pair(e
.object
, e
.instance
)];
2548 if (squash_entry
.first
<= e
.timestamp
) {
2549 squash_entry
= make_pair
<>(e
.timestamp
, e
.op
);
2553 entries_iter
= list_result
.begin();
2554 for (; entries_iter
!= list_result
.end(); ++entries_iter
) {
2555 if (!lease_cr
->is_locked()) {
2557 return set_cr_error(-ECANCELED
);
2559 entry
= &(*entries_iter
);
2561 ssize_t p
= entry
->id
.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2565 cur_id
= entry
->id
.substr(p
+ 1);
2568 inc_marker
.position
= cur_id
;
2570 if (entry
->op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
|| entry
->op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
2571 ldout(sync_env
->cct
, 20) << "detected syncstop or resync on " << entries_iter
->timestamp
<< " , skipping entry" << dendl
;
2572 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2576 if (!key
.set(rgw_obj_index_key
{entry
->object
, entry
->instance
})) {
2577 set_status() << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry";
2578 ldout(sync_env
->cct
, 20) << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry" << dendl
;
2579 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2583 ldout(sync_env
->cct
, 20) << "parsed entry: id=" << cur_id
<< " iter->object=" << entry
->object
<< " iter->instance=" << entry
->instance
<< " name=" << key
.name
<< " instance=" << key
.instance
<< " ns=" << key
.ns
<< dendl
;
2585 if (!key
.ns
.empty()) {
2586 set_status() << "skipping entry in namespace: " << entry
->object
;
2587 ldout(sync_env
->cct
, 20) << "skipping entry in namespace: " << entry
->object
<< dendl
;
2588 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2592 set_status() << "got entry.id=" << cur_id
<< " key=" << key
<< " op=" << (int)entry
->op
;
2593 if (entry
->op
== CLS_RGW_OP_CANCEL
) {
2594 set_status() << "canceled operation, skipping";
2595 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2596 << bucket_shard_str
{bs
} << "/" << key
<< ": canceled operation" << dendl
;
2597 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2600 if (entry
->state
!= CLS_RGW_STATE_COMPLETE
) {
2601 set_status() << "non-complete operation, skipping";
2602 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2603 << bucket_shard_str
{bs
} << "/" << key
<< ": non-complete operation" << dendl
;
2604 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2607 if (entry
->zones_trace
.find(zone_id
) != entry
->zones_trace
.end()) {
2608 set_status() << "redundant operation, skipping";
2609 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2610 <<bucket_shard_str
{bs
} <<"/"<<key
<<": redundant operation" << dendl
;
2611 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2614 if (make_pair
<>(entry
->timestamp
, entry
->op
) != squash_map
[make_pair(entry
->object
, entry
->instance
)]) {
2615 set_status() << "squashed operation, skipping";
2616 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2617 << bucket_shard_str
{bs
} << "/" << key
<< ": squashed operation" << dendl
;
2618 /* not updating high marker though */
2621 ldout(sync_env
->cct
, 20) << "[inc sync] syncing object: "
2622 << bucket_shard_str
{bs
} << "/" << key
<< dendl
;
2623 updated_status
= false;
2624 while (!marker_tracker
.can_do_op(key
)) {
2625 if (!updated_status
) {
2626 set_status() << "can't do op, conflicting inflight operation";
2627 updated_status
= true;
2629 ldout(sync_env
->cct
, 5) << *this << ": [inc sync] can't do op on key=" << key
<< " need to wait for conflicting operation to complete" << dendl
;
2630 yield
wait_for_child();
2633 again
= collect(&ret
, nullptr);
2635 ldout(sync_env
->cct
, 0) << "ERROR: a child operation returned error (ret=" << ret
<< ")" << dendl
;
2637 /* we have reported this error */
2641 if (!marker_tracker
.index_key_to_marker(key
, cur_id
)) {
2642 set_status() << "can't do op, sync already in progress for object";
2643 ldout(sync_env
->cct
, 20) << __func__
<< ": skipping sync of entry: " << cur_id
<< ":" << key
<< " sync already in progress for object" << dendl
;
2644 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2648 set_status() << "start object sync";
2649 if (!marker_tracker
.start(cur_id
, 0, entry
->timestamp
)) {
2650 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << cur_id
<< ". Duplicate entry?" << dendl
;
2652 uint64_t versioned_epoch
= 0;
2653 rgw_bucket_entry_owner
owner(entry
->owner
, entry
->owner_display_name
);
2654 if (entry
->ver
.pool
< 0) {
2655 versioned_epoch
= entry
->ver
.epoch
;
2657 ldout(sync_env
->cct
, 20) << __func__
<< "(): entry->timestamp=" << entry
->timestamp
<< dendl
;
2658 using SyncCR
= RGWBucketSyncSingleEntryCR
<string
, rgw_obj_key
>;
2659 spawn(new SyncCR(sync_env
, bucket_info
, bs
, key
,
2660 entry
->is_versioned(), versioned_epoch
,
2661 entry
->timestamp
, owner
, entry
->op
, entry
->state
,
2662 cur_id
, &marker_tracker
, entry
->zones_trace
),
2666 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
2667 set_status() << "num_spawned() > spawn_window";
2668 yield
wait_for_child();
2671 again
= collect(&ret
, nullptr);
2673 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2675 /* we have reported this error */
2677 /* not waiting for child here */
2681 } while (!list_result
.empty() && sync_status
== 0);
2687 const string
& oid
= RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
);
2688 RGWRados
*store
= sync_env
->store
;
2689 call(new RGWRadosRemoveCR(store
, rgw_raw_obj
{store
->get_zone_params().log_pool
, oid
}));
2692 return set_cr_done();
2695 while (num_spawned()) {
2696 yield
wait_for_child();
2699 again
= collect(&ret
, nullptr);
2701 ldout(sync_env
->cct
, 0) << "ERROR: a sync operation returned error" << dendl
;
2703 /* we have reported this error */
2705 /* not waiting for child here */
2709 yield
call(marker_tracker
.flush());
2711 ldout(sync_env
->cct
, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode
<< dendl
;
2712 return set_cr_error(retcode
);
2714 if (sync_status
< 0) {
2715 ldout(sync_env
->cct
, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status
<< ")" << dendl
;
2718 /* wait for all operations to complete */
2721 if (sync_status
< 0) {
2722 return set_cr_error(sync_status
);
2725 return set_cr_done();
2730 int RGWRunBucketSyncCoroutine::operate()
2734 set_status("acquiring sync lock");
2735 auto store
= sync_env
->store
;
2736 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
2737 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
2739 cct
->_conf
->rgw_sync_lease_period
,
2741 lease_stack
.reset(spawn(lease_cr
.get(), false));
2743 while (!lease_cr
->is_locked()) {
2744 if (lease_cr
->is_done()) {
2745 ldout(cct
, 5) << "lease cr failed, done early" << dendl
;
2746 set_status("lease lock failed, early abort");
2747 return set_cr_error(lease_cr
->get_ret_status());
2753 yield
call(new RGWReadBucketSyncStatusCoroutine(sync_env
, bs
, &sync_status
));
2754 if (retcode
< 0 && retcode
!= -ENOENT
) {
2755 ldout(sync_env
->cct
, 0) << "ERROR: failed to read sync status for bucket="
2756 << bucket_shard_str
{bs
} << dendl
;
2757 lease_cr
->go_down();
2759 return set_cr_error(retcode
);
2762 ldout(sync_env
->cct
, 20) << __func__
<< "(): sync status for bucket "
2763 << bucket_shard_str
{bs
} << ": " << sync_status
.state
<< dendl
;
2765 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
2766 if (retcode
== -ENOENT
) {
2767 /* bucket instance info has not been synced in yet, fetch it now */
2769 ldout(sync_env
->cct
, 10) << "no local info for bucket "
2770 << bucket_str
{bs
.bucket
} << ": fetching metadata" << dendl
;
2771 string raw_key
= string("bucket.instance:") + bs
.bucket
.get_key();
2773 meta_sync_env
.init(cct
, sync_env
->store
, sync_env
->store
->rest_master_conn
, sync_env
->async_rados
, sync_env
->http_manager
, sync_env
->error_logger
);
2775 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env
, raw_key
,
2776 string() /* no marker */,
2777 MDLOG_STATUS_COMPLETE
,
2778 NULL
/* no marker tracker */));
2781 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str
{bs
.bucket
} << dendl
;
2782 lease_cr
->go_down();
2784 return set_cr_error(retcode
);
2787 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
2790 ldout(sync_env
->cct
, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{bs
.bucket
} << dendl
;
2791 lease_cr
->go_down();
2793 return set_cr_error(retcode
);
2796 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateInit
) {
2797 yield
call(new RGWInitBucketShardSyncStatusCoroutine(sync_env
, bs
, sync_status
));
2799 ldout(sync_env
->cct
, 0) << "ERROR: init sync on " << bucket_shard_str
{bs
}
2800 << " failed, retcode=" << retcode
<< dendl
;
2801 lease_cr
->go_down();
2803 return set_cr_error(retcode
);
2807 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateFullSync
) {
2808 yield
call(new RGWBucketShardFullSyncCR(sync_env
, bs
, &bucket_info
,
2809 status_oid
, lease_cr
.get(),
2810 sync_status
.full_marker
));
2812 ldout(sync_env
->cct
, 5) << "full sync on " << bucket_shard_str
{bs
}
2813 << " failed, retcode=" << retcode
<< dendl
;
2814 lease_cr
->go_down();
2816 return set_cr_error(retcode
);
2818 sync_status
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
2821 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateIncrementalSync
) {
2822 yield
call(new RGWBucketShardIncrementalSyncCR(sync_env
, bs
, &bucket_info
,
2823 status_oid
, lease_cr
.get(),
2824 sync_status
.inc_marker
));
2826 ldout(sync_env
->cct
, 5) << "incremental sync on " << bucket_shard_str
{bs
}
2827 << " failed, retcode=" << retcode
<< dendl
;
2828 lease_cr
->go_down();
2830 return set_cr_error(retcode
);
2834 lease_cr
->go_down();
2836 return set_cr_done();
2842 RGWCoroutine
*RGWRemoteBucketLog::run_sync_cr()
2844 return new RGWRunBucketSyncCoroutine(&sync_env
, bs
);
2847 int RGWBucketSyncStatusManager::init()
2849 conn
= store
->get_zone_conn_by_id(source_zone
);
2851 ldout(store
->ctx(), 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
2855 int ret
= http_manager
.set_threaded();
2857 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
2862 const string key
= bucket
.get_key();
2864 rgw_http_param_pair pairs
[] = { { "key", key
.c_str() },
2867 string path
= string("/admin/metadata/bucket.instance");
2869 bucket_instance_meta_info result
;
2870 ret
= cr_mgr
.run(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), conn
, &http_manager
, path
, pairs
, &result
));
2872 ldout(store
->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone
<< " path=" << path
<< " key=" << key
<< " ret=" << ret
<< dendl
;
2876 RGWBucketInfo
& bi
= result
.data
.get_bucket_info();
2877 num_shards
= bi
.num_shards
;
2879 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
2881 sync_module
.reset(new RGWDefaultSyncModuleInstance());
2883 int effective_num_shards
= (num_shards
? num_shards
: 1);
2885 auto async_rados
= store
->get_async_rados();
2887 for (int i
= 0; i
< effective_num_shards
; i
++) {
2888 RGWRemoteBucketLog
*l
= new RGWRemoteBucketLog(store
, this, async_rados
, &http_manager
);
2889 ret
= l
->init(source_zone
, conn
, bucket
, (num_shards
? i
: -1), error_logger
, sync_module
);
2891 ldout(store
->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl
;
2900 int RGWBucketSyncStatusManager::init_sync_status()
2902 list
<RGWCoroutinesStack
*> stacks
;
2904 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
2905 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
2906 RGWRemoteBucketLog
*l
= iter
->second
;
2907 stack
->call(l
->init_sync_status_cr());
2909 stacks
.push_back(stack
);
2912 return cr_mgr
.run(stacks
);
2915 int RGWBucketSyncStatusManager::read_sync_status()
2917 list
<RGWCoroutinesStack
*> stacks
;
2919 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
2920 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
2921 RGWRemoteBucketLog
*l
= iter
->second
;
2922 stack
->call(l
->read_sync_status_cr(&sync_status
[iter
->first
]));
2924 stacks
.push_back(stack
);
2927 int ret
= cr_mgr
.run(stacks
);
2929 ldout(store
->ctx(), 0) << "ERROR: failed to read sync status for "
2930 << bucket_str
{bucket
} << dendl
;
2937 int RGWBucketSyncStatusManager::run()
2939 list
<RGWCoroutinesStack
*> stacks
;
2941 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
2942 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
2943 RGWRemoteBucketLog
*l
= iter
->second
;
2944 stack
->call(l
->run_sync_cr());
2946 stacks
.push_back(stack
);
2949 int ret
= cr_mgr
.run(stacks
);
2951 ldout(store
->ctx(), 0) << "ERROR: failed to read sync status for "
2952 << bucket_str
{bucket
} << dendl
;
2959 string
RGWBucketSyncStatusManager::status_oid(const string
& source_zone
,
2960 const rgw_bucket_shard
& bs
)
2962 return bucket_status_oid_prefix
+ "." + source_zone
+ ":" + bs
.get_key();
2966 // TODO: move into rgw_data_sync_trim.cc
2968 #define dout_prefix (*_dout << "data trim: ")
2972 /// return the marker that it's safe to trim up to
2973 const std::string
& get_stable_marker(const rgw_data_sync_marker
& m
)
2975 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
2978 /// comparison operator for take_min_markers()
2979 bool operator<(const rgw_data_sync_marker
& lhs
,
2980 const rgw_data_sync_marker
& rhs
)
2982 // sort by stable marker
2983 return get_stable_marker(lhs
) < get_stable_marker(rhs
);
2986 /// populate the container starting with 'dest' with the minimum stable marker
2987 /// of each shard for all of the peers in [first, last)
2988 template <typename IterIn
, typename IterOut
>
2989 void take_min_markers(IterIn first
, IterIn last
, IterOut dest
)
2991 if (first
== last
) {
2994 // initialize markers with the first peer's
2996 for (auto &shard
: first
->sync_markers
) {
2997 *m
= std::move(shard
.second
);
3000 // for remaining peers, replace with smaller markers
3001 for (auto p
= first
+ 1; p
!= last
; ++p
) {
3003 for (auto &shard
: p
->sync_markers
) {
3004 if (shard
.second
< *m
) {
3005 *m
= std::move(shard
.second
);
3012 } // anonymous namespace
3014 class DataLogTrimCR
: public RGWCoroutine
{
3016 RGWHTTPManager
*http
;
3017 const int num_shards
;
3018 const std::string
& zone_id
; //< my zone id
3019 std::vector
<rgw_data_sync_status
> peer_status
; //< sync status for each peer
3020 std::vector
<rgw_data_sync_marker
> min_shard_markers
; //< min marker per shard
3021 std::vector
<std::string
>& last_trim
; //< last trimmed marker per shard
3025 DataLogTrimCR(RGWRados
*store
, RGWHTTPManager
*http
,
3026 int num_shards
, std::vector
<std::string
>& last_trim
)
3027 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3028 num_shards(num_shards
),
3029 zone_id(store
->get_zone().id
),
3030 peer_status(store
->zone_conn_map
.size()),
3031 min_shard_markers(num_shards
),
3032 last_trim(last_trim
)
3035 int operate() override
;
3038 int DataLogTrimCR::operate()
3041 ldout(cct
, 10) << "fetching sync status for zone " << zone_id
<< dendl
;
3042 set_status("fetching sync status");
3044 // query data sync status from each sync peer
3045 rgw_http_param_pair params
[] = {
3047 { "status", nullptr },
3048 { "source-zone", zone_id
.c_str() },
3049 { nullptr, nullptr }
3052 auto p
= peer_status
.begin();
3053 for (auto& c
: store
->zone_conn_map
) {
3054 ldout(cct
, 20) << "query sync status from " << c
.first
<< dendl
;
3055 using StatusCR
= RGWReadRESTResourceCR
<rgw_data_sync_status
>;
3056 spawn(new StatusCR(cct
, c
.second
, http
, "/admin/log/", params
, &*p
),
3062 // must get a successful reply from all peers to consider trimming
3064 while (ret
== 0 && num_spawned() > 0) {
3065 yield
wait_for_child();
3071 ldout(cct
, 4) << "failed to fetch sync status from all peers" << dendl
;
3072 return set_cr_error(ret
);
3075 ldout(cct
, 10) << "trimming log shards" << dendl
;
3076 set_status("trimming log shards");
3078 // determine the minimum marker for each shard
3079 take_min_markers(peer_status
.begin(), peer_status
.end(),
3080 min_shard_markers
.begin());
3082 for (int i
= 0; i
< num_shards
; i
++) {
3083 const auto& m
= min_shard_markers
[i
];
3084 auto& stable
= get_stable_marker(m
);
3085 if (stable
<= last_trim
[i
]) {
3088 ldout(cct
, 10) << "trimming log shard " << i
3089 << " at marker=" << stable
3090 << " last_trim=" << last_trim
[i
] << dendl
;
3091 using TrimCR
= RGWSyncLogTrimCR
;
3092 spawn(new TrimCR(store
, store
->data_log
->get_oid(i
),
3093 stable
, &last_trim
[i
]),
3097 return set_cr_done();
3102 class DataLogTrimPollCR
: public RGWCoroutine
{
3104 RGWHTTPManager
*http
;
3105 const int num_shards
;
3106 const utime_t interval
; //< polling interval
3107 const std::string lock_oid
; //< use first data log shard for lock
3108 const std::string lock_cookie
;
3109 std::vector
<std::string
> last_trim
; //< last trimmed marker per shard
3112 DataLogTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
3113 int num_shards
, utime_t interval
)
3114 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3115 num_shards(num_shards
), interval(interval
),
3116 lock_oid(store
->data_log
->get_oid(0)),
3117 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
3118 last_trim(num_shards
)
3121 int operate() override
;
3124 int DataLogTrimPollCR::operate()
3128 set_status("sleeping");
3131 // request a 'data_trim' lock that covers the entire wait interval to
3132 // prevent other gateways from attempting to trim for the duration
3133 set_status("acquiring trim lock");
3134 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
3135 rgw_raw_obj(store
->get_zone_params().log_pool
, lock_oid
),
3136 "data_trim", lock_cookie
,
3139 // if the lock is already held, go back to sleep and try again later
3140 ldout(cct
, 4) << "failed to lock " << lock_oid
<< ", trying again in "
3141 << interval
.sec() << "s" << dendl
;
3145 set_status("trimming");
3146 yield
call(new DataLogTrimCR(store
, http
, num_shards
, last_trim
));
3148 // note that the lock is not released. this is intentional, as it avoids
3149 // duplicating this work in other gateways
3155 RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
3156 RGWHTTPManager
*http
,
3157 int num_shards
, utime_t interval
)
3159 return new DataLogTrimPollCR(store
, http
, num_shards
, interval
);