1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_DATA_SYNC_H
5 #define CEPH_RGW_DATA_SYNC_H
7 #include "include/encoding.h"
9 #include "common/ceph_json.h"
11 #include "rgw_coroutine.h"
12 #include "rgw_http_client.h"
13 #include "rgw_sal_rados.h"
15 #include "rgw_datalog.h"
16 #include "rgw_sync_module.h"
17 #include "rgw_sync_trace.h"
18 #include "rgw_sync_policy.h"
20 #include "rgw_bucket_sync.h"
22 // represents an obligation to sync an entry up a given time
23 struct rgw_data_sync_obligation
{
26 ceph::real_time timestamp
;
30 inline std::ostream
& operator<<(std::ostream
& out
, const rgw_data_sync_obligation
& o
) {
31 out
<< "key=" << o
.key
;
32 if (!o
.marker
.empty()) {
33 out
<< " marker=" << o
.marker
;
35 if (o
.timestamp
!= ceph::real_time
{}) {
36 out
<< " timestamp=" << o
.timestamp
;
45 struct rgw_sync_bucket_pipe
;
47 struct rgw_bucket_sync_pair_info
{
48 RGWBucketSyncFlowManager::pipe_handler handler
; /* responsible for sync filters */
49 rgw_bucket_shard source_bs
;
50 rgw_bucket_shard dest_bs
;
53 inline std::ostream
& operator<<(std::ostream
& out
, const rgw_bucket_sync_pair_info
& p
) {
54 if (p
.source_bs
.bucket
== p
.dest_bs
.bucket
) {
55 return out
<< p
.source_bs
;
60 out
<< "->" << p
.dest_bs
.bucket
;
65 struct rgw_bucket_sync_pipe
{
66 rgw_bucket_sync_pair_info info
;
67 RGWBucketInfo source_bucket_info
;
68 std::map
<std::string
, bufferlist
> source_bucket_attrs
;
69 RGWBucketInfo dest_bucket_info
;
70 std::map
<std::string
, bufferlist
> dest_bucket_attrs
;
72 RGWBucketSyncFlowManager::pipe_rules_ref
& get_rules() {
73 return info
.handler
.rules
;
77 inline std::ostream
& operator<<(std::ostream
& out
, const rgw_bucket_sync_pipe
& p
) {
81 struct rgw_datalog_info
{
84 rgw_datalog_info() : num_shards(0) {}
86 void decode_json(JSONObj
*obj
);
89 struct rgw_data_sync_info
{
92 StateBuildingFullSyncMaps
= 1,
99 uint64_t instance_id
{0};
101 void encode(bufferlist
& bl
) const {
102 ENCODE_START(2, 1, bl
);
104 encode(num_shards
, bl
);
105 encode(instance_id
, bl
);
109 void decode(bufferlist::const_iterator
& bl
) {
112 decode(num_shards
, bl
);
114 decode(instance_id
, bl
);
119 void dump(Formatter
*f
) const {
121 switch ((SyncState
)state
) {
125 case StateBuildingFullSyncMaps
:
126 s
= "building-full-sync-maps";
135 encode_json("status", s
, f
);
136 encode_json("num_shards", num_shards
, f
);
137 encode_json("instance_id", instance_id
, f
);
139 void decode_json(JSONObj
*obj
) {
141 JSONDecoder::decode_json("status", s
, obj
);
142 if (s
== "building-full-sync-maps") {
143 state
= StateBuildingFullSyncMaps
;
144 } else if (s
== "sync") {
149 JSONDecoder::decode_json("num_shards", num_shards
, obj
);
150 JSONDecoder::decode_json("instance_id", instance_id
, obj
);
152 static void generate_test_instances(std::list
<rgw_data_sync_info
*>& o
);
154 rgw_data_sync_info() : state((int)StateInit
), num_shards(0) {}
156 WRITE_CLASS_ENCODER(rgw_data_sync_info
)
158 struct rgw_data_sync_marker
{
165 std::string next_step_marker
;
166 uint64_t total_entries
;
170 rgw_data_sync_marker() : state(FullSync
), total_entries(0), pos(0) {}
172 void encode(bufferlist
& bl
) const {
173 ENCODE_START(1, 1, bl
);
176 encode(next_step_marker
, bl
);
177 encode(total_entries
, bl
);
179 encode(timestamp
, bl
);
183 void decode(bufferlist::const_iterator
& bl
) {
187 decode(next_step_marker
, bl
);
188 decode(total_entries
, bl
);
190 decode(timestamp
, bl
);
194 void dump(Formatter
*f
) const {
195 const char *s
{nullptr};
196 switch ((SyncState
)state
) {
200 case IncrementalSync
:
201 s
= "incremental-sync";
207 encode_json("status", s
, f
);
208 encode_json("marker", marker
, f
);
209 encode_json("next_step_marker", next_step_marker
, f
);
210 encode_json("total_entries", total_entries
, f
);
211 encode_json("pos", pos
, f
);
212 encode_json("timestamp", utime_t(timestamp
), f
);
214 void decode_json(JSONObj
*obj
) {
216 JSONDecoder::decode_json("status", s
, obj
);
217 if (s
== "full-sync") {
219 } else if (s
== "incremental-sync") {
220 state
= IncrementalSync
;
222 JSONDecoder::decode_json("marker", marker
, obj
);
223 JSONDecoder::decode_json("next_step_marker", next_step_marker
, obj
);
224 JSONDecoder::decode_json("total_entries", total_entries
, obj
);
225 JSONDecoder::decode_json("pos", pos
, obj
);
227 JSONDecoder::decode_json("timestamp", t
, obj
);
228 timestamp
= t
.to_real_time();
230 static void generate_test_instances(std::list
<rgw_data_sync_marker
*>& o
);
232 WRITE_CLASS_ENCODER(rgw_data_sync_marker
)
234 struct rgw_data_sync_status
{
235 rgw_data_sync_info sync_info
;
236 std::map
<uint32_t, rgw_data_sync_marker
> sync_markers
;
238 rgw_data_sync_status() {}
240 void encode(bufferlist
& bl
) const {
241 ENCODE_START(1, 1, bl
);
242 encode(sync_info
, bl
);
243 /* sync markers are encoded separately */
247 void decode(bufferlist::const_iterator
& bl
) {
249 decode(sync_info
, bl
);
250 /* sync markers are decoded separately */
254 void dump(Formatter
*f
) const {
255 encode_json("info", sync_info
, f
);
256 encode_json("markers", sync_markers
, f
);
258 void decode_json(JSONObj
*obj
) {
259 JSONDecoder::decode_json("info", sync_info
, obj
);
260 JSONDecoder::decode_json("markers", sync_markers
, obj
);
262 static void generate_test_instances(std::list
<rgw_data_sync_status
*>& o
);
264 WRITE_CLASS_ENCODER(rgw_data_sync_status
)
266 struct rgw_datalog_entry
{
268 ceph::real_time timestamp
;
270 void decode_json(JSONObj
*obj
);
273 struct rgw_datalog_shard_data
{
276 std::vector
<rgw_datalog_entry
> entries
;
278 void decode_json(JSONObj
*obj
);
281 class RGWAsyncRadosProcessor
;
282 class RGWDataSyncControlCR
;
284 struct rgw_bucket_entry_owner
{
286 std::string display_name
;
288 rgw_bucket_entry_owner() {}
289 rgw_bucket_entry_owner(const std::string
& _id
, const std::string
& _display_name
) : id(_id
), display_name(_display_name
) {}
291 void decode_json(JSONObj
*obj
);
294 class RGWSyncErrorLogger
;
298 struct RGWDataSyncEnv
{
299 const DoutPrefixProvider
*dpp
{nullptr};
300 CephContext
*cct
{nullptr};
301 rgw::sal::RadosStore
* store
{nullptr};
302 RGWServices
*svc
{nullptr};
303 RGWAsyncRadosProcessor
*async_rados
{nullptr};
304 RGWHTTPManager
*http_manager
{nullptr};
305 RGWSyncErrorLogger
*error_logger
{nullptr};
306 RGWSyncTraceManager
*sync_tracer
{nullptr};
307 RGWSyncModuleInstanceRef sync_module
{nullptr};
308 PerfCounters
* counters
{nullptr};
312 void init(const DoutPrefixProvider
*_dpp
, CephContext
*_cct
, rgw::sal::RadosStore
* _store
, RGWServices
*_svc
,
313 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
314 RGWSyncErrorLogger
*_error_logger
, RGWSyncTraceManager
*_sync_tracer
,
315 RGWSyncModuleInstanceRef
& _sync_module
,
316 PerfCounters
* _counters
) {
321 async_rados
= _async_rados
;
322 http_manager
= _http_manager
;
323 error_logger
= _error_logger
;
324 sync_tracer
= _sync_tracer
;
325 sync_module
= _sync_module
;
326 counters
= _counters
;
329 std::string
shard_obj_name(int shard_id
);
330 std::string
status_oid();
333 struct RGWDataSyncCtx
{
334 CephContext
*cct
{nullptr};
335 RGWDataSyncEnv
*env
{nullptr};
337 RGWRESTConn
*conn
{nullptr};
338 rgw_zone_id source_zone
;
340 void init(RGWDataSyncEnv
*_env
,
342 const rgw_zone_id
& _source_zone
) {
346 source_zone
= _source_zone
;
352 class RGWRemoteDataLog
: public RGWCoroutinesManager
{
353 const DoutPrefixProvider
*dpp
;
354 rgw::sal::RadosStore
* store
;
356 RGWCoroutinesManagerRegistry
*cr_registry
;
357 RGWAsyncRadosProcessor
*async_rados
;
358 RGWHTTPManager http_manager
;
360 RGWDataSyncEnv sync_env
;
363 ceph::shared_mutex lock
= ceph::make_shared_mutex("RGWRemoteDataLog::lock");
364 RGWDataSyncControlCR
*data_sync_cr
;
366 RGWSyncTraceNodeRef tn
;
371 RGWRemoteDataLog(const DoutPrefixProvider
*dpp
,
372 rgw::sal::RadosStore
* _store
,
373 RGWAsyncRadosProcessor
*async_rados
);
374 int init(const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
, RGWSyncErrorLogger
*_error_logger
,
375 RGWSyncTraceManager
*_sync_tracer
, RGWSyncModuleInstanceRef
& module
,
376 PerfCounters
* _counters
);
379 int read_log_info(const DoutPrefixProvider
*dpp
, rgw_datalog_info
*log_info
);
380 int read_source_log_shards_info(const DoutPrefixProvider
*dpp
, std::map
<int, RGWDataChangesLogInfo
> *shards_info
);
381 int read_source_log_shards_next(const DoutPrefixProvider
*dpp
, std::map
<int, std::string
> shard_markers
, std::map
<int, rgw_datalog_shard_data
> *result
);
382 int read_sync_status(const DoutPrefixProvider
*dpp
, rgw_data_sync_status
*sync_status
);
383 int read_recovering_shards(const DoutPrefixProvider
*dpp
, const int num_shards
, std::set
<int>& recovering_shards
);
384 int read_shard_status(const DoutPrefixProvider
*dpp
, int shard_id
, std::set
<std::string
>& lagging_buckets
,std::set
<std::string
>& recovering_buckets
, rgw_data_sync_marker
* sync_marker
, const int max_entries
);
385 int init_sync_status(const DoutPrefixProvider
*dpp
, int num_shards
);
386 int run_sync(const DoutPrefixProvider
*dpp
, int num_shards
);
388 void wakeup(int shard_id
, std::set
<std::string
>& keys
);
391 class RGWDataSyncStatusManager
: public DoutPrefixProvider
{
392 rgw::sal::RadosStore
* store
;
394 rgw_zone_id source_zone
;
396 RGWSyncErrorLogger
*error_logger
;
397 RGWSyncModuleInstanceRef sync_module
;
398 PerfCounters
* counters
;
400 RGWRemoteDataLog source_log
;
402 std::string source_status_oid
;
403 std::string source_shard_status_oid_prefix
;
405 std::map
<int, rgw_raw_obj
> shard_objs
;
410 RGWDataSyncStatusManager(rgw::sal::RadosStore
* _store
, RGWAsyncRadosProcessor
*async_rados
,
411 const rgw_zone_id
& _source_zone
, PerfCounters
* counters
)
412 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
413 sync_module(nullptr), counters(counters
),
414 source_log(this, store
, async_rados
), num_shards(0) {}
415 RGWDataSyncStatusManager(rgw::sal::RadosStore
* _store
, RGWAsyncRadosProcessor
*async_rados
,
416 const rgw_zone_id
& _source_zone
, PerfCounters
* counters
,
417 const RGWSyncModuleInstanceRef
& _sync_module
)
418 : store(_store
), source_zone(_source_zone
), conn(NULL
), error_logger(NULL
),
419 sync_module(_sync_module
), counters(counters
),
420 source_log(this, store
, async_rados
), num_shards(0) {}
421 ~RGWDataSyncStatusManager() {
424 int init(const DoutPrefixProvider
*dpp
);
427 static std::string
shard_obj_name(const rgw_zone_id
& source_zone
, int shard_id
);
428 static std::string
sync_status_oid(const rgw_zone_id
& source_zone
);
430 int read_sync_status(const DoutPrefixProvider
*dpp
, rgw_data_sync_status
*sync_status
) {
431 return source_log
.read_sync_status(dpp
, sync_status
);
434 int read_recovering_shards(const DoutPrefixProvider
*dpp
, const int num_shards
, std::set
<int>& recovering_shards
) {
435 return source_log
.read_recovering_shards(dpp
, num_shards
, recovering_shards
);
438 int read_shard_status(const DoutPrefixProvider
*dpp
, int shard_id
, std::set
<std::string
>& lagging_buckets
, std::set
<std::string
>& recovering_buckets
, rgw_data_sync_marker
*sync_marker
, const int max_entries
) {
439 return source_log
.read_shard_status(dpp
, shard_id
, lagging_buckets
, recovering_buckets
,sync_marker
, max_entries
);
441 int init_sync_status(const DoutPrefixProvider
*dpp
) { return source_log
.init_sync_status(dpp
, num_shards
); }
443 int read_log_info(const DoutPrefixProvider
*dpp
, rgw_datalog_info
*log_info
) {
444 return source_log
.read_log_info(dpp
, log_info
);
446 int read_source_log_shards_info(const DoutPrefixProvider
*dpp
, std::map
<int, RGWDataChangesLogInfo
> *shards_info
) {
447 return source_log
.read_source_log_shards_info(dpp
, shards_info
);
449 int read_source_log_shards_next(const DoutPrefixProvider
*dpp
, std::map
<int, std::string
> shard_markers
, std::map
<int, rgw_datalog_shard_data
> *result
) {
450 return source_log
.read_source_log_shards_next(dpp
, shard_markers
, result
);
453 int run(const DoutPrefixProvider
*dpp
) { return source_log
.run_sync(dpp
, num_shards
); }
455 void wakeup(int shard_id
, std::set
<std::string
>& keys
) { return source_log
.wakeup(shard_id
, keys
); }
460 // implements DoutPrefixProvider
461 CephContext
*get_cct() const override
;
462 unsigned get_subsys() const override
;
463 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
466 class RGWBucketPipeSyncStatusManager
;
467 class RGWBucketSyncCR
;
469 struct rgw_bucket_shard_full_sync_marker
{
470 rgw_obj_key position
;
473 rgw_bucket_shard_full_sync_marker() : count(0) {}
475 void encode_attr(std::map
<std::string
, bufferlist
>& attrs
);
477 void encode(bufferlist
& bl
) const {
478 ENCODE_START(1, 1, bl
);
479 encode(position
, bl
);
484 void decode(bufferlist::const_iterator
& bl
) {
486 decode(position
, bl
);
491 void dump(Formatter
*f
) const;
492 void decode_json(JSONObj
*obj
);
494 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker
)
496 struct rgw_bucket_shard_inc_sync_marker
{
497 std::string position
;
498 ceph::real_time timestamp
;
500 void encode_attr(std::map
<std::string
, bufferlist
>& attrs
);
502 void encode(bufferlist
& bl
) const {
503 ENCODE_START(2, 1, bl
);
504 encode(position
, bl
);
505 encode(timestamp
, bl
);
509 void decode(bufferlist::const_iterator
& bl
) {
511 decode(position
, bl
);
513 decode(timestamp
, bl
);
518 void dump(Formatter
*f
) const;
519 void decode_json(JSONObj
*obj
);
521 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker
)
523 struct rgw_bucket_shard_sync_info
{
527 StateIncrementalSync
= 2,
532 rgw_bucket_shard_full_sync_marker full_marker
;
533 rgw_bucket_shard_inc_sync_marker inc_marker
;
535 void decode_from_attrs(CephContext
*cct
, std::map
<std::string
, bufferlist
>& attrs
);
536 void encode_all_attrs(std::map
<std::string
, bufferlist
>& attrs
);
537 void encode_state_attr(std::map
<std::string
, bufferlist
>& attrs
);
539 void encode(bufferlist
& bl
) const {
540 ENCODE_START(1, 1, bl
);
542 encode(full_marker
, bl
);
543 encode(inc_marker
, bl
);
547 void decode(bufferlist::const_iterator
& bl
) {
550 decode(full_marker
, bl
);
551 decode(inc_marker
, bl
);
555 void dump(Formatter
*f
) const;
556 void decode_json(JSONObj
*obj
);
558 rgw_bucket_shard_sync_info() : state((int)StateInit
) {}
561 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info
)
563 struct rgw_bucket_index_marker_info
{
564 std::string bucket_ver
;
565 std::string master_ver
;
566 std::string max_marker
;
567 bool syncstopped
{false};
569 void decode_json(JSONObj
*obj
) {
570 JSONDecoder::decode_json("bucket_ver", bucket_ver
, obj
);
571 JSONDecoder::decode_json("master_ver", master_ver
, obj
);
572 JSONDecoder::decode_json("max_marker", max_marker
, obj
);
573 JSONDecoder::decode_json("syncstopped", syncstopped
, obj
);
578 class RGWRemoteBucketManager
{
579 const DoutPrefixProvider
*dpp
;
581 RGWDataSyncEnv
*sync_env
;
583 RGWRESTConn
*conn
{nullptr};
584 rgw_zone_id source_zone
;
586 std::vector
<rgw_bucket_sync_pair_info
> sync_pairs
;
589 rgw_bucket_shard_sync_info init_status
;
591 RGWBucketSyncCR
*sync_cr
{nullptr};
594 RGWRemoteBucketManager(const DoutPrefixProvider
*_dpp
,
595 RGWDataSyncEnv
*_sync_env
,
596 const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
,
597 const RGWBucketInfo
& source_bucket_info
,
598 const rgw_bucket
& dest_bucket
);
600 void init(const rgw_zone_id
& _source_zone
, RGWRESTConn
*_conn
,
601 const rgw_bucket
& source_bucket
, int shard_id
,
602 const rgw_bucket
& dest_bucket
);
604 RGWCoroutine
*read_sync_status_cr(int num
, rgw_bucket_shard_sync_info
*sync_status
);
605 RGWCoroutine
*init_sync_status_cr(int num
, RGWObjVersionTracker
& objv_tracker
);
606 RGWCoroutine
*run_sync_cr(int num
);
609 return sync_pairs
.size();
615 class BucketIndexShardsManager
;
617 int rgw_read_remote_bilog_info(const DoutPrefixProvider
*dpp
,
619 const rgw_bucket
& bucket
,
620 BucketIndexShardsManager
& markers
,
623 class RGWBucketPipeSyncStatusManager
: public DoutPrefixProvider
{
624 rgw::sal::RadosStore
* store
;
626 RGWDataSyncEnv sync_env
;
628 RGWCoroutinesManager cr_mgr
;
630 RGWHTTPManager http_manager
;
632 std::optional
<rgw_zone_id
> source_zone
;
633 std::optional
<rgw_bucket
> source_bucket
;
636 RGWSyncErrorLogger
*error_logger
;
637 RGWSyncModuleInstanceRef sync_module
;
639 rgw_bucket dest_bucket
;
641 std::vector
<RGWRemoteBucketManager
*> source_mgrs
;
643 std::string source_status_oid
;
644 std::string source_shard_status_oid_prefix
;
646 std::map
<int, rgw_bucket_shard_sync_info
> sync_status
;
647 rgw_raw_obj status_obj
;
652 RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore
* _store
,
653 std::optional
<rgw_zone_id
> _source_zone
,
654 std::optional
<rgw_bucket
> _source_bucket
,
655 const rgw_bucket
& dest_bucket
);
656 ~RGWBucketPipeSyncStatusManager();
658 int init(const DoutPrefixProvider
*dpp
);
660 std::map
<int, rgw_bucket_shard_sync_info
>& get_sync_status() { return sync_status
; }
661 int init_sync_status(const DoutPrefixProvider
*dpp
);
663 static std::string
status_oid(const rgw_zone_id
& source_zone
, const rgw_bucket_sync_pair_info
& bs
);
664 static std::string
obj_status_oid(const rgw_bucket_sync_pipe
& sync_pipe
,
665 const rgw_zone_id
& source_zone
, const rgw::sal::Object
* obj
); /* specific source obj sync status,
666 can be used by sync modules */
668 // implements DoutPrefixProvider
669 CephContext
*get_cct() const override
;
670 unsigned get_subsys() const override
;
671 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
673 int read_sync_status(const DoutPrefixProvider
*dpp
);
674 int run(const DoutPrefixProvider
*dpp
);
677 /// read the sync status of all bucket shards from the given source zone
678 int rgw_bucket_sync_status(const DoutPrefixProvider
*dpp
,
679 rgw::sal::RadosStore
* store
,
680 const rgw_sync_bucket_pipe
& pipe
,
681 const RGWBucketInfo
& dest_bucket_info
,
682 const RGWBucketInfo
*psource_bucket_info
,
683 std::vector
<rgw_bucket_shard_sync_info
> *status
);
685 class RGWDefaultSyncModule
: public RGWSyncModule
{
687 RGWDefaultSyncModule() {}
688 bool supports_writes() override
{ return true; }
689 bool supports_data_export() override
{ return true; }
690 int create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) override
;
693 class RGWArchiveSyncModule
: public RGWDefaultSyncModule
{
695 RGWArchiveSyncModule() {}
696 bool supports_writes() override
{ return true; }
697 bool supports_data_export() override
{ return false; }
698 int create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) override
;