1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
8 #include "rgw_sync_module.h"
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
14 class BucketChangeObserver
;
17 struct rgw_datalog_info
{
20 rgw_datalog_info() : num_shards(0) {}
22 void decode_json(JSONObj
*obj
);
25 struct rgw_data_sync_info
{
28 StateBuildingFullSyncMaps
= 1,
35 uint64_t instance_id
{0};
37 void encode(bufferlist
& bl
) const {
38 ENCODE_START(2, 1, bl
);
40 ::encode(num_shards
, bl
);
41 ::encode(instance_id
, bl
);
45 void decode(bufferlist::iterator
& bl
) {
48 ::decode(num_shards
, bl
);
50 ::decode(instance_id
, bl
);
55 void dump(Formatter
*f
) const {
57 switch ((SyncState
)state
) {
61 case StateBuildingFullSyncMaps
:
62 s
= "building-full-sync-maps";
71 encode_json("status", s
, f
);
72 encode_json("num_shards", num_shards
, f
);
73 encode_json("instance_id", instance_id
, f
);
75 void decode_json(JSONObj
*obj
) {
77 JSONDecoder::decode_json("status", s
, obj
);
78 if (s
== "building-full-sync-maps") {
79 state
= StateBuildingFullSyncMaps
;
80 } else if (s
== "sync") {
85 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
86 JSONDecoder::decode_json("instance_id", num_shards
, obj
);
88 static void generate_test_instances(std::list
<rgw_data_sync_info
*>& o
);
90 rgw_data_sync_info() : state((int)StateInit
), num_shards(0) {}
92 WRITE_CLASS_ENCODER(rgw_data_sync_info
)
94 struct rgw_data_sync_marker
{
101 string next_step_marker
;
102 uint64_t total_entries
;
106 rgw_data_sync_marker() : state(FullSync
), total_entries(0), pos(0) {}
108 void encode(bufferlist
& bl
) const {
109 ENCODE_START(1, 1, bl
);
111 ::encode(marker
, bl
);
112 ::encode(next_step_marker
, bl
);
113 ::encode(total_entries
, bl
);
115 ::encode(timestamp
, bl
);
119 void decode(bufferlist::iterator
& bl
) {
122 ::decode(marker
, bl
);
123 ::decode(next_step_marker
, bl
);
124 ::decode(total_entries
, bl
);
126 ::decode(timestamp
, bl
);
130 void dump(Formatter
*f
) const {
131 encode_json("state", (int)state
, f
);
132 encode_json("marker", marker
, f
);
133 encode_json("next_step_marker", next_step_marker
, f
);
134 encode_json("total_entries", total_entries
, f
);
135 encode_json("pos", pos
, f
);
136 encode_json("timestamp", utime_t(timestamp
), f
);
138 void decode_json(JSONObj
*obj
) {
140 JSONDecoder::decode_json("state", s
, obj
);
142 JSONDecoder::decode_json("marker", marker
, obj
);
143 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
144 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
145 JSONDecoder::decode_json("pos", pos
, obj
);
147 JSONDecoder::decode_json("timestamp", t
, obj
);
148 timestamp
= t
.to_real_time();
150 static void generate_test_instances(std::list
<rgw_data_sync_marker
*>& o
);
152 WRITE_CLASS_ENCODER(rgw_data_sync_marker
)
154 struct rgw_data_sync_status
{
155 rgw_data_sync_info sync_info
;
156 map
<uint32_t, rgw_data_sync_marker
> sync_markers
;
158 rgw_data_sync_status() {}
160 void encode(bufferlist
& bl
) const {
161 ENCODE_START(1, 1, bl
);
162 ::encode(sync_info
, bl
);
163 /* sync markers are encoded separately */
167 void decode(bufferlist::iterator
& bl
) {
169 ::decode(sync_info
, bl
);
170 /* sync markers are decoded separately */
174 void dump(Formatter
*f
) const {
175 encode_json("info", sync_info
, f
);
176 encode_json("markers", sync_markers
, f
);
178 void decode_json(JSONObj
*obj
) {
179 JSONDecoder::decode_json("info", sync_info
, obj
);
180 JSONDecoder::decode_json("markers", sync_markers
, obj
);
182 static void generate_test_instances(std::list
<rgw_data_sync_status
*>& o
);
184 WRITE_CLASS_ENCODER(rgw_data_sync_status
)
186 struct rgw_datalog_entry
{
188 ceph::real_time timestamp
;
190 void decode_json(JSONObj
*obj
);
193 struct rgw_datalog_shard_data
{
196 vector
<rgw_datalog_entry
> entries
;
198 void decode_json(JSONObj
*obj
);
201 class RGWAsyncRadosProcessor
;
202 class RGWDataSyncControlCR
;
204 struct rgw_bucket_entry_owner
{
208 rgw_bucket_entry_owner() {}
209 rgw_bucket_entry_owner(const string
& _id
, const string
& _display_name
) : id(_id
), display_name(_display_name
) {}
211 void decode_json(JSONObj
*obj
);
214 class RGWSyncErrorLogger
;
216 struct RGWDataSyncEnv
{
220 RGWAsyncRadosProcessor
*async_rados
;
221 RGWHTTPManager
*http_manager
;
222 RGWSyncErrorLogger
*error_logger
;
224 RGWSyncModuleInstanceRef sync_module
;
225 rgw::BucketChangeObserver
*observer
{nullptr};
227 RGWDataSyncEnv() : cct(NULL
), store(NULL
), conn(NULL
), async_rados(NULL
), http_manager(NULL
), error_logger(NULL
), sync_module(NULL
) {}
229 void init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
230 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
231 RGWSyncErrorLogger
*_error_logger
, const string
& _source_zone
,
232 RGWSyncModuleInstanceRef
& _sync_module
,
233 rgw::BucketChangeObserver
*_observer
) {
237 async_rados
= _async_rados
;
238 http_manager
= _http_manager
;
239 error_logger
= _error_logger
;
240 source_zone
= _source_zone
;
241 sync_module
= _sync_module
;
242 observer
= _observer
;
245 string
shard_obj_name(int shard_id
);
249 class RGWRemoteDataLog
: public RGWCoroutinesManager
{
251 RGWAsyncRadosProcessor
*async_rados
;
252 rgw::BucketChangeObserver
*observer
;
253 RGWHTTPManager http_manager
;
255 RGWDataSyncEnv sync_env
;
258 RGWDataSyncControlCR
*data_sync_cr
;
263 RGWRemoteDataLog(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
264 rgw::BucketChangeObserver
*observer
)
265 : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()),
266 store(_store
), async_rados(async_rados
), observer(observer
),
267 http_manager(store
->ctx(), completion_mgr
),
268 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL
),
269 initialized(false) {}
270 int init(const string
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
, RGWSyncModuleInstanceRef
& module
);
273 int read_log_info(rgw_datalog_info
*log_info
);
274 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
);
275 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
);
276 int read_sync_status(rgw_data_sync_status
*sync_status
);
277 int init_sync_status(int num_shards
);
278 int run_sync(int num_shards
);
280 void wakeup(int shard_id
, set
<string
>& keys
);
283 class RGWDataSyncStatusManager
{
289 RGWSyncErrorLogger
*error_logger
;
290 RGWSyncModuleInstanceRef sync_module
;
292 RGWRemoteDataLog source_log
;
294 string source_status_oid
;
295 string source_shard_status_oid_prefix
;
297 map
<int, rgw_raw_obj
> shard_objs
;
302 RGWDataSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
303 const string
& _source_zone
,
304 rgw::BucketChangeObserver
*observer
= nullptr)
305 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
306 sync_module(nullptr),
307 source_log(store
, async_rados
, observer
), num_shards(0) {}
308 ~RGWDataSyncStatusManager() {
314 static string
shard_obj_name(const string
& source_zone
, int shard_id
);
315 static string
sync_status_oid(const string
& source_zone
);
317 int read_sync_status(rgw_data_sync_status
*sync_status
) {
318 return source_log
.read_sync_status(sync_status
);
320 int init_sync_status() { return source_log
.init_sync_status(num_shards
); }
322 int read_log_info(rgw_datalog_info
*log_info
) {
323 return source_log
.read_log_info(log_info
);
325 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
) {
326 return source_log
.read_source_log_shards_info(shards_info
);
328 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
) {
329 return source_log
.read_source_log_shards_next(shard_markers
, result
);
332 int run() { return source_log
.run_sync(num_shards
); }
334 void wakeup(int shard_id
, set
<string
>& keys
) { return source_log
.wakeup(shard_id
, keys
); }
340 class RGWBucketSyncStatusManager
;
341 class RGWBucketSyncCR
;
343 struct rgw_bucket_shard_full_sync_marker
{
344 rgw_obj_key position
;
347 rgw_bucket_shard_full_sync_marker() : count(0) {}
349 void encode_attr(map
<string
, bufferlist
>& attrs
);
351 void encode(bufferlist
& bl
) const {
352 ENCODE_START(1, 1, bl
);
353 ::encode(position
, bl
);
358 void decode(bufferlist::iterator
& bl
) {
360 ::decode(position
, bl
);
365 void dump(Formatter
*f
) const;
366 void decode_json(JSONObj
*obj
);
368 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker
)
370 struct rgw_bucket_shard_inc_sync_marker
{
373 rgw_bucket_shard_inc_sync_marker() {}
375 void encode_attr(map
<string
, bufferlist
>& attrs
);
377 void encode(bufferlist
& bl
) const {
378 ENCODE_START(1, 1, bl
);
379 ::encode(position
, bl
);
383 void decode(bufferlist::iterator
& bl
) {
385 ::decode(position
, bl
);
389 void dump(Formatter
*f
) const;
390 void decode_json(JSONObj
*obj
);
392 bool operator<(const rgw_bucket_shard_inc_sync_marker
& m
) const {
393 return (position
< m
.position
);
396 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker
)
398 struct rgw_bucket_shard_sync_info
{
402 StateIncrementalSync
= 2,
406 rgw_bucket_shard_full_sync_marker full_marker
;
407 rgw_bucket_shard_inc_sync_marker inc_marker
;
409 void decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
);
410 void encode_all_attrs(map
<string
, bufferlist
>& attrs
);
411 void encode_state_attr(map
<string
, bufferlist
>& attrs
);
413 void encode(bufferlist
& bl
) const {
414 ENCODE_START(1, 1, bl
);
416 ::encode(full_marker
, bl
);
417 ::encode(inc_marker
, bl
);
421 void decode(bufferlist::iterator
& bl
) {
424 ::decode(full_marker
, bl
);
425 ::decode(inc_marker
, bl
);
429 void dump(Formatter
*f
) const;
430 void decode_json(JSONObj
*obj
);
432 rgw_bucket_shard_sync_info() : state((int)StateInit
) {}
435 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info
)
438 class RGWRemoteBucketLog
: public RGWCoroutinesManager
{
440 RGWRESTConn
*conn
{nullptr};
444 RGWBucketSyncStatusManager
*status_manager
;
445 RGWAsyncRadosProcessor
*async_rados
;
446 RGWHTTPManager
*http_manager
;
448 RGWDataSyncEnv sync_env
;
449 rgw_bucket_shard_sync_info init_status
;
451 RGWBucketSyncCR
*sync_cr
{nullptr};
454 RGWRemoteBucketLog(RGWRados
*_store
, RGWBucketSyncStatusManager
*_sm
,
455 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
) : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()), store(_store
),
456 status_manager(_sm
), async_rados(_async_rados
), http_manager(_http_manager
) {}
458 int init(const string
& _source_zone
, RGWRESTConn
*_conn
,
459 const rgw_bucket
& bucket
, int shard_id
,
460 RGWSyncErrorLogger
*_error_logger
,
461 RGWSyncModuleInstanceRef
& _sync_module
);
464 RGWCoroutine
*read_sync_status_cr(rgw_bucket_shard_sync_info
*sync_status
);
465 RGWCoroutine
*init_sync_status_cr();
466 RGWCoroutine
*run_sync_cr();
471 class RGWBucketSyncStatusManager
{
474 RGWCoroutinesManager cr_mgr
;
476 RGWHTTPManager http_manager
;
480 RGWSyncErrorLogger
*error_logger
;
481 RGWSyncModuleInstanceRef sync_module
;
485 map
<int, RGWRemoteBucketLog
*> source_logs
;
487 string source_status_oid
;
488 string source_shard_status_oid_prefix
;
490 map
<int, rgw_bucket_shard_sync_info
> sync_status
;
491 rgw_raw_obj status_obj
;
496 RGWBucketSyncStatusManager(RGWRados
*_store
, const string
& _source_zone
,
497 const rgw_bucket
& bucket
) : store(_store
),
498 cr_mgr(_store
->ctx(), _store
->get_cr_registry()),
499 http_manager(store
->ctx(), cr_mgr
.get_completion_mgr()),
500 source_zone(_source_zone
),
501 conn(NULL
), error_logger(NULL
),
504 ~RGWBucketSyncStatusManager();
508 map
<int, rgw_bucket_shard_sync_info
>& get_sync_status() { return sync_status
; }
509 int init_sync_status();
511 static string
status_oid(const string
& source_zone
, const rgw_bucket_shard
& bs
);
513 int read_sync_status();
517 /// read the sync status of all bucket shards from the given source zone
518 int rgw_bucket_sync_status(RGWRados
*store
, const std::string
& source_zone
,
519 const rgw_bucket
& bucket
,
520 std::vector
<rgw_bucket_shard_sync_info
> *status
);
522 class RGWDefaultSyncModule
: public RGWSyncModule
{
524 RGWDefaultSyncModule() {}
525 bool supports_data_export() override
{ return true; }
526 int create_instance(CephContext
*cct
, map
<string
, string
, ltstr_nocase
>& config
, RGWSyncModuleInstanceRef
*instance
) override
;
529 // DataLogTrimCR factory function
530 extern RGWCoroutine
* create_data_log_trim_cr(RGWRados
*store
,
531 RGWHTTPManager
*http
,
532 int num_shards
, utime_t interval
);