1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/utility/string_ref.hpp>
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/errno.h"
13 #include "rgw_common.h"
14 #include "rgw_rados.h"
16 #include "rgw_data_sync.h"
17 #include "rgw_rest_conn.h"
18 #include "rgw_cr_rados.h"
19 #include "rgw_cr_rest.h"
20 #include "rgw_http_client.h"
21 #include "rgw_bucket.h"
22 #include "rgw_metadata.h"
23 #include "rgw_sync_module.h"
24 #include "rgw_sync_log_trim.h"
26 #include "cls/lock/cls_lock_client.h"
28 #include "auth/Crypto.h"
30 #include <boost/asio/yield.hpp>
32 #define dout_subsys ceph_subsys_rgw
35 #define dout_prefix (*_dout << "data sync: ")
37 static string datalog_sync_status_oid_prefix
= "datalog.sync-status";
38 static string datalog_sync_status_shard_prefix
= "datalog.sync-status.shard";
39 static string datalog_sync_full_sync_index_prefix
= "data.full-sync.index";
40 static string bucket_status_oid_prefix
= "bucket.sync-status";
42 class RGWSyncDebugLogger
{
49 RGWSyncDebugLogger(CephContext
*_cct
, const string
& source_zone
,
50 const string
& sync_type
, const string
& sync_stage
,
51 const string
& resource
, bool log_start
= true) {
52 init(_cct
, source_zone
, sync_type
, sync_stage
, resource
, log_start
);
54 RGWSyncDebugLogger() : cct(NULL
), ended(false) {}
55 ~RGWSyncDebugLogger();
57 void init(CephContext
*_cct
, const string
& source_zone
,
58 const string
& sync_type
, const string
& sync_stage
,
59 const string
& resource
, bool log_start
= true);
60 void log(const string
& state
);
61 void finish(int status
);
64 void RGWSyncDebugLogger::init(CephContext
*_cct
, const string
& source_zone
,
65 const string
& sync_type
, const string
& sync_section
,
66 const string
& resource
, bool log_start
)
70 string zone_str
= source_zone
.substr(0, 8);
71 prefix
= "Sync:" + zone_str
+ ":" + sync_type
+ ":" + sync_section
+ ":" + resource
;
77 RGWSyncDebugLogger::~RGWSyncDebugLogger()
84 void RGWSyncDebugLogger::log(const string
& state
)
86 ldout(cct
, 5) << prefix
<< ":" << state
<< dendl
;
89 void RGWSyncDebugLogger::finish(int status
)
92 ldout(cct
, 5) << prefix
<< ":" << "finish r=" << status
<< dendl
;
95 class RGWDataSyncDebugLogger
: public RGWSyncDebugLogger
{
97 RGWDataSyncDebugLogger() {}
98 RGWDataSyncDebugLogger(RGWDataSyncEnv
*sync_env
, const string
& sync_section
,
99 const string
& resource
, bool log_start
= true) {
100 init(sync_env
, sync_section
, resource
, log_start
);
102 void init(RGWDataSyncEnv
*sync_env
, const string
& sync_section
,
103 const string
& resource
, bool log_start
= true) {
104 RGWSyncDebugLogger::init(sync_env
->cct
, sync_env
->source_zone
, "data", sync_section
, resource
, log_start
);
109 void rgw_datalog_info::decode_json(JSONObj
*obj
) {
110 JSONDecoder::decode_json("num_objects", num_shards
, obj
);
113 void rgw_datalog_entry::decode_json(JSONObj
*obj
) {
114 JSONDecoder::decode_json("key", key
, obj
);
116 JSONDecoder::decode_json("timestamp", ut
, obj
);
117 timestamp
= ut
.to_real_time();
120 void rgw_datalog_shard_data::decode_json(JSONObj
*obj
) {
121 JSONDecoder::decode_json("marker", marker
, obj
);
122 JSONDecoder::decode_json("truncated", truncated
, obj
);
123 JSONDecoder::decode_json("entries", entries
, obj
);
126 class RGWReadDataSyncStatusMarkersCR
: public RGWShardCollectCR
{
127 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
130 const int num_shards
;
133 map
<uint32_t, rgw_data_sync_marker
>& markers
;
136 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv
*env
, int num_shards
,
137 map
<uint32_t, rgw_data_sync_marker
>& markers
)
138 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
),
139 env(env
), num_shards(num_shards
), markers(markers
)
141 bool spawn_next() override
;
144 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
146 if (shard_id
>= num_shards
) {
149 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
150 spawn(new CR(env
->async_rados
, env
->store
,
151 rgw_raw_obj(env
->store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(env
->source_zone
, shard_id
)),
158 class RGWReadDataSyncRecoveringShardsCR
: public RGWShardCollectCR
{
159 static constexpr int MAX_CONCURRENT_SHARDS
= 16;
163 uint64_t max_entries
;
168 map
<int, std::set
<std::string
>> &entries_map
;
171 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv
*env
, uint64_t _max_entries
, int _num_shards
,
172 map
<int, std::set
<std::string
>>& _entries_map
)
173 : RGWShardCollectCR(env
->cct
, MAX_CONCURRENT_SHARDS
), env(env
),
174 max_entries(_max_entries
), num_shards(_num_shards
), entries_map(_entries_map
)
176 bool spawn_next() override
;
179 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
181 if (shard_id
> num_shards
)
184 string error_oid
= RGWDataSyncStatusManager::shard_obj_name(env
->source_zone
, shard_id
) + ".retry";
185 spawn(new RGWRadosGetOmapKeysCR(env
->store
, rgw_raw_obj(env
->store
->get_zone_params().log_pool
, error_oid
),
186 marker
, &entries_map
[shard_id
], max_entries
), false);
192 class RGWReadDataSyncStatusCoroutine
: public RGWCoroutine
{
193 RGWDataSyncEnv
*sync_env
;
194 rgw_data_sync_status
*sync_status
;
197 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
198 rgw_data_sync_status
*_status
)
199 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), sync_status(_status
)
201 int operate() override
;
204 int RGWReadDataSyncStatusCoroutine::operate()
208 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_data_sync_info
>;
210 bool empty_on_enoent
= false; // fail on ENOENT
211 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
,
212 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
213 &sync_status
->sync_info
, empty_on_enoent
));
216 ldout(sync_env
->cct
, 4) << "failed to read sync status info with "
217 << cpp_strerror(retcode
) << dendl
;
218 return set_cr_error(retcode
);
220 // read shard markers
221 using ReadMarkersCR
= RGWReadDataSyncStatusMarkersCR
;
222 yield
call(new ReadMarkersCR(sync_env
, sync_status
->sync_info
.num_shards
,
223 sync_status
->sync_markers
));
225 ldout(sync_env
->cct
, 4) << "failed to read sync status markers with "
226 << cpp_strerror(retcode
) << dendl
;
227 return set_cr_error(retcode
);
229 return set_cr_done();
234 class RGWReadRemoteDataLogShardInfoCR
: public RGWCoroutine
{
235 RGWDataSyncEnv
*sync_env
;
237 RGWRESTReadResource
*http_op
;
240 RGWDataChangesLogInfo
*shard_info
;
243 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv
*_sync_env
,
244 int _shard_id
, RGWDataChangesLogInfo
*_shard_info
) : RGWCoroutine(_sync_env
->cct
),
248 shard_info(_shard_info
) {
251 ~RGWReadRemoteDataLogShardInfoCR() override
{
257 int operate() override
{
261 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
262 rgw_http_param_pair pairs
[] = { { "type" , "data" },
267 string p
= "/admin/log/";
269 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
271 http_op
->set_user_info((void *)stack
);
273 int ret
= http_op
->aio_read();
275 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
276 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
277 return set_cr_error(ret
);
283 int ret
= http_op
->wait(shard_info
);
285 return set_cr_error(ret
);
287 return set_cr_done();
294 struct read_remote_data_log_response
{
297 list
<rgw_data_change_log_entry
> entries
;
299 read_remote_data_log_response() : truncated(false) {}
301 void decode_json(JSONObj
*obj
) {
302 JSONDecoder::decode_json("marker", marker
, obj
);
303 JSONDecoder::decode_json("truncated", truncated
, obj
);
304 JSONDecoder::decode_json("entries", entries
, obj
);
308 class RGWReadRemoteDataLogShardCR
: public RGWCoroutine
{
309 RGWDataSyncEnv
*sync_env
;
311 RGWRESTReadResource
*http_op
= nullptr;
314 const std::string
& marker
;
315 string
*pnext_marker
;
316 list
<rgw_data_change_log_entry
> *entries
;
319 read_remote_data_log_response response
;
322 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv
*_sync_env
, int _shard_id
,
323 const std::string
& marker
, string
*pnext_marker
,
324 list
<rgw_data_change_log_entry
> *_entries
,
326 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
327 shard_id(_shard_id
), marker(marker
), pnext_marker(pnext_marker
),
328 entries(_entries
), truncated(_truncated
) {
330 ~RGWReadRemoteDataLogShardCR() override
{
336 int operate() override
{
340 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
341 rgw_http_param_pair pairs
[] = { { "type" , "data" },
343 { "marker", marker
.c_str() },
344 { "extra-info", "true" },
347 string p
= "/admin/log/";
349 http_op
= new RGWRESTReadResource(sync_env
->conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
351 http_op
->set_user_info((void *)stack
);
353 int ret
= http_op
->aio_read();
355 ldout(sync_env
->cct
, 0) << "ERROR: failed to read from " << p
<< dendl
;
356 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
357 return set_cr_error(ret
);
363 int ret
= http_op
->wait(&response
);
365 return set_cr_error(ret
);
368 entries
->swap(response
.entries
);
369 *pnext_marker
= response
.marker
;
370 *truncated
= response
.truncated
;
371 return set_cr_done();
378 class RGWReadRemoteDataLogInfoCR
: public RGWShardCollectCR
{
379 RGWDataSyncEnv
*sync_env
;
382 map
<int, RGWDataChangesLogInfo
> *datalog_info
;
385 #define READ_DATALOG_MAX_CONCURRENT 10
388 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv
*_sync_env
,
390 map
<int, RGWDataChangesLogInfo
> *_datalog_info
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
391 sync_env(_sync_env
), num_shards(_num_shards
),
392 datalog_info(_datalog_info
), shard_id(0) {}
393 bool spawn_next() override
;
396 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
397 if (shard_id
>= num_shards
) {
400 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, shard_id
, &(*datalog_info
)[shard_id
]), false);
405 class RGWListRemoteDataLogShardCR
: public RGWSimpleCoroutine
{
406 RGWDataSyncEnv
*sync_env
;
407 RGWRESTReadResource
*http_op
;
411 uint32_t max_entries
;
412 rgw_datalog_shard_data
*result
;
415 RGWListRemoteDataLogShardCR(RGWDataSyncEnv
*env
, int _shard_id
,
416 const string
& _marker
, uint32_t _max_entries
,
417 rgw_datalog_shard_data
*_result
)
418 : RGWSimpleCoroutine(env
->store
->ctx()), sync_env(env
), http_op(NULL
),
419 shard_id(_shard_id
), marker(_marker
), max_entries(_max_entries
), result(_result
) {}
421 int send_request() override
{
422 RGWRESTConn
*conn
= sync_env
->conn
;
423 RGWRados
*store
= sync_env
->store
;
426 snprintf(buf
, sizeof(buf
), "%d", shard_id
);
428 char max_entries_buf
[32];
429 snprintf(max_entries_buf
, sizeof(max_entries_buf
), "%d", (int)max_entries
);
431 const char *marker_key
= (marker
.empty() ? "" : "marker");
433 rgw_http_param_pair pairs
[] = { { "type", "data" },
435 { "max-entries", max_entries_buf
},
436 { marker_key
, marker
.c_str() },
439 string p
= "/admin/log/";
441 http_op
= new RGWRESTReadResource(conn
, p
, pairs
, NULL
, sync_env
->http_manager
);
442 http_op
->set_user_info((void *)stack
);
444 int ret
= http_op
->aio_read();
446 ldout(store
->ctx(), 0) << "ERROR: failed to read from " << p
<< dendl
;
447 log_error() << "failed to send http operation: " << http_op
->to_str() << " ret=" << ret
<< std::endl
;
455 int request_complete() override
{
456 int ret
= http_op
->wait(result
);
458 if (ret
< 0 && ret
!= -ENOENT
) {
459 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret
<< dendl
;
466 class RGWListRemoteDataLogCR
: public RGWShardCollectCR
{
467 RGWDataSyncEnv
*sync_env
;
469 map
<int, string
> shards
;
470 int max_entries_per_shard
;
471 map
<int, rgw_datalog_shard_data
> *result
;
473 map
<int, string
>::iterator iter
;
474 #define READ_DATALOG_MAX_CONCURRENT 10
477 RGWListRemoteDataLogCR(RGWDataSyncEnv
*_sync_env
,
478 map
<int, string
>& _shards
,
479 int _max_entries_per_shard
,
480 map
<int, rgw_datalog_shard_data
> *_result
) : RGWShardCollectCR(_sync_env
->cct
, READ_DATALOG_MAX_CONCURRENT
),
481 sync_env(_sync_env
), max_entries_per_shard(_max_entries_per_shard
),
483 shards
.swap(_shards
);
484 iter
= shards
.begin();
486 bool spawn_next() override
;
489 bool RGWListRemoteDataLogCR::spawn_next() {
490 if (iter
== shards
.end()) {
494 spawn(new RGWListRemoteDataLogShardCR(sync_env
, iter
->first
, iter
->second
, max_entries_per_shard
, &(*result
)[iter
->first
]), false);
499 class RGWInitDataSyncStatusCoroutine
: public RGWCoroutine
{
500 static constexpr uint32_t lock_duration
= 30;
501 RGWDataSyncEnv
*sync_env
;
503 const rgw_pool
& pool
;
504 const uint32_t num_shards
;
506 string sync_status_oid
;
510 rgw_data_sync_status
*status
;
511 map
<int, RGWDataChangesLogInfo
> shards_info
;
513 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
, uint32_t num_shards
,
514 uint64_t instance_id
,
515 rgw_data_sync_status
*status
)
516 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), store(sync_env
->store
),
517 pool(store
->get_zone_params().log_pool
),
518 num_shards(num_shards
), status(status
) {
519 lock_name
= "sync_lock";
521 status
->sync_info
.instance_id
= instance_id
;
523 #define COOKIE_LEN 16
524 char buf
[COOKIE_LEN
+ 1];
526 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
529 sync_status_oid
= RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
);
532 int operate() override
{
535 using LockCR
= RGWSimpleRadosLockCR
;
536 yield
call(new LockCR(sync_env
->async_rados
, store
,
537 rgw_raw_obj
{pool
, sync_status_oid
},
538 lock_name
, cookie
, lock_duration
));
540 ldout(cct
, 0) << "ERROR: failed to take a lock on " << sync_status_oid
<< dendl
;
541 return set_cr_error(retcode
);
543 using WriteInfoCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_info
>;
544 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
,
545 rgw_raw_obj
{pool
, sync_status_oid
},
548 ldout(cct
, 0) << "ERROR: failed to write sync status info with " << retcode
<< dendl
;
549 return set_cr_error(retcode
);
552 /* take lock again, we just recreated the object */
553 yield
call(new LockCR(sync_env
->async_rados
, store
,
554 rgw_raw_obj
{pool
, sync_status_oid
},
555 lock_name
, cookie
, lock_duration
));
557 ldout(cct
, 0) << "ERROR: failed to take a lock on " << sync_status_oid
<< dendl
;
558 return set_cr_error(retcode
);
561 /* fetch current position in logs */
563 RGWRESTConn
*conn
= store
->get_zone_conn_by_id(sync_env
->source_zone
);
565 ldout(cct
, 0) << "ERROR: connection to zone " << sync_env
->source_zone
<< " does not exist!" << dendl
;
566 return set_cr_error(-EIO
);
568 for (uint32_t i
= 0; i
< num_shards
; i
++) {
569 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env
, i
, &shards_info
[i
]), true);
572 while (collect(&ret
, NULL
)) {
574 ldout(cct
, 0) << "ERROR: failed to read remote data log shards" << dendl
;
575 return set_state(RGWCoroutine_Error
);
580 for (uint32_t i
= 0; i
< num_shards
; i
++) {
581 RGWDataChangesLogInfo
& info
= shards_info
[i
];
582 auto& marker
= status
->sync_markers
[i
];
583 marker
.next_step_marker
= info
.marker
;
584 marker
.timestamp
= info
.last_update
;
585 const auto& oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, i
);
586 using WriteMarkerCR
= RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>;
587 spawn(new WriteMarkerCR(sync_env
->async_rados
, store
,
588 rgw_raw_obj
{pool
, oid
}, marker
), true);
591 while (collect(&ret
, NULL
)) {
593 ldout(cct
, 0) << "ERROR: failed to write data sync status markers" << dendl
;
594 return set_state(RGWCoroutine_Error
);
599 status
->sync_info
.state
= rgw_data_sync_info::StateBuildingFullSyncMaps
;
600 yield
call(new WriteInfoCR(sync_env
->async_rados
, store
,
601 rgw_raw_obj
{pool
, sync_status_oid
},
604 ldout(cct
, 0) << "ERROR: failed to write sync status info with " << retcode
<< dendl
;
605 return set_cr_error(retcode
);
607 yield
call(new RGWSimpleRadosUnlockCR(sync_env
->async_rados
, store
,
608 rgw_raw_obj
{pool
, sync_status_oid
},
610 return set_cr_done();
616 int RGWRemoteDataLog::read_log_info(rgw_datalog_info
*log_info
)
618 rgw_http_param_pair pairs
[] = { { "type", "data" },
621 int ret
= sync_env
.conn
->get_json_resource("/admin/log", pairs
, *log_info
);
623 ldout(store
->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl
;
627 ldout(store
->ctx(), 20) << "remote datalog, num_shards=" << log_info
->num_shards
<< dendl
;
632 int RGWRemoteDataLog::read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
)
634 rgw_datalog_info log_info
;
635 int ret
= read_log_info(&log_info
);
640 return run(new RGWReadRemoteDataLogInfoCR(&sync_env
, log_info
.num_shards
, shards_info
));
643 int RGWRemoteDataLog::read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
)
645 if (store
->is_meta_master()) {
649 return run(new RGWListRemoteDataLogCR(&sync_env
, shard_markers
, 1, result
));
652 int RGWRemoteDataLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& _sync_module
)
654 sync_env
.init(store
->ctx(), store
, _conn
, async_rados
, &http_manager
, _error_logger
,
655 _source_zone
, _sync_module
);
661 int ret
= http_manager
.set_threaded();
663 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
672 void RGWRemoteDataLog::finish()
677 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status
*sync_status
)
679 // cannot run concurrently with run_sync(), so run in a separate manager
680 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
681 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
682 int ret
= http_manager
.set_threaded();
684 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
687 RGWDataSyncEnv sync_env_local
= sync_env
;
688 sync_env_local
.http_manager
= &http_manager
;
689 ret
= crs
.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local
, sync_status
));
694 int RGWRemoteDataLog::read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
)
696 // cannot run concurrently with run_sync(), so run in a separate manager
697 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
698 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
699 int ret
= http_manager
.set_threaded();
701 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
704 RGWDataSyncEnv sync_env_local
= sync_env
;
705 sync_env_local
.http_manager
= &http_manager
;
706 map
<int, std::set
<std::string
>> entries_map
;
707 uint64_t max_entries
{1};
708 ret
= crs
.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local
, max_entries
, num_shards
, entries_map
));
712 for (const auto& entry
: entries_map
) {
713 if (entry
.second
.size() != 0) {
714 recovering_shards
.insert(entry
.first
);
722 int RGWRemoteDataLog::init_sync_status(int num_shards
)
724 rgw_data_sync_status sync_status
;
725 sync_status
.sync_info
.num_shards
= num_shards
;
727 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
728 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
729 int ret
= http_manager
.set_threaded();
731 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
734 RGWDataSyncEnv sync_env_local
= sync_env
;
735 sync_env_local
.http_manager
= &http_manager
;
736 uint64_t instance_id
;
737 get_random_bytes((char *)&instance_id
, sizeof(instance_id
));
738 ret
= crs
.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local
, num_shards
, instance_id
, &sync_status
));
743 static string
full_data_sync_index_shard_oid(const string
& source_zone
, int shard_id
)
745 char buf
[datalog_sync_full_sync_index_prefix
.size() + 1 + source_zone
.size() + 1 + 16];
746 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_full_sync_index_prefix
.c_str(), source_zone
.c_str(), shard_id
);
750 struct bucket_instance_meta_info
{
754 RGWBucketInstanceMetadataObject data
;
756 bucket_instance_meta_info() {}
758 void decode_json(JSONObj
*obj
) {
759 JSONDecoder::decode_json("key", key
, obj
);
760 JSONDecoder::decode_json("ver", ver
, obj
);
761 JSONDecoder::decode_json("mtime", mtime
, obj
);
762 JSONDecoder::decode_json("data", data
, obj
);
766 class RGWListBucketIndexesCR
: public RGWCoroutine
{
767 RGWDataSyncEnv
*sync_env
;
771 rgw_data_sync_status
*sync_status
;
778 list
<string
>::iterator iter
;
780 RGWShardedOmapCRManager
*entries_index
;
785 bucket_instance_meta_info meta_info
;
793 RGWListBucketIndexesCR(RGWDataSyncEnv
*_sync_env
,
794 rgw_data_sync_status
*_sync_status
) : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
795 store(sync_env
->store
), sync_status(_sync_status
),
796 req_ret(0), ret(0), entries_index(NULL
), i(0), failed(false) {
797 oid_prefix
= datalog_sync_full_sync_index_prefix
+ "." + sync_env
->source_zone
;
798 path
= "/admin/metadata/bucket.instance";
799 num_shards
= sync_status
->sync_info
.num_shards
;
801 ~RGWListBucketIndexesCR() override
{
802 delete entries_index
;
805 int operate() override
{
808 string entrypoint
= string("/admin/metadata/bucket.instance");
809 /* FIXME: need a better scaling solution here, requires streaming output */
810 call(new RGWReadRESTResourceCR
<list
<string
> >(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
,
811 entrypoint
, NULL
, &result
));
814 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl
;
815 return set_cr_error(retcode
);
817 entries_index
= new RGWShardedOmapCRManager(sync_env
->async_rados
, store
, this, num_shards
,
818 store
->get_zone_params().log_pool
,
820 yield
; // yield so OmapAppendCRs can start
821 for (iter
= result
.begin(); iter
!= result
.end(); ++iter
) {
822 ldout(sync_env
->cct
, 20) << "list metadata: section=bucket.index key=" << *iter
<< dendl
;
827 rgw_http_param_pair pairs
[] = { { "key", key
.c_str() },
830 call(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), sync_env
->conn
, sync_env
->http_manager
, path
, pairs
, &meta_info
));
833 num_shards
= meta_info
.data
.get_bucket_info().num_shards
;
834 if (num_shards
> 0) {
835 for (i
= 0; i
< num_shards
; i
++) {
837 snprintf(buf
, sizeof(buf
), ":%d", i
);
839 yield entries_index
->append(s
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, i
));
842 yield entries_index
->append(key
, store
->data_log
->get_log_shard_id(meta_info
.data
.get_bucket_info().bucket
, -1));
846 if (!entries_index
->finish()) {
851 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
->sync_markers
.begin(); iter
!= sync_status
->sync_markers
.end(); ++iter
) {
852 int shard_id
= (int)iter
->first
;
853 rgw_data_sync_marker
& marker
= iter
->second
;
854 marker
.total_entries
= entries_index
->get_total_entries(shard_id
);
855 spawn(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
856 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
860 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
861 EIO
, string("failed to build bucket instances map")));
863 while (collect(&ret
, NULL
)) {
865 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data.init", "",
866 -ret
, string("failed to store sync status: ") + cpp_strerror(-ret
)));
873 yield
return set_cr_error(req_ret
);
875 yield
return set_cr_done();
881 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
883 class RGWDataSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, string
> {
884 RGWDataSyncEnv
*sync_env
;
887 rgw_data_sync_marker sync_marker
;
889 map
<string
, string
> key_to_marker
;
890 map
<string
, string
> marker_to_key
;
892 void handle_finish(const string
& marker
) override
{
893 map
<string
, string
>::iterator iter
= marker_to_key
.find(marker
);
894 if (iter
== marker_to_key
.end()) {
897 key_to_marker
.erase(iter
->second
);
898 reset_need_retry(iter
->second
);
899 marker_to_key
.erase(iter
);
903 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
904 const string
& _marker_oid
,
905 const rgw_data_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW
),
907 marker_oid(_marker_oid
),
908 sync_marker(_marker
) {}
910 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
911 sync_marker
.marker
= new_marker
;
912 sync_marker
.pos
= index_pos
;
914 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
915 RGWRados
*store
= sync_env
->store
;
917 return new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
918 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
923 * create index from key -> marker, and from marker -> key
924 * this is useful so that we can insure that we only have one
925 * entry for any key that is used. This is needed when doing
926 * incremenatl sync of data, and we don't want to run multiple
927 * concurrent sync operations for the same bucket shard
929 bool index_key_to_marker(const string
& key
, const string
& marker
) {
930 if (key_to_marker
.find(key
) != key_to_marker
.end()) {
934 key_to_marker
[key
] = marker
;
935 marker_to_key
[marker
] = key
;
939 RGWOrderCallCR
*allocate_order_control_cr() {
940 return new RGWLastCallerWinsCR(sync_env
->cct
);
944 // ostream wrappers to print buckets without copying strings
947 bucket_str(const rgw_bucket
& b
) : b(b
) {}
949 std::ostream
& operator<<(std::ostream
& out
, const bucket_str
& rhs
) {
951 if (!b
.tenant
.empty()) {
952 out
<< b
.tenant
<< '/';
955 if (!b
.bucket_id
.empty()) {
956 out
<< ':' << b
.bucket_id
;
961 struct bucket_shard_str
{
962 const rgw_bucket_shard
& bs
;
963 bucket_shard_str(const rgw_bucket_shard
& bs
) : bs(bs
) {}
965 std::ostream
& operator<<(std::ostream
& out
, const bucket_shard_str
& rhs
) {
967 out
<< bucket_str
{bs
.bucket
};
968 if (bs
.shard_id
>= 0) {
969 out
<< ':' << bs
.shard_id
;
974 class RGWRunBucketSyncCoroutine
: public RGWCoroutine
{
975 RGWDataSyncEnv
*sync_env
;
977 RGWBucketInfo bucket_info
;
978 rgw_bucket_shard_sync_info sync_status
;
979 RGWMetaSyncEnv meta_sync_env
;
981 RGWDataSyncDebugLogger logger
;
982 const std::string status_oid
;
984 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
985 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
988 RGWRunBucketSyncCoroutine(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
)
989 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
990 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)) {
991 logger
.init(sync_env
, "Bucket", bs
.get_key());
993 ~RGWRunBucketSyncCoroutine() override
{
999 int operate() override
;
1002 class RGWDataSyncSingleEntryCR
: public RGWCoroutine
{
1003 RGWDataSyncEnv
*sync_env
;
1006 string entry_marker
;
1008 rgw_bucket_shard bs
;
1014 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1016 boost::intrusive_ptr
<RGWOmapAppend
> error_repo
;
1017 bool remove_from_repo
;
1022 RGWDataSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
1023 const string
& _raw_key
, const string
& _entry_marker
, RGWDataSyncShardMarkerTrack
*_marker_tracker
,
1024 RGWOmapAppend
*_error_repo
, bool _remove_from_repo
) : RGWCoroutine(_sync_env
->cct
),
1025 sync_env(_sync_env
),
1026 raw_key(_raw_key
), entry_marker(_entry_marker
),
1028 marker_tracker(_marker_tracker
),
1029 error_repo(_error_repo
), remove_from_repo(_remove_from_repo
) {
1030 set_description() << "data sync single entry (source_zone=" << sync_env
->source_zone
<< ") key=" <<_raw_key
<< " entry=" << entry_marker
;
1033 int operate() override
{
1037 int ret
= rgw_bucket_parse_bucket_key(sync_env
->cct
, raw_key
,
1038 &bs
.bucket
, &bs
.shard_id
);
1040 return set_cr_error(-EIO
);
1042 if (marker_tracker
) {
1043 marker_tracker
->reset_need_retry(raw_key
);
1045 call(new RGWRunBucketSyncCoroutine(sync_env
, bs
));
1047 } while (marker_tracker
&& marker_tracker
->need_retry(raw_key
));
1049 sync_status
= retcode
;
1051 if (sync_status
== -ENOENT
) {
1052 // this was added when 'tenant/' was added to datalog entries, because
1053 // preexisting tenant buckets could never sync and would stay in the
1054 // error_repo forever
1055 ldout(sync_env
->store
->ctx(), 0) << "WARNING: skipping data log entry "
1056 "for missing bucket " << raw_key
<< dendl
;
1060 if (sync_status
< 0) {
1061 // write actual sync failures for 'radosgw-admin sync error list'
1062 if (sync_status
!= -EBUSY
&& sync_status
!= -EAGAIN
) {
1063 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", raw_key
,
1064 -sync_status
, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status
)));
1066 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode
<< dendl
;
1069 if (error_repo
&& !error_repo
->append(raw_key
)) {
1070 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode
<< dendl
;
1072 } else if (error_repo
&& remove_from_repo
) {
1074 yield
call(new RGWRadosRemoveOmapKeysCR(sync_env
->store
, error_repo
->get_obj(), keys
));
1076 ldout(sync_env
->store
->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1077 << error_repo
->get_obj() << " retcode=" << retcode
<< dendl
;
1080 /* FIXME: what do do in case of error */
1081 if (marker_tracker
&& !entry_marker
.empty()) {
1083 yield
call(marker_tracker
->finish(entry_marker
));
1085 if (sync_status
== 0) {
1086 sync_status
= retcode
;
1088 if (sync_status
< 0) {
1089 return set_cr_error(sync_status
);
1091 return set_cr_done();
1097 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1098 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1100 class RGWDataSyncShardCR
: public RGWCoroutine
{
1101 RGWDataSyncEnv
*sync_env
;
1106 rgw_data_sync_marker sync_marker
;
1108 std::set
<std::string
> entries
;
1109 std::set
<std::string
>::iterator iter
;
1113 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1115 std::string next_marker
;
1116 list
<rgw_data_change_log_entry
> log_entries
;
1117 list
<rgw_data_change_log_entry
>::iterator log_iter
;
1123 boost::asio::coroutine incremental_cr
;
1124 boost::asio::coroutine full_cr
;
1127 set
<string
> modified_shards
;
1128 set
<string
> current_modified
;
1130 set
<string
>::iterator modified_iter
;
1136 bool *reset_backoff
;
1138 set
<string
> spawned_keys
;
1140 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
1141 boost::intrusive_ptr
<RGWCoroutinesStack
> lease_stack
;
1146 RGWOmapAppend
*error_repo
;
1147 std::set
<std::string
> error_entries
;
1148 string error_marker
;
1149 int max_error_entries
;
1151 ceph::coarse_real_time error_retry_time
;
1153 #define RETRY_BACKOFF_SECS_MIN 60
1154 #define RETRY_BACKOFF_SECS_DEFAULT 60
1155 #define RETRY_BACKOFF_SECS_MAX 600
1156 uint32_t retry_backoff_secs
;
1158 RGWDataSyncDebugLogger logger
;
1160 RGWDataSyncShardCR(RGWDataSyncEnv
*_sync_env
,
1162 uint32_t _shard_id
, const rgw_data_sync_marker
& _marker
, bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1163 sync_env(_sync_env
),
1165 shard_id(_shard_id
),
1166 sync_marker(_marker
),
1167 marker_tracker(NULL
), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
1168 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW
), reset_backoff(NULL
),
1169 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES
),
1170 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT
) {
1171 set_description() << "data sync shard source_zone=" << sync_env
->source_zone
<< " shard_id=" << shard_id
;
1172 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
);
1173 error_oid
= status_oid
+ ".retry";
1175 logger
.init(sync_env
, "DataShard", status_oid
);
1178 ~RGWDataSyncShardCR() override
{
1179 delete marker_tracker
;
1188 void append_modified_shards(set
<string
>& keys
) {
1189 Mutex::Locker
l(inc_lock
);
1190 modified_shards
.insert(keys
.begin(), keys
.end());
1193 void set_marker_tracker(RGWDataSyncShardMarkerTrack
*mt
) {
1194 delete marker_tracker
;
1195 marker_tracker
= mt
;
1198 int operate() override
{
1201 switch (sync_marker
.state
) {
1202 case rgw_data_sync_marker::FullSync
:
1205 ldout(cct
, 10) << "sync: full_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1206 return set_cr_error(r
);
1209 case rgw_data_sync_marker::IncrementalSync
:
1210 r
= incremental_sync();
1212 ldout(cct
, 10) << "sync: incremental_sync: shard_id=" << shard_id
<< " r=" << r
<< dendl
;
1213 return set_cr_error(r
);
1217 return set_cr_error(-EIO
);
1223 void init_lease_cr() {
1224 set_status("acquiring sync lock");
1225 uint32_t lock_duration
= cct
->_conf
->rgw_sync_lease_period
;
1226 string lock_name
= "sync_lock";
1230 RGWRados
*store
= sync_env
->store
;
1231 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
1232 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
1233 lock_name
, lock_duration
, this));
1234 lease_stack
.reset(spawn(lease_cr
.get(), false));
1238 #define OMAP_GET_MAX_ENTRIES 100
1239 int max_entries
= OMAP_GET_MAX_ENTRIES
;
1241 yield
init_lease_cr();
1242 while (!lease_cr
->is_locked()) {
1243 if (lease_cr
->is_done()) {
1244 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1245 set_status("lease lock failed, early abort");
1247 return set_cr_error(lease_cr
->get_ret_status());
1252 logger
.log("full sync");
1253 oid
= full_data_sync_index_shard_oid(sync_env
->source_zone
, shard_id
);
1254 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
));
1255 total_entries
= sync_marker
.pos
;
1257 if (!lease_cr
->is_locked()) {
1258 stop_spawned_services();
1260 return set_cr_error(-ECANCELED
);
1262 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, oid
), sync_marker
.marker
, &entries
, max_entries
));
1264 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode
<< dendl
;
1265 lease_cr
->go_down();
1267 return set_cr_error(retcode
);
1269 iter
= entries
.begin();
1270 for (; iter
!= entries
.end(); ++iter
) {
1271 ldout(sync_env
->cct
, 20) << __func__
<< ": full sync: " << *iter
<< dendl
;
1273 if (!marker_tracker
->start(*iter
, total_entries
, real_time())) {
1274 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << *iter
<< ". Duplicate entry?" << dendl
;
1276 // fetch remote and write locally
1277 yield
spawn(new RGWDataSyncSingleEntryCR(sync_env
, *iter
, *iter
, marker_tracker
, error_repo
, false), false);
1279 sync_marker
.marker
= *iter
;
1281 while ((int)num_spawned() > spawn_window
) {
1282 set_status() << "num_spawned() > spawn_window";
1283 yield
wait_for_child();
1285 while (collect(&ret
, lease_stack
.get())) {
1287 ldout(cct
, 10) << "a sync operation returned error" << dendl
;
1292 } while ((int)entries
.size() == max_entries
);
1294 drain_all_but_stack(lease_stack
.get());
1297 /* update marker to reflect we're done with full sync */
1298 sync_marker
.state
= rgw_data_sync_marker::IncrementalSync
;
1299 sync_marker
.marker
= sync_marker
.next_step_marker
;
1300 sync_marker
.next_step_marker
.clear();
1301 RGWRados
*store
= sync_env
->store
;
1302 call(new RGWSimpleRadosWriteCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
1303 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
1307 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync marker: retcode=" << retcode
<< dendl
;
1308 lease_cr
->go_down();
1310 return set_cr_error(retcode
);
1312 // keep lease and transition to incremental_sync()
1317 int incremental_sync() {
1318 reenter(&incremental_cr
) {
1319 ldout(cct
, 10) << "start incremental sync" << dendl
;
1321 ldout(cct
, 10) << "lease already held from full sync" << dendl
;
1323 yield
init_lease_cr();
1324 while (!lease_cr
->is_locked()) {
1325 if (lease_cr
->is_done()) {
1326 ldout(cct
, 5) << "lease cr failed, done early " << dendl
;
1327 set_status("lease lock failed, early abort");
1329 return set_cr_error(lease_cr
->get_ret_status());
1334 set_status("lease acquired");
1335 ldout(cct
, 10) << "took lease" << dendl
;
1337 error_repo
= new RGWOmapAppend(sync_env
->async_rados
, sync_env
->store
,
1338 rgw_raw_obj(pool
, error_oid
),
1341 spawn(error_repo
, false);
1342 logger
.log("inc sync");
1343 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env
, status_oid
, sync_marker
));
1345 if (!lease_cr
->is_locked()) {
1346 stop_spawned_services();
1348 return set_cr_error(-ECANCELED
);
1350 current_modified
.clear();
1352 current_modified
.swap(modified_shards
);
1355 /* process out of band updates */
1356 for (modified_iter
= current_modified
.begin(); modified_iter
!= current_modified
.end(); ++modified_iter
) {
1358 ldout(sync_env
->cct
, 20) << __func__
<< "(): async update notification: " << *modified_iter
<< dendl
;
1359 spawn(new RGWDataSyncSingleEntryCR(sync_env
, *modified_iter
, string(), marker_tracker
, error_repo
, false), false);
1363 if (error_retry_time
<= ceph::coarse_real_clock::now()) {
1364 /* process bucket shards that previously failed */
1365 yield
call(new RGWRadosGetOmapKeysCR(sync_env
->store
, rgw_raw_obj(pool
, error_oid
),
1366 error_marker
, &error_entries
,
1367 max_error_entries
));
1368 ldout(sync_env
->cct
, 20) << __func__
<< "(): read error repo, got " << error_entries
.size() << " entries" << dendl
;
1369 iter
= error_entries
.begin();
1370 for (; iter
!= error_entries
.end(); ++iter
) {
1371 error_marker
= *iter
;
1372 ldout(sync_env
->cct
, 20) << __func__
<< "(): handle error entry: " << error_marker
<< dendl
;
1373 spawn(new RGWDataSyncSingleEntryCR(sync_env
, error_marker
, error_marker
, nullptr /* no marker tracker */, error_repo
, true), false);
1375 if ((int)error_entries
.size() != max_error_entries
) {
1376 if (error_marker
.empty() && error_entries
.empty()) {
1377 /* the retry repo is empty, we back off a bit before calling it again */
1378 retry_backoff_secs
*= 2;
1379 if (retry_backoff_secs
> RETRY_BACKOFF_SECS_MAX
) {
1380 retry_backoff_secs
= RETRY_BACKOFF_SECS_MAX
;
1383 retry_backoff_secs
= RETRY_BACKOFF_SECS_DEFAULT
;
1385 error_retry_time
= ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs
);
1386 error_marker
.clear();
1390 #define INCREMENTAL_MAX_ENTRIES 100
1391 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " sync_marker=" << sync_marker
.marker
<< dendl
;
1392 spawned_keys
.clear();
1393 yield
call(new RGWReadRemoteDataLogShardCR(sync_env
, shard_id
, sync_marker
.marker
,
1394 &next_marker
, &log_entries
, &truncated
));
1396 ldout(sync_env
->cct
, 0) << "ERROR: failed to read remote data log info: ret=" << retcode
<< dendl
;
1397 stop_spawned_services();
1399 return set_cr_error(retcode
);
1401 for (log_iter
= log_entries
.begin(); log_iter
!= log_entries
.end(); ++log_iter
) {
1402 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " log_entry: " << log_iter
->log_id
<< ":" << log_iter
->log_timestamp
<< ":" << log_iter
->entry
.key
<< dendl
;
1403 if (!marker_tracker
->index_key_to_marker(log_iter
->entry
.key
, log_iter
->log_id
)) {
1404 ldout(sync_env
->cct
, 20) << __func__
<< ": skipping sync of entry: " << log_iter
->log_id
<< ":" << log_iter
->entry
.key
<< " sync already in progress for bucket shard" << dendl
;
1405 marker_tracker
->try_update_high_marker(log_iter
->log_id
, 0, log_iter
->log_timestamp
);
1408 if (!marker_tracker
->start(log_iter
->log_id
, 0, log_iter
->log_timestamp
)) {
1409 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << log_iter
->log_id
<< ". Duplicate entry?" << dendl
;
1412 * don't spawn the same key more than once. We can do that as long as we don't yield
1414 if (spawned_keys
.find(log_iter
->entry
.key
) == spawned_keys
.end()) {
1415 spawned_keys
.insert(log_iter
->entry
.key
);
1416 spawn(new RGWDataSyncSingleEntryCR(sync_env
, log_iter
->entry
.key
, log_iter
->log_id
, marker_tracker
, error_repo
, false), false);
1418 stop_spawned_services();
1420 return set_cr_error(retcode
);
1424 while ((int)num_spawned() > spawn_window
) {
1425 set_status() << "num_spawned() > spawn_window";
1426 yield
wait_for_child();
1428 while (collect(&ret
, lease_stack
.get())) {
1430 ldout(sync_env
->cct
, 10) << "a sync operation returned error" << dendl
;
1431 /* we have reported this error */
1433 /* not waiting for child here */
1435 /* not waiting for child here */
1438 ldout(sync_env
->cct
, 20) << __func__
<< ":" << __LINE__
<< ": shard_id=" << shard_id
<< " sync_marker=" << sync_marker
.marker
1439 << " next_marker=" << next_marker
<< " truncated=" << truncated
<< dendl
;
1441 yield
wait(get_idle_interval());
1443 if (!next_marker
.empty()) {
1444 sync_marker
.marker
= next_marker
;
1445 } else if (!log_entries
.empty()) {
1446 sync_marker
.marker
= log_entries
.back().log_id
;
1453 utime_t
get_idle_interval() const {
1454 #define INCREMENTAL_INTERVAL 20
1455 ceph::timespan interval
= std::chrono::seconds(INCREMENTAL_INTERVAL
);
1456 if (!ceph::coarse_real_clock::is_zero(error_retry_time
)) {
1457 auto now
= ceph::coarse_real_clock::now();
1458 if (error_retry_time
> now
) {
1459 auto d
= error_retry_time
- now
;
1465 // convert timespan -> time_point -> utime_t
1466 return utime_t(ceph::coarse_real_clock::zero() + interval
);
1469 void stop_spawned_services() {
1470 lease_cr
->go_down();
1472 error_repo
->finish();
1479 class RGWDataSyncShardControlCR
: public RGWBackoffControlCR
{
1480 RGWDataSyncEnv
*sync_env
;
1485 rgw_data_sync_marker sync_marker
;
1488 RGWDataSyncShardControlCR(RGWDataSyncEnv
*_sync_env
, rgw_pool
& _pool
,
1489 uint32_t _shard_id
, rgw_data_sync_marker
& _marker
) : RGWBackoffControlCR(_sync_env
->cct
, false),
1490 sync_env(_sync_env
),
1492 shard_id(_shard_id
),
1493 sync_marker(_marker
) {
1496 RGWCoroutine
*alloc_cr() override
{
1497 return new RGWDataSyncShardCR(sync_env
, pool
, shard_id
, sync_marker
, backoff_ptr());
1500 RGWCoroutine
*alloc_finisher_cr() override
{
1501 RGWRados
*store
= sync_env
->store
;
1502 return new RGWSimpleRadosReadCR
<rgw_data_sync_marker
>(sync_env
->async_rados
, store
,
1503 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
)),
1507 void append_modified_shards(set
<string
>& keys
) {
1508 Mutex::Locker
l(cr_lock());
1510 RGWDataSyncShardCR
*cr
= static_cast<RGWDataSyncShardCR
*>(get_cr());
1515 cr
->append_modified_shards(keys
);
1519 class RGWDataSyncCR
: public RGWCoroutine
{
1520 RGWDataSyncEnv
*sync_env
;
1521 uint32_t num_shards
;
1523 rgw_data_sync_status sync_status
;
1525 RGWDataSyncShardMarkerTrack
*marker_tracker
;
1527 Mutex shard_crs_lock
;
1528 map
<int, RGWDataSyncShardControlCR
*> shard_crs
;
1530 bool *reset_backoff
;
1532 RGWDataSyncDebugLogger logger
;
1534 RGWDataSyncModule
*data_sync_module
{nullptr};
1536 RGWDataSyncCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
, bool *_reset_backoff
) : RGWCoroutine(_sync_env
->cct
),
1537 sync_env(_sync_env
),
1538 num_shards(_num_shards
),
1539 marker_tracker(NULL
),
1540 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1541 reset_backoff(_reset_backoff
), logger(sync_env
, "Data", "all") {
1545 ~RGWDataSyncCR() override
{
1546 for (auto iter
: shard_crs
) {
1551 int operate() override
{
1554 /* read sync status */
1555 yield
call(new RGWReadDataSyncStatusCoroutine(sync_env
, &sync_status
));
1557 data_sync_module
= sync_env
->sync_module
->get_data_handler();
1559 if (retcode
< 0 && retcode
!= -ENOENT
) {
1560 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode
<< dendl
;
1561 return set_cr_error(retcode
);
1564 /* state: init status */
1565 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateInit
) {
1566 ldout(sync_env
->cct
, 20) << __func__
<< "(): init" << dendl
;
1567 sync_status
.sync_info
.num_shards
= num_shards
;
1568 uint64_t instance_id
;
1569 get_random_bytes((char *)&instance_id
, sizeof(instance_id
));
1570 yield
call(new RGWInitDataSyncStatusCoroutine(sync_env
, num_shards
, instance_id
, &sync_status
));
1572 ldout(sync_env
->cct
, 0) << "ERROR: failed to init sync, retcode=" << retcode
<< dendl
;
1573 return set_cr_error(retcode
);
1575 // sets state = StateBuildingFullSyncMaps
1577 *reset_backoff
= true;
1580 data_sync_module
->init(sync_env
, sync_status
.sync_info
.instance_id
);
1582 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateBuildingFullSyncMaps
) {
1583 /* call sync module init here */
1584 sync_status
.sync_info
.num_shards
= num_shards
;
1585 yield
call(data_sync_module
->init_sync(sync_env
));
1587 ldout(sync_env
->cct
, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode
<< dendl
;
1588 return set_cr_error(retcode
);
1590 /* state: building full sync maps */
1591 ldout(sync_env
->cct
, 20) << __func__
<< "(): building full sync maps" << dendl
;
1592 yield
call(new RGWListBucketIndexesCR(sync_env
, &sync_status
));
1594 ldout(sync_env
->cct
, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode
<< dendl
;
1595 return set_cr_error(retcode
);
1597 sync_status
.sync_info
.state
= rgw_data_sync_info::StateSync
;
1599 /* update new state */
1600 yield
call(set_sync_info_cr());
1602 ldout(sync_env
->cct
, 0) << "ERROR: failed to write sync status, retcode=" << retcode
<< dendl
;
1603 return set_cr_error(retcode
);
1606 *reset_backoff
= true;
1610 if ((rgw_data_sync_info::SyncState
)sync_status
.sync_info
.state
== rgw_data_sync_info::StateSync
) {
1611 for (map
<uint32_t, rgw_data_sync_marker
>::iterator iter
= sync_status
.sync_markers
.begin();
1612 iter
!= sync_status
.sync_markers
.end(); ++iter
) {
1613 RGWDataSyncShardControlCR
*cr
= new RGWDataSyncShardControlCR(sync_env
, sync_env
->store
->get_zone_params().log_pool
,
1614 iter
->first
, iter
->second
);
1616 shard_crs_lock
.Lock();
1617 shard_crs
[iter
->first
] = cr
;
1618 shard_crs_lock
.Unlock();
1624 return set_cr_done();
1629 RGWCoroutine
*set_sync_info_cr() {
1630 RGWRados
*store
= sync_env
->store
;
1631 return new RGWSimpleRadosWriteCR
<rgw_data_sync_info
>(sync_env
->async_rados
, store
,
1632 rgw_raw_obj(store
->get_zone_params().log_pool
, RGWDataSyncStatusManager::sync_status_oid(sync_env
->source_zone
)),
1633 sync_status
.sync_info
);
1636 void wakeup(int shard_id
, set
<string
>& keys
) {
1637 Mutex::Locker
l(shard_crs_lock
);
1638 map
<int, RGWDataSyncShardControlCR
*>::iterator iter
= shard_crs
.find(shard_id
);
1639 if (iter
== shard_crs
.end()) {
1642 iter
->second
->append_modified_shards(keys
);
1643 iter
->second
->wakeup();
1647 class RGWDefaultDataSyncModule
: public RGWDataSyncModule
{
1649 RGWDefaultDataSyncModule() {}
1651 RGWCoroutine
*sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, boost::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1652 RGWCoroutine
*remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1653 RGWCoroutine
*create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1654 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
;
1657 class RGWDefaultSyncModuleInstance
: public RGWSyncModuleInstance
{
1658 RGWDefaultDataSyncModule data_handler
;
1660 RGWDefaultSyncModuleInstance() {}
1661 RGWDataSyncModule
*get_data_handler() override
{
1662 return &data_handler
;
1666 int RGWDefaultSyncModule::create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
)
1668 instance
->reset(new RGWDefaultSyncModuleInstance());
1672 RGWCoroutine
*RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, boost::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
)
1674 return new RGWFetchRemoteObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
, bucket_info
,
1675 key
, versioned_epoch
,
1679 RGWCoroutine
*RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
,
1680 real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1682 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1683 bucket_info
, key
, versioned
, versioned_epoch
,
1684 NULL
, NULL
, false, &mtime
, zones_trace
);
1687 RGWCoroutine
*RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv
*sync_env
, RGWBucketInfo
& bucket_info
, rgw_obj_key
& key
, real_time
& mtime
,
1688 rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
)
1690 return new RGWRemoveObjCR(sync_env
->async_rados
, sync_env
->store
, sync_env
->source_zone
,
1691 bucket_info
, key
, versioned
, versioned_epoch
,
1692 &owner
.id
, &owner
.display_name
, true, &mtime
, zones_trace
);
1695 class RGWDataSyncControlCR
: public RGWBackoffControlCR
1697 RGWDataSyncEnv
*sync_env
;
1698 uint32_t num_shards
;
1700 static constexpr bool exit_on_error
= false; // retry on all errors
1702 RGWDataSyncControlCR(RGWDataSyncEnv
*_sync_env
, uint32_t _num_shards
) : RGWBackoffControlCR(_sync_env
->cct
, exit_on_error
),
1703 sync_env(_sync_env
), num_shards(_num_shards
) {
1706 RGWCoroutine
*alloc_cr() override
{
1707 return new RGWDataSyncCR(sync_env
, num_shards
, backoff_ptr());
1710 void wakeup(int shard_id
, set
<string
>& keys
) {
1711 Mutex
& m
= cr_lock();
1714 RGWDataSyncCR
*cr
= static_cast<RGWDataSyncCR
*>(get_cr());
1724 cr
->wakeup(shard_id
, keys
);
1731 void RGWRemoteDataLog::wakeup(int shard_id
, set
<string
>& keys
) {
1732 RWLock::RLocker
rl(lock
);
1733 if (!data_sync_cr
) {
1736 data_sync_cr
->wakeup(shard_id
, keys
);
1739 int RGWRemoteDataLog::run_sync(int num_shards
)
1742 data_sync_cr
= new RGWDataSyncControlCR(&sync_env
, num_shards
);
1743 data_sync_cr
->get(); // run() will drop a ref, so take another
1746 int r
= run(data_sync_cr
);
1749 data_sync_cr
->put();
1750 data_sync_cr
= NULL
;
1754 ldout(store
->ctx(), 0) << "ERROR: failed to run sync" << dendl
;
1760 int RGWDataSyncStatusManager::init()
1762 auto zone_def_iter
= store
->zone_by_id
.find(source_zone
);
1763 if (zone_def_iter
== store
->zone_by_id
.end()) {
1764 ldout(store
->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone
<< dendl
;
1768 auto& zone_def
= zone_def_iter
->second
;
1770 if (!store
->get_sync_modules_manager()->supports_data_export(zone_def
.tier_type
)) {
1774 RGWZoneParams
& zone_params
= store
->get_zone_params();
1776 if (sync_module
== nullptr) {
1777 sync_module
= store
->get_sync_module();
1780 conn
= store
->get_zone_conn_by_id(source_zone
);
1782 ldout(store
->ctx(), 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
1786 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
1788 int r
= source_log
.init(source_zone
, conn
, error_logger
, sync_module
);
1790 lderr(store
->ctx()) << "ERROR: failed to init remote log, r=" << r
<< dendl
;
1795 rgw_datalog_info datalog_info
;
1796 r
= source_log
.read_log_info(&datalog_info
);
1798 ldout(store
->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r
<< dendl
;
1803 num_shards
= datalog_info
.num_shards
;
1805 for (int i
= 0; i
< num_shards
; i
++) {
1806 shard_objs
[i
] = rgw_raw_obj(zone_params
.log_pool
, shard_obj_name(source_zone
, i
));
1812 void RGWDataSyncStatusManager::finalize()
1814 delete error_logger
;
1815 error_logger
= nullptr;
1818 string
RGWDataSyncStatusManager::sync_status_oid(const string
& source_zone
)
1820 char buf
[datalog_sync_status_oid_prefix
.size() + source_zone
.size() + 16];
1821 snprintf(buf
, sizeof(buf
), "%s.%s", datalog_sync_status_oid_prefix
.c_str(), source_zone
.c_str());
1826 string
RGWDataSyncStatusManager::shard_obj_name(const string
& source_zone
, int shard_id
)
1828 char buf
[datalog_sync_status_shard_prefix
.size() + source_zone
.size() + 16];
1829 snprintf(buf
, sizeof(buf
), "%s.%s.%d", datalog_sync_status_shard_prefix
.c_str(), source_zone
.c_str(), shard_id
);
1834 int RGWRemoteBucketLog::init(const string
& _source_zone
, RGWRESTConn
*_conn
,
1835 const rgw_bucket
& bucket
, int shard_id
,
1836 RGWSyncErrorLogger
*_error_logger
,
1837 RGWSyncModuleInstanceRef
& _sync_module
)
1840 source_zone
= _source_zone
;
1842 bs
.shard_id
= shard_id
;
1844 sync_env
.init(store
->ctx(), store
, conn
, async_rados
, http_manager
,
1845 _error_logger
, source_zone
, _sync_module
);
1850 class RGWReadRemoteBucketIndexLogInfoCR
: public RGWCoroutine
{
1851 RGWDataSyncEnv
*sync_env
;
1852 const string instance_key
;
1854 rgw_bucket_index_marker_info
*info
;
1857 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv
*_sync_env
,
1858 const rgw_bucket_shard
& bs
,
1859 rgw_bucket_index_marker_info
*_info
)
1860 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1861 instance_key(bs
.get_key()), info(_info
) {}
1863 int operate() override
{
1866 rgw_http_param_pair pairs
[] = { { "type" , "bucket-index" },
1867 { "bucket-instance", instance_key
.c_str() },
1871 string p
= "/admin/log/";
1872 call(new RGWReadRESTResourceCR
<rgw_bucket_index_marker_info
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, info
));
1875 return set_cr_error(retcode
);
1877 return set_cr_done();
1883 class RGWInitBucketShardSyncStatusCoroutine
: public RGWCoroutine
{
1884 RGWDataSyncEnv
*sync_env
;
1886 rgw_bucket_shard bs
;
1887 const string sync_status_oid
;
1889 rgw_bucket_shard_sync_info
& status
;
1891 rgw_bucket_index_marker_info info
;
1893 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
1894 const rgw_bucket_shard
& bs
,
1895 rgw_bucket_shard_sync_info
& _status
)
1896 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
1897 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
1901 int operate() override
{
1903 /* fetch current position in logs */
1904 yield
call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env
, bs
, &info
));
1905 if (retcode
< 0 && retcode
!= -ENOENT
) {
1906 ldout(cct
, 0) << "ERROR: failed to fetch bucket index status" << dendl
;
1907 return set_cr_error(retcode
);
1910 auto store
= sync_env
->store
;
1911 rgw_raw_obj
obj(store
->get_zone_params().log_pool
, sync_status_oid
);
1913 if (info
.syncstopped
) {
1914 call(new RGWRadosRemoveCR(store
, obj
));
1916 status
.state
= rgw_bucket_shard_sync_info::StateFullSync
;
1917 status
.inc_marker
.position
= info
.max_marker
;
1918 map
<string
, bufferlist
> attrs
;
1919 status
.encode_all_attrs(attrs
);
1920 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
, obj
, attrs
));
1923 if (info
.syncstopped
) {
1927 return set_cr_error(retcode
);
1929 return set_cr_done();
1935 RGWCoroutine
*RGWRemoteBucketLog::init_sync_status_cr()
1937 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env
, bs
, init_status
);
1941 static void decode_attr(CephContext
*cct
, map
<string
, bufferlist
>& attrs
, const string
& attr_name
, T
*val
)
1943 map
<string
, bufferlist
>::iterator iter
= attrs
.find(attr_name
);
1944 if (iter
== attrs
.end()) {
1949 bufferlist::iterator biter
= iter
->second
.begin();
1951 ::decode(*val
, biter
);
1952 } catch (buffer::error
& err
) {
1953 ldout(cct
, 0) << "ERROR: failed to decode attribute: " << attr_name
<< dendl
;
1957 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
)
1959 decode_attr(cct
, attrs
, "state", &state
);
1960 decode_attr(cct
, attrs
, "full_marker", &full_marker
);
1961 decode_attr(cct
, attrs
, "inc_marker", &inc_marker
);
1964 void rgw_bucket_shard_sync_info::encode_all_attrs(map
<string
, bufferlist
>& attrs
)
1966 encode_state_attr(attrs
);
1967 full_marker
.encode_attr(attrs
);
1968 inc_marker
.encode_attr(attrs
);
1971 void rgw_bucket_shard_sync_info::encode_state_attr(map
<string
, bufferlist
>& attrs
)
1973 ::encode(state
, attrs
["state"]);
1976 void rgw_bucket_shard_full_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
1978 ::encode(*this, attrs
["full_marker"]);
1981 void rgw_bucket_shard_inc_sync_marker::encode_attr(map
<string
, bufferlist
>& attrs
)
1983 ::encode(*this, attrs
["inc_marker"]);
1986 class RGWReadBucketSyncStatusCoroutine
: public RGWCoroutine
{
1987 RGWDataSyncEnv
*sync_env
;
1989 rgw_bucket_shard_sync_info
*status
;
1991 map
<string
, bufferlist
> attrs
;
1993 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv
*_sync_env
,
1994 const rgw_bucket_shard
& bs
,
1995 rgw_bucket_shard_sync_info
*_status
)
1996 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
1997 oid(RGWBucketSyncStatusManager::status_oid(sync_env
->source_zone
, bs
)),
1999 int operate() override
;
2002 int RGWReadBucketSyncStatusCoroutine::operate()
2005 yield
call(new RGWSimpleRadosReadAttrsCR(sync_env
->async_rados
, sync_env
->store
,
2006 rgw_raw_obj(sync_env
->store
->get_zone_params().log_pool
, oid
),
2008 if (retcode
== -ENOENT
) {
2009 *status
= rgw_bucket_shard_sync_info();
2010 return set_cr_done();
2013 ldout(sync_env
->cct
, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid
<< " ret=" << retcode
<< dendl
;
2014 return set_cr_error(retcode
);
2016 status
->decode_from_attrs(sync_env
->cct
, attrs
);
2017 return set_cr_done();
2022 #define OMAP_READ_MAX_ENTRIES 10
2023 class RGWReadRecoveringBucketShardsCoroutine
: public RGWCoroutine
{
2024 RGWDataSyncEnv
*sync_env
;
2030 set
<string
>& recovering_buckets
;
2034 set
<string
> error_entries
;
2035 int max_omap_entries
;
2039 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv
*_sync_env
, const int _shard_id
,
2040 set
<string
>& _recovering_buckets
, const int _max_entries
)
2041 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2042 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
2043 recovering_buckets(_recovering_buckets
), max_omap_entries(OMAP_READ_MAX_ENTRIES
)
2045 error_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
) + ".retry";
2048 int operate() override
;
2051 int RGWReadRecoveringBucketShardsCoroutine::operate()
2054 //read recovering bucket shards
2057 yield
call(new RGWRadosGetOmapKeysCR(store
, rgw_raw_obj(store
->get_zone_params().log_pool
, error_oid
),
2058 marker
, &error_entries
, max_omap_entries
));
2060 if (retcode
== -ENOENT
) {
2065 ldout(sync_env
->cct
, 0) << "failed to read recovering bucket shards with "
2066 << cpp_strerror(retcode
) << dendl
;
2067 return set_cr_error(retcode
);
2070 if (error_entries
.empty()) {
2074 count
+= error_entries
.size();
2075 marker
= *error_entries
.rbegin();
2076 recovering_buckets
.insert(error_entries
.begin(), error_entries
.end());
2077 }while((int)error_entries
.size() == max_omap_entries
&& count
< max_entries
);
2079 return set_cr_done();
2085 class RGWReadPendingBucketShardsCoroutine
: public RGWCoroutine
{
2086 RGWDataSyncEnv
*sync_env
;
2092 set
<string
>& pending_buckets
;
2096 rgw_data_sync_marker
* sync_marker
;
2099 std::string next_marker
;
2100 list
<rgw_data_change_log_entry
> log_entries
;
2104 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv
*_sync_env
, const int _shard_id
,
2105 set
<string
>& _pending_buckets
,
2106 rgw_data_sync_marker
* _sync_marker
, const int _max_entries
)
2107 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2108 store(sync_env
->store
), shard_id(_shard_id
), max_entries(_max_entries
),
2109 pending_buckets(_pending_buckets
), sync_marker(_sync_marker
)
2111 status_oid
= RGWDataSyncStatusManager::shard_obj_name(sync_env
->source_zone
, shard_id
);
2114 int operate() override
;
2117 int RGWReadPendingBucketShardsCoroutine::operate()
2120 //read sync status marker
2121 using CR
= RGWSimpleRadosReadCR
<rgw_data_sync_marker
>;
2122 yield
call(new CR(sync_env
->async_rados
, store
,
2123 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
2126 ldout(sync_env
->cct
,0) << "failed to read sync status marker with "
2127 << cpp_strerror(retcode
) << dendl
;
2128 return set_cr_error(retcode
);
2131 //read pending bucket shards
2132 marker
= sync_marker
->marker
;
2135 yield
call(new RGWReadRemoteDataLogShardCR(sync_env
, shard_id
, marker
,
2136 &next_marker
, &log_entries
, &truncated
));
2138 if (retcode
== -ENOENT
) {
2143 ldout(sync_env
->cct
,0) << "failed to read remote data log info with "
2144 << cpp_strerror(retcode
) << dendl
;
2145 return set_cr_error(retcode
);
2148 if (log_entries
.empty()) {
2152 count
+= log_entries
.size();
2153 for (const auto& entry
: log_entries
) {
2154 pending_buckets
.insert(entry
.entry
.key
);
2156 }while(truncated
&& count
< max_entries
);
2158 return set_cr_done();
2164 int RGWRemoteDataLog::read_shard_status(int shard_id
, set
<string
>& pending_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
)
2166 // cannot run concurrently with run_sync(), so run in a separate manager
2167 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
2168 RGWHTTPManager
http_manager(store
->ctx(), crs
.get_completion_mgr());
2169 int ret
= http_manager
.set_threaded();
2171 ldout(store
->ctx(), 0) << "failed in http_manager.start() ret=" << ret
<< dendl
;
2174 RGWDataSyncEnv sync_env_local
= sync_env
;
2175 sync_env_local
.http_manager
= &http_manager
;
2176 list
<RGWCoroutinesStack
*> stacks
;
2177 RGWCoroutinesStack
* recovering_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
2178 recovering_stack
->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local
, shard_id
, recovering_buckets
, max_entries
));
2179 stacks
.push_back(recovering_stack
);
2180 RGWCoroutinesStack
* pending_stack
= new RGWCoroutinesStack(store
->ctx(), &crs
);
2181 pending_stack
->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local
, shard_id
, pending_buckets
, sync_marker
, max_entries
));
2182 stacks
.push_back(pending_stack
);
2183 ret
= crs
.run(stacks
);
2184 http_manager
.stop();
2188 RGWCoroutine
*RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
)
2190 return new RGWReadBucketSyncStatusCoroutine(&sync_env
, bs
, sync_status
);
2193 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2194 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
2195 delete iter
->second
;
2197 delete error_logger
;
2201 void rgw_bucket_entry_owner::decode_json(JSONObj
*obj
)
2203 JSONDecoder::decode_json("ID", id
, obj
);
2204 JSONDecoder::decode_json("DisplayName", display_name
, obj
);
2207 struct bucket_list_entry
{
2214 string storage_class
;
2215 rgw_bucket_entry_owner owner
;
2216 uint64_t versioned_epoch
;
2219 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2221 void decode_json(JSONObj
*obj
) {
2222 JSONDecoder::decode_json("IsDeleteMarker", delete_marker
, obj
);
2223 JSONDecoder::decode_json("Key", key
.name
, obj
);
2224 JSONDecoder::decode_json("VersionId", key
.instance
, obj
);
2225 JSONDecoder::decode_json("IsLatest", is_latest
, obj
);
2227 JSONDecoder::decode_json("RgwxMtime", mtime_str
, obj
);
2231 if (parse_iso8601(mtime_str
.c_str(), &t
, &nsec
)) {
2233 ts
.tv_sec
= (uint64_t)internal_timegm(&t
);
2235 mtime
= real_clock::from_ceph_timespec(ts
);
2237 JSONDecoder::decode_json("ETag", etag
, obj
);
2238 JSONDecoder::decode_json("Size", size
, obj
);
2239 JSONDecoder::decode_json("StorageClass", storage_class
, obj
);
2240 JSONDecoder::decode_json("Owner", owner
, obj
);
2241 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch
, obj
);
2242 JSONDecoder::decode_json("RgwxTag", rgw_tag
, obj
);
2245 RGWModifyOp
get_modify_op() const {
2246 if (delete_marker
) {
2247 return CLS_RGW_OP_LINK_OLH_DM
;
2248 } else if (!key
.instance
.empty() && key
.instance
!= "null") {
2249 return CLS_RGW_OP_LINK_OLH
;
2251 return CLS_RGW_OP_ADD
;
2256 struct bucket_list_result
{
2260 string version_id_marker
;
2263 list
<bucket_list_entry
> entries
;
2265 bucket_list_result() : max_keys(0), is_truncated(false) {}
2267 void decode_json(JSONObj
*obj
) {
2268 JSONDecoder::decode_json("Name", name
, obj
);
2269 JSONDecoder::decode_json("Prefix", prefix
, obj
);
2270 JSONDecoder::decode_json("KeyMarker", key_marker
, obj
);
2271 JSONDecoder::decode_json("VersionIdMarker", version_id_marker
, obj
);
2272 JSONDecoder::decode_json("MaxKeys", max_keys
, obj
);
2273 JSONDecoder::decode_json("IsTruncated", is_truncated
, obj
);
2274 JSONDecoder::decode_json("Entries", entries
, obj
);
2278 class RGWListBucketShardCR
: public RGWCoroutine
{
2279 RGWDataSyncEnv
*sync_env
;
2280 const rgw_bucket_shard
& bs
;
2281 const string instance_key
;
2282 rgw_obj_key marker_position
;
2284 bucket_list_result
*result
;
2287 RGWListBucketShardCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2288 rgw_obj_key
& _marker_position
, bucket_list_result
*_result
)
2289 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2290 instance_key(bs
.get_key()), marker_position(_marker_position
),
2293 int operate() override
{
2296 rgw_http_param_pair pairs
[] = { { "rgwx-bucket-instance", instance_key
.c_str() },
2297 { "versions" , NULL
},
2298 { "format" , "json" },
2299 { "objs-container" , "true" },
2300 { "key-marker" , marker_position
.name
.c_str() },
2301 { "version-id-marker" , marker_position
.instance
.c_str() },
2303 // don't include tenant in the url, it's already part of instance_key
2304 string p
= string("/") + bs
.bucket
.name
;
2305 call(new RGWReadRESTResourceCR
<bucket_list_result
>(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, p
, pairs
, result
));
2308 return set_cr_error(retcode
);
2310 return set_cr_done();
2316 class RGWListBucketIndexLogCR
: public RGWCoroutine
{
2317 RGWDataSyncEnv
*sync_env
;
2318 const string instance_key
;
2321 list
<rgw_bi_log_entry
> *result
;
2324 RGWListBucketIndexLogCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2325 string
& _marker
, list
<rgw_bi_log_entry
> *_result
)
2326 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
),
2327 instance_key(bs
.get_key()), marker(_marker
), result(_result
) {}
2329 int operate() override
{
2332 rgw_http_param_pair pairs
[] = { { "bucket-instance", instance_key
.c_str() },
2333 { "format" , "json" },
2334 { "marker" , marker
.c_str() },
2335 { "type", "bucket-index" },
2338 call(new RGWReadRESTResourceCR
<list
<rgw_bi_log_entry
> >(sync_env
->cct
, sync_env
->conn
, sync_env
->http_manager
, "/admin/log", pairs
, result
));
2341 return set_cr_error(retcode
);
2343 return set_cr_done();
2349 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2351 class RGWBucketFullSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<rgw_obj_key
, rgw_obj_key
> {
2352 RGWDataSyncEnv
*sync_env
;
2355 rgw_bucket_shard_full_sync_marker sync_marker
;
2358 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2359 const string
& _marker_oid
,
2360 const rgw_bucket_shard_full_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2361 sync_env(_sync_env
),
2362 marker_oid(_marker_oid
),
2363 sync_marker(_marker
) {}
2365 RGWCoroutine
*store_marker(const rgw_obj_key
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2366 sync_marker
.position
= new_marker
;
2367 sync_marker
.count
= index_pos
;
2369 map
<string
, bufferlist
> attrs
;
2370 sync_marker
.encode_attr(attrs
);
2372 RGWRados
*store
= sync_env
->store
;
2374 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
2375 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
,
2376 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
2380 RGWOrderCallCR
*allocate_order_control_cr() {
2381 return new RGWLastCallerWinsCR(sync_env
->cct
);
2385 class RGWBucketIncSyncShardMarkerTrack
: public RGWSyncShardMarkerTrack
<string
, rgw_obj_key
> {
2386 RGWDataSyncEnv
*sync_env
;
2389 rgw_bucket_shard_inc_sync_marker sync_marker
;
2391 map
<rgw_obj_key
, string
> key_to_marker
;
2397 map
<string
, operation
> marker_to_op
;
2398 std::set
<std::string
> pending_olh
; // object names with pending olh operations
2400 void handle_finish(const string
& marker
) override
{
2401 auto iter
= marker_to_op
.find(marker
);
2402 if (iter
== marker_to_op
.end()) {
2405 auto& op
= iter
->second
;
2406 key_to_marker
.erase(op
.key
);
2407 reset_need_retry(op
.key
);
2409 pending_olh
.erase(op
.key
.name
);
2411 marker_to_op
.erase(iter
);
2415 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv
*_sync_env
,
2416 const string
& _marker_oid
,
2417 const rgw_bucket_shard_inc_sync_marker
& _marker
) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW
),
2418 sync_env(_sync_env
),
2419 marker_oid(_marker_oid
),
2420 sync_marker(_marker
) {}
2422 RGWCoroutine
*store_marker(const string
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) override
{
2423 sync_marker
.position
= new_marker
;
2425 map
<string
, bufferlist
> attrs
;
2426 sync_marker
.encode_attr(attrs
);
2428 RGWRados
*store
= sync_env
->store
;
2430 ldout(sync_env
->cct
, 20) << __func__
<< "(): updating marker marker_oid=" << marker_oid
<< " marker=" << new_marker
<< dendl
;
2431 return new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
,
2433 rgw_raw_obj(store
->get_zone_params().log_pool
, marker_oid
),
2438 * create index from key -> <op, marker>, and from marker -> key
2439 * this is useful so that we can insure that we only have one
2440 * entry for any key that is used. This is needed when doing
2441 * incremenatl sync of data, and we don't want to run multiple
2442 * concurrent sync operations for the same bucket shard
2443 * Also, we should make sure that we don't run concurrent operations on the same key with
2446 bool index_key_to_marker(const rgw_obj_key
& key
, const string
& marker
, bool is_olh
) {
2447 auto result
= key_to_marker
.emplace(key
, marker
);
2448 if (!result
.second
) { // exists
2449 set_need_retry(key
);
2452 marker_to_op
[marker
] = operation
{key
, is_olh
};
2454 // prevent other olh ops from starting on this object name
2455 pending_olh
.insert(key
.name
);
2460 bool can_do_op(const rgw_obj_key
& key
, bool is_olh
) {
2461 // serialize olh ops on the same object name
2462 if (is_olh
&& pending_olh
.count(key
.name
)) {
2463 ldout(sync_env
->cct
, 20) << __func__
<< "(): sync of " << key
<< " waiting for pending olh op" << dendl
;
2466 return (key_to_marker
.find(key
) == key_to_marker
.end());
2469 RGWOrderCallCR
*allocate_order_control_cr() {
2470 return new RGWLastCallerWinsCR(sync_env
->cct
);
2474 template <class T
, class K
>
2475 class RGWBucketSyncSingleEntryCR
: public RGWCoroutine
{
2476 RGWDataSyncEnv
*sync_env
;
2478 RGWBucketInfo
*bucket_info
;
2479 const rgw_bucket_shard
& bs
;
2483 boost::optional
<uint64_t> versioned_epoch
;
2484 rgw_bucket_entry_owner owner
;
2485 real_time timestamp
;
2487 RGWPendingState op_state
;
2490 RGWSyncShardMarkerTrack
<T
, K
> *marker_tracker
;
2494 stringstream error_ss
;
2496 RGWDataSyncDebugLogger logger
;
2498 bool error_injection
;
2500 RGWDataSyncModule
*data_sync_module
;
2502 rgw_zone_set zones_trace
;
2505 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv
*_sync_env
,
2506 RGWBucketInfo
*_bucket_info
,
2507 const rgw_bucket_shard
& bs
,
2508 const rgw_obj_key
& _key
, bool _versioned
,
2509 boost::optional
<uint64_t> _versioned_epoch
,
2510 real_time
& _timestamp
,
2511 const rgw_bucket_entry_owner
& _owner
,
2512 RGWModifyOp _op
, RGWPendingState _op_state
,
2513 const T
& _entry_marker
, RGWSyncShardMarkerTrack
<T
, K
> *_marker_tracker
, rgw_zone_set
& _zones_trace
) : RGWCoroutine(_sync_env
->cct
),
2514 sync_env(_sync_env
),
2515 bucket_info(_bucket_info
), bs(bs
),
2516 key(_key
), versioned(_versioned
), versioned_epoch(_versioned_epoch
),
2518 timestamp(_timestamp
), op(_op
),
2519 op_state(_op_state
),
2520 entry_marker(_entry_marker
),
2521 marker_tracker(_marker_tracker
),
2524 ss
<< bucket_shard_str
{bs
} << "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]";
2525 set_description() << "bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
;
2526 ldout(sync_env
->cct
, 20) << "bucket sync single entry (source_zone=" << sync_env
->source_zone
<< ") b=" << ss
.str() << " log_entry=" << entry_marker
<< " op=" << (int)op
<< " op_state=" << (int)op_state
<< dendl
;
2529 logger
.init(sync_env
, "Object", ss
.str());
2531 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_data_inject_err_probability
> 0);
2533 data_sync_module
= sync_env
->sync_module
->get_data_handler();
2535 zones_trace
= _zones_trace
;
2536 zones_trace
.insert(sync_env
->store
->get_zone().id
);
2539 int operate() override
{
2541 /* skip entries that are not complete */
2542 if (op_state
!= CLS_RGW_STATE_COMPLETE
) {
2547 marker_tracker
->reset_need_retry(key
);
2548 if (key
.name
.empty()) {
2549 /* shouldn't happen */
2550 set_status("skipping empty entry");
2551 ldout(sync_env
->cct
, 0) << "ERROR: " << __func__
<< "(): entry with empty obj name, skipping" << dendl
;
2554 if (error_injection
&&
2555 rand() % 10000 < cct
->_conf
->rgw_sync_data_inject_err_probability
* 10000.0) {
2556 ldout(sync_env
->cct
, 0) << __func__
<< ": injecting data sync error on key=" << key
.name
<< dendl
;
2558 } else if (op
== CLS_RGW_OP_ADD
||
2559 op
== CLS_RGW_OP_LINK_OLH
) {
2560 set_status("syncing obj");
2561 ldout(sync_env
->cct
, 5) << "bucket sync: sync obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]" << dendl
;
2562 logger
.log("fetch");
2563 call(data_sync_module
->sync_object(sync_env
, *bucket_info
, key
, versioned_epoch
, &zones_trace
));
2564 } else if (op
== CLS_RGW_OP_DEL
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2565 set_status("removing obj");
2566 if (op
== CLS_RGW_OP_UNLINK_INSTANCE
) {
2569 logger
.log("remove");
2570 call(data_sync_module
->remove_object(sync_env
, *bucket_info
, key
, timestamp
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
2571 // our copy of the object is more recent, continue as if it succeeded
2572 if (retcode
== -ERR_PRECONDITION_FAILED
) {
2575 } else if (op
== CLS_RGW_OP_LINK_OLH_DM
) {
2576 logger
.log("creating delete marker");
2577 set_status("creating delete marker");
2578 ldout(sync_env
->cct
, 10) << "creating delete marker: obj: " << sync_env
->source_zone
<< "/" << bucket_info
->bucket
<< "/" << key
<< "[" << versioned_epoch
.value_or(0) << "]" << dendl
;
2579 call(data_sync_module
->create_delete_marker(sync_env
, *bucket_info
, key
, timestamp
, owner
, versioned
, versioned_epoch
.value_or(0), &zones_trace
));
2582 } while (marker_tracker
->need_retry(key
));
2588 ss
<< "done, retcode=" << retcode
;
2590 logger
.log(ss
.str());
2593 if (retcode
< 0 && retcode
!= -ENOENT
) {
2594 set_status() << "failed to sync obj; retcode=" << retcode
;
2595 ldout(sync_env
->cct
, 0) << "ERROR: failed to sync object: "
2596 << bucket_shard_str
{bs
} << "/" << key
.name
<< dendl
;
2597 error_ss
<< bucket_shard_str
{bs
} << "/" << key
.name
;
2598 sync_status
= retcode
;
2600 if (!error_ss
.str().empty()) {
2601 yield
call(sync_env
->error_logger
->log_error_cr(sync_env
->conn
->get_remote_id(), "data", error_ss
.str(), -retcode
, string("failed to sync object") + cpp_strerror(-sync_status
)));
2604 if (sync_status
== 0) {
2606 set_status() << "calling marker_tracker->finish(" << entry_marker
<< ")";
2607 yield
call(marker_tracker
->finish(entry_marker
));
2608 sync_status
= retcode
;
2610 if (sync_status
< 0) {
2611 return set_cr_error(sync_status
);
2613 return set_cr_done();
2619 #define BUCKET_SYNC_SPAWN_WINDOW 20
2621 class RGWBucketShardFullSyncCR
: public RGWCoroutine
{
2622 RGWDataSyncEnv
*sync_env
;
2623 const rgw_bucket_shard
& bs
;
2624 RGWBucketInfo
*bucket_info
;
2625 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2626 bucket_list_result list_result
;
2627 list
<bucket_list_entry
>::iterator entries_iter
;
2628 rgw_bucket_shard_sync_info
& sync_info
;
2629 RGWBucketFullSyncShardMarkerTrack marker_tracker
;
2630 rgw_obj_key list_marker
;
2631 bucket_list_entry
*entry
{nullptr};
2633 int total_entries
{0};
2637 const string
& status_oid
;
2639 RGWDataSyncDebugLogger logger
;
2640 rgw_zone_set zones_trace
;
2642 RGWBucketShardFullSyncCR(RGWDataSyncEnv
*_sync_env
, const rgw_bucket_shard
& bs
,
2643 RGWBucketInfo
*_bucket_info
,
2644 const std::string
& status_oid
,
2645 RGWContinuousLeaseCR
*lease_cr
,
2646 rgw_bucket_shard_sync_info
& sync_info
)
2647 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2648 bucket_info(_bucket_info
), lease_cr(lease_cr
), sync_info(sync_info
),
2649 marker_tracker(sync_env
, status_oid
, sync_info
.full_marker
),
2650 status_oid(status_oid
) {
2651 logger
.init(sync_env
, "BucketFull", bs
.get_key());
2652 zones_trace
.insert(sync_env
->source_zone
);
2655 int operate() override
;
2658 int RGWBucketShardFullSyncCR::operate()
2662 list_marker
= sync_info
.full_marker
.position
;
2664 total_entries
= sync_info
.full_marker
.count
;
2666 if (!lease_cr
->is_locked()) {
2668 return set_cr_error(-ECANCELED
);
2670 set_status("listing remote bucket");
2671 ldout(sync_env
->cct
, 20) << __func__
<< "(): listing bucket for full sync" << dendl
;
2672 yield
call(new RGWListBucketShardCR(sync_env
, bs
, list_marker
,
2674 if (retcode
< 0 && retcode
!= -ENOENT
) {
2675 set_status("failed bucket listing, going down");
2677 return set_cr_error(retcode
);
2679 entries_iter
= list_result
.entries
.begin();
2680 for (; entries_iter
!= list_result
.entries
.end(); ++entries_iter
) {
2681 if (!lease_cr
->is_locked()) {
2683 return set_cr_error(-ECANCELED
);
2685 ldout(sync_env
->cct
, 20) << "[full sync] syncing object: "
2686 << bucket_shard_str
{bs
} << "/" << entries_iter
->key
<< dendl
;
2687 entry
= &(*entries_iter
);
2689 list_marker
= entries_iter
->key
;
2690 if (!marker_tracker
.start(entry
->key
, total_entries
, real_time())) {
2691 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << entry
->key
<< ". Duplicate entry?" << dendl
;
2693 using SyncCR
= RGWBucketSyncSingleEntryCR
<rgw_obj_key
, rgw_obj_key
>;
2694 yield
spawn(new SyncCR(sync_env
, bucket_info
, bs
, entry
->key
,
2695 false, /* versioned, only matters for object removal */
2696 entry
->versioned_epoch
, entry
->mtime
,
2697 entry
->owner
, entry
->get_modify_op(), CLS_RGW_STATE_COMPLETE
,
2698 entry
->key
, &marker_tracker
, zones_trace
),
2701 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
2702 yield
wait_for_child();
2705 again
= collect(&ret
, nullptr);
2707 ldout(sync_env
->cct
, 10) << "a sync operation returned error" << dendl
;
2709 /* we have reported this error */
2714 } while (list_result
.is_truncated
&& sync_status
== 0);
2715 set_status("done iterating over all objects");
2716 /* wait for all operations to complete */
2717 while (num_spawned()) {
2718 yield
wait_for_child();
2721 again
= collect(&ret
, nullptr);
2723 ldout(sync_env
->cct
, 10) << "a sync operation returned error" << dendl
;
2725 /* we have reported this error */
2729 if (!lease_cr
->is_locked()) {
2730 return set_cr_error(-ECANCELED
);
2732 /* update sync state to incremental */
2733 if (sync_status
== 0) {
2735 sync_info
.state
= rgw_bucket_shard_sync_info::StateIncrementalSync
;
2736 map
<string
, bufferlist
> attrs
;
2737 sync_info
.encode_state_attr(attrs
);
2738 RGWRados
*store
= sync_env
->store
;
2739 call(new RGWSimpleRadosWriteAttrsCR(sync_env
->async_rados
, store
,
2740 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
2744 ldout(sync_env
->cct
, 10) << "failure in sync, backing out (sync_status=" << sync_status
<< ")" << dendl
;
2746 if (retcode
< 0 && sync_status
== 0) { /* actually tried to set incremental state and failed */
2747 ldout(sync_env
->cct
, 0) << "ERROR: failed to set sync state on bucket "
2748 << bucket_shard_str
{bs
} << " retcode=" << retcode
<< dendl
;
2749 return set_cr_error(retcode
);
2751 if (sync_status
< 0) {
2752 return set_cr_error(sync_status
);
2754 return set_cr_done();
2759 static bool has_olh_epoch(RGWModifyOp op
) {
2760 return op
== CLS_RGW_OP_LINK_OLH
|| op
== CLS_RGW_OP_UNLINK_INSTANCE
;
2763 class RGWBucketShardIncrementalSyncCR
: public RGWCoroutine
{
2764 RGWDataSyncEnv
*sync_env
;
2765 const rgw_bucket_shard
& bs
;
2766 RGWBucketInfo
*bucket_info
;
2767 boost::intrusive_ptr
<RGWContinuousLeaseCR
> lease_cr
;
2768 list
<rgw_bi_log_entry
> list_result
;
2769 list
<rgw_bi_log_entry
>::iterator entries_iter
, entries_end
;
2770 map
<pair
<string
, string
>, pair
<real_time
, RGWModifyOp
> > squash_map
;
2771 rgw_bucket_shard_sync_info
& sync_info
;
2773 rgw_bi_log_entry
*entry
{nullptr};
2774 RGWBucketIncSyncShardMarkerTrack marker_tracker
;
2775 bool updated_status
{false};
2776 const string
& status_oid
;
2777 const string
& zone_id
;
2781 RGWDataSyncDebugLogger logger
;
2784 bool syncstopped
{false};
2787 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv
*_sync_env
,
2788 const rgw_bucket_shard
& bs
,
2789 RGWBucketInfo
*_bucket_info
,
2790 const std::string
& status_oid
,
2791 RGWContinuousLeaseCR
*lease_cr
,
2792 rgw_bucket_shard_sync_info
& sync_info
)
2793 : RGWCoroutine(_sync_env
->cct
), sync_env(_sync_env
), bs(bs
),
2794 bucket_info(_bucket_info
), lease_cr(lease_cr
), sync_info(sync_info
),
2795 marker_tracker(sync_env
, status_oid
, sync_info
.inc_marker
),
2796 status_oid(status_oid
), zone_id(_sync_env
->store
->get_zone().id
)
2798 set_description() << "bucket shard incremental sync bucket="
2799 << bucket_shard_str
{bs
};
2801 logger
.init(sync_env
, "BucketInc", bs
.get_key());
2804 int operate() override
;
2807 int RGWBucketShardIncrementalSyncCR::operate()
2812 if (!lease_cr
->is_locked()) {
2814 return set_cr_error(-ECANCELED
);
2816 ldout(sync_env
->cct
, 20) << __func__
<< "(): listing bilog for incremental sync" << sync_info
.inc_marker
.position
<< dendl
;
2817 set_status() << "listing bilog; position=" << sync_info
.inc_marker
.position
;
2818 yield
call(new RGWListBucketIndexLogCR(sync_env
, bs
, sync_info
.inc_marker
.position
,
2820 if (retcode
< 0 && retcode
!= -ENOENT
) {
2821 /* wait for all operations to complete */
2823 return set_cr_error(retcode
);
2826 entries_iter
= list_result
.begin();
2827 entries_end
= list_result
.end();
2828 for (; entries_iter
!= entries_end
; ++entries_iter
) {
2829 auto e
= *entries_iter
;
2830 if (e
.op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
) {
2831 ldout(sync_env
->cct
, 20) << "syncstop on " << e
.timestamp
<< dendl
;
2833 entries_end
= entries_iter
; // dont sync past here
2836 if (e
.op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
2839 if (e
.op
== CLS_RGW_OP_CANCEL
) {
2842 if (e
.state
!= CLS_RGW_STATE_COMPLETE
) {
2845 if (e
.zones_trace
.find(zone_id
) != e
.zones_trace
.end()) {
2848 auto& squash_entry
= squash_map
[make_pair(e
.object
, e
.instance
)];
2849 // don't squash over olh entries - we need to apply their olh_epoch
2850 if (has_olh_epoch(squash_entry
.second
) && !has_olh_epoch(e
.op
)) {
2853 if (squash_entry
.first
<= e
.timestamp
) {
2854 squash_entry
= make_pair
<>(e
.timestamp
, e
.op
);
2858 entries_iter
= list_result
.begin();
2859 for (; entries_iter
!= entries_end
; ++entries_iter
) {
2860 if (!lease_cr
->is_locked()) {
2862 return set_cr_error(-ECANCELED
);
2864 entry
= &(*entries_iter
);
2866 ssize_t p
= entry
->id
.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2870 cur_id
= entry
->id
.substr(p
+ 1);
2873 sync_info
.inc_marker
.position
= cur_id
;
2875 if (entry
->op
== RGWModifyOp::CLS_RGW_OP_SYNCSTOP
|| entry
->op
== RGWModifyOp::CLS_RGW_OP_RESYNC
) {
2876 ldout(sync_env
->cct
, 20) << "detected syncstop or resync on " << entries_iter
->timestamp
<< ", skipping entry" << dendl
;
2877 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2881 if (!key
.set(rgw_obj_index_key
{entry
->object
, entry
->instance
})) {
2882 set_status() << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry";
2883 ldout(sync_env
->cct
, 20) << "parse_raw_oid() on " << entry
->object
<< " returned false, skipping entry" << dendl
;
2884 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2888 ldout(sync_env
->cct
, 20) << "parsed entry: id=" << cur_id
<< " iter->object=" << entry
->object
<< " iter->instance=" << entry
->instance
<< " name=" << key
.name
<< " instance=" << key
.instance
<< " ns=" << key
.ns
<< dendl
;
2890 if (!key
.ns
.empty()) {
2891 set_status() << "skipping entry in namespace: " << entry
->object
;
2892 ldout(sync_env
->cct
, 20) << "skipping entry in namespace: " << entry
->object
<< dendl
;
2893 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2897 set_status() << "got entry.id=" << cur_id
<< " key=" << key
<< " op=" << (int)entry
->op
;
2898 if (entry
->op
== CLS_RGW_OP_CANCEL
) {
2899 set_status() << "canceled operation, skipping";
2900 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2901 << bucket_shard_str
{bs
} << "/" << key
<< ": canceled operation" << dendl
;
2902 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2905 if (entry
->state
!= CLS_RGW_STATE_COMPLETE
) {
2906 set_status() << "non-complete operation, skipping";
2907 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2908 << bucket_shard_str
{bs
} << "/" << key
<< ": non-complete operation" << dendl
;
2909 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2912 if (entry
->zones_trace
.find(zone_id
) != entry
->zones_trace
.end()) {
2913 set_status() << "redundant operation, skipping";
2914 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2915 <<bucket_shard_str
{bs
} <<"/"<<key
<<": redundant operation" << dendl
;
2916 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2919 if (make_pair
<>(entry
->timestamp
, entry
->op
) != squash_map
[make_pair(entry
->object
, entry
->instance
)]) {
2920 set_status() << "squashed operation, skipping";
2921 ldout(sync_env
->cct
, 20) << "[inc sync] skipping object: "
2922 << bucket_shard_str
{bs
} << "/" << key
<< ": squashed operation" << dendl
;
2923 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2926 ldout(sync_env
->cct
, 20) << "[inc sync] syncing object: "
2927 << bucket_shard_str
{bs
} << "/" << key
<< dendl
;
2928 updated_status
= false;
2929 while (!marker_tracker
.can_do_op(key
, has_olh_epoch(entry
->op
))) {
2930 if (!updated_status
) {
2931 set_status() << "can't do op, conflicting inflight operation";
2932 updated_status
= true;
2934 ldout(sync_env
->cct
, 5) << *this << ": [inc sync] can't do op on key=" << key
<< " need to wait for conflicting operation to complete" << dendl
;
2935 yield
wait_for_child();
2938 again
= collect(&ret
, nullptr);
2940 ldout(sync_env
->cct
, 0) << "ERROR: a child operation returned error (ret=" << ret
<< ")" << dendl
;
2942 /* we have reported this error */
2945 if (sync_status
!= 0)
2948 if (sync_status
!= 0) {
2949 /* get error, stop */
2952 if (!marker_tracker
.index_key_to_marker(key
, cur_id
, has_olh_epoch(entry
->op
))) {
2953 set_status() << "can't do op, sync already in progress for object";
2954 ldout(sync_env
->cct
, 20) << __func__
<< ": skipping sync of entry: " << cur_id
<< ":" << key
<< " sync already in progress for object" << dendl
;
2955 marker_tracker
.try_update_high_marker(cur_id
, 0, entry
->timestamp
);
2959 set_status() << "start object sync";
2960 if (!marker_tracker
.start(cur_id
, 0, entry
->timestamp
)) {
2961 ldout(sync_env
->cct
, 0) << "ERROR: cannot start syncing " << cur_id
<< ". Duplicate entry?" << dendl
;
2963 boost::optional
<uint64_t> versioned_epoch
;
2964 rgw_bucket_entry_owner
owner(entry
->owner
, entry
->owner_display_name
);
2965 if (entry
->ver
.pool
< 0) {
2966 versioned_epoch
= entry
->ver
.epoch
;
2968 ldout(sync_env
->cct
, 20) << __func__
<< "(): entry->timestamp=" << entry
->timestamp
<< dendl
;
2969 using SyncCR
= RGWBucketSyncSingleEntryCR
<string
, rgw_obj_key
>;
2970 spawn(new SyncCR(sync_env
, bucket_info
, bs
, key
,
2971 entry
->is_versioned(), versioned_epoch
,
2972 entry
->timestamp
, owner
, entry
->op
, entry
->state
,
2973 cur_id
, &marker_tracker
, entry
->zones_trace
),
2977 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW
) {
2978 set_status() << "num_spawned() > spawn_window";
2979 yield
wait_for_child();
2982 again
= collect(&ret
, nullptr);
2984 ldout(sync_env
->cct
, 10) << "a sync operation returned error" << dendl
;
2986 /* we have reported this error */
2988 /* not waiting for child here */
2992 } while (!list_result
.empty() && sync_status
== 0 && !syncstopped
);
2994 while (num_spawned()) {
2995 yield
wait_for_child();
2998 again
= collect(&ret
, nullptr);
3000 ldout(sync_env
->cct
, 10) << "a sync operation returned error" << dendl
;
3002 /* we have reported this error */
3004 /* not waiting for child here */
3009 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3010 // still disabled, we'll delete the sync status object. otherwise we'll
3011 // restart full sync to catch any changes that happened while sync was
3013 sync_info
.state
= rgw_bucket_shard_sync_info::StateInit
;
3014 return set_cr_done();
3017 yield
call(marker_tracker
.flush());
3019 ldout(sync_env
->cct
, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode
<< dendl
;
3020 return set_cr_error(retcode
);
3022 if (sync_status
< 0) {
3023 ldout(sync_env
->cct
, 10) << "failure in sync, backing out (sync_status=" << sync_status
<< ")" << dendl
;
3024 return set_cr_error(sync_status
);
3026 return set_cr_done();
3031 int RGWRunBucketSyncCoroutine::operate()
3035 set_status("acquiring sync lock");
3036 auto store
= sync_env
->store
;
3037 lease_cr
.reset(new RGWContinuousLeaseCR(sync_env
->async_rados
, store
,
3038 rgw_raw_obj(store
->get_zone_params().log_pool
, status_oid
),
3040 cct
->_conf
->rgw_sync_lease_period
,
3042 lease_stack
.reset(spawn(lease_cr
.get(), false));
3044 while (!lease_cr
->is_locked()) {
3045 if (lease_cr
->is_done()) {
3046 ldout(cct
, 5) << "lease cr failed, done early" << dendl
;
3047 set_status("lease lock failed, early abort");
3049 return set_cr_error(lease_cr
->get_ret_status());
3055 yield
call(new RGWReadBucketSyncStatusCoroutine(sync_env
, bs
, &sync_status
));
3056 if (retcode
< 0 && retcode
!= -ENOENT
) {
3057 ldout(sync_env
->cct
, 0) << "ERROR: failed to read sync status for bucket="
3058 << bucket_shard_str
{bs
} << dendl
;
3059 lease_cr
->go_down();
3061 return set_cr_error(retcode
);
3064 ldout(sync_env
->cct
, 20) << __func__
<< "(): sync status for bucket "
3065 << bucket_shard_str
{bs
} << ": " << sync_status
.state
<< dendl
;
3067 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
3068 if (retcode
== -ENOENT
) {
3069 /* bucket instance info has not been synced in yet, fetch it now */
3071 ldout(sync_env
->cct
, 10) << "no local info for bucket "
3072 << bucket_str
{bs
.bucket
} << ": fetching metadata" << dendl
;
3073 string raw_key
= string("bucket.instance:") + bs
.bucket
.get_key();
3075 meta_sync_env
.init(cct
, sync_env
->store
, sync_env
->store
->rest_master_conn
, sync_env
->async_rados
, sync_env
->http_manager
, sync_env
->error_logger
);
3077 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env
, raw_key
,
3078 string() /* no marker */,
3079 MDLOG_STATUS_COMPLETE
,
3080 NULL
/* no marker tracker */));
3083 ldout(sync_env
->cct
, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str
{bs
.bucket
} << dendl
;
3084 lease_cr
->go_down();
3086 return set_cr_error(retcode
);
3089 yield
call(new RGWGetBucketInstanceInfoCR(sync_env
->async_rados
, sync_env
->store
, bs
.bucket
, &bucket_info
));
3092 ldout(sync_env
->cct
, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str
{bs
.bucket
} << dendl
;
3093 lease_cr
->go_down();
3095 return set_cr_error(retcode
);
3099 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateInit
) {
3100 yield
call(new RGWInitBucketShardSyncStatusCoroutine(sync_env
, bs
, sync_status
));
3101 if (retcode
== -ENOENT
) {
3102 ldout(sync_env
->cct
, 0) << "bucket sync disabled" << dendl
;
3103 lease_cr
->abort(); // deleted lease object, abort/wakeup instead of unlock
3107 return set_cr_done();
3110 ldout(sync_env
->cct
, 0) << "ERROR: init sync on " << bucket_shard_str
{bs
}
3111 << " failed, retcode=" << retcode
<< dendl
;
3112 lease_cr
->go_down();
3114 return set_cr_error(retcode
);
3118 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateFullSync
) {
3119 yield
call(new RGWBucketShardFullSyncCR(sync_env
, bs
, &bucket_info
,
3120 status_oid
, lease_cr
.get(),
3123 ldout(sync_env
->cct
, 5) << "full sync on " << bucket_shard_str
{bs
}
3124 << " failed, retcode=" << retcode
<< dendl
;
3125 lease_cr
->go_down();
3127 return set_cr_error(retcode
);
3131 if (sync_status
.state
== rgw_bucket_shard_sync_info::StateIncrementalSync
) {
3132 yield
call(new RGWBucketShardIncrementalSyncCR(sync_env
, bs
, &bucket_info
,
3133 status_oid
, lease_cr
.get(),
3136 ldout(sync_env
->cct
, 5) << "incremental sync on " << bucket_shard_str
{bs
}
3137 << " failed, retcode=" << retcode
<< dendl
;
3138 lease_cr
->go_down();
3140 return set_cr_error(retcode
);
3143 // loop back to previous states unless incremental sync returns normally
3144 } while (sync_status
.state
!= rgw_bucket_shard_sync_info::StateIncrementalSync
);
3146 lease_cr
->go_down();
3148 return set_cr_done();
3154 RGWCoroutine
*RGWRemoteBucketLog::run_sync_cr()
3156 return new RGWRunBucketSyncCoroutine(&sync_env
, bs
);
3159 int RGWBucketSyncStatusManager::init()
3161 conn
= store
->get_zone_conn_by_id(source_zone
);
3163 ldout(store
->ctx(), 0) << "connection object to zone " << source_zone
<< " does not exist" << dendl
;
3167 int ret
= http_manager
.set_threaded();
3169 ldout(store
->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret
<< dendl
;
3174 const string key
= bucket
.get_key();
3176 rgw_http_param_pair pairs
[] = { { "key", key
.c_str() },
3179 string path
= string("/admin/metadata/bucket.instance");
3181 bucket_instance_meta_info result
;
3182 ret
= cr_mgr
.run(new RGWReadRESTResourceCR
<bucket_instance_meta_info
>(store
->ctx(), conn
, &http_manager
, path
, pairs
, &result
));
3184 ldout(store
->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone
<< " path=" << path
<< " key=" << key
<< " ret=" << ret
<< dendl
;
3188 RGWBucketInfo
& bi
= result
.data
.get_bucket_info();
3189 num_shards
= bi
.num_shards
;
3191 error_logger
= new RGWSyncErrorLogger(store
, RGW_SYNC_ERROR_LOG_SHARD_PREFIX
, ERROR_LOGGER_SHARDS
);
3193 sync_module
.reset(new RGWDefaultSyncModuleInstance());
3195 int effective_num_shards
= (num_shards
? num_shards
: 1);
3197 auto async_rados
= store
->get_async_rados();
3199 for (int i
= 0; i
< effective_num_shards
; i
++) {
3200 RGWRemoteBucketLog
*l
= new RGWRemoteBucketLog(store
, this, async_rados
, &http_manager
);
3201 ret
= l
->init(source_zone
, conn
, bucket
, (num_shards
? i
: -1), error_logger
, sync_module
);
3203 ldout(store
->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl
;
3212 int RGWBucketSyncStatusManager::init_sync_status()
3214 list
<RGWCoroutinesStack
*> stacks
;
3216 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3217 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3218 RGWRemoteBucketLog
*l
= iter
->second
;
3219 stack
->call(l
->init_sync_status_cr());
3221 stacks
.push_back(stack
);
3224 return cr_mgr
.run(stacks
);
3227 int RGWBucketSyncStatusManager::read_sync_status()
3229 list
<RGWCoroutinesStack
*> stacks
;
3231 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3232 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3233 RGWRemoteBucketLog
*l
= iter
->second
;
3234 stack
->call(l
->read_sync_status_cr(&sync_status
[iter
->first
]));
3236 stacks
.push_back(stack
);
3239 int ret
= cr_mgr
.run(stacks
);
3241 ldout(store
->ctx(), 0) << "ERROR: failed to read sync status for "
3242 << bucket_str
{bucket
} << dendl
;
3249 int RGWBucketSyncStatusManager::run()
3251 list
<RGWCoroutinesStack
*> stacks
;
3253 for (map
<int, RGWRemoteBucketLog
*>::iterator iter
= source_logs
.begin(); iter
!= source_logs
.end(); ++iter
) {
3254 RGWCoroutinesStack
*stack
= new RGWCoroutinesStack(store
->ctx(), &cr_mgr
);
3255 RGWRemoteBucketLog
*l
= iter
->second
;
3256 stack
->call(l
->run_sync_cr());
3258 stacks
.push_back(stack
);
3261 int ret
= cr_mgr
.run(stacks
);
3263 ldout(store
->ctx(), 0) << "ERROR: failed to read sync status for "
3264 << bucket_str
{bucket
} << dendl
;
3271 string
RGWBucketSyncStatusManager::status_oid(const string
& source_zone
,
3272 const rgw_bucket_shard
& bs
)
3274 return bucket_status_oid_prefix
+ "." + source_zone
+ ":" + bs
.get_key();
3277 class RGWCollectBucketSyncStatusCR
: public RGWShardCollectCR
{
3278 static constexpr int max_concurrent_shards
= 16;
3279 RGWRados
*const store
;
3280 RGWDataSyncEnv
*const env
;
3281 const int num_shards
;
3282 rgw_bucket_shard bs
;
3284 using Vector
= std::vector
<rgw_bucket_shard_sync_info
>;
3285 Vector::iterator i
, end
;
3288 RGWCollectBucketSyncStatusCR(RGWRados
*store
, RGWDataSyncEnv
*env
,
3289 int num_shards
, const rgw_bucket
& bucket
,
3291 : RGWShardCollectCR(store
->ctx(), max_concurrent_shards
),
3292 store(store
), env(env
), num_shards(num_shards
),
3293 bs(bucket
, num_shards
> 0 ? 0 : -1), // start at shard 0 or -1
3294 i(status
->begin()), end(status
->end())
3297 bool spawn_next() override
{
3301 spawn(new RGWReadBucketSyncStatusCoroutine(env
, bs
, &*i
), false);
3308 int rgw_bucket_sync_status(RGWRados
*store
, const std::string
& source_zone
,
3309 const RGWBucketInfo
& bucket_info
,
3310 std::vector
<rgw_bucket_shard_sync_info
> *status
)
3312 const auto num_shards
= bucket_info
.num_shards
;
3314 status
->resize(std::max
<size_t>(1, num_shards
));
3317 RGWSyncModuleInstanceRef module
; // null sync module
3318 env
.init(store
->ctx(), store
, nullptr, store
->get_async_rados(),
3319 nullptr, nullptr, source_zone
, module
);
3321 RGWCoroutinesManager
crs(store
->ctx(), store
->get_cr_registry());
3322 return crs
.run(new RGWCollectBucketSyncStatusCR(store
, &env
, num_shards
,
3323 bucket_info
.bucket
, status
));
3327 // TODO: move into rgw_data_sync_trim.cc
3329 #define dout_prefix (*_dout << "data trim: ")
3333 /// return the marker that it's safe to trim up to
3334 const std::string
& get_stable_marker(const rgw_data_sync_marker
& m
)
3336 return m
.state
== m
.FullSync
? m
.next_step_marker
: m
.marker
;
3339 /// comparison operator for take_min_markers()
3340 bool operator<(const rgw_data_sync_marker
& lhs
,
3341 const rgw_data_sync_marker
& rhs
)
3343 // sort by stable marker
3344 return get_stable_marker(lhs
) < get_stable_marker(rhs
);
3347 /// populate the container starting with 'dest' with the minimum stable marker
3348 /// of each shard for all of the peers in [first, last)
3349 template <typename IterIn
, typename IterOut
>
3350 void take_min_markers(IterIn first
, IterIn last
, IterOut dest
)
3352 if (first
== last
) {
3355 // initialize markers with the first peer's
3357 for (auto &shard
: first
->sync_markers
) {
3358 *m
= std::move(shard
.second
);
3361 // for remaining peers, replace with smaller markers
3362 for (auto p
= first
+ 1; p
!= last
; ++p
) {
3364 for (auto &shard
: p
->sync_markers
) {
3365 if (shard
.second
< *m
) {
3366 *m
= std::move(shard
.second
);
3373 } // anonymous namespace
3375 class DataLogTrimCR
: public RGWCoroutine
{
3377 RGWHTTPManager
*http
;
3378 const int num_shards
;
3379 const std::string
& zone_id
; //< my zone id
3380 std::vector
<rgw_data_sync_status
> peer_status
; //< sync status for each peer
3381 std::vector
<rgw_data_sync_marker
> min_shard_markers
; //< min marker per shard
3382 std::vector
<std::string
>& last_trim
; //< last trimmed marker per shard
3386 DataLogTrimCR(RGWRados
*store
, RGWHTTPManager
*http
,
3387 int num_shards
, std::vector
<std::string
>& last_trim
)
3388 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3389 num_shards(num_shards
),
3390 zone_id(store
->get_zone().id
),
3391 peer_status(store
->zone_conn_map
.size()),
3392 min_shard_markers(num_shards
),
3393 last_trim(last_trim
)
3396 int operate() override
;
3399 int DataLogTrimCR::operate()
3402 ldout(cct
, 10) << "fetching sync status for zone " << zone_id
<< dendl
;
3403 set_status("fetching sync status");
3405 // query data sync status from each sync peer
3406 rgw_http_param_pair params
[] = {
3408 { "status", nullptr },
3409 { "source-zone", zone_id
.c_str() },
3410 { nullptr, nullptr }
3413 auto p
= peer_status
.begin();
3414 for (auto& c
: store
->zone_conn_map
) {
3415 ldout(cct
, 20) << "query sync status from " << c
.first
<< dendl
;
3416 using StatusCR
= RGWReadRESTResourceCR
<rgw_data_sync_status
>;
3417 spawn(new StatusCR(cct
, c
.second
, http
, "/admin/log/", params
, &*p
),
3423 // must get a successful reply from all peers to consider trimming
3425 while (ret
== 0 && num_spawned() > 0) {
3426 yield
wait_for_child();
3432 ldout(cct
, 4) << "failed to fetch sync status from all peers" << dendl
;
3433 return set_cr_error(ret
);
3436 ldout(cct
, 10) << "trimming log shards" << dendl
;
3437 set_status("trimming log shards");
3439 // determine the minimum marker for each shard
3440 take_min_markers(peer_status
.begin(), peer_status
.end(),
3441 min_shard_markers
.begin());
3443 for (int i
= 0; i
< num_shards
; i
++) {
3444 const auto& m
= min_shard_markers
[i
];
3445 auto& stable
= get_stable_marker(m
);
3446 if (stable
<= last_trim
[i
]) {
3449 ldout(cct
, 10) << "trimming log shard " << i
3450 << " at marker=" << stable
3451 << " last_trim=" << last_trim
[i
] << dendl
;
3452 using TrimCR
= RGWSyncLogTrimCR
;
3453 spawn(new TrimCR(store
, store
->data_log
->get_oid(i
),
3454 stable
, &last_trim
[i
]),
3458 return set_cr_done();
3463 RGWCoroutine
* create_admin_data_log_trim_cr(RGWRados
*store
,
3464 RGWHTTPManager
*http
,
3466 std::vector
<std::string
>& markers
)
3468 return new DataLogTrimCR(store
, http
, num_shards
, markers
);
3471 class DataLogTrimPollCR
: public RGWCoroutine
{
3473 RGWHTTPManager
*http
;
3474 const int num_shards
;
3475 const utime_t interval
; //< polling interval
3476 const std::string lock_oid
; //< use first data log shard for lock
3477 const std::string lock_cookie
;
3478 std::vector
<std::string
> last_trim
; //< last trimmed marker per shard
3481 DataLogTrimPollCR(RGWRados
*store
, RGWHTTPManager
*http
,
3482 int num_shards
, utime_t interval
)
3483 : RGWCoroutine(store
->ctx()), store(store
), http(http
),
3484 num_shards(num_shards
), interval(interval
),
3485 lock_oid(store
->data_log
->get_oid(0)),
3486 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
3487 last_trim(num_shards
)
3490 int operate() override
;
3493 int DataLogTrimPollCR::operate()
3497 set_status("sleeping");
3500 // request a 'data_trim' lock that covers the entire wait interval to
3501 // prevent other gateways from attempting to trim for the duration
3502 set_status("acquiring trim lock");
3503 yield
call(new RGWSimpleRadosLockCR(store
->get_async_rados(), store
,
3504 rgw_raw_obj(store
->get_zone_params().log_pool
, lock_oid
),
3505 "data_trim", lock_cookie
,
3508 // if the lock is already held, go back to sleep and try again later
3509 ldout(cct
, 4) << "failed to lock " << lock_oid
<< ", trying again in "
3510 << interval
.sec() << "s" << dendl
;
3514 set_status("trimming");
3515 yield
call(new DataLogTrimCR(store
, http
, num_shards
, last_trim
));
3517 // note that the lock is not released. this is intentional, as it avoids
3518 // duplicating this work in other gateways
3524 RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
3525 RGWHTTPManager
*http
,
3526 int num_shards
, utime_t interval
)
3528 return new DataLogTrimPollCR(store
, http
, num_shards
, interval
);