1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
8 #include "rgw_sync_module.h"
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
14 struct rgw_datalog_info
{
17 rgw_datalog_info() : num_shards(0) {}
19 void decode_json(JSONObj
*obj
);
22 struct rgw_data_sync_info
{
25 StateBuildingFullSyncMaps
= 1,
32 uint64_t instance_id
{0};
34 void encode(bufferlist
& bl
) const {
35 ENCODE_START(2, 1, bl
);
37 ::encode(num_shards
, bl
);
38 ::encode(instance_id
, bl
);
42 void decode(bufferlist::iterator
& bl
) {
45 ::decode(num_shards
, bl
);
47 ::decode(instance_id
, bl
);
52 void dump(Formatter
*f
) const {
54 switch ((SyncState
)state
) {
58 case StateBuildingFullSyncMaps
:
59 s
= "building-full-sync-maps";
68 encode_json("status", s
, f
);
69 encode_json("num_shards", num_shards
, f
);
70 encode_json("instance_id", instance_id
, f
);
72 void decode_json(JSONObj
*obj
) {
74 JSONDecoder::decode_json("status", s
, obj
);
75 if (s
== "building-full-sync-maps") {
76 state
= StateBuildingFullSyncMaps
;
77 } else if (s
== "sync") {
82 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
83 JSONDecoder::decode_json("instance_id", num_shards
, obj
);
86 rgw_data_sync_info() : state((int)StateInit
), num_shards(0) {}
88 WRITE_CLASS_ENCODER(rgw_data_sync_info
)
90 struct rgw_data_sync_marker
{
97 string next_step_marker
;
98 uint64_t total_entries
;
102 rgw_data_sync_marker() : state(FullSync
), total_entries(0), pos(0) {}
104 void encode(bufferlist
& bl
) const {
105 ENCODE_START(1, 1, bl
);
107 ::encode(marker
, bl
);
108 ::encode(next_step_marker
, bl
);
109 ::encode(total_entries
, bl
);
111 ::encode(timestamp
, bl
);
115 void decode(bufferlist::iterator
& bl
) {
118 ::decode(marker
, bl
);
119 ::decode(next_step_marker
, bl
);
120 ::decode(total_entries
, bl
);
122 ::decode(timestamp
, bl
);
126 void dump(Formatter
*f
) const {
127 encode_json("state", (int)state
, f
);
128 encode_json("marker", marker
, f
);
129 encode_json("next_step_marker", next_step_marker
, f
);
130 encode_json("total_entries", total_entries
, f
);
131 encode_json("pos", pos
, f
);
132 encode_json("timestamp", utime_t(timestamp
), f
);
134 void decode_json(JSONObj
*obj
) {
136 JSONDecoder::decode_json("state", s
, obj
);
138 JSONDecoder::decode_json("marker", marker
, obj
);
139 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
140 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
141 JSONDecoder::decode_json("pos", pos
, obj
);
143 JSONDecoder::decode_json("timestamp", t
, obj
);
144 timestamp
= t
.to_real_time();
147 WRITE_CLASS_ENCODER(rgw_data_sync_marker
)
149 struct rgw_data_sync_status
{
150 rgw_data_sync_info sync_info
;
151 map
<uint32_t, rgw_data_sync_marker
> sync_markers
;
153 rgw_data_sync_status() {}
155 void encode(bufferlist
& bl
) const {
156 ENCODE_START(1, 1, bl
);
157 ::encode(sync_info
, bl
);
158 /* sync markers are encoded separately */
162 void decode(bufferlist::iterator
& bl
) {
164 ::decode(sync_info
, bl
);
165 /* sync markers are decoded separately */
169 void dump(Formatter
*f
) const {
170 encode_json("info", sync_info
, f
);
171 encode_json("markers", sync_markers
, f
);
173 void decode_json(JSONObj
*obj
) {
174 JSONDecoder::decode_json("info", sync_info
, obj
);
175 JSONDecoder::decode_json("markers", sync_markers
, obj
);
178 WRITE_CLASS_ENCODER(rgw_data_sync_status
)
180 struct rgw_datalog_entry
{
182 ceph::real_time timestamp
;
184 void decode_json(JSONObj
*obj
);
187 struct rgw_datalog_shard_data
{
190 vector
<rgw_datalog_entry
> entries
;
192 void decode_json(JSONObj
*obj
);
195 class RGWAsyncRadosProcessor
;
196 class RGWDataSyncControlCR
;
198 struct rgw_bucket_entry_owner
{
202 rgw_bucket_entry_owner() {}
203 rgw_bucket_entry_owner(const string
& _id
, const string
& _display_name
) : id(_id
), display_name(_display_name
) {}
205 void decode_json(JSONObj
*obj
);
208 class RGWSyncErrorLogger
;
210 struct RGWDataSyncEnv
{
214 RGWAsyncRadosProcessor
*async_rados
;
215 RGWHTTPManager
*http_manager
;
216 RGWSyncErrorLogger
*error_logger
;
218 RGWSyncModuleInstanceRef sync_module
;
220 RGWDataSyncEnv() : cct(NULL
), store(NULL
), conn(NULL
), async_rados(NULL
), http_manager(NULL
), error_logger(NULL
), sync_module(NULL
) {}
222 void init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
223 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
224 RGWSyncErrorLogger
*_error_logger
, const string
& _source_zone
,
225 RGWSyncModuleInstanceRef
& _sync_module
) {
229 async_rados
= _async_rados
;
230 http_manager
= _http_manager
;
231 error_logger
= _error_logger
;
232 source_zone
= _source_zone
;
233 sync_module
= _sync_module
;
236 string
shard_obj_name(int shard_id
);
240 class RGWRemoteDataLog
: public RGWCoroutinesManager
{
242 RGWAsyncRadosProcessor
*async_rados
;
243 RGWHTTPManager http_manager
;
245 RGWDataSyncEnv sync_env
;
248 RGWDataSyncControlCR
*data_sync_cr
;
253 RGWRemoteDataLog(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
)
254 : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()),
255 store(_store
), async_rados(async_rados
),
256 http_manager(store
->ctx(), completion_mgr
),
257 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL
),
258 initialized(false) {}
259 int init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& module
);
262 int read_log_info(rgw_datalog_info
*log_info
);
263 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
);
264 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
);
265 int read_sync_status(rgw_data_sync_status
*sync_status
);
266 int init_sync_status(int num_shards
);
267 int run_sync(int num_shards
);
269 void wakeup(int shard_id
, set
<string
>& keys
);
272 class RGWDataSyncStatusManager
{
278 RGWSyncErrorLogger
*error_logger
;
279 RGWSyncModuleInstanceRef sync_module
;
281 RGWRemoteDataLog source_log
;
283 string source_status_oid
;
284 string source_shard_status_oid_prefix
;
286 map
<int, rgw_raw_obj
> shard_objs
;
291 RGWDataSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
292 const string
& _source_zone
)
293 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
294 sync_module(nullptr),
295 source_log(store
, async_rados
), num_shards(0) {}
296 ~RGWDataSyncStatusManager() {
302 static string
shard_obj_name(const string
& source_zone
, int shard_id
);
303 static string
sync_status_oid(const string
& source_zone
);
305 int read_sync_status(rgw_data_sync_status
*sync_status
) {
306 return source_log
.read_sync_status(sync_status
);
308 int init_sync_status() { return source_log
.init_sync_status(num_shards
); }
310 int read_log_info(rgw_datalog_info
*log_info
) {
311 return source_log
.read_log_info(log_info
);
313 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
) {
314 return source_log
.read_source_log_shards_info(shards_info
);
316 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
) {
317 return source_log
.read_source_log_shards_next(shard_markers
, result
);
320 int run() { return source_log
.run_sync(num_shards
); }
322 void wakeup(int shard_id
, set
<string
>& keys
) { return source_log
.wakeup(shard_id
, keys
); }
328 class RGWBucketSyncStatusManager
;
329 class RGWBucketSyncCR
;
331 struct rgw_bucket_shard_full_sync_marker
{
332 rgw_obj_key position
;
335 rgw_bucket_shard_full_sync_marker() : count(0) {}
337 void encode_attr(map
<string
, bufferlist
>& attrs
);
339 void encode(bufferlist
& bl
) const {
340 ENCODE_START(1, 1, bl
);
341 ::encode(position
, bl
);
346 void decode(bufferlist::iterator
& bl
) {
348 ::decode(position
, bl
);
353 void dump(Formatter
*f
) const {
354 encode_json("position", position
, f
);
355 encode_json("count", count
, f
);
358 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker
)
360 struct rgw_bucket_shard_inc_sync_marker
{
363 rgw_bucket_shard_inc_sync_marker() {}
365 void encode_attr(map
<string
, bufferlist
>& attrs
);
367 void encode(bufferlist
& bl
) const {
368 ENCODE_START(1, 1, bl
);
369 ::encode(position
, bl
);
373 void decode(bufferlist::iterator
& bl
) {
375 ::decode(position
, bl
);
379 void dump(Formatter
*f
) const {
380 encode_json("position", position
, f
);
383 bool operator<(const rgw_bucket_shard_inc_sync_marker
& m
) const {
384 return (position
< m
.position
);
387 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker
)
389 struct rgw_bucket_shard_sync_info
{
393 StateIncrementalSync
= 2,
397 rgw_bucket_shard_full_sync_marker full_marker
;
398 rgw_bucket_shard_inc_sync_marker inc_marker
;
400 void decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
);
401 void encode_all_attrs(map
<string
, bufferlist
>& attrs
);
402 void encode_state_attr(map
<string
, bufferlist
>& attrs
);
404 void encode(bufferlist
& bl
) const {
405 ENCODE_START(1, 1, bl
);
407 ::encode(full_marker
, bl
);
408 ::encode(inc_marker
, bl
);
412 void decode(bufferlist::iterator
& bl
) {
415 ::decode(full_marker
, bl
);
416 ::decode(inc_marker
, bl
);
420 void dump(Formatter
*f
) const {
422 switch ((SyncState
)state
) {
429 case StateIncrementalSync
:
430 s
= "incremental-sync";
436 encode_json("status", s
, f
);
437 encode_json("full_marker", full_marker
, f
);
438 encode_json("inc_marker", inc_marker
, f
);
441 rgw_bucket_shard_sync_info() : state((int)StateInit
) {}
444 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info
)
447 class RGWRemoteBucketLog
: public RGWCoroutinesManager
{
449 RGWRESTConn
*conn
{nullptr};
453 RGWBucketSyncStatusManager
*status_manager
;
454 RGWAsyncRadosProcessor
*async_rados
;
455 RGWHTTPManager
*http_manager
;
457 RGWDataSyncEnv sync_env
;
458 rgw_bucket_shard_sync_info init_status
;
460 RGWBucketSyncCR
*sync_cr
{nullptr};
463 RGWRemoteBucketLog(RGWRados
*_store
, RGWBucketSyncStatusManager
*_sm
,
464 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
) : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()), store(_store
),
465 status_manager(_sm
), async_rados(_async_rados
), http_manager(_http_manager
) {}
467 int init(const string
& _source_zone
, RGWRESTConn
*_conn
,
468 const rgw_bucket
& bucket
, int shard_id
,
469 RGWSyncErrorLogger
*_error_logger
,
470 RGWSyncModuleInstanceRef
& _sync_module
);
473 RGWCoroutine
*read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
);
474 RGWCoroutine
*init_sync_status_cr();
475 RGWCoroutine
*run_sync_cr();
480 class RGWBucketSyncStatusManager
{
483 RGWCoroutinesManager cr_mgr
;
485 RGWHTTPManager http_manager
;
489 RGWSyncErrorLogger
*error_logger
;
490 RGWSyncModuleInstanceRef sync_module
;
494 map
<int, RGWRemoteBucketLog
*> source_logs
;
496 string source_status_oid
;
497 string source_shard_status_oid_prefix
;
499 map
<int, rgw_bucket_shard_sync_info
> sync_status
;
500 rgw_raw_obj status_obj
;
505 RGWBucketSyncStatusManager(RGWRados
*_store
, const string
& _source_zone
,
506 const rgw_bucket
& bucket
) : store(_store
),
507 cr_mgr(_store
->ctx(), _store
->get_cr_registry()),
508 http_manager(store
->ctx(), cr_mgr
.get_completion_mgr()),
509 source_zone(_source_zone
),
510 conn(NULL
), error_logger(NULL
),
513 ~RGWBucketSyncStatusManager();
517 map
<int, rgw_bucket_shard_sync_info
>& get_sync_status() { return sync_status
; }
518 int init_sync_status();
520 static string
status_oid(const string
& source_zone
, const rgw_bucket_shard
& bs
);
522 int read_sync_status();
526 class RGWDefaultSyncModule
: public RGWSyncModule
{
528 RGWDefaultSyncModule() {}
529 bool supports_data_export() override
{ return true; }
530 int create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
) override
;
533 // DataLogTrimCR factory function
534 extern RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
535 RGWHTTPManager
*http
,
536 int num_shards
, utime_t interval
);