]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.h
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
7c673cae
FG
4#ifndef CEPH_RGW_DATA_SYNC_H
5#define CEPH_RGW_DATA_SYNC_H
6
11fdf7f2
TL
7#include "include/encoding.h"
8
9#include "common/RWLock.h"
10#include "common/ceph_json.h"
11
7c673cae
FG
12#include "rgw_coroutine.h"
13#include "rgw_http_client.h"
9f95a23c 14#include "rgw_sal.h"
7c673cae 15
f67539c2 16#include "rgw_datalog.h"
7c673cae 17#include "rgw_sync_module.h"
11fdf7f2 18#include "rgw_sync_trace.h"
9f95a23c
TL
19#include "rgw_sync_policy.h"
20
21#include "rgw_bucket_sync.h"
22
f67539c2
TL
23// represents an obligation to sync an entry up a given time
24struct rgw_data_sync_obligation {
25 std::string key;
26 std::string marker;
27 ceph::real_time timestamp;
28 bool retry = false;
29};
30
31inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) {
32 out << "key=" << o.key;
33 if (!o.marker.empty()) {
34 out << " marker=" << o.marker;
35 }
36 if (o.timestamp != ceph::real_time{}) {
37 out << " timestamp=" << o.timestamp;
38 }
39 if (o.retry) {
40 out << " retry";
41 }
42 return out;
43}
44
9f95a23c
TL
45class JSONObj;
46struct rgw_sync_bucket_pipe;
47
48struct rgw_bucket_sync_pair_info {
49 RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */
50 rgw_bucket_shard source_bs;
51 rgw_bucket_shard dest_bs;
52};
53
54inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
55 if (p.source_bs.bucket == p.dest_bs.bucket) {
56 return out << p.source_bs;
57 }
58
59 out << p.source_bs;
60
61 out << "->" << p.dest_bs.bucket;
62
63 return out;
64}
65
66struct rgw_bucket_sync_pipe {
67 rgw_bucket_sync_pair_info info;
68 RGWBucketInfo source_bucket_info;
69 map<string, bufferlist> source_bucket_attrs;
70 RGWBucketInfo dest_bucket_info;
71 map<string, bufferlist> dest_bucket_attrs;
72
73 RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() {
74 return info.handler.rules;
75 }
76};
77
78inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
79 return out << p.info;
80}
7c673cae 81
7c673cae
FG
82struct rgw_datalog_info {
83 uint32_t num_shards;
84
85 rgw_datalog_info() : num_shards(0) {}
86
87 void decode_json(JSONObj *obj);
88};
89
90struct rgw_data_sync_info {
91 enum SyncState {
92 StateInit = 0,
93 StateBuildingFullSyncMaps = 1,
94 StateSync = 2,
95 };
96
97 uint16_t state;
98 uint32_t num_shards;
99
31f18b77
FG
100 uint64_t instance_id{0};
101
7c673cae 102 void encode(bufferlist& bl) const {
31f18b77 103 ENCODE_START(2, 1, bl);
11fdf7f2
TL
104 encode(state, bl);
105 encode(num_shards, bl);
106 encode(instance_id, bl);
7c673cae
FG
107 ENCODE_FINISH(bl);
108 }
109
11fdf7f2 110 void decode(bufferlist::const_iterator& bl) {
31f18b77 111 DECODE_START(2, bl);
11fdf7f2
TL
112 decode(state, bl);
113 decode(num_shards, bl);
31f18b77 114 if (struct_v >= 2) {
11fdf7f2 115 decode(instance_id, bl);
31f18b77 116 }
7c673cae
FG
117 DECODE_FINISH(bl);
118 }
119
120 void dump(Formatter *f) const {
121 string s;
122 switch ((SyncState)state) {
123 case StateInit:
124 s = "init";
125 break;
126 case StateBuildingFullSyncMaps:
127 s = "building-full-sync-maps";
128 break;
129 case StateSync:
130 s = "sync";
131 break;
132 default:
133 s = "unknown";
134 break;
135 }
136 encode_json("status", s, f);
137 encode_json("num_shards", num_shards, f);
31f18b77 138 encode_json("instance_id", instance_id, f);
7c673cae
FG
139 }
140 void decode_json(JSONObj *obj) {
141 std::string s;
142 JSONDecoder::decode_json("status", s, obj);
143 if (s == "building-full-sync-maps") {
144 state = StateBuildingFullSyncMaps;
145 } else if (s == "sync") {
146 state = StateSync;
147 } else {
148 state = StateInit;
149 }
150 JSONDecoder::decode_json("num_shards", num_shards, obj);
a8e16298 151 JSONDecoder::decode_json("instance_id", instance_id, obj);
7c673cae 152 }
b32b8144 153 static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
7c673cae
FG
154
155 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
156};
157WRITE_CLASS_ENCODER(rgw_data_sync_info)
158
159struct rgw_data_sync_marker {
160 enum SyncState {
161 FullSync = 0,
162 IncrementalSync = 1,
163 };
164 uint16_t state;
165 string marker;
166 string next_step_marker;
167 uint64_t total_entries;
168 uint64_t pos;
169 real_time timestamp;
170
171 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
172
173 void encode(bufferlist& bl) const {
174 ENCODE_START(1, 1, bl);
11fdf7f2
TL
175 encode(state, bl);
176 encode(marker, bl);
177 encode(next_step_marker, bl);
178 encode(total_entries, bl);
179 encode(pos, bl);
180 encode(timestamp, bl);
7c673cae
FG
181 ENCODE_FINISH(bl);
182 }
183
11fdf7f2 184 void decode(bufferlist::const_iterator& bl) {
7c673cae 185 DECODE_START(1, bl);
11fdf7f2
TL
186 decode(state, bl);
187 decode(marker, bl);
188 decode(next_step_marker, bl);
189 decode(total_entries, bl);
190 decode(pos, bl);
191 decode(timestamp, bl);
7c673cae
FG
192 DECODE_FINISH(bl);
193 }
194
195 void dump(Formatter *f) const {
28e407b8
AA
196 const char *s{nullptr};
197 switch ((SyncState)state) {
198 case FullSync:
199 s = "full-sync";
200 break;
201 case IncrementalSync:
202 s = "incremental-sync";
203 break;
204 default:
205 s = "unknown";
206 break;
207 }
208 encode_json("status", s, f);
7c673cae
FG
209 encode_json("marker", marker, f);
210 encode_json("next_step_marker", next_step_marker, f);
211 encode_json("total_entries", total_entries, f);
212 encode_json("pos", pos, f);
213 encode_json("timestamp", utime_t(timestamp), f);
214 }
215 void decode_json(JSONObj *obj) {
28e407b8
AA
216 std::string s;
217 JSONDecoder::decode_json("status", s, obj);
218 if (s == "full-sync") {
219 state = FullSync;
220 } else if (s == "incremental-sync") {
221 state = IncrementalSync;
222 }
7c673cae
FG
223 JSONDecoder::decode_json("marker", marker, obj);
224 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
225 JSONDecoder::decode_json("total_entries", total_entries, obj);
226 JSONDecoder::decode_json("pos", pos, obj);
227 utime_t t;
228 JSONDecoder::decode_json("timestamp", t, obj);
229 timestamp = t.to_real_time();
230 }
b32b8144 231 static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
7c673cae
FG
232};
233WRITE_CLASS_ENCODER(rgw_data_sync_marker)
234
235struct rgw_data_sync_status {
236 rgw_data_sync_info sync_info;
237 map<uint32_t, rgw_data_sync_marker> sync_markers;
238
239 rgw_data_sync_status() {}
240
241 void encode(bufferlist& bl) const {
242 ENCODE_START(1, 1, bl);
11fdf7f2 243 encode(sync_info, bl);
7c673cae
FG
244 /* sync markers are encoded separately */
245 ENCODE_FINISH(bl);
246 }
247
11fdf7f2 248 void decode(bufferlist::const_iterator& bl) {
7c673cae 249 DECODE_START(1, bl);
11fdf7f2 250 decode(sync_info, bl);
7c673cae
FG
251 /* sync markers are decoded separately */
252 DECODE_FINISH(bl);
253 }
254
255 void dump(Formatter *f) const {
256 encode_json("info", sync_info, f);
257 encode_json("markers", sync_markers, f);
258 }
259 void decode_json(JSONObj *obj) {
260 JSONDecoder::decode_json("info", sync_info, obj);
261 JSONDecoder::decode_json("markers", sync_markers, obj);
262 }
b32b8144 263 static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
7c673cae
FG
264};
265WRITE_CLASS_ENCODER(rgw_data_sync_status)
266
267struct rgw_datalog_entry {
268 string key;
269 ceph::real_time timestamp;
270
271 void decode_json(JSONObj *obj);
272};
273
274struct rgw_datalog_shard_data {
275 string marker;
276 bool truncated;
277 vector<rgw_datalog_entry> entries;
278
279 void decode_json(JSONObj *obj);
280};
281
282class RGWAsyncRadosProcessor;
283class RGWDataSyncControlCR;
284
285struct rgw_bucket_entry_owner {
286 string id;
287 string display_name;
288
289 rgw_bucket_entry_owner() {}
290 rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
291
292 void decode_json(JSONObj *obj);
293};
294
295class RGWSyncErrorLogger;
11fdf7f2 296class RGWRESTConn;
9f95a23c 297class RGWServices;
7c673cae
FG
298
299struct RGWDataSyncEnv {
11fdf7f2
TL
300 const DoutPrefixProvider *dpp{nullptr};
301 CephContext *cct{nullptr};
9f95a23c
TL
302 rgw::sal::RGWRadosStore *store{nullptr};
303 RGWServices *svc{nullptr};
11fdf7f2
TL
304 RGWAsyncRadosProcessor *async_rados{nullptr};
305 RGWHTTPManager *http_manager{nullptr};
306 RGWSyncErrorLogger *error_logger{nullptr};
307 RGWSyncTraceManager *sync_tracer{nullptr};
11fdf7f2 308 RGWSyncModuleInstanceRef sync_module{nullptr};
81eedcae 309 PerfCounters* counters{nullptr};
7c673cae 310
11fdf7f2 311 RGWDataSyncEnv() {}
7c673cae 312
9f95a23c 313 void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWServices *_svc,
7c673cae 314 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
11fdf7f2 315 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
9f95a23c 316 RGWSyncModuleInstanceRef& _sync_module,
81eedcae 317 PerfCounters* _counters) {
9f95a23c 318 dpp = _dpp;
7c673cae
FG
319 cct = _cct;
320 store = _store;
9f95a23c 321 svc = _svc;
7c673cae
FG
322 async_rados = _async_rados;
323 http_manager = _http_manager;
324 error_logger = _error_logger;
11fdf7f2 325 sync_tracer = _sync_tracer;
7c673cae 326 sync_module = _sync_module;
81eedcae 327 counters = _counters;
7c673cae
FG
328 }
329
330 string shard_obj_name(int shard_id);
331 string status_oid();
332};
333
9f95a23c
TL
334struct RGWDataSyncCtx {
335 CephContext *cct{nullptr};
336 RGWDataSyncEnv *env{nullptr};
337
338 RGWRESTConn *conn{nullptr};
339 rgw_zone_id source_zone;
340
341 void init(RGWDataSyncEnv *_env,
342 RGWRESTConn *_conn,
343 const rgw_zone_id& _source_zone) {
344 cct = _env->cct;
345 env = _env;
346 conn = _conn;
347 source_zone = _source_zone;
348 }
349};
350
351class RGWRados;
9f95a23c 352
7c673cae 353class RGWRemoteDataLog : public RGWCoroutinesManager {
11fdf7f2 354 const DoutPrefixProvider *dpp;
9f95a23c
TL
355 rgw::sal::RGWRadosStore *store;
356 CephContext *cct;
357 RGWCoroutinesManagerRegistry *cr_registry;
7c673cae
FG
358 RGWAsyncRadosProcessor *async_rados;
359 RGWHTTPManager http_manager;
360
361 RGWDataSyncEnv sync_env;
9f95a23c 362 RGWDataSyncCtx sc;
7c673cae 363
9f95a23c 364 ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock");
7c673cae
FG
365 RGWDataSyncControlCR *data_sync_cr;
366
11fdf7f2
TL
367 RGWSyncTraceNodeRef tn;
368
7c673cae
FG
369 bool initialized;
370
371public:
9f95a23c
TL
372 RGWRemoteDataLog(const DoutPrefixProvider *dpp,
373 rgw::sal::RGWRadosStore *_store,
374 RGWAsyncRadosProcessor *async_rados);
375 int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
81eedcae
TL
376 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module,
377 PerfCounters* _counters);
7c673cae
FG
378 void finish();
379
b3b6e05e
TL
380 int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info);
381 int read_source_log_shards_info(const DoutPrefixProvider *dpp, map<int, RGWDataChangesLogInfo> *shards_info);
382 int read_source_log_shards_next(const DoutPrefixProvider *dpp, map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
383 int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status);
384 int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, set<int>& recovering_shards);
385 int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
386 int init_sync_status(const DoutPrefixProvider *dpp, int num_shards);
387 int run_sync(const DoutPrefixProvider *dpp, int num_shards);
7c673cae
FG
388
389 void wakeup(int shard_id, set<string>& keys);
390};
391
11fdf7f2 392class RGWDataSyncStatusManager : public DoutPrefixProvider {
9f95a23c 393 rgw::sal::RGWRadosStore *store;
7c673cae 394
9f95a23c 395 rgw_zone_id source_zone;
7c673cae
FG
396 RGWRESTConn *conn;
397 RGWSyncErrorLogger *error_logger;
398 RGWSyncModuleInstanceRef sync_module;
81eedcae 399 PerfCounters* counters;
7c673cae
FG
400
401 RGWRemoteDataLog source_log;
402
403 string source_status_oid;
404 string source_shard_status_oid_prefix;
405
406 map<int, rgw_raw_obj> shard_objs;
407
408 int num_shards;
409
410public:
9f95a23c
TL
411 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados,
412 const rgw_zone_id& _source_zone, PerfCounters* counters)
7c673cae 413 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
81eedcae 414 sync_module(nullptr), counters(counters),
11fdf7f2 415 source_log(this, store, async_rados), num_shards(0) {}
9f95a23c
TL
416 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados,
417 const rgw_zone_id& _source_zone, PerfCounters* counters,
81eedcae 418 const RGWSyncModuleInstanceRef& _sync_module)
94b18763 419 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
81eedcae 420 sync_module(_sync_module), counters(counters),
11fdf7f2 421 source_log(this, store, async_rados), num_shards(0) {}
7c673cae
FG
422 ~RGWDataSyncStatusManager() {
423 finalize();
424 }
b3b6e05e 425 int init(const DoutPrefixProvider *dpp);
7c673cae
FG
426 void finalize();
427
9f95a23c
TL
428 static string shard_obj_name(const rgw_zone_id& source_zone, int shard_id);
429 static string sync_status_oid(const rgw_zone_id& source_zone);
7c673cae 430
b3b6e05e
TL
431 int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status) {
432 return source_log.read_sync_status(dpp, sync_status);
7c673cae 433 }
28e407b8 434
b3b6e05e
TL
435 int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, set<int>& recovering_shards) {
436 return source_log.read_recovering_shards(dpp, num_shards, recovering_shards);
28e407b8
AA
437 }
438
b3b6e05e
TL
439 int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
440 return source_log.read_shard_status(dpp, shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
28e407b8 441 }
b3b6e05e 442 int init_sync_status(const DoutPrefixProvider *dpp) { return source_log.init_sync_status(dpp, num_shards); }
7c673cae 443
b3b6e05e
TL
444 int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info) {
445 return source_log.read_log_info(dpp, log_info);
7c673cae 446 }
b3b6e05e
TL
447 int read_source_log_shards_info(const DoutPrefixProvider *dpp, map<int, RGWDataChangesLogInfo> *shards_info) {
448 return source_log.read_source_log_shards_info(dpp, shards_info);
7c673cae 449 }
b3b6e05e
TL
450 int read_source_log_shards_next(const DoutPrefixProvider *dpp, map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
451 return source_log.read_source_log_shards_next(dpp, shard_markers, result);
7c673cae
FG
452 }
453
b3b6e05e 454 int run(const DoutPrefixProvider *dpp) { return source_log.run_sync(dpp, num_shards); }
7c673cae
FG
455
456 void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
457 void stop() {
458 source_log.finish();
459 }
11fdf7f2
TL
460
461 // implements DoutPrefixProvider
9f95a23c 462 CephContext *get_cct() const override;
11fdf7f2
TL
463 unsigned get_subsys() const override;
464 std::ostream& gen_prefix(std::ostream& out) const override;
7c673cae
FG
465};
466
9f95a23c 467class RGWBucketPipeSyncStatusManager;
7c673cae
FG
468class RGWBucketSyncCR;
469
470struct rgw_bucket_shard_full_sync_marker {
471 rgw_obj_key position;
472 uint64_t count;
473
474 rgw_bucket_shard_full_sync_marker() : count(0) {}
475
476 void encode_attr(map<string, bufferlist>& attrs);
477
478 void encode(bufferlist& bl) const {
479 ENCODE_START(1, 1, bl);
11fdf7f2
TL
480 encode(position, bl);
481 encode(count, bl);
7c673cae
FG
482 ENCODE_FINISH(bl);
483 }
484
11fdf7f2 485 void decode(bufferlist::const_iterator& bl) {
7c673cae 486 DECODE_START(1, bl);
11fdf7f2
TL
487 decode(position, bl);
488 decode(count, bl);
7c673cae
FG
489 DECODE_FINISH(bl);
490 }
491
b32b8144
FG
492 void dump(Formatter *f) const;
493 void decode_json(JSONObj *obj);
7c673cae
FG
494};
495WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
496
497struct rgw_bucket_shard_inc_sync_marker {
498 string position;
f67539c2 499 ceph::real_time timestamp;
7c673cae
FG
500
501 void encode_attr(map<string, bufferlist>& attrs);
502
503 void encode(bufferlist& bl) const {
f67539c2 504 ENCODE_START(2, 1, bl);
11fdf7f2 505 encode(position, bl);
f67539c2 506 encode(timestamp, bl);
7c673cae
FG
507 ENCODE_FINISH(bl);
508 }
509
11fdf7f2 510 void decode(bufferlist::const_iterator& bl) {
f67539c2 511 DECODE_START(2, bl);
11fdf7f2 512 decode(position, bl);
f67539c2
TL
513 if (struct_v >= 2) {
514 decode(timestamp, bl);
515 }
516 DECODE_FINISH(bl);
7c673cae
FG
517 }
518
b32b8144
FG
519 void dump(Formatter *f) const;
520 void decode_json(JSONObj *obj);
7c673cae
FG
521};
522WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
523
524struct rgw_bucket_shard_sync_info {
525 enum SyncState {
526 StateInit = 0,
527 StateFullSync = 1,
528 StateIncrementalSync = 2,
9f95a23c 529 StateStopped = 3,
7c673cae
FG
530 };
531
532 uint16_t state;
533 rgw_bucket_shard_full_sync_marker full_marker;
534 rgw_bucket_shard_inc_sync_marker inc_marker;
535
536 void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
537 void encode_all_attrs(map<string, bufferlist>& attrs);
538 void encode_state_attr(map<string, bufferlist>& attrs);
539
540 void encode(bufferlist& bl) const {
541 ENCODE_START(1, 1, bl);
11fdf7f2
TL
542 encode(state, bl);
543 encode(full_marker, bl);
544 encode(inc_marker, bl);
7c673cae
FG
545 ENCODE_FINISH(bl);
546 }
547
11fdf7f2 548 void decode(bufferlist::const_iterator& bl) {
7c673cae 549 DECODE_START(1, bl);
11fdf7f2
TL
550 decode(state, bl);
551 decode(full_marker, bl);
552 decode(inc_marker, bl);
7c673cae
FG
553 DECODE_FINISH(bl);
554 }
555
b32b8144
FG
556 void dump(Formatter *f) const;
557 void decode_json(JSONObj *obj);
7c673cae
FG
558
559 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
560
561};
562WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
563
28e407b8
AA
564struct rgw_bucket_index_marker_info {
565 string bucket_ver;
566 string master_ver;
567 string max_marker;
568 bool syncstopped{false};
569
570 void decode_json(JSONObj *obj) {
571 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
572 JSONDecoder::decode_json("master_ver", master_ver, obj);
573 JSONDecoder::decode_json("max_marker", max_marker, obj);
574 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
575 }
576};
577
7c673cae 578
9f95a23c 579class RGWRemoteBucketManager {
11fdf7f2 580 const DoutPrefixProvider *dpp;
9f95a23c
TL
581
582 RGWDataSyncEnv *sync_env;
583
7c673cae 584 RGWRESTConn *conn{nullptr};
9f95a23c 585 rgw_zone_id source_zone;
7c673cae 586
9f95a23c 587 vector<rgw_bucket_sync_pair_info> sync_pairs;
7c673cae 588
9f95a23c 589 RGWDataSyncCtx sc;
7c673cae
FG
590 rgw_bucket_shard_sync_info init_status;
591
592 RGWBucketSyncCR *sync_cr{nullptr};
593
594public:
9f95a23c
TL
595 RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
596 RGWDataSyncEnv *_sync_env,
597 const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
598 const RGWBucketInfo& source_bucket_info,
599 const rgw_bucket& dest_bucket);
600
601 void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
602 const rgw_bucket& source_bucket, int shard_id,
603 const rgw_bucket& dest_bucket);
604
605 RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
f67539c2 606 RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker);
9f95a23c
TL
607 RGWCoroutine *run_sync_cr(int num);
608
609 int num_pipes() {
610 return sync_pairs.size();
611 }
7c673cae
FG
612
613 void wakeup();
614};
615
f67539c2
TL
616class BucketIndexShardsManager;
617
b3b6e05e
TL
618int rgw_read_remote_bilog_info(const DoutPrefixProvider *dpp,
619 RGWRESTConn* conn,
f67539c2
TL
620 const rgw_bucket& bucket,
621 BucketIndexShardsManager& markers,
622 optional_yield y);
623
9f95a23c
TL
624class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
625 rgw::sal::RGWRadosStore *store;
626
627 RGWDataSyncEnv sync_env;
7c673cae
FG
628
629 RGWCoroutinesManager cr_mgr;
630
631 RGWHTTPManager http_manager;
632
9f95a23c
TL
633 std::optional<rgw_zone_id> source_zone;
634 std::optional<rgw_bucket> source_bucket;
635
7c673cae
FG
636 RGWRESTConn *conn;
637 RGWSyncErrorLogger *error_logger;
638 RGWSyncModuleInstanceRef sync_module;
639
9f95a23c 640 rgw_bucket dest_bucket;
7c673cae 641
9f95a23c 642 vector<RGWRemoteBucketManager *> source_mgrs;
7c673cae
FG
643
644 string source_status_oid;
645 string source_shard_status_oid_prefix;
646
647 map<int, rgw_bucket_shard_sync_info> sync_status;
648 rgw_raw_obj status_obj;
649
650 int num_shards;
651
652public:
9f95a23c
TL
653 RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
654 std::optional<rgw_zone_id> _source_zone,
655 std::optional<rgw_bucket> _source_bucket,
656 const rgw_bucket& dest_bucket);
657 ~RGWBucketPipeSyncStatusManager();
7c673cae 658
b3b6e05e 659 int init(const DoutPrefixProvider *dpp);
7c673cae
FG
660
661 map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
b3b6e05e 662 int init_sync_status(const DoutPrefixProvider *dpp);
7c673cae 663
9f95a23c
TL
664 static string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs);
665 static string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
f67539c2 666 const rgw_zone_id& source_zone, const rgw::sal::RGWObject* obj); /* specific source obj sync status,
9f95a23c 667 can be used by sync modules */
11fdf7f2
TL
668
669 // implements DoutPrefixProvider
9f95a23c 670 CephContext *get_cct() const override;
11fdf7f2
TL
671 unsigned get_subsys() const override;
672 std::ostream& gen_prefix(std::ostream& out) const override;
7c673cae 673
b3b6e05e
TL
674 int read_sync_status(const DoutPrefixProvider *dpp);
675 int run(const DoutPrefixProvider *dpp);
7c673cae
FG
676};
677
b32b8144 678/// read the sync status of all bucket shards from the given source zone
9f95a23c
TL
679int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
680 rgw::sal::RGWRadosStore *store,
681 const rgw_sync_bucket_pipe& pipe,
682 const RGWBucketInfo& dest_bucket_info,
683 const RGWBucketInfo *psource_bucket_info,
b32b8144
FG
684 std::vector<rgw_bucket_shard_sync_info> *status);
685
7c673cae
FG
686class RGWDefaultSyncModule : public RGWSyncModule {
687public:
688 RGWDefaultSyncModule() {}
11fdf7f2 689 bool supports_writes() override { return true; }
7c673cae 690 bool supports_data_export() override { return true; }
11fdf7f2
TL
691 int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
692};
693
694class RGWArchiveSyncModule : public RGWDefaultSyncModule {
695public:
696 RGWArchiveSyncModule() {}
697 bool supports_writes() override { return true; }
698 bool supports_data_export() override { return false; }
699 int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
7c673cae
FG
700};
701
7c673cae 702#endif