1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
8 #include "rgw_sync_module.h"
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
13 struct rgw_datalog_info
{
16 rgw_datalog_info() : num_shards(0) {}
18 void decode_json(JSONObj
*obj
);
21 struct rgw_data_sync_info
{
24 StateBuildingFullSyncMaps
= 1,
31 uint64_t instance_id
{0};
33 void encode(bufferlist
& bl
) const {
34 ENCODE_START(2, 1, bl
);
36 ::encode(num_shards
, bl
);
37 ::encode(instance_id
, bl
);
41 void decode(bufferlist::iterator
& bl
) {
44 ::decode(num_shards
, bl
);
46 ::decode(instance_id
, bl
);
51 void dump(Formatter
*f
) const {
53 switch ((SyncState
)state
) {
57 case StateBuildingFullSyncMaps
:
58 s
= "building-full-sync-maps";
67 encode_json("status", s
, f
);
68 encode_json("num_shards", num_shards
, f
);
69 encode_json("instance_id", instance_id
, f
);
71 void decode_json(JSONObj
*obj
) {
73 JSONDecoder::decode_json("status", s
, obj
);
74 if (s
== "building-full-sync-maps") {
75 state
= StateBuildingFullSyncMaps
;
76 } else if (s
== "sync") {
81 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
82 JSONDecoder::decode_json("instance_id", num_shards
, obj
);
84 static void generate_test_instances(std::list
<rgw_data_sync_info
*>& o
);
86 rgw_data_sync_info() : state((int)StateInit
), num_shards(0) {}
88 WRITE_CLASS_ENCODER(rgw_data_sync_info
)
90 struct rgw_data_sync_marker
{
97 string next_step_marker
;
98 uint64_t total_entries
;
102 rgw_data_sync_marker() : state(FullSync
), total_entries(0), pos(0) {}
104 void encode(bufferlist
& bl
) const {
105 ENCODE_START(1, 1, bl
);
107 ::encode(marker
, bl
);
108 ::encode(next_step_marker
, bl
);
109 ::encode(total_entries
, bl
);
111 ::encode(timestamp
, bl
);
115 void decode(bufferlist::iterator
& bl
) {
118 ::decode(marker
, bl
);
119 ::decode(next_step_marker
, bl
);
120 ::decode(total_entries
, bl
);
122 ::decode(timestamp
, bl
);
126 void dump(Formatter
*f
) const {
127 const char *s
{nullptr};
128 switch ((SyncState
)state
) {
132 case IncrementalSync
:
133 s
= "incremental-sync";
139 encode_json("status", s
, f
);
140 encode_json("marker", marker
, f
);
141 encode_json("next_step_marker", next_step_marker
, f
);
142 encode_json("total_entries", total_entries
, f
);
143 encode_json("pos", pos
, f
);
144 encode_json("timestamp", utime_t(timestamp
), f
);
146 void decode_json(JSONObj
*obj
) {
148 JSONDecoder::decode_json("status", s
, obj
);
149 if (s
== "full-sync") {
151 } else if (s
== "incremental-sync") {
152 state
= IncrementalSync
;
154 JSONDecoder::decode_json("marker", marker
, obj
);
155 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
156 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
157 JSONDecoder::decode_json("pos", pos
, obj
);
159 JSONDecoder::decode_json("timestamp", t
, obj
);
160 timestamp
= t
.to_real_time();
162 static void generate_test_instances(std::list
<rgw_data_sync_marker
*>& o
);
164 WRITE_CLASS_ENCODER(rgw_data_sync_marker
)
166 struct rgw_data_sync_status
{
167 rgw_data_sync_info sync_info
;
168 map
<uint32_t, rgw_data_sync_marker
> sync_markers
;
170 rgw_data_sync_status() {}
172 void encode(bufferlist
& bl
) const {
173 ENCODE_START(1, 1, bl
);
174 ::encode(sync_info
, bl
);
175 /* sync markers are encoded separately */
179 void decode(bufferlist::iterator
& bl
) {
181 ::decode(sync_info
, bl
);
182 /* sync markers are decoded separately */
186 void dump(Formatter
*f
) const {
187 encode_json("info", sync_info
, f
);
188 encode_json("markers", sync_markers
, f
);
190 void decode_json(JSONObj
*obj
) {
191 JSONDecoder::decode_json("info", sync_info
, obj
);
192 JSONDecoder::decode_json("markers", sync_markers
, obj
);
194 static void generate_test_instances(std::list
<rgw_data_sync_status
*>& o
);
196 WRITE_CLASS_ENCODER(rgw_data_sync_status
)
198 struct rgw_datalog_entry
{
200 ceph::real_time timestamp
;
202 void decode_json(JSONObj
*obj
);
205 struct rgw_datalog_shard_data
{
208 vector
<rgw_datalog_entry
> entries
;
210 void decode_json(JSONObj
*obj
);
213 class RGWAsyncRadosProcessor
;
214 class RGWDataSyncControlCR
;
216 struct rgw_bucket_entry_owner
{
220 rgw_bucket_entry_owner() {}
221 rgw_bucket_entry_owner(const string
& _id
, const string
& _display_name
) : id(_id
), display_name(_display_name
) {}
223 void decode_json(JSONObj
*obj
);
226 class RGWSyncErrorLogger
;
228 struct RGWDataSyncEnv
{
232 RGWAsyncRadosProcessor
*async_rados
;
233 RGWHTTPManager
*http_manager
;
234 RGWSyncErrorLogger
*error_logger
;
236 RGWSyncModuleInstanceRef sync_module
;
238 RGWDataSyncEnv() : cct(NULL
), store(NULL
), conn(NULL
), async_rados(NULL
), http_manager(NULL
), error_logger(NULL
), sync_module(NULL
) {}
240 void init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
241 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
242 RGWSyncErrorLogger
*_error_logger
, const string
& _source_zone
,
243 RGWSyncModuleInstanceRef
& _sync_module
) {
247 async_rados
= _async_rados
;
248 http_manager
= _http_manager
;
249 error_logger
= _error_logger
;
250 source_zone
= _source_zone
;
251 sync_module
= _sync_module
;
254 string
shard_obj_name(int shard_id
);
258 class RGWRemoteDataLog
: public RGWCoroutinesManager
{
260 RGWAsyncRadosProcessor
*async_rados
;
261 RGWHTTPManager http_manager
;
263 RGWDataSyncEnv sync_env
;
266 RGWDataSyncControlCR
*data_sync_cr
;
271 RGWRemoteDataLog(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
)
272 : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()),
273 store(_store
), async_rados(async_rados
),
274 http_manager(store
->ctx(), completion_mgr
),
275 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL
),
276 initialized(false) {}
277 int init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& module
);
280 int read_log_info(rgw_datalog_info
*log_info
);
281 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
);
282 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
);
283 int read_sync_status(rgw_data_sync_status
*sync_status
);
284 int read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
);
285 int read_shard_status(int shard_id
, set
<string
>& lagging_buckets
,set
<string
>& recovering_buckets
, rgw_data_sync_marker
* sync_marker
, const int max_entries
);
286 int init_sync_status(int num_shards
);
287 int run_sync(int num_shards
);
289 void wakeup(int shard_id
, set
<string
>& keys
);
292 class RGWDataSyncStatusManager
{
298 RGWSyncErrorLogger
*error_logger
;
299 RGWSyncModuleInstanceRef sync_module
;
301 RGWRemoteDataLog source_log
;
303 string source_status_oid
;
304 string source_shard_status_oid_prefix
;
306 map
<int, rgw_raw_obj
> shard_objs
;
311 RGWDataSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
312 const string
& _source_zone
)
313 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
314 sync_module(nullptr),
315 source_log(store
, async_rados
), num_shards(0) {}
316 RGWDataSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
317 const string
& _source_zone
, const RGWSyncModuleInstanceRef
& _sync_module
)
318 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
319 sync_module(_sync_module
),
320 source_log(store
, async_rados
), num_shards(0) {}
321 ~RGWDataSyncStatusManager() {
327 static string
shard_obj_name(const string
& source_zone
, int shard_id
);
328 static string
sync_status_oid(const string
& source_zone
);
330 int read_sync_status(rgw_data_sync_status
*sync_status
) {
331 return source_log
.read_sync_status(sync_status
);
334 int read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
) {
335 return source_log
.read_recovering_shards(num_shards
, recovering_shards
);
338 int read_shard_status(int shard_id
, set
<string
>& lagging_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
) {
339 return source_log
.read_shard_status(shard_id
, lagging_buckets
, recovering_buckets
,sync_marker
, max_entries
);
341 int init_sync_status() { return source_log
.init_sync_status(num_shards
); }
343 int read_log_info(rgw_datalog_info
*log_info
) {
344 return source_log
.read_log_info(log_info
);
346 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
) {
347 return source_log
.read_source_log_shards_info(shards_info
);
349 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
) {
350 return source_log
.read_source_log_shards_next(shard_markers
, result
);
353 int run() { return source_log
.run_sync(num_shards
); }
355 void wakeup(int shard_id
, set
<string
>& keys
) { return source_log
.wakeup(shard_id
, keys
); }
361 class RGWBucketSyncStatusManager
;
362 class RGWBucketSyncCR
;
364 struct rgw_bucket_shard_full_sync_marker
{
365 rgw_obj_key position
;
368 rgw_bucket_shard_full_sync_marker() : count(0) {}
370 void encode_attr(map
<string
, bufferlist
>& attrs
);
372 void encode(bufferlist
& bl
) const {
373 ENCODE_START(1, 1, bl
);
374 ::encode(position
, bl
);
379 void decode(bufferlist::iterator
& bl
) {
381 ::decode(position
, bl
);
386 void dump(Formatter
*f
) const;
387 void decode_json(JSONObj
*obj
);
389 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker
)
391 struct rgw_bucket_shard_inc_sync_marker
{
394 rgw_bucket_shard_inc_sync_marker() {}
396 void encode_attr(map
<string
, bufferlist
>& attrs
);
398 void encode(bufferlist
& bl
) const {
399 ENCODE_START(1, 1, bl
);
400 ::encode(position
, bl
);
404 void decode(bufferlist::iterator
& bl
) {
406 ::decode(position
, bl
);
410 void dump(Formatter
*f
) const;
411 void decode_json(JSONObj
*obj
);
413 bool operator<(const rgw_bucket_shard_inc_sync_marker
& m
) const {
414 return (position
< m
.position
);
417 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker
)
419 struct rgw_bucket_shard_sync_info
{
423 StateIncrementalSync
= 2,
427 rgw_bucket_shard_full_sync_marker full_marker
;
428 rgw_bucket_shard_inc_sync_marker inc_marker
;
430 void decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
);
431 void encode_all_attrs(map
<string
, bufferlist
>& attrs
);
432 void encode_state_attr(map
<string
, bufferlist
>& attrs
);
434 void encode(bufferlist
& bl
) const {
435 ENCODE_START(1, 1, bl
);
437 ::encode(full_marker
, bl
);
438 ::encode(inc_marker
, bl
);
442 void decode(bufferlist::iterator
& bl
) {
445 ::decode(full_marker
, bl
);
446 ::decode(inc_marker
, bl
);
450 void dump(Formatter
*f
) const;
451 void decode_json(JSONObj
*obj
);
453 rgw_bucket_shard_sync_info() : state((int)StateInit
) {}
456 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info
)
458 struct rgw_bucket_index_marker_info
{
462 bool syncstopped
{false};
464 void decode_json(JSONObj
*obj
) {
465 JSONDecoder::decode_json("bucket_ver", bucket_ver
, obj
);
466 JSONDecoder::decode_json("master_ver", master_ver
, obj
);
467 JSONDecoder::decode_json("max_marker", max_marker
, obj
);
468 JSONDecoder::decode_json("syncstopped", syncstopped
, obj
);
473 class RGWRemoteBucketLog
: public RGWCoroutinesManager
{
475 RGWRESTConn
*conn
{nullptr};
479 RGWBucketSyncStatusManager
*status_manager
;
480 RGWAsyncRadosProcessor
*async_rados
;
481 RGWHTTPManager
*http_manager
;
483 RGWDataSyncEnv sync_env
;
484 rgw_bucket_shard_sync_info init_status
;
486 RGWBucketSyncCR
*sync_cr
{nullptr};
489 RGWRemoteBucketLog(RGWRados
*_store
, RGWBucketSyncStatusManager
*_sm
,
490 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
) : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()), store(_store
),
491 status_manager(_sm
), async_rados(_async_rados
), http_manager(_http_manager
) {}
493 int init(const string
& _source_zone
, RGWRESTConn
*_conn
,
494 const rgw_bucket
& bucket
, int shard_id
,
495 RGWSyncErrorLogger
*_error_logger
,
496 RGWSyncModuleInstanceRef
& _sync_module
);
499 RGWCoroutine
*read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
);
500 RGWCoroutine
*init_sync_status_cr();
501 RGWCoroutine
*run_sync_cr();
506 class RGWBucketSyncStatusManager
{
509 RGWCoroutinesManager cr_mgr
;
511 RGWHTTPManager http_manager
;
515 RGWSyncErrorLogger
*error_logger
;
516 RGWSyncModuleInstanceRef sync_module
;
520 map
<int, RGWRemoteBucketLog
*> source_logs
;
522 string source_status_oid
;
523 string source_shard_status_oid_prefix
;
525 map
<int, rgw_bucket_shard_sync_info
> sync_status
;
526 rgw_raw_obj status_obj
;
531 RGWBucketSyncStatusManager(RGWRados
*_store
, const string
& _source_zone
,
532 const rgw_bucket
& bucket
) : store(_store
),
533 cr_mgr(_store
->ctx(), _store
->get_cr_registry()),
534 http_manager(store
->ctx(), cr_mgr
.get_completion_mgr()),
535 source_zone(_source_zone
),
536 conn(NULL
), error_logger(NULL
),
539 ~RGWBucketSyncStatusManager();
543 map
<int, rgw_bucket_shard_sync_info
>& get_sync_status() { return sync_status
; }
544 int init_sync_status();
546 static string
status_oid(const string
& source_zone
, const rgw_bucket_shard
& bs
);
548 int read_sync_status();
552 /// read the sync status of all bucket shards from the given source zone
553 int rgw_bucket_sync_status(RGWRados
*store
, const std::string
& source_zone
,
554 const RGWBucketInfo
& bucket_info
,
555 std::vector
<rgw_bucket_shard_sync_info
> *status
);
557 class RGWDefaultSyncModule
: public RGWSyncModule
{
559 RGWDefaultSyncModule() {}
560 bool supports_data_export() override
{ return true; }
561 int create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
) override
;
564 // DataLogTrimCR factory function
565 extern RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
566 RGWHTTPManager
*http
,
567 int num_shards
, utime_t interval
);