]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.h
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
3
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
7
8 #include "rgw_sync_module.h"
9
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
12
13 namespace rgw {
14 class BucketChangeObserver;
15 }
16
17 struct rgw_datalog_info {
18 uint32_t num_shards;
19
20 rgw_datalog_info() : num_shards(0) {}
21
22 void decode_json(JSONObj *obj);
23 };
24
25 struct rgw_data_sync_info {
26 enum SyncState {
27 StateInit = 0,
28 StateBuildingFullSyncMaps = 1,
29 StateSync = 2,
30 };
31
32 uint16_t state;
33 uint32_t num_shards;
34
35 uint64_t instance_id{0};
36
37 void encode(bufferlist& bl) const {
38 ENCODE_START(2, 1, bl);
39 ::encode(state, bl);
40 ::encode(num_shards, bl);
41 ::encode(instance_id, bl);
42 ENCODE_FINISH(bl);
43 }
44
45 void decode(bufferlist::iterator& bl) {
46 DECODE_START(2, bl);
47 ::decode(state, bl);
48 ::decode(num_shards, bl);
49 if (struct_v >= 2) {
50 ::decode(instance_id, bl);
51 }
52 DECODE_FINISH(bl);
53 }
54
55 void dump(Formatter *f) const {
56 string s;
57 switch ((SyncState)state) {
58 case StateInit:
59 s = "init";
60 break;
61 case StateBuildingFullSyncMaps:
62 s = "building-full-sync-maps";
63 break;
64 case StateSync:
65 s = "sync";
66 break;
67 default:
68 s = "unknown";
69 break;
70 }
71 encode_json("status", s, f);
72 encode_json("num_shards", num_shards, f);
73 encode_json("instance_id", instance_id, f);
74 }
75 void decode_json(JSONObj *obj) {
76 std::string s;
77 JSONDecoder::decode_json("status", s, obj);
78 if (s == "building-full-sync-maps") {
79 state = StateBuildingFullSyncMaps;
80 } else if (s == "sync") {
81 state = StateSync;
82 } else {
83 state = StateInit;
84 }
85 JSONDecoder::decode_json("num_shards", num_shards, obj);
86 JSONDecoder::decode_json("instance_id", num_shards, obj);
87 }
88 static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
89
90 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
91 };
92 WRITE_CLASS_ENCODER(rgw_data_sync_info)
93
94 struct rgw_data_sync_marker {
95 enum SyncState {
96 FullSync = 0,
97 IncrementalSync = 1,
98 };
99 uint16_t state;
100 string marker;
101 string next_step_marker;
102 uint64_t total_entries;
103 uint64_t pos;
104 real_time timestamp;
105
106 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
107
108 void encode(bufferlist& bl) const {
109 ENCODE_START(1, 1, bl);
110 ::encode(state, bl);
111 ::encode(marker, bl);
112 ::encode(next_step_marker, bl);
113 ::encode(total_entries, bl);
114 ::encode(pos, bl);
115 ::encode(timestamp, bl);
116 ENCODE_FINISH(bl);
117 }
118
119 void decode(bufferlist::iterator& bl) {
120 DECODE_START(1, bl);
121 ::decode(state, bl);
122 ::decode(marker, bl);
123 ::decode(next_step_marker, bl);
124 ::decode(total_entries, bl);
125 ::decode(pos, bl);
126 ::decode(timestamp, bl);
127 DECODE_FINISH(bl);
128 }
129
130 void dump(Formatter *f) const {
131 const char *s{nullptr};
132 switch ((SyncState)state) {
133 case FullSync:
134 s = "full-sync";
135 break;
136 case IncrementalSync:
137 s = "incremental-sync";
138 break;
139 default:
140 s = "unknown";
141 break;
142 }
143 encode_json("status", s, f);
144 encode_json("marker", marker, f);
145 encode_json("next_step_marker", next_step_marker, f);
146 encode_json("total_entries", total_entries, f);
147 encode_json("pos", pos, f);
148 encode_json("timestamp", utime_t(timestamp), f);
149 }
150 void decode_json(JSONObj *obj) {
151 std::string s;
152 JSONDecoder::decode_json("status", s, obj);
153 if (s == "full-sync") {
154 state = FullSync;
155 } else if (s == "incremental-sync") {
156 state = IncrementalSync;
157 }
158 JSONDecoder::decode_json("marker", marker, obj);
159 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
160 JSONDecoder::decode_json("total_entries", total_entries, obj);
161 JSONDecoder::decode_json("pos", pos, obj);
162 utime_t t;
163 JSONDecoder::decode_json("timestamp", t, obj);
164 timestamp = t.to_real_time();
165 }
166 static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
167 };
168 WRITE_CLASS_ENCODER(rgw_data_sync_marker)
169
170 struct rgw_data_sync_status {
171 rgw_data_sync_info sync_info;
172 map<uint32_t, rgw_data_sync_marker> sync_markers;
173
174 rgw_data_sync_status() {}
175
176 void encode(bufferlist& bl) const {
177 ENCODE_START(1, 1, bl);
178 ::encode(sync_info, bl);
179 /* sync markers are encoded separately */
180 ENCODE_FINISH(bl);
181 }
182
183 void decode(bufferlist::iterator& bl) {
184 DECODE_START(1, bl);
185 ::decode(sync_info, bl);
186 /* sync markers are decoded separately */
187 DECODE_FINISH(bl);
188 }
189
190 void dump(Formatter *f) const {
191 encode_json("info", sync_info, f);
192 encode_json("markers", sync_markers, f);
193 }
194 void decode_json(JSONObj *obj) {
195 JSONDecoder::decode_json("info", sync_info, obj);
196 JSONDecoder::decode_json("markers", sync_markers, obj);
197 }
198 static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
199 };
200 WRITE_CLASS_ENCODER(rgw_data_sync_status)
201
202 struct rgw_datalog_entry {
203 string key;
204 ceph::real_time timestamp;
205
206 void decode_json(JSONObj *obj);
207 };
208
209 struct rgw_datalog_shard_data {
210 string marker;
211 bool truncated;
212 vector<rgw_datalog_entry> entries;
213
214 void decode_json(JSONObj *obj);
215 };
216
217 class RGWAsyncRadosProcessor;
218 class RGWDataSyncControlCR;
219
220 struct rgw_bucket_entry_owner {
221 string id;
222 string display_name;
223
224 rgw_bucket_entry_owner() {}
225 rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
226
227 void decode_json(JSONObj *obj);
228 };
229
230 class RGWSyncErrorLogger;
231
232 struct RGWDataSyncEnv {
233 CephContext *cct;
234 RGWRados *store;
235 RGWRESTConn *conn;
236 RGWAsyncRadosProcessor *async_rados;
237 RGWHTTPManager *http_manager;
238 RGWSyncErrorLogger *error_logger;
239 string source_zone;
240 RGWSyncModuleInstanceRef sync_module;
241 rgw::BucketChangeObserver *observer{nullptr};
242
243 RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
244
245 void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
246 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
247 RGWSyncErrorLogger *_error_logger, const string& _source_zone,
248 RGWSyncModuleInstanceRef& _sync_module,
249 rgw::BucketChangeObserver *_observer) {
250 cct = _cct;
251 store = _store;
252 conn = _conn;
253 async_rados = _async_rados;
254 http_manager = _http_manager;
255 error_logger = _error_logger;
256 source_zone = _source_zone;
257 sync_module = _sync_module;
258 observer = _observer;
259 }
260
261 string shard_obj_name(int shard_id);
262 string status_oid();
263 };
264
265 class RGWRemoteDataLog : public RGWCoroutinesManager {
266 RGWRados *store;
267 RGWAsyncRadosProcessor *async_rados;
268 rgw::BucketChangeObserver *observer;
269 RGWHTTPManager http_manager;
270
271 RGWDataSyncEnv sync_env;
272
273 RWLock lock;
274 RGWDataSyncControlCR *data_sync_cr;
275
276 bool initialized;
277
278 public:
279 RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
280 rgw::BucketChangeObserver *observer)
281 : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
282 store(_store), async_rados(async_rados), observer(observer),
283 http_manager(store->ctx(), completion_mgr),
284 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
285 initialized(false) {}
286 int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module);
287 void finish();
288
289 int read_log_info(rgw_datalog_info *log_info);
290 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
291 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
292 int read_sync_status(rgw_data_sync_status *sync_status);
293 int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
294 int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
295 int init_sync_status(int num_shards);
296 int run_sync(int num_shards);
297
298 void wakeup(int shard_id, set<string>& keys);
299 };
300
301 class RGWDataSyncStatusManager {
302 RGWRados *store;
303 rgw_rados_ref ref;
304
305 string source_zone;
306 RGWRESTConn *conn;
307 RGWSyncErrorLogger *error_logger;
308 RGWSyncModuleInstanceRef sync_module;
309
310 RGWRemoteDataLog source_log;
311
312 string source_status_oid;
313 string source_shard_status_oid_prefix;
314
315 map<int, rgw_raw_obj> shard_objs;
316
317 int num_shards;
318
319 public:
320 RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
321 const string& _source_zone,
322 rgw::BucketChangeObserver *observer = nullptr)
323 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
324 sync_module(nullptr),
325 source_log(store, async_rados, observer), num_shards(0) {}
326 RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
327 const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module,
328 rgw::BucketChangeObserver *observer = nullptr)
329 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
330 sync_module(_sync_module),
331 source_log(store, async_rados, observer), num_shards(0) {}
332 ~RGWDataSyncStatusManager() {
333 finalize();
334 }
335 int init();
336 void finalize();
337
338 static string shard_obj_name(const string& source_zone, int shard_id);
339 static string sync_status_oid(const string& source_zone);
340
341 int read_sync_status(rgw_data_sync_status *sync_status) {
342 return source_log.read_sync_status(sync_status);
343 }
344
345 int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
346 return source_log.read_recovering_shards(num_shards, recovering_shards);
347 }
348
349 int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
350 return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
351 }
352 int init_sync_status() { return source_log.init_sync_status(num_shards); }
353
354 int read_log_info(rgw_datalog_info *log_info) {
355 return source_log.read_log_info(log_info);
356 }
357 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
358 return source_log.read_source_log_shards_info(shards_info);
359 }
360 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
361 return source_log.read_source_log_shards_next(shard_markers, result);
362 }
363
364 int run() { return source_log.run_sync(num_shards); }
365
366 void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
367 void stop() {
368 source_log.finish();
369 }
370 };
371
372 class RGWBucketSyncStatusManager;
373 class RGWBucketSyncCR;
374
375 struct rgw_bucket_shard_full_sync_marker {
376 rgw_obj_key position;
377 uint64_t count;
378
379 rgw_bucket_shard_full_sync_marker() : count(0) {}
380
381 void encode_attr(map<string, bufferlist>& attrs);
382
383 void encode(bufferlist& bl) const {
384 ENCODE_START(1, 1, bl);
385 ::encode(position, bl);
386 ::encode(count, bl);
387 ENCODE_FINISH(bl);
388 }
389
390 void decode(bufferlist::iterator& bl) {
391 DECODE_START(1, bl);
392 ::decode(position, bl);
393 ::decode(count, bl);
394 DECODE_FINISH(bl);
395 }
396
397 void dump(Formatter *f) const;
398 void decode_json(JSONObj *obj);
399 };
400 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
401
402 struct rgw_bucket_shard_inc_sync_marker {
403 string position;
404
405 rgw_bucket_shard_inc_sync_marker() {}
406
407 void encode_attr(map<string, bufferlist>& attrs);
408
409 void encode(bufferlist& bl) const {
410 ENCODE_START(1, 1, bl);
411 ::encode(position, bl);
412 ENCODE_FINISH(bl);
413 }
414
415 void decode(bufferlist::iterator& bl) {
416 DECODE_START(1, bl);
417 ::decode(position, bl);
418 DECODE_FINISH(bl);
419 }
420
421 void dump(Formatter *f) const;
422 void decode_json(JSONObj *obj);
423
424 bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
425 return (position < m.position);
426 }
427 };
428 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
429
430 struct rgw_bucket_shard_sync_info {
431 enum SyncState {
432 StateInit = 0,
433 StateFullSync = 1,
434 StateIncrementalSync = 2,
435 };
436
437 uint16_t state;
438 rgw_bucket_shard_full_sync_marker full_marker;
439 rgw_bucket_shard_inc_sync_marker inc_marker;
440
441 void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
442 void encode_all_attrs(map<string, bufferlist>& attrs);
443 void encode_state_attr(map<string, bufferlist>& attrs);
444
445 void encode(bufferlist& bl) const {
446 ENCODE_START(1, 1, bl);
447 ::encode(state, bl);
448 ::encode(full_marker, bl);
449 ::encode(inc_marker, bl);
450 ENCODE_FINISH(bl);
451 }
452
453 void decode(bufferlist::iterator& bl) {
454 DECODE_START(1, bl);
455 ::decode(state, bl);
456 ::decode(full_marker, bl);
457 ::decode(inc_marker, bl);
458 DECODE_FINISH(bl);
459 }
460
461 void dump(Formatter *f) const;
462 void decode_json(JSONObj *obj);
463
464 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
465
466 };
467 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
468
469 struct rgw_bucket_index_marker_info {
470 string bucket_ver;
471 string master_ver;
472 string max_marker;
473 bool syncstopped{false};
474
475 void decode_json(JSONObj *obj) {
476 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
477 JSONDecoder::decode_json("master_ver", master_ver, obj);
478 JSONDecoder::decode_json("max_marker", max_marker, obj);
479 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
480 }
481 };
482
483
484 class RGWRemoteBucketLog : public RGWCoroutinesManager {
485 RGWRados *store;
486 RGWRESTConn *conn{nullptr};
487 string source_zone;
488 rgw_bucket_shard bs;
489
490 RGWBucketSyncStatusManager *status_manager;
491 RGWAsyncRadosProcessor *async_rados;
492 RGWHTTPManager *http_manager;
493
494 RGWDataSyncEnv sync_env;
495 rgw_bucket_shard_sync_info init_status;
496
497 RGWBucketSyncCR *sync_cr{nullptr};
498
499 public:
500 RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
501 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
502 status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager) {}
503
504 int init(const string& _source_zone, RGWRESTConn *_conn,
505 const rgw_bucket& bucket, int shard_id,
506 RGWSyncErrorLogger *_error_logger,
507 RGWSyncModuleInstanceRef& _sync_module);
508 void finish();
509
510 RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
511 RGWCoroutine *init_sync_status_cr();
512 RGWCoroutine *run_sync_cr();
513
514 void wakeup();
515 };
516
517 class RGWBucketSyncStatusManager {
518 RGWRados *store;
519
520 RGWCoroutinesManager cr_mgr;
521
522 RGWHTTPManager http_manager;
523
524 string source_zone;
525 RGWRESTConn *conn;
526 RGWSyncErrorLogger *error_logger;
527 RGWSyncModuleInstanceRef sync_module;
528
529 rgw_bucket bucket;
530
531 map<int, RGWRemoteBucketLog *> source_logs;
532
533 string source_status_oid;
534 string source_shard_status_oid_prefix;
535
536 map<int, rgw_bucket_shard_sync_info> sync_status;
537 rgw_raw_obj status_obj;
538
539 int num_shards;
540
541 public:
542 RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone,
543 const rgw_bucket& bucket) : store(_store),
544 cr_mgr(_store->ctx(), _store->get_cr_registry()),
545 http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
546 source_zone(_source_zone),
547 conn(NULL), error_logger(NULL),
548 bucket(bucket),
549 num_shards(0) {}
550 ~RGWBucketSyncStatusManager();
551
552 int init();
553
554 map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
555 int init_sync_status();
556
557 static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
558
559 int read_sync_status();
560 int run();
561 };
562
563 /// read the sync status of all bucket shards from the given source zone
564 int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
565 const RGWBucketInfo& bucket_info,
566 std::vector<rgw_bucket_shard_sync_info> *status);
567
568 class RGWDefaultSyncModule : public RGWSyncModule {
569 public:
570 RGWDefaultSyncModule() {}
571 bool supports_data_export() override { return true; }
572 int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
573 };
574
575 // DataLogTrimCR factory function
576 extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
577 RGWHTTPManager *http,
578 int num_shards, utime_t interval);
579
580 #endif