]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.h
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
7c673cae
FG
4#ifndef CEPH_RGW_DATA_SYNC_H
5#define CEPH_RGW_DATA_SYNC_H
6
11fdf7f2
TL
7#include "include/encoding.h"
8
9#include "common/RWLock.h"
10#include "common/ceph_json.h"
11
7c673cae
FG
12#include "rgw_coroutine.h"
13#include "rgw_http_client.h"
9f95a23c 14#include "rgw_sal.h"
7c673cae
FG
15
16#include "rgw_sync_module.h"
11fdf7f2 17#include "rgw_sync_trace.h"
9f95a23c
TL
18#include "rgw_sync_policy.h"
19
20#include "rgw_bucket_sync.h"
21
22class JSONObj;
23struct rgw_sync_bucket_pipe;
24
25struct rgw_bucket_sync_pair_info {
26 RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */
27 rgw_bucket_shard source_bs;
28 rgw_bucket_shard dest_bs;
29};
30
31inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
32 if (p.source_bs.bucket == p.dest_bs.bucket) {
33 return out << p.source_bs;
34 }
35
36 out << p.source_bs;
37
38 out << "->" << p.dest_bs.bucket;
39
40 return out;
41}
42
43struct rgw_bucket_sync_pipe {
44 rgw_bucket_sync_pair_info info;
45 RGWBucketInfo source_bucket_info;
46 map<string, bufferlist> source_bucket_attrs;
47 RGWBucketInfo dest_bucket_info;
48 map<string, bufferlist> dest_bucket_attrs;
49
50 RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() {
51 return info.handler.rules;
52 }
53};
54
55inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
56 return out << p.info;
57}
7c673cae 58
7c673cae
FG
59struct rgw_datalog_info {
60 uint32_t num_shards;
61
62 rgw_datalog_info() : num_shards(0) {}
63
64 void decode_json(JSONObj *obj);
65};
66
67struct rgw_data_sync_info {
68 enum SyncState {
69 StateInit = 0,
70 StateBuildingFullSyncMaps = 1,
71 StateSync = 2,
72 };
73
74 uint16_t state;
75 uint32_t num_shards;
76
31f18b77
FG
77 uint64_t instance_id{0};
78
7c673cae 79 void encode(bufferlist& bl) const {
31f18b77 80 ENCODE_START(2, 1, bl);
11fdf7f2
TL
81 encode(state, bl);
82 encode(num_shards, bl);
83 encode(instance_id, bl);
7c673cae
FG
84 ENCODE_FINISH(bl);
85 }
86
11fdf7f2 87 void decode(bufferlist::const_iterator& bl) {
31f18b77 88 DECODE_START(2, bl);
11fdf7f2
TL
89 decode(state, bl);
90 decode(num_shards, bl);
31f18b77 91 if (struct_v >= 2) {
11fdf7f2 92 decode(instance_id, bl);
31f18b77 93 }
7c673cae
FG
94 DECODE_FINISH(bl);
95 }
96
97 void dump(Formatter *f) const {
98 string s;
99 switch ((SyncState)state) {
100 case StateInit:
101 s = "init";
102 break;
103 case StateBuildingFullSyncMaps:
104 s = "building-full-sync-maps";
105 break;
106 case StateSync:
107 s = "sync";
108 break;
109 default:
110 s = "unknown";
111 break;
112 }
113 encode_json("status", s, f);
114 encode_json("num_shards", num_shards, f);
31f18b77 115 encode_json("instance_id", instance_id, f);
7c673cae
FG
116 }
117 void decode_json(JSONObj *obj) {
118 std::string s;
119 JSONDecoder::decode_json("status", s, obj);
120 if (s == "building-full-sync-maps") {
121 state = StateBuildingFullSyncMaps;
122 } else if (s == "sync") {
123 state = StateSync;
124 } else {
125 state = StateInit;
126 }
127 JSONDecoder::decode_json("num_shards", num_shards, obj);
a8e16298 128 JSONDecoder::decode_json("instance_id", instance_id, obj);
7c673cae 129 }
b32b8144 130 static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
7c673cae
FG
131
132 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
133};
134WRITE_CLASS_ENCODER(rgw_data_sync_info)
135
136struct rgw_data_sync_marker {
137 enum SyncState {
138 FullSync = 0,
139 IncrementalSync = 1,
140 };
141 uint16_t state;
142 string marker;
143 string next_step_marker;
144 uint64_t total_entries;
145 uint64_t pos;
146 real_time timestamp;
147
148 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
149
150 void encode(bufferlist& bl) const {
151 ENCODE_START(1, 1, bl);
11fdf7f2
TL
152 encode(state, bl);
153 encode(marker, bl);
154 encode(next_step_marker, bl);
155 encode(total_entries, bl);
156 encode(pos, bl);
157 encode(timestamp, bl);
7c673cae
FG
158 ENCODE_FINISH(bl);
159 }
160
11fdf7f2 161 void decode(bufferlist::const_iterator& bl) {
7c673cae 162 DECODE_START(1, bl);
11fdf7f2
TL
163 decode(state, bl);
164 decode(marker, bl);
165 decode(next_step_marker, bl);
166 decode(total_entries, bl);
167 decode(pos, bl);
168 decode(timestamp, bl);
7c673cae
FG
169 DECODE_FINISH(bl);
170 }
171
172 void dump(Formatter *f) const {
28e407b8
AA
173 const char *s{nullptr};
174 switch ((SyncState)state) {
175 case FullSync:
176 s = "full-sync";
177 break;
178 case IncrementalSync:
179 s = "incremental-sync";
180 break;
181 default:
182 s = "unknown";
183 break;
184 }
185 encode_json("status", s, f);
7c673cae
FG
186 encode_json("marker", marker, f);
187 encode_json("next_step_marker", next_step_marker, f);
188 encode_json("total_entries", total_entries, f);
189 encode_json("pos", pos, f);
190 encode_json("timestamp", utime_t(timestamp), f);
191 }
192 void decode_json(JSONObj *obj) {
28e407b8
AA
193 std::string s;
194 JSONDecoder::decode_json("status", s, obj);
195 if (s == "full-sync") {
196 state = FullSync;
197 } else if (s == "incremental-sync") {
198 state = IncrementalSync;
199 }
7c673cae
FG
200 JSONDecoder::decode_json("marker", marker, obj);
201 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
202 JSONDecoder::decode_json("total_entries", total_entries, obj);
203 JSONDecoder::decode_json("pos", pos, obj);
204 utime_t t;
205 JSONDecoder::decode_json("timestamp", t, obj);
206 timestamp = t.to_real_time();
207 }
b32b8144 208 static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
7c673cae
FG
209};
210WRITE_CLASS_ENCODER(rgw_data_sync_marker)
211
212struct rgw_data_sync_status {
213 rgw_data_sync_info sync_info;
214 map<uint32_t, rgw_data_sync_marker> sync_markers;
215
216 rgw_data_sync_status() {}
217
218 void encode(bufferlist& bl) const {
219 ENCODE_START(1, 1, bl);
11fdf7f2 220 encode(sync_info, bl);
7c673cae
FG
221 /* sync markers are encoded separately */
222 ENCODE_FINISH(bl);
223 }
224
11fdf7f2 225 void decode(bufferlist::const_iterator& bl) {
7c673cae 226 DECODE_START(1, bl);
11fdf7f2 227 decode(sync_info, bl);
7c673cae
FG
228 /* sync markers are decoded separately */
229 DECODE_FINISH(bl);
230 }
231
232 void dump(Formatter *f) const {
233 encode_json("info", sync_info, f);
234 encode_json("markers", sync_markers, f);
235 }
236 void decode_json(JSONObj *obj) {
237 JSONDecoder::decode_json("info", sync_info, obj);
238 JSONDecoder::decode_json("markers", sync_markers, obj);
239 }
b32b8144 240 static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
7c673cae
FG
241};
242WRITE_CLASS_ENCODER(rgw_data_sync_status)
243
244struct rgw_datalog_entry {
245 string key;
246 ceph::real_time timestamp;
247
248 void decode_json(JSONObj *obj);
249};
250
251struct rgw_datalog_shard_data {
252 string marker;
253 bool truncated;
254 vector<rgw_datalog_entry> entries;
255
256 void decode_json(JSONObj *obj);
257};
258
259class RGWAsyncRadosProcessor;
260class RGWDataSyncControlCR;
261
262struct rgw_bucket_entry_owner {
263 string id;
264 string display_name;
265
266 rgw_bucket_entry_owner() {}
267 rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
268
269 void decode_json(JSONObj *obj);
270};
271
272class RGWSyncErrorLogger;
11fdf7f2 273class RGWRESTConn;
9f95a23c 274class RGWServices;
7c673cae
FG
275
276struct RGWDataSyncEnv {
11fdf7f2
TL
277 const DoutPrefixProvider *dpp{nullptr};
278 CephContext *cct{nullptr};
9f95a23c
TL
279 rgw::sal::RGWRadosStore *store{nullptr};
280 RGWServices *svc{nullptr};
11fdf7f2
TL
281 RGWAsyncRadosProcessor *async_rados{nullptr};
282 RGWHTTPManager *http_manager{nullptr};
283 RGWSyncErrorLogger *error_logger{nullptr};
284 RGWSyncTraceManager *sync_tracer{nullptr};
11fdf7f2 285 RGWSyncModuleInstanceRef sync_module{nullptr};
81eedcae 286 PerfCounters* counters{nullptr};
7c673cae 287
11fdf7f2 288 RGWDataSyncEnv() {}
7c673cae 289
9f95a23c 290 void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWServices *_svc,
7c673cae 291 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
11fdf7f2 292 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
9f95a23c 293 RGWSyncModuleInstanceRef& _sync_module,
81eedcae 294 PerfCounters* _counters) {
9f95a23c 295 dpp = _dpp;
7c673cae
FG
296 cct = _cct;
297 store = _store;
9f95a23c 298 svc = _svc;
7c673cae
FG
299 async_rados = _async_rados;
300 http_manager = _http_manager;
301 error_logger = _error_logger;
11fdf7f2 302 sync_tracer = _sync_tracer;
7c673cae 303 sync_module = _sync_module;
81eedcae 304 counters = _counters;
7c673cae
FG
305 }
306
307 string shard_obj_name(int shard_id);
308 string status_oid();
309};
310
9f95a23c
TL
311struct RGWDataSyncCtx {
312 CephContext *cct{nullptr};
313 RGWDataSyncEnv *env{nullptr};
314
315 RGWRESTConn *conn{nullptr};
316 rgw_zone_id source_zone;
317
318 void init(RGWDataSyncEnv *_env,
319 RGWRESTConn *_conn,
320 const rgw_zone_id& _source_zone) {
321 cct = _env->cct;
322 env = _env;
323 conn = _conn;
324 source_zone = _source_zone;
325 }
326};
327
328class RGWRados;
329class RGWDataChangesLogInfo;
330
7c673cae 331class RGWRemoteDataLog : public RGWCoroutinesManager {
11fdf7f2 332 const DoutPrefixProvider *dpp;
9f95a23c
TL
333 rgw::sal::RGWRadosStore *store;
334 CephContext *cct;
335 RGWCoroutinesManagerRegistry *cr_registry;
7c673cae
FG
336 RGWAsyncRadosProcessor *async_rados;
337 RGWHTTPManager http_manager;
338
339 RGWDataSyncEnv sync_env;
9f95a23c 340 RGWDataSyncCtx sc;
7c673cae 341
9f95a23c 342 ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock");
7c673cae
FG
343 RGWDataSyncControlCR *data_sync_cr;
344
11fdf7f2
TL
345 RGWSyncTraceNodeRef tn;
346
7c673cae
FG
347 bool initialized;
348
349public:
9f95a23c
TL
350 RGWRemoteDataLog(const DoutPrefixProvider *dpp,
351 rgw::sal::RGWRadosStore *_store,
352 RGWAsyncRadosProcessor *async_rados);
353 int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
81eedcae
TL
354 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module,
355 PerfCounters* _counters);
7c673cae
FG
356 void finish();
357
358 int read_log_info(rgw_datalog_info *log_info);
359 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
360 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
7c673cae 361 int read_sync_status(rgw_data_sync_status *sync_status);
28e407b8
AA
362 int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
363 int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
7c673cae
FG
364 int init_sync_status(int num_shards);
365 int run_sync(int num_shards);
366
367 void wakeup(int shard_id, set<string>& keys);
368};
369
11fdf7f2 370class RGWDataSyncStatusManager : public DoutPrefixProvider {
9f95a23c 371 rgw::sal::RGWRadosStore *store;
7c673cae 372
9f95a23c 373 rgw_zone_id source_zone;
7c673cae
FG
374 RGWRESTConn *conn;
375 RGWSyncErrorLogger *error_logger;
376 RGWSyncModuleInstanceRef sync_module;
81eedcae 377 PerfCounters* counters;
7c673cae
FG
378
379 RGWRemoteDataLog source_log;
380
381 string source_status_oid;
382 string source_shard_status_oid_prefix;
383
384 map<int, rgw_raw_obj> shard_objs;
385
386 int num_shards;
387
388public:
9f95a23c
TL
389 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados,
390 const rgw_zone_id& _source_zone, PerfCounters* counters)
7c673cae 391 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
81eedcae 392 sync_module(nullptr), counters(counters),
11fdf7f2 393 source_log(this, store, async_rados), num_shards(0) {}
9f95a23c
TL
394 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados,
395 const rgw_zone_id& _source_zone, PerfCounters* counters,
81eedcae 396 const RGWSyncModuleInstanceRef& _sync_module)
94b18763 397 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
81eedcae 398 sync_module(_sync_module), counters(counters),
11fdf7f2 399 source_log(this, store, async_rados), num_shards(0) {}
7c673cae
FG
400 ~RGWDataSyncStatusManager() {
401 finalize();
402 }
403 int init();
404 void finalize();
405
9f95a23c
TL
406 static string shard_obj_name(const rgw_zone_id& source_zone, int shard_id);
407 static string sync_status_oid(const rgw_zone_id& source_zone);
7c673cae
FG
408
409 int read_sync_status(rgw_data_sync_status *sync_status) {
410 return source_log.read_sync_status(sync_status);
411 }
28e407b8
AA
412
413 int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
414 return source_log.read_recovering_shards(num_shards, recovering_shards);
415 }
416
417 int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
418 return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
419 }
7c673cae
FG
420 int init_sync_status() { return source_log.init_sync_status(num_shards); }
421
422 int read_log_info(rgw_datalog_info *log_info) {
423 return source_log.read_log_info(log_info);
424 }
425 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
426 return source_log.read_source_log_shards_info(shards_info);
427 }
428 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
429 return source_log.read_source_log_shards_next(shard_markers, result);
430 }
431
432 int run() { return source_log.run_sync(num_shards); }
433
434 void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
435 void stop() {
436 source_log.finish();
437 }
11fdf7f2
TL
438
439 // implements DoutPrefixProvider
9f95a23c 440 CephContext *get_cct() const override;
11fdf7f2
TL
441 unsigned get_subsys() const override;
442 std::ostream& gen_prefix(std::ostream& out) const override;
7c673cae
FG
443};
444
9f95a23c 445class RGWBucketPipeSyncStatusManager;
7c673cae
FG
446class RGWBucketSyncCR;
447
448struct rgw_bucket_shard_full_sync_marker {
449 rgw_obj_key position;
450 uint64_t count;
451
452 rgw_bucket_shard_full_sync_marker() : count(0) {}
453
454 void encode_attr(map<string, bufferlist>& attrs);
455
456 void encode(bufferlist& bl) const {
457 ENCODE_START(1, 1, bl);
11fdf7f2
TL
458 encode(position, bl);
459 encode(count, bl);
7c673cae
FG
460 ENCODE_FINISH(bl);
461 }
462
11fdf7f2 463 void decode(bufferlist::const_iterator& bl) {
7c673cae 464 DECODE_START(1, bl);
11fdf7f2
TL
465 decode(position, bl);
466 decode(count, bl);
7c673cae
FG
467 DECODE_FINISH(bl);
468 }
469
b32b8144
FG
470 void dump(Formatter *f) const;
471 void decode_json(JSONObj *obj);
7c673cae
FG
472};
473WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
474
475struct rgw_bucket_shard_inc_sync_marker {
476 string position;
477
478 rgw_bucket_shard_inc_sync_marker() {}
479
480 void encode_attr(map<string, bufferlist>& attrs);
481
482 void encode(bufferlist& bl) const {
483 ENCODE_START(1, 1, bl);
11fdf7f2 484 encode(position, bl);
7c673cae
FG
485 ENCODE_FINISH(bl);
486 }
487
11fdf7f2 488 void decode(bufferlist::const_iterator& bl) {
7c673cae 489 DECODE_START(1, bl);
11fdf7f2 490 decode(position, bl);
7c673cae
FG
491 DECODE_FINISH(bl);
492 }
493
b32b8144
FG
494 void dump(Formatter *f) const;
495 void decode_json(JSONObj *obj);
7c673cae
FG
496
497 bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
498 return (position < m.position);
499 }
500};
501WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
502
503struct rgw_bucket_shard_sync_info {
504 enum SyncState {
505 StateInit = 0,
506 StateFullSync = 1,
507 StateIncrementalSync = 2,
9f95a23c 508 StateStopped = 3,
7c673cae
FG
509 };
510
511 uint16_t state;
512 rgw_bucket_shard_full_sync_marker full_marker;
513 rgw_bucket_shard_inc_sync_marker inc_marker;
514
515 void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
516 void encode_all_attrs(map<string, bufferlist>& attrs);
517 void encode_state_attr(map<string, bufferlist>& attrs);
518
519 void encode(bufferlist& bl) const {
520 ENCODE_START(1, 1, bl);
11fdf7f2
TL
521 encode(state, bl);
522 encode(full_marker, bl);
523 encode(inc_marker, bl);
7c673cae
FG
524 ENCODE_FINISH(bl);
525 }
526
11fdf7f2 527 void decode(bufferlist::const_iterator& bl) {
7c673cae 528 DECODE_START(1, bl);
11fdf7f2
TL
529 decode(state, bl);
530 decode(full_marker, bl);
531 decode(inc_marker, bl);
7c673cae
FG
532 DECODE_FINISH(bl);
533 }
534
b32b8144
FG
535 void dump(Formatter *f) const;
536 void decode_json(JSONObj *obj);
7c673cae
FG
537
538 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
539
540};
541WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
542
28e407b8
AA
543struct rgw_bucket_index_marker_info {
544 string bucket_ver;
545 string master_ver;
546 string max_marker;
547 bool syncstopped{false};
548
549 void decode_json(JSONObj *obj) {
550 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
551 JSONDecoder::decode_json("master_ver", master_ver, obj);
552 JSONDecoder::decode_json("max_marker", max_marker, obj);
553 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
554 }
555};
556
7c673cae 557
9f95a23c 558class RGWRemoteBucketManager {
11fdf7f2 559 const DoutPrefixProvider *dpp;
9f95a23c
TL
560
561 RGWDataSyncEnv *sync_env;
562
7c673cae 563 RGWRESTConn *conn{nullptr};
9f95a23c 564 rgw_zone_id source_zone;
7c673cae 565
9f95a23c 566 vector<rgw_bucket_sync_pair_info> sync_pairs;
7c673cae 567
9f95a23c 568 RGWDataSyncCtx sc;
7c673cae
FG
569 rgw_bucket_shard_sync_info init_status;
570
571 RGWBucketSyncCR *sync_cr{nullptr};
572
573public:
9f95a23c
TL
574 RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
575 RGWDataSyncEnv *_sync_env,
576 const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
577 const RGWBucketInfo& source_bucket_info,
578 const rgw_bucket& dest_bucket);
579
580 void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
581 const rgw_bucket& source_bucket, int shard_id,
582 const rgw_bucket& dest_bucket);
583
584 RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
585 RGWCoroutine *init_sync_status_cr(int num);
586 RGWCoroutine *run_sync_cr(int num);
587
588 int num_pipes() {
589 return sync_pairs.size();
590 }
7c673cae
FG
591
592 void wakeup();
593};
594
9f95a23c
TL
595class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
596 rgw::sal::RGWRadosStore *store;
597
598 RGWDataSyncEnv sync_env;
7c673cae
FG
599
600 RGWCoroutinesManager cr_mgr;
601
602 RGWHTTPManager http_manager;
603
9f95a23c
TL
604 std::optional<rgw_zone_id> source_zone;
605 std::optional<rgw_bucket> source_bucket;
606
7c673cae
FG
607 RGWRESTConn *conn;
608 RGWSyncErrorLogger *error_logger;
609 RGWSyncModuleInstanceRef sync_module;
610
9f95a23c 611 rgw_bucket dest_bucket;
7c673cae 612
9f95a23c 613 vector<RGWRemoteBucketManager *> source_mgrs;
7c673cae
FG
614
615 string source_status_oid;
616 string source_shard_status_oid_prefix;
617
618 map<int, rgw_bucket_shard_sync_info> sync_status;
619 rgw_raw_obj status_obj;
620
621 int num_shards;
622
623public:
9f95a23c
TL
624 RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
625 std::optional<rgw_zone_id> _source_zone,
626 std::optional<rgw_bucket> _source_bucket,
627 const rgw_bucket& dest_bucket);
628 ~RGWBucketPipeSyncStatusManager();
7c673cae
FG
629
630 int init();
631
632 map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
633 int init_sync_status();
634
9f95a23c
TL
635 static string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs);
636 static string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
637 const rgw_zone_id& source_zone, const rgw_obj& obj); /* specific source obj sync status,
638 can be used by sync modules */
11fdf7f2
TL
639
640 // implements DoutPrefixProvider
9f95a23c 641 CephContext *get_cct() const override;
11fdf7f2
TL
642 unsigned get_subsys() const override;
643 std::ostream& gen_prefix(std::ostream& out) const override;
7c673cae
FG
644
645 int read_sync_status();
646 int run();
647};
648
b32b8144 649/// read the sync status of all bucket shards from the given source zone
9f95a23c
TL
650int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
651 rgw::sal::RGWRadosStore *store,
652 const rgw_sync_bucket_pipe& pipe,
653 const RGWBucketInfo& dest_bucket_info,
654 const RGWBucketInfo *psource_bucket_info,
b32b8144
FG
655 std::vector<rgw_bucket_shard_sync_info> *status);
656
7c673cae
FG
657class RGWDefaultSyncModule : public RGWSyncModule {
658public:
659 RGWDefaultSyncModule() {}
11fdf7f2 660 bool supports_writes() override { return true; }
7c673cae 661 bool supports_data_export() override { return true; }
11fdf7f2
TL
662 int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
663};
664
665class RGWArchiveSyncModule : public RGWDefaultSyncModule {
666public:
667 RGWArchiveSyncModule() {}
668 bool supports_writes() override { return true; }
669 bool supports_data_export() override { return false; }
670 int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
7c673cae
FG
671};
672
7c673cae 673#endif