1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
8 #include "rgw_sync_module.h"
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
14 class BucketChangeObserver
;
17 struct rgw_datalog_info
{
20 rgw_datalog_info() : num_shards(0) {}
22 void decode_json(JSONObj
*obj
);
25 struct rgw_data_sync_info
{
28 StateBuildingFullSyncMaps
= 1,
35 uint64_t instance_id
{0};
37 void encode(bufferlist
& bl
) const {
38 ENCODE_START(2, 1, bl
);
40 ::encode(num_shards
, bl
);
41 ::encode(instance_id
, bl
);
45 void decode(bufferlist::iterator
& bl
) {
48 ::decode(num_shards
, bl
);
50 ::decode(instance_id
, bl
);
55 void dump(Formatter
*f
) const {
57 switch ((SyncState
)state
) {
61 case StateBuildingFullSyncMaps
:
62 s
= "building-full-sync-maps";
71 encode_json("status", s
, f
);
72 encode_json("num_shards", num_shards
, f
);
73 encode_json("instance_id", instance_id
, f
);
75 void decode_json(JSONObj
*obj
) {
77 JSONDecoder::decode_json("status", s
, obj
);
78 if (s
== "building-full-sync-maps") {
79 state
= StateBuildingFullSyncMaps
;
80 } else if (s
== "sync") {
85 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
86 JSONDecoder::decode_json("instance_id", num_shards
, obj
);
88 static void generate_test_instances(std::list
<rgw_data_sync_info
*>& o
);
90 rgw_data_sync_info() : state((int)StateInit
), num_shards(0) {}
92 WRITE_CLASS_ENCODER(rgw_data_sync_info
)
94 struct rgw_data_sync_marker
{
101 string next_step_marker
;
102 uint64_t total_entries
;
106 rgw_data_sync_marker() : state(FullSync
), total_entries(0), pos(0) {}
108 void encode(bufferlist
& bl
) const {
109 ENCODE_START(1, 1, bl
);
111 ::encode(marker
, bl
);
112 ::encode(next_step_marker
, bl
);
113 ::encode(total_entries
, bl
);
115 ::encode(timestamp
, bl
);
119 void decode(bufferlist::iterator
& bl
) {
122 ::decode(marker
, bl
);
123 ::decode(next_step_marker
, bl
);
124 ::decode(total_entries
, bl
);
126 ::decode(timestamp
, bl
);
130 void dump(Formatter
*f
) const {
131 const char *s
{nullptr};
132 switch ((SyncState
)state
) {
136 case IncrementalSync
:
137 s
= "incremental-sync";
143 encode_json("status", s
, f
);
144 encode_json("marker", marker
, f
);
145 encode_json("next_step_marker", next_step_marker
, f
);
146 encode_json("total_entries", total_entries
, f
);
147 encode_json("pos", pos
, f
);
148 encode_json("timestamp", utime_t(timestamp
), f
);
150 void decode_json(JSONObj
*obj
) {
152 JSONDecoder::decode_json("status", s
, obj
);
153 if (s
== "full-sync") {
155 } else if (s
== "incremental-sync") {
156 state
= IncrementalSync
;
158 JSONDecoder::decode_json("marker", marker
, obj
);
159 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
160 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
161 JSONDecoder::decode_json("pos", pos
, obj
);
163 JSONDecoder::decode_json("timestamp", t
, obj
);
164 timestamp
= t
.to_real_time();
166 static void generate_test_instances(std::list
<rgw_data_sync_marker
*>& o
);
168 WRITE_CLASS_ENCODER(rgw_data_sync_marker
)
170 struct rgw_data_sync_status
{
171 rgw_data_sync_info sync_info
;
172 map
<uint32_t, rgw_data_sync_marker
> sync_markers
;
174 rgw_data_sync_status() {}
176 void encode(bufferlist
& bl
) const {
177 ENCODE_START(1, 1, bl
);
178 ::encode(sync_info
, bl
);
179 /* sync markers are encoded separately */
183 void decode(bufferlist::iterator
& bl
) {
185 ::decode(sync_info
, bl
);
186 /* sync markers are decoded separately */
190 void dump(Formatter
*f
) const {
191 encode_json("info", sync_info
, f
);
192 encode_json("markers", sync_markers
, f
);
194 void decode_json(JSONObj
*obj
) {
195 JSONDecoder::decode_json("info", sync_info
, obj
);
196 JSONDecoder::decode_json("markers", sync_markers
, obj
);
198 static void generate_test_instances(std::list
<rgw_data_sync_status
*>& o
);
200 WRITE_CLASS_ENCODER(rgw_data_sync_status
)
202 struct rgw_datalog_entry
{
204 ceph::real_time timestamp
;
206 void decode_json(JSONObj
*obj
);
209 struct rgw_datalog_shard_data
{
212 vector
<rgw_datalog_entry
> entries
;
214 void decode_json(JSONObj
*obj
);
217 class RGWAsyncRadosProcessor
;
218 class RGWDataSyncControlCR
;
220 struct rgw_bucket_entry_owner
{
224 rgw_bucket_entry_owner() {}
225 rgw_bucket_entry_owner(const string
& _id
, const string
& _display_name
) : id(_id
), display_name(_display_name
) {}
227 void decode_json(JSONObj
*obj
);
230 class RGWSyncErrorLogger
;
232 struct RGWDataSyncEnv
{
236 RGWAsyncRadosProcessor
*async_rados
;
237 RGWHTTPManager
*http_manager
;
238 RGWSyncErrorLogger
*error_logger
;
240 RGWSyncModuleInstanceRef sync_module
;
241 rgw::BucketChangeObserver
*observer
{nullptr};
243 RGWDataSyncEnv() : cct(NULL
), store(NULL
), conn(NULL
), async_rados(NULL
), http_manager(NULL
), error_logger(NULL
), sync_module(NULL
) {}
245 void init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
246 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
247 RGWSyncErrorLogger
*_error_logger
, const string
& _source_zone
,
248 RGWSyncModuleInstanceRef
& _sync_module
,
249 rgw::BucketChangeObserver
*_observer
) {
253 async_rados
= _async_rados
;
254 http_manager
= _http_manager
;
255 error_logger
= _error_logger
;
256 source_zone
= _source_zone
;
257 sync_module
= _sync_module
;
258 observer
= _observer
;
261 string
shard_obj_name(int shard_id
);
265 class RGWRemoteDataLog
: public RGWCoroutinesManager
{
267 RGWAsyncRadosProcessor
*async_rados
;
268 rgw::BucketChangeObserver
*observer
;
269 RGWHTTPManager http_manager
;
271 RGWDataSyncEnv sync_env
;
274 RGWDataSyncControlCR
*data_sync_cr
;
279 RGWRemoteDataLog(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
280 rgw::BucketChangeObserver
*observer
)
281 : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()),
282 store(_store
), async_rados(async_rados
), observer(observer
),
283 http_manager(store
->ctx(), completion_mgr
),
284 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL
),
285 initialized(false) {}
286 int init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& module
);
289 int read_log_info(rgw_datalog_info
*log_info
);
290 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
);
291 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
);
292 int read_sync_status(rgw_data_sync_status
*sync_status
);
293 int read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
);
294 int read_shard_status(int shard_id
, set
<string
>& lagging_buckets
,set
<string
>& recovering_buckets
, rgw_data_sync_marker
* sync_marker
, const int max_entries
);
295 int init_sync_status(int num_shards
);
296 int run_sync(int num_shards
);
298 void wakeup(int shard_id
, set
<string
>& keys
);
301 class RGWDataSyncStatusManager
{
307 RGWSyncErrorLogger
*error_logger
;
308 RGWSyncModuleInstanceRef sync_module
;
310 RGWRemoteDataLog source_log
;
312 string source_status_oid
;
313 string source_shard_status_oid_prefix
;
315 map
<int, rgw_raw_obj
> shard_objs
;
320 RGWDataSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
321 const string
& _source_zone
,
322 rgw::BucketChangeObserver
*observer
= nullptr)
323 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
324 sync_module(nullptr),
325 source_log(store
, async_rados
, observer
), num_shards(0) {}
326 RGWDataSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
327 const string
& _source_zone
, const RGWSyncModuleInstanceRef
& _sync_module
,
328 rgw::BucketChangeObserver
*observer
= nullptr)
329 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
330 sync_module(_sync_module
),
331 source_log(store
, async_rados
, observer
), num_shards(0) {}
332 ~RGWDataSyncStatusManager() {
338 static string
shard_obj_name(const string
& source_zone
, int shard_id
);
339 static string
sync_status_oid(const string
& source_zone
);
341 int read_sync_status(rgw_data_sync_status
*sync_status
) {
342 return source_log
.read_sync_status(sync_status
);
345 int read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
) {
346 return source_log
.read_recovering_shards(num_shards
, recovering_shards
);
349 int read_shard_status(int shard_id
, set
<string
>& lagging_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
) {
350 return source_log
.read_shard_status(shard_id
, lagging_buckets
, recovering_buckets
,sync_marker
, max_entries
);
352 int init_sync_status() { return source_log
.init_sync_status(num_shards
); }
354 int read_log_info(rgw_datalog_info
*log_info
) {
355 return source_log
.read_log_info(log_info
);
357 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
) {
358 return source_log
.read_source_log_shards_info(shards_info
);
360 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
) {
361 return source_log
.read_source_log_shards_next(shard_markers
, result
);
364 int run() { return source_log
.run_sync(num_shards
); }
366 void wakeup(int shard_id
, set
<string
>& keys
) { return source_log
.wakeup(shard_id
, keys
); }
372 class RGWBucketSyncStatusManager
;
373 class RGWBucketSyncCR
;
375 struct rgw_bucket_shard_full_sync_marker
{
376 rgw_obj_key position
;
379 rgw_bucket_shard_full_sync_marker() : count(0) {}
381 void encode_attr(map
<string
, bufferlist
>& attrs
);
383 void encode(bufferlist
& bl
) const {
384 ENCODE_START(1, 1, bl
);
385 ::encode(position
, bl
);
390 void decode(bufferlist::iterator
& bl
) {
392 ::decode(position
, bl
);
397 void dump(Formatter
*f
) const;
398 void decode_json(JSONObj
*obj
);
400 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker
)
402 struct rgw_bucket_shard_inc_sync_marker
{
405 rgw_bucket_shard_inc_sync_marker() {}
407 void encode_attr(map
<string
, bufferlist
>& attrs
);
409 void encode(bufferlist
& bl
) const {
410 ENCODE_START(1, 1, bl
);
411 ::encode(position
, bl
);
415 void decode(bufferlist::iterator
& bl
) {
417 ::decode(position
, bl
);
421 void dump(Formatter
*f
) const;
422 void decode_json(JSONObj
*obj
);
424 bool operator<(const rgw_bucket_shard_inc_sync_marker
& m
) const {
425 return (position
< m
.position
);
428 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker
)
430 struct rgw_bucket_shard_sync_info
{
434 StateIncrementalSync
= 2,
438 rgw_bucket_shard_full_sync_marker full_marker
;
439 rgw_bucket_shard_inc_sync_marker inc_marker
;
441 void decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
);
442 void encode_all_attrs(map
<string
, bufferlist
>& attrs
);
443 void encode_state_attr(map
<string
, bufferlist
>& attrs
);
445 void encode(bufferlist
& bl
) const {
446 ENCODE_START(1, 1, bl
);
448 ::encode(full_marker
, bl
);
449 ::encode(inc_marker
, bl
);
453 void decode(bufferlist::iterator
& bl
) {
456 ::decode(full_marker
, bl
);
457 ::decode(inc_marker
, bl
);
461 void dump(Formatter
*f
) const;
462 void decode_json(JSONObj
*obj
);
464 rgw_bucket_shard_sync_info() : state((int)StateInit
) {}
467 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info
)
469 struct rgw_bucket_index_marker_info
{
473 bool syncstopped
{false};
475 void decode_json(JSONObj
*obj
) {
476 JSONDecoder::decode_json("bucket_ver", bucket_ver
, obj
);
477 JSONDecoder::decode_json("master_ver", master_ver
, obj
);
478 JSONDecoder::decode_json("max_marker", max_marker
, obj
);
479 JSONDecoder::decode_json("syncstopped", syncstopped
, obj
);
484 class RGWRemoteBucketLog
: public RGWCoroutinesManager
{
486 RGWRESTConn
*conn
{nullptr};
490 RGWBucketSyncStatusManager
*status_manager
;
491 RGWAsyncRadosProcessor
*async_rados
;
492 RGWHTTPManager
*http_manager
;
494 RGWDataSyncEnv sync_env
;
495 rgw_bucket_shard_sync_info init_status
;
497 RGWBucketSyncCR
*sync_cr
{nullptr};
500 RGWRemoteBucketLog(RGWRados
*_store
, RGWBucketSyncStatusManager
*_sm
,
501 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
) : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()), store(_store
),
502 status_manager(_sm
), async_rados(_async_rados
), http_manager(_http_manager
) {}
504 int init(const string
& _source_zone
, RGWRESTConn
*_conn
,
505 const rgw_bucket
& bucket
, int shard_id
,
506 RGWSyncErrorLogger
*_error_logger
,
507 RGWSyncModuleInstanceRef
& _sync_module
);
510 RGWCoroutine
*read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
);
511 RGWCoroutine
*init_sync_status_cr();
512 RGWCoroutine
*run_sync_cr();
517 class RGWBucketSyncStatusManager
{
520 RGWCoroutinesManager cr_mgr
;
522 RGWHTTPManager http_manager
;
526 RGWSyncErrorLogger
*error_logger
;
527 RGWSyncModuleInstanceRef sync_module
;
531 map
<int, RGWRemoteBucketLog
*> source_logs
;
533 string source_status_oid
;
534 string source_shard_status_oid_prefix
;
536 map
<int, rgw_bucket_shard_sync_info
> sync_status
;
537 rgw_raw_obj status_obj
;
542 RGWBucketSyncStatusManager(RGWRados
*_store
, const string
& _source_zone
,
543 const rgw_bucket
& bucket
) : store(_store
),
544 cr_mgr(_store
->ctx(), _store
->get_cr_registry()),
545 http_manager(store
->ctx(), cr_mgr
.get_completion_mgr()),
546 source_zone(_source_zone
),
547 conn(NULL
), error_logger(NULL
),
550 ~RGWBucketSyncStatusManager();
554 map
<int, rgw_bucket_shard_sync_info
>& get_sync_status() { return sync_status
; }
555 int init_sync_status();
557 static string
status_oid(const string
& source_zone
, const rgw_bucket_shard
& bs
);
559 int read_sync_status();
563 /// read the sync status of all bucket shards from the given source zone
564 int rgw_bucket_sync_status(RGWRados
*store
, const std::string
& source_zone
,
565 const RGWBucketInfo
& bucket_info
,
566 std::vector
<rgw_bucket_shard_sync_info
> *status
);
568 class RGWDefaultSyncModule
: public RGWSyncModule
{
570 RGWDefaultSyncModule() {}
571 bool supports_data_export() override
{ return true; }
572 int create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
) override
;
575 // DataLogTrimCR factory function
576 extern RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
577 RGWHTTPManager
*http
,
578 int num_shards
, utime_t interval
);