]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.h
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
CommitLineData
7c673cae
FG
1#ifndef CEPH_RGW_DATA_SYNC_H
2#define CEPH_RGW_DATA_SYNC_H
3
4#include "rgw_coroutine.h"
5#include "rgw_http_client.h"
6#include "rgw_bucket.h"
7
8#include "rgw_sync_module.h"
9
10#include "common/RWLock.h"
11#include "common/ceph_json.h"
12
7c673cae
FG
13struct rgw_datalog_info {
14 uint32_t num_shards;
15
16 rgw_datalog_info() : num_shards(0) {}
17
18 void decode_json(JSONObj *obj);
19};
20
21struct rgw_data_sync_info {
22 enum SyncState {
23 StateInit = 0,
24 StateBuildingFullSyncMaps = 1,
25 StateSync = 2,
26 };
27
28 uint16_t state;
29 uint32_t num_shards;
30
31f18b77
FG
31 uint64_t instance_id{0};
32
7c673cae 33 void encode(bufferlist& bl) const {
31f18b77 34 ENCODE_START(2, 1, bl);
7c673cae
FG
35 ::encode(state, bl);
36 ::encode(num_shards, bl);
31f18b77 37 ::encode(instance_id, bl);
7c673cae
FG
38 ENCODE_FINISH(bl);
39 }
40
41 void decode(bufferlist::iterator& bl) {
31f18b77 42 DECODE_START(2, bl);
7c673cae
FG
43 ::decode(state, bl);
44 ::decode(num_shards, bl);
31f18b77
FG
45 if (struct_v >= 2) {
46 ::decode(instance_id, bl);
47 }
7c673cae
FG
48 DECODE_FINISH(bl);
49 }
50
51 void dump(Formatter *f) const {
52 string s;
53 switch ((SyncState)state) {
54 case StateInit:
55 s = "init";
56 break;
57 case StateBuildingFullSyncMaps:
58 s = "building-full-sync-maps";
59 break;
60 case StateSync:
61 s = "sync";
62 break;
63 default:
64 s = "unknown";
65 break;
66 }
67 encode_json("status", s, f);
68 encode_json("num_shards", num_shards, f);
31f18b77 69 encode_json("instance_id", instance_id, f);
7c673cae
FG
70 }
71 void decode_json(JSONObj *obj) {
72 std::string s;
73 JSONDecoder::decode_json("status", s, obj);
74 if (s == "building-full-sync-maps") {
75 state = StateBuildingFullSyncMaps;
76 } else if (s == "sync") {
77 state = StateSync;
78 } else {
79 state = StateInit;
80 }
81 JSONDecoder::decode_json("num_shards", num_shards, obj);
31f18b77 82 JSONDecoder::decode_json("instance_id", num_shards, obj);
7c673cae 83 }
b32b8144 84 static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
7c673cae
FG
85
86 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
87};
88WRITE_CLASS_ENCODER(rgw_data_sync_info)
89
90struct rgw_data_sync_marker {
91 enum SyncState {
92 FullSync = 0,
93 IncrementalSync = 1,
94 };
95 uint16_t state;
96 string marker;
97 string next_step_marker;
98 uint64_t total_entries;
99 uint64_t pos;
100 real_time timestamp;
101
102 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
103
104 void encode(bufferlist& bl) const {
105 ENCODE_START(1, 1, bl);
106 ::encode(state, bl);
107 ::encode(marker, bl);
108 ::encode(next_step_marker, bl);
109 ::encode(total_entries, bl);
110 ::encode(pos, bl);
111 ::encode(timestamp, bl);
112 ENCODE_FINISH(bl);
113 }
114
115 void decode(bufferlist::iterator& bl) {
116 DECODE_START(1, bl);
117 ::decode(state, bl);
118 ::decode(marker, bl);
119 ::decode(next_step_marker, bl);
120 ::decode(total_entries, bl);
121 ::decode(pos, bl);
122 ::decode(timestamp, bl);
123 DECODE_FINISH(bl);
124 }
125
126 void dump(Formatter *f) const {
28e407b8
AA
127 const char *s{nullptr};
128 switch ((SyncState)state) {
129 case FullSync:
130 s = "full-sync";
131 break;
132 case IncrementalSync:
133 s = "incremental-sync";
134 break;
135 default:
136 s = "unknown";
137 break;
138 }
139 encode_json("status", s, f);
7c673cae
FG
140 encode_json("marker", marker, f);
141 encode_json("next_step_marker", next_step_marker, f);
142 encode_json("total_entries", total_entries, f);
143 encode_json("pos", pos, f);
144 encode_json("timestamp", utime_t(timestamp), f);
145 }
146 void decode_json(JSONObj *obj) {
28e407b8
AA
147 std::string s;
148 JSONDecoder::decode_json("status", s, obj);
149 if (s == "full-sync") {
150 state = FullSync;
151 } else if (s == "incremental-sync") {
152 state = IncrementalSync;
153 }
7c673cae
FG
154 JSONDecoder::decode_json("marker", marker, obj);
155 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
156 JSONDecoder::decode_json("total_entries", total_entries, obj);
157 JSONDecoder::decode_json("pos", pos, obj);
158 utime_t t;
159 JSONDecoder::decode_json("timestamp", t, obj);
160 timestamp = t.to_real_time();
161 }
b32b8144 162 static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
7c673cae
FG
163};
164WRITE_CLASS_ENCODER(rgw_data_sync_marker)
165
166struct rgw_data_sync_status {
167 rgw_data_sync_info sync_info;
168 map<uint32_t, rgw_data_sync_marker> sync_markers;
169
170 rgw_data_sync_status() {}
171
172 void encode(bufferlist& bl) const {
173 ENCODE_START(1, 1, bl);
174 ::encode(sync_info, bl);
175 /* sync markers are encoded separately */
176 ENCODE_FINISH(bl);
177 }
178
179 void decode(bufferlist::iterator& bl) {
180 DECODE_START(1, bl);
181 ::decode(sync_info, bl);
182 /* sync markers are decoded separately */
183 DECODE_FINISH(bl);
184 }
185
186 void dump(Formatter *f) const {
187 encode_json("info", sync_info, f);
188 encode_json("markers", sync_markers, f);
189 }
190 void decode_json(JSONObj *obj) {
191 JSONDecoder::decode_json("info", sync_info, obj);
192 JSONDecoder::decode_json("markers", sync_markers, obj);
193 }
b32b8144 194 static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
7c673cae
FG
195};
196WRITE_CLASS_ENCODER(rgw_data_sync_status)
197
198struct rgw_datalog_entry {
199 string key;
200 ceph::real_time timestamp;
201
202 void decode_json(JSONObj *obj);
203};
204
205struct rgw_datalog_shard_data {
206 string marker;
207 bool truncated;
208 vector<rgw_datalog_entry> entries;
209
210 void decode_json(JSONObj *obj);
211};
212
213class RGWAsyncRadosProcessor;
214class RGWDataSyncControlCR;
215
216struct rgw_bucket_entry_owner {
217 string id;
218 string display_name;
219
220 rgw_bucket_entry_owner() {}
221 rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
222
223 void decode_json(JSONObj *obj);
224};
225
226class RGWSyncErrorLogger;
227
228struct RGWDataSyncEnv {
229 CephContext *cct;
230 RGWRados *store;
231 RGWRESTConn *conn;
232 RGWAsyncRadosProcessor *async_rados;
233 RGWHTTPManager *http_manager;
234 RGWSyncErrorLogger *error_logger;
235 string source_zone;
236 RGWSyncModuleInstanceRef sync_module;
237
238 RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
239
240 void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
241 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
242 RGWSyncErrorLogger *_error_logger, const string& _source_zone,
91327a77 243 RGWSyncModuleInstanceRef& _sync_module) {
7c673cae
FG
244 cct = _cct;
245 store = _store;
246 conn = _conn;
247 async_rados = _async_rados;
248 http_manager = _http_manager;
249 error_logger = _error_logger;
250 source_zone = _source_zone;
251 sync_module = _sync_module;
252 }
253
254 string shard_obj_name(int shard_id);
255 string status_oid();
256};
257
258class RGWRemoteDataLog : public RGWCoroutinesManager {
259 RGWRados *store;
260 RGWAsyncRadosProcessor *async_rados;
261 RGWHTTPManager http_manager;
262
263 RGWDataSyncEnv sync_env;
264
265 RWLock lock;
266 RGWDataSyncControlCR *data_sync_cr;
267
268 bool initialized;
269
270public:
91327a77 271 RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
7c673cae 272 : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
91327a77 273 store(_store), async_rados(async_rados),
7c673cae
FG
274 http_manager(store->ctx(), completion_mgr),
275 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
276 initialized(false) {}
277 int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module);
278 void finish();
279
280 int read_log_info(rgw_datalog_info *log_info);
281 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
282 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
7c673cae 283 int read_sync_status(rgw_data_sync_status *sync_status);
28e407b8
AA
284 int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
285 int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
7c673cae
FG
286 int init_sync_status(int num_shards);
287 int run_sync(int num_shards);
288
289 void wakeup(int shard_id, set<string>& keys);
290};
291
292class RGWDataSyncStatusManager {
293 RGWRados *store;
294 rgw_rados_ref ref;
295
296 string source_zone;
297 RGWRESTConn *conn;
298 RGWSyncErrorLogger *error_logger;
299 RGWSyncModuleInstanceRef sync_module;
300
301 RGWRemoteDataLog source_log;
302
303 string source_status_oid;
304 string source_shard_status_oid_prefix;
305
306 map<int, rgw_raw_obj> shard_objs;
307
308 int num_shards;
309
310public:
311 RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
91327a77 312 const string& _source_zone)
7c673cae
FG
313 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
314 sync_module(nullptr),
91327a77 315 source_log(store, async_rados), num_shards(0) {}
94b18763 316 RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
91327a77 317 const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module)
94b18763
FG
318 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
319 sync_module(_sync_module),
91327a77 320 source_log(store, async_rados), num_shards(0) {}
7c673cae
FG
321 ~RGWDataSyncStatusManager() {
322 finalize();
323 }
324 int init();
325 void finalize();
326
327 static string shard_obj_name(const string& source_zone, int shard_id);
328 static string sync_status_oid(const string& source_zone);
329
330 int read_sync_status(rgw_data_sync_status *sync_status) {
331 return source_log.read_sync_status(sync_status);
332 }
28e407b8
AA
333
334 int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
335 return source_log.read_recovering_shards(num_shards, recovering_shards);
336 }
337
338 int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
339 return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
340 }
7c673cae
FG
341 int init_sync_status() { return source_log.init_sync_status(num_shards); }
342
343 int read_log_info(rgw_datalog_info *log_info) {
344 return source_log.read_log_info(log_info);
345 }
346 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
347 return source_log.read_source_log_shards_info(shards_info);
348 }
349 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
350 return source_log.read_source_log_shards_next(shard_markers, result);
351 }
352
353 int run() { return source_log.run_sync(num_shards); }
354
355 void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
356 void stop() {
357 source_log.finish();
358 }
359};
360
361class RGWBucketSyncStatusManager;
362class RGWBucketSyncCR;
363
364struct rgw_bucket_shard_full_sync_marker {
365 rgw_obj_key position;
366 uint64_t count;
367
368 rgw_bucket_shard_full_sync_marker() : count(0) {}
369
370 void encode_attr(map<string, bufferlist>& attrs);
371
372 void encode(bufferlist& bl) const {
373 ENCODE_START(1, 1, bl);
374 ::encode(position, bl);
375 ::encode(count, bl);
376 ENCODE_FINISH(bl);
377 }
378
379 void decode(bufferlist::iterator& bl) {
380 DECODE_START(1, bl);
381 ::decode(position, bl);
382 ::decode(count, bl);
383 DECODE_FINISH(bl);
384 }
385
b32b8144
FG
386 void dump(Formatter *f) const;
387 void decode_json(JSONObj *obj);
7c673cae
FG
388};
389WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
390
391struct rgw_bucket_shard_inc_sync_marker {
392 string position;
393
394 rgw_bucket_shard_inc_sync_marker() {}
395
396 void encode_attr(map<string, bufferlist>& attrs);
397
398 void encode(bufferlist& bl) const {
399 ENCODE_START(1, 1, bl);
400 ::encode(position, bl);
401 ENCODE_FINISH(bl);
402 }
403
404 void decode(bufferlist::iterator& bl) {
405 DECODE_START(1, bl);
406 ::decode(position, bl);
407 DECODE_FINISH(bl);
408 }
409
b32b8144
FG
410 void dump(Formatter *f) const;
411 void decode_json(JSONObj *obj);
7c673cae
FG
412
413 bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
414 return (position < m.position);
415 }
416};
417WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
418
419struct rgw_bucket_shard_sync_info {
420 enum SyncState {
421 StateInit = 0,
422 StateFullSync = 1,
423 StateIncrementalSync = 2,
424 };
425
426 uint16_t state;
427 rgw_bucket_shard_full_sync_marker full_marker;
428 rgw_bucket_shard_inc_sync_marker inc_marker;
429
430 void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
431 void encode_all_attrs(map<string, bufferlist>& attrs);
432 void encode_state_attr(map<string, bufferlist>& attrs);
433
434 void encode(bufferlist& bl) const {
435 ENCODE_START(1, 1, bl);
436 ::encode(state, bl);
437 ::encode(full_marker, bl);
438 ::encode(inc_marker, bl);
439 ENCODE_FINISH(bl);
440 }
441
442 void decode(bufferlist::iterator& bl) {
443 DECODE_START(1, bl);
444 ::decode(state, bl);
445 ::decode(full_marker, bl);
446 ::decode(inc_marker, bl);
447 DECODE_FINISH(bl);
448 }
449
b32b8144
FG
450 void dump(Formatter *f) const;
451 void decode_json(JSONObj *obj);
7c673cae
FG
452
453 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
454
455};
456WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
457
28e407b8
AA
458struct rgw_bucket_index_marker_info {
459 string bucket_ver;
460 string master_ver;
461 string max_marker;
462 bool syncstopped{false};
463
464 void decode_json(JSONObj *obj) {
465 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
466 JSONDecoder::decode_json("master_ver", master_ver, obj);
467 JSONDecoder::decode_json("max_marker", max_marker, obj);
468 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
469 }
470};
471
7c673cae
FG
472
473class RGWRemoteBucketLog : public RGWCoroutinesManager {
474 RGWRados *store;
475 RGWRESTConn *conn{nullptr};
476 string source_zone;
477 rgw_bucket_shard bs;
478
479 RGWBucketSyncStatusManager *status_manager;
480 RGWAsyncRadosProcessor *async_rados;
481 RGWHTTPManager *http_manager;
482
483 RGWDataSyncEnv sync_env;
484 rgw_bucket_shard_sync_info init_status;
485
486 RGWBucketSyncCR *sync_cr{nullptr};
487
488public:
489 RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
490 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
491 status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager) {}
492
493 int init(const string& _source_zone, RGWRESTConn *_conn,
494 const rgw_bucket& bucket, int shard_id,
495 RGWSyncErrorLogger *_error_logger,
496 RGWSyncModuleInstanceRef& _sync_module);
497 void finish();
498
499 RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
500 RGWCoroutine *init_sync_status_cr();
501 RGWCoroutine *run_sync_cr();
502
503 void wakeup();
504};
505
506class RGWBucketSyncStatusManager {
507 RGWRados *store;
508
509 RGWCoroutinesManager cr_mgr;
510
511 RGWHTTPManager http_manager;
512
513 string source_zone;
514 RGWRESTConn *conn;
515 RGWSyncErrorLogger *error_logger;
516 RGWSyncModuleInstanceRef sync_module;
517
518 rgw_bucket bucket;
519
520 map<int, RGWRemoteBucketLog *> source_logs;
521
522 string source_status_oid;
523 string source_shard_status_oid_prefix;
524
525 map<int, rgw_bucket_shard_sync_info> sync_status;
526 rgw_raw_obj status_obj;
527
528 int num_shards;
529
530public:
531 RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone,
532 const rgw_bucket& bucket) : store(_store),
533 cr_mgr(_store->ctx(), _store->get_cr_registry()),
534 http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
535 source_zone(_source_zone),
536 conn(NULL), error_logger(NULL),
537 bucket(bucket),
538 num_shards(0) {}
539 ~RGWBucketSyncStatusManager();
540
541 int init();
542
543 map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
544 int init_sync_status();
545
546 static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
547
548 int read_sync_status();
549 int run();
550};
551
b32b8144
FG
552/// read the sync status of all bucket shards from the given source zone
553int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
28e407b8 554 const RGWBucketInfo& bucket_info,
b32b8144
FG
555 std::vector<rgw_bucket_shard_sync_info> *status);
556
7c673cae
FG
557class RGWDefaultSyncModule : public RGWSyncModule {
558public:
559 RGWDefaultSyncModule() {}
560 bool supports_data_export() override { return true; }
31f18b77 561 int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
7c673cae
FG
562};
563
564// DataLogTrimCR factory function
565extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
566 RGWHTTPManager *http,
567 int num_shards, utime_t interval);
568
569#endif