1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_DATA_SYNC_H
5 #define CEPH_RGW_DATA_SYNC_H
7 #include "include/encoding.h"
9 #include "common/RWLock.h"
10 #include "common/ceph_json.h"
12 #include "rgw_coroutine.h"
13 #include "rgw_http_client.h"
16 #include "rgw_sync_module.h"
17 #include "rgw_sync_trace.h"
18 #include "rgw_sync_policy.h"
20 #include "rgw_bucket_sync.h"
23 struct rgw_sync_bucket_pipe
;
25 struct rgw_bucket_sync_pair_info
{
26 RGWBucketSyncFlowManager::pipe_handler handler
; /* responsible for sync filters */
27 rgw_bucket_shard source_bs
;
28 rgw_bucket_shard dest_bs
;
31 inline ostream
& operator<<(ostream
& out
, const rgw_bucket_sync_pair_info
& p
) {
32 if (p
.source_bs
.bucket
== p
.dest_bs
.bucket
) {
33 return out
<< p
.source_bs
;
38 out
<< "->" << p
.dest_bs
.bucket
;
43 struct rgw_bucket_sync_pipe
{
44 rgw_bucket_sync_pair_info info
;
45 RGWBucketInfo source_bucket_info
;
46 map
<string
, bufferlist
> source_bucket_attrs
;
47 RGWBucketInfo dest_bucket_info
;
48 map
<string
, bufferlist
> dest_bucket_attrs
;
50 RGWBucketSyncFlowManager::pipe_rules_ref
& get_rules() {
51 return info
.handler
.rules
;
55 inline ostream
& operator<<(ostream
& out
, const rgw_bucket_sync_pipe
& p
) {
59 struct rgw_datalog_info
{
62 rgw_datalog_info() : num_shards(0) {}
64 void decode_json(JSONObj
*obj
);
67 struct rgw_data_sync_info
{
70 StateBuildingFullSyncMaps
= 1,
77 uint64_t instance_id
{0};
79 void encode(bufferlist
& bl
) const {
80 ENCODE_START(2, 1, bl
);
82 encode(num_shards
, bl
);
83 encode(instance_id
, bl
);
87 void decode(bufferlist::const_iterator
& bl
) {
90 decode(num_shards
, bl
);
92 decode(instance_id
, bl
);
97 void dump(Formatter
*f
) const {
99 switch ((SyncState
)state
) {
103 case StateBuildingFullSyncMaps
:
104 s
= "building-full-sync-maps";
113 encode_json("status", s
, f
);
114 encode_json("num_shards", num_shards
, f
);
115 encode_json("instance_id", instance_id
, f
);
117 void decode_json(JSONObj
*obj
) {
119 JSONDecoder::decode_json("status", s
, obj
);
120 if (s
== "building-full-sync-maps") {
121 state
= StateBuildingFullSyncMaps
;
122 } else if (s
== "sync") {
127 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
128 JSONDecoder::decode_json("instance_id", instance_id
, obj
);
130 static void generate_test_instances(std::list
<rgw_data_sync_info
*>& o
);
132 rgw_data_sync_info() : state((int)StateInit
), num_shards(0) {}
134 WRITE_CLASS_ENCODER(rgw_data_sync_info
)
136 struct rgw_data_sync_marker
{
143 string next_step_marker
;
144 uint64_t total_entries
;
148 rgw_data_sync_marker() : state(FullSync
), total_entries(0), pos(0) {}
150 void encode(bufferlist
& bl
) const {
151 ENCODE_START(1, 1, bl
);
154 encode(next_step_marker
, bl
);
155 encode(total_entries
, bl
);
157 encode(timestamp
, bl
);
161 void decode(bufferlist::const_iterator
& bl
) {
165 decode(next_step_marker
, bl
);
166 decode(total_entries
, bl
);
168 decode(timestamp
, bl
);
172 void dump(Formatter
*f
) const {
173 const char *s
{nullptr};
174 switch ((SyncState
)state
) {
178 case IncrementalSync
:
179 s
= "incremental-sync";
185 encode_json("status", s
, f
);
186 encode_json("marker", marker
, f
);
187 encode_json("next_step_marker", next_step_marker
, f
);
188 encode_json("total_entries", total_entries
, f
);
189 encode_json("pos", pos
, f
);
190 encode_json("timestamp", utime_t(timestamp
), f
);
192 void decode_json(JSONObj
*obj
) {
194 JSONDecoder::decode_json("status", s
, obj
);
195 if (s
== "full-sync") {
197 } else if (s
== "incremental-sync") {
198 state
= IncrementalSync
;
200 JSONDecoder::decode_json("marker", marker
, obj
);
201 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
202 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
203 JSONDecoder::decode_json("pos", pos
, obj
);
205 JSONDecoder::decode_json("timestamp", t
, obj
);
206 timestamp
= t
.to_real_time();
208 static void generate_test_instances(std::list
<rgw_data_sync_marker
*>& o
);
210 WRITE_CLASS_ENCODER(rgw_data_sync_marker
)
212 struct rgw_data_sync_status
{
213 rgw_data_sync_info sync_info
;
214 map
<uint32_t, rgw_data_sync_marker
> sync_markers
;
216 rgw_data_sync_status() {}
218 void encode(bufferlist
& bl
) const {
219 ENCODE_START(1, 1, bl
);
220 encode(sync_info
, bl
);
221 /* sync markers are encoded separately */
225 void decode(bufferlist::const_iterator
& bl
) {
227 decode(sync_info
, bl
);
228 /* sync markers are decoded separately */
232 void dump(Formatter
*f
) const {
233 encode_json("info", sync_info
, f
);
234 encode_json("markers", sync_markers
, f
);
236 void decode_json(JSONObj
*obj
) {
237 JSONDecoder::decode_json("info", sync_info
, obj
);
238 JSONDecoder::decode_json("markers", sync_markers
, obj
);
240 static void generate_test_instances(std::list
<rgw_data_sync_status
*>& o
);
242 WRITE_CLASS_ENCODER(rgw_data_sync_status
)
244 struct rgw_datalog_entry
{
246 ceph::real_time timestamp
;
248 void decode_json(JSONObj
*obj
);
251 struct rgw_datalog_shard_data
{
254 vector
<rgw_datalog_entry
> entries
;
256 void decode_json(JSONObj
*obj
);
259 class RGWAsyncRadosProcessor
;
260 class RGWDataSyncControlCR
;
262 struct rgw_bucket_entry_owner
{
266 rgw_bucket_entry_owner() {}
267 rgw_bucket_entry_owner(const string
& _id
, const string
& _display_name
) : id(_id
), display_name(_display_name
) {}
269 void decode_json(JSONObj
*obj
);
272 class RGWSyncErrorLogger
;
276 struct RGWDataSyncEnv
{
277 const DoutPrefixProvider
*dpp
{nullptr};
278 CephContext
*cct
{nullptr};
279 rgw::sal::RGWRadosStore
*store
{nullptr};
280 RGWServices
*svc
{nullptr};
281 RGWAsyncRadosProcessor
*async_rados
{nullptr};
282 RGWHTTPManager
*http_manager
{nullptr};
283 RGWSyncErrorLogger
*error_logger
{nullptr};
284 RGWSyncTraceManager
*sync_tracer
{nullptr};
285 RGWSyncModuleInstanceRef sync_module
{nullptr};
286 PerfCounters
* counters
{nullptr};
290 void init(const DoutPrefixProvider
*_dpp
, CephContext
*_cct
, rgw::sal::RGWRadosStore
*_store
, RGWServices
*_svc
,
291 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
292 RGWSyncErrorLogger
*_error_logger
, RGWSyncTraceManager
*_sync_tracer
,
293 RGWSyncModuleInstanceRef
& _sync_module
,
294 PerfCounters
* _counters
) {
299 async_rados
= _async_rados
;
300 http_manager
= _http_manager
;
301 error_logger
= _error_logger
;
302 sync_tracer
= _sync_tracer
;
303 sync_module
= _sync_module
;
304 counters
= _counters
;
307 string
shard_obj_name(int shard_id
);
311 struct RGWDataSyncCtx
{
312 CephContext
*cct
{nullptr};
313 RGWDataSyncEnv
*env
{nullptr};
315 RGWRESTConn
*conn
{nullptr};
316 rgw_zone_id source_zone
;
318 void init(RGWDataSyncEnv
*_env
,
320 const rgw_zone_id
& _source_zone
) {
324 source_zone
= _source_zone
;
329 class RGWDataChangesLogInfo
;
331 class RGWRemoteDataLog
: public RGWCoroutinesManager
{
332 const DoutPrefixProvider
*dpp
;
333 rgw::sal::RGWRadosStore
*store
;
335 RGWCoroutinesManagerRegistry
*cr_registry
;
336 RGWAsyncRadosProcessor
*async_rados
;
337 RGWHTTPManager http_manager
;
339 RGWDataSyncEnv sync_env
;
342 ceph::shared_mutex lock
= ceph::make_shared_mutex("RGWRemoteDataLog::lock");
343 RGWDataSyncControlCR
*data_sync_cr
;
345 RGWSyncTraceNodeRef tn
;
350 RGWRemoteDataLog(const DoutPrefixProvider
*dpp
,
351 rgw::sal::RGWRadosStore
*_store
,
352 RGWAsyncRadosProcessor
*async_rados
);
353 int init(const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
,
354 RGWSyncTraceManager
*_sync_tracer
, RGWSyncModuleInstanceRef
& module
,
355 PerfCounters
* _counters
);
358 int read_log_info(rgw_datalog_info
*log_info
);
359 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
);
360 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
);
361 int read_sync_status(rgw_data_sync_status
*sync_status
);
362 int read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
);
363 int read_shard_status(int shard_id
, set
<string
>& lagging_buckets
,set
<string
>& recovering_buckets
, rgw_data_sync_marker
* sync_marker
, const int max_entries
);
364 int init_sync_status(int num_shards
);
365 int run_sync(int num_shards
);
367 void wakeup(int shard_id
, set
<string
>& keys
);
370 class RGWDataSyncStatusManager
: public DoutPrefixProvider
{
371 rgw::sal::RGWRadosStore
*store
;
373 rgw_zone_id source_zone
;
375 RGWSyncErrorLogger
*error_logger
;
376 RGWSyncModuleInstanceRef sync_module
;
377 PerfCounters
* counters
;
379 RGWRemoteDataLog source_log
;
381 string source_status_oid
;
382 string source_shard_status_oid_prefix
;
384 map
<int, rgw_raw_obj
> shard_objs
;
389 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore
*_store
, RGWAsyncRadosProcessor
*async_rados
,
390 const rgw_zone_id
& _source_zone
, PerfCounters
* counters
)
391 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
392 sync_module(nullptr), counters(counters
),
393 source_log(this, store
, async_rados
), num_shards(0) {}
394 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore
*_store
, RGWAsyncRadosProcessor
*async_rados
,
395 const rgw_zone_id
& _source_zone
, PerfCounters
* counters
,
396 const RGWSyncModuleInstanceRef
& _sync_module
)
397 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
398 sync_module(_sync_module
), counters(counters
),
399 source_log(this, store
, async_rados
), num_shards(0) {}
400 ~RGWDataSyncStatusManager() {
406 static string
shard_obj_name(const rgw_zone_id
& source_zone
, int shard_id
);
407 static string
sync_status_oid(const rgw_zone_id
& source_zone
);
409 int read_sync_status(rgw_data_sync_status
*sync_status
) {
410 return source_log
.read_sync_status(sync_status
);
413 int read_recovering_shards(const int num_shards
, set
<int>& recovering_shards
) {
414 return source_log
.read_recovering_shards(num_shards
, recovering_shards
);
417 int read_shard_status(int shard_id
, set
<string
>& lagging_buckets
, set
<string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
) {
418 return source_log
.read_shard_status(shard_id
, lagging_buckets
, recovering_buckets
,sync_marker
, max_entries
);
420 int init_sync_status() { return source_log
.init_sync_status(num_shards
); }
422 int read_log_info(rgw_datalog_info
*log_info
) {
423 return source_log
.read_log_info(log_info
);
425 int read_source_log_shards_info(map
<int, RGWDataChangesLogInfo
> *shards_info
) {
426 return source_log
.read_source_log_shards_info(shards_info
);
428 int read_source_log_shards_next(map
<int, string
> shard_markers
, map
<int, rgw_datalog_shard_data
> *result
) {
429 return source_log
.read_source_log_shards_next(shard_markers
, result
);
432 int run() { return source_log
.run_sync(num_shards
); }
434 void wakeup(int shard_id
, set
<string
>& keys
) { return source_log
.wakeup(shard_id
, keys
); }
439 // implements DoutPrefixProvider
440 CephContext
*get_cct() const override
;
441 unsigned get_subsys() const override
;
442 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
445 class RGWBucketPipeSyncStatusManager
;
446 class RGWBucketSyncCR
;
448 struct rgw_bucket_shard_full_sync_marker
{
449 rgw_obj_key position
;
452 rgw_bucket_shard_full_sync_marker() : count(0) {}
454 void encode_attr(map
<string
, bufferlist
>& attrs
);
456 void encode(bufferlist
& bl
) const {
457 ENCODE_START(1, 1, bl
);
458 encode(position
, bl
);
463 void decode(bufferlist::const_iterator
& bl
) {
465 decode(position
, bl
);
470 void dump(Formatter
*f
) const;
471 void decode_json(JSONObj
*obj
);
473 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker
)
475 struct rgw_bucket_shard_inc_sync_marker
{
478 rgw_bucket_shard_inc_sync_marker() {}
480 void encode_attr(map
<string
, bufferlist
>& attrs
);
482 void encode(bufferlist
& bl
) const {
483 ENCODE_START(1, 1, bl
);
484 encode(position
, bl
);
488 void decode(bufferlist::const_iterator
& bl
) {
490 decode(position
, bl
);
494 void dump(Formatter
*f
) const;
495 void decode_json(JSONObj
*obj
);
497 bool operator<(const rgw_bucket_shard_inc_sync_marker
& m
) const {
498 return (position
< m
.position
);
501 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker
)
503 struct rgw_bucket_shard_sync_info
{
507 StateIncrementalSync
= 2,
512 rgw_bucket_shard_full_sync_marker full_marker
;
513 rgw_bucket_shard_inc_sync_marker inc_marker
;
515 void decode_from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
);
516 void encode_all_attrs(map
<string
, bufferlist
>& attrs
);
517 void encode_state_attr(map
<string
, bufferlist
>& attrs
);
519 void encode(bufferlist
& bl
) const {
520 ENCODE_START(1, 1, bl
);
522 encode(full_marker
, bl
);
523 encode(inc_marker
, bl
);
527 void decode(bufferlist::const_iterator
& bl
) {
530 decode(full_marker
, bl
);
531 decode(inc_marker
, bl
);
535 void dump(Formatter
*f
) const;
536 void decode_json(JSONObj
*obj
);
538 rgw_bucket_shard_sync_info() : state((int)StateInit
) {}
541 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info
)
543 struct rgw_bucket_index_marker_info
{
547 bool syncstopped
{false};
549 void decode_json(JSONObj
*obj
) {
550 JSONDecoder::decode_json("bucket_ver", bucket_ver
, obj
);
551 JSONDecoder::decode_json("master_ver", master_ver
, obj
);
552 JSONDecoder::decode_json("max_marker", max_marker
, obj
);
553 JSONDecoder::decode_json("syncstopped", syncstopped
, obj
);
558 class RGWRemoteBucketManager
{
559 const DoutPrefixProvider
*dpp
;
561 RGWDataSyncEnv
*sync_env
;
563 RGWRESTConn
*conn
{nullptr};
564 rgw_zone_id source_zone
;
566 vector
<rgw_bucket_sync_pair_info
> sync_pairs
;
569 rgw_bucket_shard_sync_info init_status
;
571 RGWBucketSyncCR
*sync_cr
{nullptr};
574 RGWRemoteBucketManager(const DoutPrefixProvider
*_dpp
,
575 RGWDataSyncEnv
*_sync_env
,
576 const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
,
577 const RGWBucketInfo
& source_bucket_info
,
578 const rgw_bucket
& dest_bucket
);
580 void init(const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
,
581 const rgw_bucket
& source_bucket
, int shard_id
,
582 const rgw_bucket
& dest_bucket
);
584 RGWCoroutine
*read_sync_status_cr(int num
, rgw_bucket_shard_sync_info
*sync_status
);
585 RGWCoroutine
*init_sync_status_cr(int num
);
586 RGWCoroutine
*run_sync_cr(int num
);
589 return sync_pairs
.size();
595 class RGWBucketPipeSyncStatusManager
: public DoutPrefixProvider
{
596 rgw::sal::RGWRadosStore
*store
;
598 RGWDataSyncEnv sync_env
;
600 RGWCoroutinesManager cr_mgr
;
602 RGWHTTPManager http_manager
;
604 std::optional
<rgw_zone_id
> source_zone
;
605 std::optional
<rgw_bucket
> source_bucket
;
608 RGWSyncErrorLogger
*error_logger
;
609 RGWSyncModuleInstanceRef sync_module
;
611 rgw_bucket dest_bucket
;
613 vector
<RGWRemoteBucketManager
*> source_mgrs
;
615 string source_status_oid
;
616 string source_shard_status_oid_prefix
;
618 map
<int, rgw_bucket_shard_sync_info
> sync_status
;
619 rgw_raw_obj status_obj
;
624 RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore
*_store
,
625 std::optional
<rgw_zone_id
> _source_zone
,
626 std::optional
<rgw_bucket
> _source_bucket
,
627 const rgw_bucket
& dest_bucket
);
628 ~RGWBucketPipeSyncStatusManager();
632 map
<int, rgw_bucket_shard_sync_info
>& get_sync_status() { return sync_status
; }
633 int init_sync_status();
635 static string
status_oid(const rgw_zone_id
& source_zone
, const rgw_bucket_sync_pair_info
& bs
);
636 static string
obj_status_oid(const rgw_bucket_sync_pipe
& sync_pipe
,
637 const rgw_zone_id
& source_zone
, const rgw_obj
& obj
); /* specific source obj sync status,
638 can be used by sync modules */
640 // implements DoutPrefixProvider
641 CephContext
*get_cct() const override
;
642 unsigned get_subsys() const override
;
643 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
645 int read_sync_status();
649 /// read the sync status of all bucket shards from the given source zone
650 int rgw_bucket_sync_status(const DoutPrefixProvider
*dpp
,
651 rgw::sal::RGWRadosStore
*store
,
652 const rgw_sync_bucket_pipe
& pipe
,
653 const RGWBucketInfo
& dest_bucket_info
,
654 const RGWBucketInfo
*psource_bucket_info
,
655 std::vector
<rgw_bucket_shard_sync_info
> *status
);
657 class RGWDefaultSyncModule
: public RGWSyncModule
{
659 RGWDefaultSyncModule() {}
660 bool supports_writes() override
{ return true; }
661 bool supports_data_export() override
{ return true; }
662 int create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) override
;
665 class RGWArchiveSyncModule
: public RGWDefaultSyncModule
{
667 RGWArchiveSyncModule() {}
668 bool supports_writes() override
{ return true; }
669 bool supports_data_export() override
{ return false; }
670 int create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) override
;