1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
8 #include "rgw_sync_module.h"
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
14 struct rgw_datalog_info
{
17 rgw_datalog_info() : num_shards(0) {}
19 void decode_json(JSONObj
*obj
);
22 struct rgw_data_sync_info
{
25 StateBuildingFullSyncMaps
= 1,
32 uint64_t instance_id
{0};
34 void encode(bufferlist
& bl
) const {
35 ENCODE_START(2, 1, bl
);
37 ::encode(num_shards
, bl
);
38 ::encode(instance_id
, bl
);
42 void decode(bufferlist::iterator
& bl
) {
45 ::decode(num_shards
, bl
);
47 ::decode(instance_id
, bl
);
52 void dump(Formatter
*f
) const {
54 switch ((SyncState
)state
) {
58 case StateBuildingFullSyncMaps
:
59 s
= "building-full-sync-maps";
68 encode_json("status", s
, f
);
69 encode_json("num_shards", num_shards
, f
);
70 encode_json("instance_id", instance_id
, f
);
72 void decode_json(JSONObj
*obj
) {
74 JSONDecoder::decode_json("status", s
, obj
);
75 if (s
== "building-full-sync-maps") {
76 state
= StateBuildingFullSyncMaps
;
77 } else if (s
== "sync") {
82 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
83 JSONDecoder::decode_json("instance_id", num_shards
, obj
);
86 rgw_data_sync_info() : state((int)StateInit
), num_shards(0) {}
88 WRITE_CLASS_ENCODER(rgw_data_sync_info
)
90 struct rgw_data_sync_marker
{
97 string next_step_marker
;
98 uint64_t total_entries
;
102 rgw_data_sync_marker() : state(FullSync
), total_entries(0), pos(0) {}
104 void encode(bufferlist
& bl
) const {
105 ENCODE_START(1, 1, bl
);
107 ::encode(marker
, bl
);
108 ::encode(next_step_marker
, bl
);
109 ::encode(total_entries
, bl
);
111 ::encode(timestamp
, bl
);
115 void decode(bufferlist::iterator
& bl
) {
118 ::decode(marker
, bl
);
119 ::decode(next_step_marker
, bl
);
120 ::decode(total_entries
, bl
);
122 ::decode(timestamp
, bl
);
126 void dump(Formatter
*f
) const {
127 encode_json("state", (int)state
, f
);
128 encode_json("marker", marker
, f
);
129 encode_json("next_step_marker", next_step_marker
, f
);
130 encode_json("total_entries", total_entries
, f
);
131 encode_json("pos", pos
, f
);
132 encode_json("timestamp", utime_t(timestamp
), f
);
134 void decode_json(JSONObj
*obj
) {
136 JSONDecoder::decode_json("state", s
, obj
);
138 JSONDecoder::decode_json("marker", marker
, obj
);
139 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
140 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
141 JSONDecoder::decode_json("pos", pos
, obj
);
143 JSONDecoder::decode_json("timestamp", t
, obj
);
144 timestamp
= t
.to_real_time();
147 WRITE_CLASS_ENCODER(rgw_data_sync_marker
)
149 struct rgw_data_sync_status
{
150 rgw_data_sync_info sync_info
;
151 map
<uint32_t, rgw_data_sync_marker
> sync_markers
;
153 rgw_data_sync_status() {}
155 void encode(bufferlist
& bl
) const {
156 ENCODE_START(1, 1, bl
);
157 ::encode(sync_info
, bl
);
158 /* sync markers are encoded separately */
162 void decode(bufferlist::iterator
& bl
) {
164 ::decode(sync_info
, bl
);
165 /* sync markers are decoded separately */
169 void dump(Formatter
*f
) const {
170 encode_json("info", sync_info
, f
);
171 encode_json("markers", sync_markers
, f
);
173 void decode_json(JSONObj
*obj
) {
174 JSONDecoder::decode_json("info", sync_info
, obj
);
175 JSONDecoder::decode_json("markers", sync_markers
, obj
);
178 WRITE_CLASS_ENCODER(rgw_data_sync_status
)
180 struct rgw_datalog_entry
{
182 ceph::real_time timestamp
;
184 void decode_json(JSONObj
*obj
);
187 struct rgw_datalog_shard_data
{
190 vector
<rgw_datalog_entry
> entries
;
192 void decode_json(JSONObj
*obj
);
195 class RGWAsyncRadosProcessor
;
196 class RGWDataSyncControlCR
;
198 struct rgw_bucket_entry_owner
{
202 rgw_bucket_entry_owner() {}
203 rgw_bucket_entry_owner(const string
& _id
, const string
& _display_name
) : id(_id
), display_name(_display_name
) {}
205 void decode_json(JSONObj
*obj
);
208 class RGWSyncErrorLogger
;
210 struct RGWDataSyncEnv
{
214 RGWAsyncRadosProcessor
*async_rados
;
215 RGWHTTPManager
*http_manager
;
216 RGWSyncErrorLogger
*error_logger
;
218 RGWSyncModuleInstanceRef sync_module
;
220 RGWDataSyncEnv() : cct(NULL
), store(NULL
), conn(NULL
), async_rados(NULL
), http_manager(NULL
), error_logger(NULL
), sync_module(NULL
) {}
222 void init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
223 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
224 RGWSyncErrorLogger
*_error_logger
, const string
& _source_zone
,
225 RGWSyncModuleInstanceRef
& _sync_module
) {
229 async_rados
= _async_rados
;
230 http_manager
= _http_manager
;
231 error_logger
= _error_logger
;
232 source_zone
= _source_zone
;
233 sync_module
= _sync_module
;
236 string
shard_obj_name(int shard_id
);
240 class RGWRemoteDataLog
: public RGWCoroutinesManager
{
242 RGWAsyncRadosProcessor
*async_rados
;
243 RGWHTTPManager http_manager
;
245 RGWDataSyncEnv sync_env
;
248 RGWDataSyncControlCR
*data_sync_cr
;
253 RGWRemoteDataLog(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
)
254 : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()),
255 store(_store
), async_rados(async_rados
),
256 http_manager(store
->ctx(), completion_mgr
),
257 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL
),
258 initialized(false) {}
259 int init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& module
);
262 int read_log_info(rgw_datalog_info
*log_info
);
263 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
);
264 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
);
265 int get_shard_info(int shard_id
);
266 int read_sync_status(rgw_data_sync_status
*sync_status
);
267 int init_sync_status(int num_shards
);
268 int run_sync(int num_shards
);
270 void wakeup(int shard_id
, set
<string
>& keys
);
273 class RGWDataSyncStatusManager
{
279 RGWSyncErrorLogger
*error_logger
;
280 RGWSyncModuleInstanceRef sync_module
;
282 RGWRemoteDataLog source_log
;
284 string source_status_oid
;
285 string source_shard_status_oid_prefix
;
287 map
<int, rgw_raw_obj
> shard_objs
;
292 RGWDataSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
293 const string
& _source_zone
)
294 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
295 sync_module(nullptr),
296 source_log(store
, async_rados
), num_shards(0) {}
297 ~RGWDataSyncStatusManager() {
303 static string
shard_obj_name(const string
& source_zone
, int shard_id
);
304 static string
sync_status_oid(const string
& source_zone
);
306 int read_sync_status(rgw_data_sync_status
*sync_status
) {
307 return source_log
.read_sync_status(sync_status
);
309 int init_sync_status() { return source_log
.init_sync_status(num_shards
); }
311 int read_log_info(rgw_datalog_info
*log_info
) {
312 return source_log
.read_log_info(log_info
);
314 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
) {
315 return source_log
.read_source_log_shards_info(shards_info
);
317 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
) {
318 return source_log
.read_source_log_shards_next(shard_markers
, result
);
321 int run() { return source_log
.run_sync(num_shards
); }
323 void wakeup(int shard_id
, set
<string
>& keys
) { return source_log
.wakeup(shard_id
, keys
); }
329 class RGWBucketSyncStatusManager
;
330 class RGWBucketSyncCR
;
332 struct rgw_bucket_shard_full_sync_marker
{
333 rgw_obj_key position
;
336 rgw_bucket_shard_full_sync_marker() : count(0) {}
338 void encode_attr(map
<string
, bufferlist
>& attrs
);
340 void encode(bufferlist
& bl
) const {
341 ENCODE_START(1, 1, bl
);
342 ::encode(position
, bl
);
347 void decode(bufferlist::iterator
& bl
) {
349 ::decode(position
, bl
);
354 void dump(Formatter
*f
) const {
355 encode_json("position", position
, f
);
356 encode_json("count", count
, f
);
359 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker
)
361 struct rgw_bucket_shard_inc_sync_marker
{
364 rgw_bucket_shard_inc_sync_marker() {}
366 void encode_attr(map
<string
, bufferlist
>& attrs
);
368 void encode(bufferlist
& bl
) const {
369 ENCODE_START(1, 1, bl
);
370 ::encode(position
, bl
);
374 void decode(bufferlist::iterator
& bl
) {
376 ::decode(position
, bl
);
380 void dump(Formatter
*f
) const {
381 encode_json("position", position
, f
);
384 bool operator<(const rgw_bucket_shard_inc_sync_marker
& m
) const {
385 return (position
< m
.position
);
388 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker
)
390 struct rgw_bucket_shard_sync_info
{
394 StateIncrementalSync
= 2,
398 rgw_bucket_shard_full_sync_marker full_marker
;
399 rgw_bucket_shard_inc_sync_marker inc_marker
;
401 void decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
);
402 void encode_all_attrs(map
<string
, bufferlist
>& attrs
);
403 void encode_state_attr(map
<string
, bufferlist
>& attrs
);
405 void encode(bufferlist
& bl
) const {
406 ENCODE_START(1, 1, bl
);
408 ::encode(full_marker
, bl
);
409 ::encode(inc_marker
, bl
);
413 void decode(bufferlist::iterator
& bl
) {
416 ::decode(full_marker
, bl
);
417 ::decode(inc_marker
, bl
);
421 void dump(Formatter
*f
) const {
423 switch ((SyncState
)state
) {
430 case StateIncrementalSync
:
431 s
= "incremental-sync";
437 encode_json("status", s
, f
);
438 encode_json("full_marker", full_marker
, f
);
439 encode_json("inc_marker", inc_marker
, f
);
442 rgw_bucket_shard_sync_info() : state((int)StateInit
) {}
445 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info
)
448 class RGWRemoteBucketLog
: public RGWCoroutinesManager
{
450 RGWRESTConn
*conn
{nullptr};
454 RGWBucketSyncStatusManager
*status_manager
;
455 RGWAsyncRadosProcessor
*async_rados
;
456 RGWHTTPManager
*http_manager
;
458 RGWDataSyncEnv sync_env
;
459 rgw_bucket_shard_sync_info init_status
;
461 RGWBucketSyncCR
*sync_cr
{nullptr};
464 RGWRemoteBucketLog(RGWRados
*_store
, RGWBucketSyncStatusManager
*_sm
,
465 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
) : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()), store(_store
),
466 status_manager(_sm
), async_rados(_async_rados
), http_manager(_http_manager
) {}
468 int init(const string
& _source_zone
, RGWRESTConn
*_conn
,
469 const rgw_bucket
& bucket
, int shard_id
,
470 RGWSyncErrorLogger
*_error_logger
,
471 RGWSyncModuleInstanceRef
& _sync_module
);
474 RGWCoroutine
*read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
);
475 RGWCoroutine
*init_sync_status_cr();
476 RGWCoroutine
*run_sync_cr();
481 class RGWBucketSyncStatusManager
{
484 RGWCoroutinesManager cr_mgr
;
486 RGWHTTPManager http_manager
;
490 RGWSyncErrorLogger
*error_logger
;
491 RGWSyncModuleInstanceRef sync_module
;
495 map
<int, RGWRemoteBucketLog
*> source_logs
;
497 string source_status_oid
;
498 string source_shard_status_oid_prefix
;
500 map
<int, rgw_bucket_shard_sync_info
> sync_status
;
501 rgw_raw_obj status_obj
;
506 RGWBucketSyncStatusManager(RGWRados
*_store
, const string
& _source_zone
,
507 const rgw_bucket
& bucket
) : store(_store
),
508 cr_mgr(_store
->ctx(), _store
->get_cr_registry()),
509 http_manager(store
->ctx(), cr_mgr
.get_completion_mgr()),
510 source_zone(_source_zone
),
511 conn(NULL
), error_logger(NULL
),
514 ~RGWBucketSyncStatusManager();
518 map
<int, rgw_bucket_shard_sync_info
>& get_sync_status() { return sync_status
; }
519 int init_sync_status();
521 static string
status_oid(const string
& source_zone
, const rgw_bucket_shard
& bs
);
523 int read_sync_status();
527 class RGWDefaultSyncModule
: public RGWSyncModule
{
529 RGWDefaultSyncModule() {}
530 bool supports_data_export() override
{ return true; }
531 int create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
) override
;
534 // DataLogTrimCR factory function
535 extern RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
536 RGWHTTPManager
*http
,
537 int num_shards
, utime_t interval
);