]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
7c673cae
FG
4#ifndef CEPH_RGW_DATA_SYNC_H
5#define CEPH_RGW_DATA_SYNC_H
6
11fdf7f2
TL
7#include "include/encoding.h"
8
11fdf7f2
TL
9#include "common/ceph_json.h"
10
7c673cae
FG
11#include "rgw_coroutine.h"
12#include "rgw_http_client.h"
20effc67 13#include "rgw_sal_rados.h"
7c673cae 14
f67539c2 15#include "rgw_datalog.h"
7c673cae 16#include "rgw_sync_module.h"
11fdf7f2 17#include "rgw_sync_trace.h"
9f95a23c
TL
18#include "rgw_sync_policy.h"
19
20#include "rgw_bucket_sync.h"
21
f67539c2
TL
22// represents an obligation to sync an entry up a given time
23struct rgw_data_sync_obligation {
24 std::string key;
25 std::string marker;
26 ceph::real_time timestamp;
27 bool retry = false;
28};
29
30inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) {
31 out << "key=" << o.key;
32 if (!o.marker.empty()) {
33 out << " marker=" << o.marker;
34 }
35 if (o.timestamp != ceph::real_time{}) {
36 out << " timestamp=" << o.timestamp;
37 }
38 if (o.retry) {
39 out << " retry";
40 }
41 return out;
42}
43
9f95a23c
TL
44class JSONObj;
45struct rgw_sync_bucket_pipe;
46
47struct rgw_bucket_sync_pair_info {
48 RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */
49 rgw_bucket_shard source_bs;
50 rgw_bucket_shard dest_bs;
51};
52
20effc67 53inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_sync_pair_info& p) {
9f95a23c
TL
54 if (p.source_bs.bucket == p.dest_bs.bucket) {
55 return out << p.source_bs;
56 }
57
58 out << p.source_bs;
59
60 out << "->" << p.dest_bs.bucket;
61
62 return out;
63}
64
65struct rgw_bucket_sync_pipe {
66 rgw_bucket_sync_pair_info info;
67 RGWBucketInfo source_bucket_info;
20effc67 68 std::map<std::string, bufferlist> source_bucket_attrs;
9f95a23c 69 RGWBucketInfo dest_bucket_info;
20effc67 70 std::map<std::string, bufferlist> dest_bucket_attrs;
9f95a23c
TL
71
72 RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() {
73 return info.handler.rules;
74 }
75};
76
20effc67 77inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_sync_pipe& p) {
9f95a23c
TL
78 return out << p.info;
79}
7c673cae 80
7c673cae
FG
81struct rgw_datalog_info {
82 uint32_t num_shards;
83
84 rgw_datalog_info() : num_shards(0) {}
85
86 void decode_json(JSONObj *obj);
87};
88
89struct rgw_data_sync_info {
90 enum SyncState {
91 StateInit = 0,
92 StateBuildingFullSyncMaps = 1,
93 StateSync = 2,
94 };
95
96 uint16_t state;
97 uint32_t num_shards;
98
31f18b77
FG
99 uint64_t instance_id{0};
100
7c673cae 101 void encode(bufferlist& bl) const {
31f18b77 102 ENCODE_START(2, 1, bl);
11fdf7f2
TL
103 encode(state, bl);
104 encode(num_shards, bl);
105 encode(instance_id, bl);
7c673cae
FG
106 ENCODE_FINISH(bl);
107 }
108
11fdf7f2 109 void decode(bufferlist::const_iterator& bl) {
31f18b77 110 DECODE_START(2, bl);
11fdf7f2
TL
111 decode(state, bl);
112 decode(num_shards, bl);
31f18b77 113 if (struct_v >= 2) {
11fdf7f2 114 decode(instance_id, bl);
31f18b77 115 }
7c673cae
FG
116 DECODE_FINISH(bl);
117 }
118
119 void dump(Formatter *f) const {
20effc67 120 std::string s;
7c673cae
FG
121 switch ((SyncState)state) {
122 case StateInit:
123 s = "init";
124 break;
125 case StateBuildingFullSyncMaps:
126 s = "building-full-sync-maps";
127 break;
128 case StateSync:
129 s = "sync";
130 break;
131 default:
132 s = "unknown";
133 break;
134 }
135 encode_json("status", s, f);
136 encode_json("num_shards", num_shards, f);
31f18b77 137 encode_json("instance_id", instance_id, f);
7c673cae
FG
138 }
139 void decode_json(JSONObj *obj) {
140 std::string s;
141 JSONDecoder::decode_json("status", s, obj);
142 if (s == "building-full-sync-maps") {
143 state = StateBuildingFullSyncMaps;
144 } else if (s == "sync") {
145 state = StateSync;
146 } else {
147 state = StateInit;
148 }
149 JSONDecoder::decode_json("num_shards", num_shards, obj);
a8e16298 150 JSONDecoder::decode_json("instance_id", instance_id, obj);
7c673cae 151 }
b32b8144 152 static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
7c673cae
FG
153
154 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
155};
156WRITE_CLASS_ENCODER(rgw_data_sync_info)
157
158struct rgw_data_sync_marker {
159 enum SyncState {
160 FullSync = 0,
161 IncrementalSync = 1,
162 };
163 uint16_t state;
20effc67
TL
164 std::string marker;
165 std::string next_step_marker;
7c673cae
FG
166 uint64_t total_entries;
167 uint64_t pos;
168 real_time timestamp;
169
170 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
171
172 void encode(bufferlist& bl) const {
173 ENCODE_START(1, 1, bl);
11fdf7f2
TL
174 encode(state, bl);
175 encode(marker, bl);
176 encode(next_step_marker, bl);
177 encode(total_entries, bl);
178 encode(pos, bl);
179 encode(timestamp, bl);
7c673cae
FG
180 ENCODE_FINISH(bl);
181 }
182
11fdf7f2 183 void decode(bufferlist::const_iterator& bl) {
7c673cae 184 DECODE_START(1, bl);
11fdf7f2
TL
185 decode(state, bl);
186 decode(marker, bl);
187 decode(next_step_marker, bl);
188 decode(total_entries, bl);
189 decode(pos, bl);
190 decode(timestamp, bl);
7c673cae
FG
191 DECODE_FINISH(bl);
192 }
193
194 void dump(Formatter *f) const {
28e407b8
AA
195 const char *s{nullptr};
196 switch ((SyncState)state) {
197 case FullSync:
198 s = "full-sync";
199 break;
200 case IncrementalSync:
201 s = "incremental-sync";
202 break;
203 default:
204 s = "unknown";
205 break;
206 }
207 encode_json("status", s, f);
7c673cae
FG
208 encode_json("marker", marker, f);
209 encode_json("next_step_marker", next_step_marker, f);
210 encode_json("total_entries", total_entries, f);
211 encode_json("pos", pos, f);
212 encode_json("timestamp", utime_t(timestamp), f);
213 }
214 void decode_json(JSONObj *obj) {
28e407b8
AA
215 std::string s;
216 JSONDecoder::decode_json("status", s, obj);
217 if (s == "full-sync") {
218 state = FullSync;
219 } else if (s == "incremental-sync") {
220 state = IncrementalSync;
221 }
7c673cae
FG
222 JSONDecoder::decode_json("marker", marker, obj);
223 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
224 JSONDecoder::decode_json("total_entries", total_entries, obj);
225 JSONDecoder::decode_json("pos", pos, obj);
226 utime_t t;
227 JSONDecoder::decode_json("timestamp", t, obj);
228 timestamp = t.to_real_time();
229 }
b32b8144 230 static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
7c673cae
FG
231};
232WRITE_CLASS_ENCODER(rgw_data_sync_marker)
233
234struct rgw_data_sync_status {
235 rgw_data_sync_info sync_info;
20effc67 236 std::map<uint32_t, rgw_data_sync_marker> sync_markers;
7c673cae
FG
237
238 rgw_data_sync_status() {}
239
240 void encode(bufferlist& bl) const {
241 ENCODE_START(1, 1, bl);
11fdf7f2 242 encode(sync_info, bl);
7c673cae
FG
243 /* sync markers are encoded separately */
244 ENCODE_FINISH(bl);
245 }
246
11fdf7f2 247 void decode(bufferlist::const_iterator& bl) {
7c673cae 248 DECODE_START(1, bl);
11fdf7f2 249 decode(sync_info, bl);
7c673cae
FG
250 /* sync markers are decoded separately */
251 DECODE_FINISH(bl);
252 }
253
254 void dump(Formatter *f) const {
255 encode_json("info", sync_info, f);
256 encode_json("markers", sync_markers, f);
257 }
258 void decode_json(JSONObj *obj) {
259 JSONDecoder::decode_json("info", sync_info, obj);
260 JSONDecoder::decode_json("markers", sync_markers, obj);
261 }
b32b8144 262 static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
7c673cae
FG
263};
264WRITE_CLASS_ENCODER(rgw_data_sync_status)
265
266struct rgw_datalog_entry {
20effc67 267 std::string key;
7c673cae
FG
268 ceph::real_time timestamp;
269
270 void decode_json(JSONObj *obj);
271};
272
273struct rgw_datalog_shard_data {
20effc67 274 std::string marker;
7c673cae 275 bool truncated;
20effc67 276 std::vector<rgw_datalog_entry> entries;
7c673cae
FG
277
278 void decode_json(JSONObj *obj);
279};
280
281class RGWAsyncRadosProcessor;
282class RGWDataSyncControlCR;
283
284struct rgw_bucket_entry_owner {
20effc67
TL
285 std::string id;
286 std::string display_name;
7c673cae
FG
287
288 rgw_bucket_entry_owner() {}
20effc67 289 rgw_bucket_entry_owner(const std::string& _id, const std::string& _display_name) : id(_id), display_name(_display_name) {}
7c673cae
FG
290
291 void decode_json(JSONObj *obj);
292};
293
294class RGWSyncErrorLogger;
11fdf7f2 295class RGWRESTConn;
9f95a23c 296class RGWServices;
7c673cae
FG
297
298struct RGWDataSyncEnv {
11fdf7f2
TL
299 const DoutPrefixProvider *dpp{nullptr};
300 CephContext *cct{nullptr};
20effc67 301 rgw::sal::RadosStore* store{nullptr};
9f95a23c 302 RGWServices *svc{nullptr};
11fdf7f2
TL
303 RGWAsyncRadosProcessor *async_rados{nullptr};
304 RGWHTTPManager *http_manager{nullptr};
305 RGWSyncErrorLogger *error_logger{nullptr};
306 RGWSyncTraceManager *sync_tracer{nullptr};
11fdf7f2 307 RGWSyncModuleInstanceRef sync_module{nullptr};
81eedcae 308 PerfCounters* counters{nullptr};
7c673cae 309
11fdf7f2 310 RGWDataSyncEnv() {}
7c673cae 311
20effc67 312 void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RadosStore* _store, RGWServices *_svc,
7c673cae 313 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
11fdf7f2 314 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
9f95a23c 315 RGWSyncModuleInstanceRef& _sync_module,
81eedcae 316 PerfCounters* _counters) {
9f95a23c 317 dpp = _dpp;
7c673cae
FG
318 cct = _cct;
319 store = _store;
9f95a23c 320 svc = _svc;
7c673cae
FG
321 async_rados = _async_rados;
322 http_manager = _http_manager;
323 error_logger = _error_logger;
11fdf7f2 324 sync_tracer = _sync_tracer;
7c673cae 325 sync_module = _sync_module;
81eedcae 326 counters = _counters;
7c673cae
FG
327 }
328
20effc67
TL
329 std::string shard_obj_name(int shard_id);
330 std::string status_oid();
7c673cae
FG
331};
332
9f95a23c
TL
333struct RGWDataSyncCtx {
334 CephContext *cct{nullptr};
335 RGWDataSyncEnv *env{nullptr};
336
337 RGWRESTConn *conn{nullptr};
338 rgw_zone_id source_zone;
339
340 void init(RGWDataSyncEnv *_env,
341 RGWRESTConn *_conn,
342 const rgw_zone_id& _source_zone) {
343 cct = _env->cct;
344 env = _env;
345 conn = _conn;
346 source_zone = _source_zone;
347 }
348};
349
350class RGWRados;
9f95a23c 351
7c673cae 352class RGWRemoteDataLog : public RGWCoroutinesManager {
11fdf7f2 353 const DoutPrefixProvider *dpp;
20effc67 354 rgw::sal::RadosStore* store;
9f95a23c
TL
355 CephContext *cct;
356 RGWCoroutinesManagerRegistry *cr_registry;
7c673cae
FG
357 RGWAsyncRadosProcessor *async_rados;
358 RGWHTTPManager http_manager;
359
360 RGWDataSyncEnv sync_env;
9f95a23c 361 RGWDataSyncCtx sc;
7c673cae 362
9f95a23c 363 ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock");
7c673cae
FG
364 RGWDataSyncControlCR *data_sync_cr;
365
11fdf7f2
TL
366 RGWSyncTraceNodeRef tn;
367
7c673cae
FG
368 bool initialized;
369
370public:
9f95a23c 371 RGWRemoteDataLog(const DoutPrefixProvider *dpp,
20effc67 372 rgw::sal::RadosStore* _store,
9f95a23c
TL
373 RGWAsyncRadosProcessor *async_rados);
374 int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
81eedcae
TL
375 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module,
376 PerfCounters* _counters);
7c673cae
FG
377 void finish();
378
b3b6e05e 379 int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info);
20effc67
TL
380 int read_source_log_shards_info(const DoutPrefixProvider *dpp, std::map<int, RGWDataChangesLogInfo> *shards_info);
381 int read_source_log_shards_next(const DoutPrefixProvider *dpp, std::map<int, std::string> shard_markers, std::map<int, rgw_datalog_shard_data> *result);
b3b6e05e 382 int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status);
20effc67
TL
383 int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, std::set<int>& recovering_shards);
384 int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, std::set<std::string>& lagging_buckets,std::set<std::string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
b3b6e05e
TL
385 int init_sync_status(const DoutPrefixProvider *dpp, int num_shards);
386 int run_sync(const DoutPrefixProvider *dpp, int num_shards);
7c673cae 387
20effc67 388 void wakeup(int shard_id, std::set<std::string>& keys);
7c673cae
FG
389};
390
11fdf7f2 391class RGWDataSyncStatusManager : public DoutPrefixProvider {
20effc67 392 rgw::sal::RadosStore* store;
7c673cae 393
9f95a23c 394 rgw_zone_id source_zone;
7c673cae
FG
395 RGWRESTConn *conn;
396 RGWSyncErrorLogger *error_logger;
397 RGWSyncModuleInstanceRef sync_module;
81eedcae 398 PerfCounters* counters;
7c673cae
FG
399
400 RGWRemoteDataLog source_log;
401
20effc67
TL
402 std::string source_status_oid;
403 std::string source_shard_status_oid_prefix;
7c673cae 404
20effc67 405 std::map<int, rgw_raw_obj> shard_objs;
7c673cae
FG
406
407 int num_shards;
408
409public:
20effc67 410 RGWDataSyncStatusManager(rgw::sal::RadosStore* _store, RGWAsyncRadosProcessor *async_rados,
9f95a23c 411 const rgw_zone_id& _source_zone, PerfCounters* counters)
7c673cae 412 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
81eedcae 413 sync_module(nullptr), counters(counters),
11fdf7f2 414 source_log(this, store, async_rados), num_shards(0) {}
20effc67 415 RGWDataSyncStatusManager(rgw::sal::RadosStore* _store, RGWAsyncRadosProcessor *async_rados,
9f95a23c 416 const rgw_zone_id& _source_zone, PerfCounters* counters,
81eedcae 417 const RGWSyncModuleInstanceRef& _sync_module)
94b18763 418 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
81eedcae 419 sync_module(_sync_module), counters(counters),
11fdf7f2 420 source_log(this, store, async_rados), num_shards(0) {}
7c673cae
FG
421 ~RGWDataSyncStatusManager() {
422 finalize();
423 }
b3b6e05e 424 int init(const DoutPrefixProvider *dpp);
7c673cae
FG
425 void finalize();
426
20effc67
TL
427 static std::string shard_obj_name(const rgw_zone_id& source_zone, int shard_id);
428 static std::string sync_status_oid(const rgw_zone_id& source_zone);
7c673cae 429
b3b6e05e
TL
430 int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status) {
431 return source_log.read_sync_status(dpp, sync_status);
7c673cae 432 }
28e407b8 433
20effc67 434 int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, std::set<int>& recovering_shards) {
b3b6e05e 435 return source_log.read_recovering_shards(dpp, num_shards, recovering_shards);
28e407b8
AA
436 }
437
20effc67 438 int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, std::set<std::string>& lagging_buckets, std::set<std::string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
b3b6e05e 439 return source_log.read_shard_status(dpp, shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
28e407b8 440 }
b3b6e05e 441 int init_sync_status(const DoutPrefixProvider *dpp) { return source_log.init_sync_status(dpp, num_shards); }
7c673cae 442
b3b6e05e
TL
443 int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info) {
444 return source_log.read_log_info(dpp, log_info);
7c673cae 445 }
20effc67 446 int read_source_log_shards_info(const DoutPrefixProvider *dpp, std::map<int, RGWDataChangesLogInfo> *shards_info) {
b3b6e05e 447 return source_log.read_source_log_shards_info(dpp, shards_info);
7c673cae 448 }
20effc67 449 int read_source_log_shards_next(const DoutPrefixProvider *dpp, std::map<int, std::string> shard_markers, std::map<int, rgw_datalog_shard_data> *result) {
b3b6e05e 450 return source_log.read_source_log_shards_next(dpp, shard_markers, result);
7c673cae
FG
451 }
452
b3b6e05e 453 int run(const DoutPrefixProvider *dpp) { return source_log.run_sync(dpp, num_shards); }
7c673cae 454
20effc67 455 void wakeup(int shard_id, std::set<std::string>& keys) { return source_log.wakeup(shard_id, keys); }
7c673cae
FG
456 void stop() {
457 source_log.finish();
458 }
11fdf7f2
TL
459
460 // implements DoutPrefixProvider
9f95a23c 461 CephContext *get_cct() const override;
11fdf7f2
TL
462 unsigned get_subsys() const override;
463 std::ostream& gen_prefix(std::ostream& out) const override;
7c673cae
FG
464};
465
9f95a23c 466class RGWBucketPipeSyncStatusManager;
7c673cae
FG
467class RGWBucketSyncCR;
468
469struct rgw_bucket_shard_full_sync_marker {
470 rgw_obj_key position;
471 uint64_t count;
472
473 rgw_bucket_shard_full_sync_marker() : count(0) {}
474
20effc67 475 void encode_attr(std::map<std::string, bufferlist>& attrs);
7c673cae
FG
476
477 void encode(bufferlist& bl) const {
478 ENCODE_START(1, 1, bl);
11fdf7f2
TL
479 encode(position, bl);
480 encode(count, bl);
7c673cae
FG
481 ENCODE_FINISH(bl);
482 }
483
11fdf7f2 484 void decode(bufferlist::const_iterator& bl) {
7c673cae 485 DECODE_START(1, bl);
11fdf7f2
TL
486 decode(position, bl);
487 decode(count, bl);
7c673cae
FG
488 DECODE_FINISH(bl);
489 }
490
b32b8144
FG
491 void dump(Formatter *f) const;
492 void decode_json(JSONObj *obj);
7c673cae
FG
493};
494WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
495
496struct rgw_bucket_shard_inc_sync_marker {
20effc67 497 std::string position;
f67539c2 498 ceph::real_time timestamp;
7c673cae 499
20effc67 500 void encode_attr(std::map<std::string, bufferlist>& attrs);
7c673cae
FG
501
502 void encode(bufferlist& bl) const {
f67539c2 503 ENCODE_START(2, 1, bl);
11fdf7f2 504 encode(position, bl);
f67539c2 505 encode(timestamp, bl);
7c673cae
FG
506 ENCODE_FINISH(bl);
507 }
508
11fdf7f2 509 void decode(bufferlist::const_iterator& bl) {
f67539c2 510 DECODE_START(2, bl);
11fdf7f2 511 decode(position, bl);
f67539c2
TL
512 if (struct_v >= 2) {
513 decode(timestamp, bl);
514 }
515 DECODE_FINISH(bl);
7c673cae
FG
516 }
517
b32b8144
FG
518 void dump(Formatter *f) const;
519 void decode_json(JSONObj *obj);
7c673cae
FG
520};
521WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
522
523struct rgw_bucket_shard_sync_info {
524 enum SyncState {
525 StateInit = 0,
526 StateFullSync = 1,
527 StateIncrementalSync = 2,
9f95a23c 528 StateStopped = 3,
7c673cae
FG
529 };
530
531 uint16_t state;
532 rgw_bucket_shard_full_sync_marker full_marker;
533 rgw_bucket_shard_inc_sync_marker inc_marker;
534
20effc67
TL
535 void decode_from_attrs(CephContext *cct, std::map<std::string, bufferlist>& attrs);
536 void encode_all_attrs(std::map<std::string, bufferlist>& attrs);
537 void encode_state_attr(std::map<std::string, bufferlist>& attrs);
7c673cae
FG
538
539 void encode(bufferlist& bl) const {
540 ENCODE_START(1, 1, bl);
11fdf7f2
TL
541 encode(state, bl);
542 encode(full_marker, bl);
543 encode(inc_marker, bl);
7c673cae
FG
544 ENCODE_FINISH(bl);
545 }
546
11fdf7f2 547 void decode(bufferlist::const_iterator& bl) {
7c673cae 548 DECODE_START(1, bl);
11fdf7f2
TL
549 decode(state, bl);
550 decode(full_marker, bl);
551 decode(inc_marker, bl);
7c673cae
FG
552 DECODE_FINISH(bl);
553 }
554
b32b8144
FG
555 void dump(Formatter *f) const;
556 void decode_json(JSONObj *obj);
7c673cae
FG
557
558 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
559
560};
561WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
562
28e407b8 563struct rgw_bucket_index_marker_info {
20effc67
TL
564 std::string bucket_ver;
565 std::string master_ver;
566 std::string max_marker;
28e407b8
AA
567 bool syncstopped{false};
568
569 void decode_json(JSONObj *obj) {
570 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
571 JSONDecoder::decode_json("master_ver", master_ver, obj);
572 JSONDecoder::decode_json("max_marker", max_marker, obj);
573 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
574 }
575};
576
7c673cae 577
9f95a23c 578class RGWRemoteBucketManager {
11fdf7f2 579 const DoutPrefixProvider *dpp;
9f95a23c
TL
580
581 RGWDataSyncEnv *sync_env;
582
7c673cae 583 RGWRESTConn *conn{nullptr};
9f95a23c 584 rgw_zone_id source_zone;
7c673cae 585
20effc67 586 std::vector<rgw_bucket_sync_pair_info> sync_pairs;
7c673cae 587
9f95a23c 588 RGWDataSyncCtx sc;
7c673cae
FG
589 rgw_bucket_shard_sync_info init_status;
590
591 RGWBucketSyncCR *sync_cr{nullptr};
592
593public:
9f95a23c
TL
594 RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
595 RGWDataSyncEnv *_sync_env,
596 const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
597 const RGWBucketInfo& source_bucket_info,
598 const rgw_bucket& dest_bucket);
599
600 void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
601 const rgw_bucket& source_bucket, int shard_id,
602 const rgw_bucket& dest_bucket);
603
604 RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
f67539c2 605 RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker);
9f95a23c
TL
606 RGWCoroutine *run_sync_cr(int num);
607
608 int num_pipes() {
609 return sync_pairs.size();
610 }
7c673cae
FG
611
612 void wakeup();
613};
614
f67539c2
TL
615class BucketIndexShardsManager;
616
b3b6e05e
TL
617int rgw_read_remote_bilog_info(const DoutPrefixProvider *dpp,
618 RGWRESTConn* conn,
f67539c2
TL
619 const rgw_bucket& bucket,
620 BucketIndexShardsManager& markers,
621 optional_yield y);
622
9f95a23c 623class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
20effc67 624 rgw::sal::RadosStore* store;
9f95a23c
TL
625
626 RGWDataSyncEnv sync_env;
7c673cae
FG
627
628 RGWCoroutinesManager cr_mgr;
629
630 RGWHTTPManager http_manager;
631
9f95a23c
TL
632 std::optional<rgw_zone_id> source_zone;
633 std::optional<rgw_bucket> source_bucket;
634
7c673cae
FG
635 RGWRESTConn *conn;
636 RGWSyncErrorLogger *error_logger;
637 RGWSyncModuleInstanceRef sync_module;
638
9f95a23c 639 rgw_bucket dest_bucket;
7c673cae 640
20effc67 641 std::vector<RGWRemoteBucketManager *> source_mgrs;
7c673cae 642
20effc67
TL
643 std::string source_status_oid;
644 std::string source_shard_status_oid_prefix;
7c673cae 645
20effc67 646 std::map<int, rgw_bucket_shard_sync_info> sync_status;
7c673cae
FG
647 rgw_raw_obj status_obj;
648
649 int num_shards;
650
651public:
20effc67 652 RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store,
9f95a23c
TL
653 std::optional<rgw_zone_id> _source_zone,
654 std::optional<rgw_bucket> _source_bucket,
655 const rgw_bucket& dest_bucket);
656 ~RGWBucketPipeSyncStatusManager();
7c673cae 657
b3b6e05e 658 int init(const DoutPrefixProvider *dpp);
7c673cae 659
20effc67 660 std::map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
b3b6e05e 661 int init_sync_status(const DoutPrefixProvider *dpp);
7c673cae 662
20effc67
TL
663 static std::string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs);
664 static std::string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
665 const rgw_zone_id& source_zone, const rgw::sal::Object* obj); /* specific source obj sync status,
9f95a23c 666 can be used by sync modules */
11fdf7f2
TL
667
668 // implements DoutPrefixProvider
9f95a23c 669 CephContext *get_cct() const override;
11fdf7f2
TL
670 unsigned get_subsys() const override;
671 std::ostream& gen_prefix(std::ostream& out) const override;
7c673cae 672
b3b6e05e
TL
673 int read_sync_status(const DoutPrefixProvider *dpp);
674 int run(const DoutPrefixProvider *dpp);
7c673cae
FG
675};
676
b32b8144 677/// read the sync status of all bucket shards from the given source zone
9f95a23c 678int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
20effc67 679 rgw::sal::RadosStore* store,
9f95a23c
TL
680 const rgw_sync_bucket_pipe& pipe,
681 const RGWBucketInfo& dest_bucket_info,
682 const RGWBucketInfo *psource_bucket_info,
b32b8144
FG
683 std::vector<rgw_bucket_shard_sync_info> *status);
684
7c673cae
FG
685class RGWDefaultSyncModule : public RGWSyncModule {
686public:
687 RGWDefaultSyncModule() {}
11fdf7f2 688 bool supports_writes() override { return true; }
7c673cae 689 bool supports_data_export() override { return true; }
20effc67 690 int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
11fdf7f2
TL
691};
692
693class RGWArchiveSyncModule : public RGWDefaultSyncModule {
694public:
695 RGWArchiveSyncModule() {}
696 bool supports_writes() override { return true; }
697 bool supports_data_export() override { return false; }
20effc67 698 int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
7c673cae
FG
699};
700
7c673cae 701#endif