]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.h
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
3
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
7
8 #include "rgw_sync_module.h"
9
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
12
13 namespace rgw {
14 class BucketChangeObserver;
15 }
16
17 struct rgw_datalog_info {
18 uint32_t num_shards;
19
20 rgw_datalog_info() : num_shards(0) {}
21
22 void decode_json(JSONObj *obj);
23 };
24
25 struct rgw_data_sync_info {
26 enum SyncState {
27 StateInit = 0,
28 StateBuildingFullSyncMaps = 1,
29 StateSync = 2,
30 };
31
32 uint16_t state;
33 uint32_t num_shards;
34
35 uint64_t instance_id{0};
36
37 void encode(bufferlist& bl) const {
38 ENCODE_START(2, 1, bl);
39 ::encode(state, bl);
40 ::encode(num_shards, bl);
41 ::encode(instance_id, bl);
42 ENCODE_FINISH(bl);
43 }
44
45 void decode(bufferlist::iterator& bl) {
46 DECODE_START(2, bl);
47 ::decode(state, bl);
48 ::decode(num_shards, bl);
49 if (struct_v >= 2) {
50 ::decode(instance_id, bl);
51 }
52 DECODE_FINISH(bl);
53 }
54
55 void dump(Formatter *f) const {
56 string s;
57 switch ((SyncState)state) {
58 case StateInit:
59 s = "init";
60 break;
61 case StateBuildingFullSyncMaps:
62 s = "building-full-sync-maps";
63 break;
64 case StateSync:
65 s = "sync";
66 break;
67 default:
68 s = "unknown";
69 break;
70 }
71 encode_json("status", s, f);
72 encode_json("num_shards", num_shards, f);
73 encode_json("instance_id", instance_id, f);
74 }
75 void decode_json(JSONObj *obj) {
76 std::string s;
77 JSONDecoder::decode_json("status", s, obj);
78 if (s == "building-full-sync-maps") {
79 state = StateBuildingFullSyncMaps;
80 } else if (s == "sync") {
81 state = StateSync;
82 } else {
83 state = StateInit;
84 }
85 JSONDecoder::decode_json("num_shards", num_shards, obj);
86 JSONDecoder::decode_json("instance_id", num_shards, obj);
87 }
88 static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
89
90 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
91 };
92 WRITE_CLASS_ENCODER(rgw_data_sync_info)
93
94 struct rgw_data_sync_marker {
95 enum SyncState {
96 FullSync = 0,
97 IncrementalSync = 1,
98 };
99 uint16_t state;
100 string marker;
101 string next_step_marker;
102 uint64_t total_entries;
103 uint64_t pos;
104 real_time timestamp;
105
106 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
107
108 void encode(bufferlist& bl) const {
109 ENCODE_START(1, 1, bl);
110 ::encode(state, bl);
111 ::encode(marker, bl);
112 ::encode(next_step_marker, bl);
113 ::encode(total_entries, bl);
114 ::encode(pos, bl);
115 ::encode(timestamp, bl);
116 ENCODE_FINISH(bl);
117 }
118
119 void decode(bufferlist::iterator& bl) {
120 DECODE_START(1, bl);
121 ::decode(state, bl);
122 ::decode(marker, bl);
123 ::decode(next_step_marker, bl);
124 ::decode(total_entries, bl);
125 ::decode(pos, bl);
126 ::decode(timestamp, bl);
127 DECODE_FINISH(bl);
128 }
129
130 void dump(Formatter *f) const {
131 encode_json("state", (int)state, f);
132 encode_json("marker", marker, f);
133 encode_json("next_step_marker", next_step_marker, f);
134 encode_json("total_entries", total_entries, f);
135 encode_json("pos", pos, f);
136 encode_json("timestamp", utime_t(timestamp), f);
137 }
138 void decode_json(JSONObj *obj) {
139 int s;
140 JSONDecoder::decode_json("state", s, obj);
141 state = s;
142 JSONDecoder::decode_json("marker", marker, obj);
143 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
144 JSONDecoder::decode_json("total_entries", total_entries, obj);
145 JSONDecoder::decode_json("pos", pos, obj);
146 utime_t t;
147 JSONDecoder::decode_json("timestamp", t, obj);
148 timestamp = t.to_real_time();
149 }
150 static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
151 };
152 WRITE_CLASS_ENCODER(rgw_data_sync_marker)
153
154 struct rgw_data_sync_status {
155 rgw_data_sync_info sync_info;
156 map<uint32_t, rgw_data_sync_marker> sync_markers;
157
158 rgw_data_sync_status() {}
159
160 void encode(bufferlist& bl) const {
161 ENCODE_START(1, 1, bl);
162 ::encode(sync_info, bl);
163 /* sync markers are encoded separately */
164 ENCODE_FINISH(bl);
165 }
166
167 void decode(bufferlist::iterator& bl) {
168 DECODE_START(1, bl);
169 ::decode(sync_info, bl);
170 /* sync markers are decoded separately */
171 DECODE_FINISH(bl);
172 }
173
174 void dump(Formatter *f) const {
175 encode_json("info", sync_info, f);
176 encode_json("markers", sync_markers, f);
177 }
178 void decode_json(JSONObj *obj) {
179 JSONDecoder::decode_json("info", sync_info, obj);
180 JSONDecoder::decode_json("markers", sync_markers, obj);
181 }
182 static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
183 };
184 WRITE_CLASS_ENCODER(rgw_data_sync_status)
185
186 struct rgw_datalog_entry {
187 string key;
188 ceph::real_time timestamp;
189
190 void decode_json(JSONObj *obj);
191 };
192
193 struct rgw_datalog_shard_data {
194 string marker;
195 bool truncated;
196 vector<rgw_datalog_entry> entries;
197
198 void decode_json(JSONObj *obj);
199 };
200
201 class RGWAsyncRadosProcessor;
202 class RGWDataSyncControlCR;
203
204 struct rgw_bucket_entry_owner {
205 string id;
206 string display_name;
207
208 rgw_bucket_entry_owner() {}
209 rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
210
211 void decode_json(JSONObj *obj);
212 };
213
214 class RGWSyncErrorLogger;
215
216 struct RGWDataSyncEnv {
217 CephContext *cct;
218 RGWRados *store;
219 RGWRESTConn *conn;
220 RGWAsyncRadosProcessor *async_rados;
221 RGWHTTPManager *http_manager;
222 RGWSyncErrorLogger *error_logger;
223 string source_zone;
224 RGWSyncModuleInstanceRef sync_module;
225 rgw::BucketChangeObserver *observer{nullptr};
226
227 RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
228
229 void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
230 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
231 RGWSyncErrorLogger *_error_logger, const string& _source_zone,
232 RGWSyncModuleInstanceRef& _sync_module,
233 rgw::BucketChangeObserver *_observer) {
234 cct = _cct;
235 store = _store;
236 conn = _conn;
237 async_rados = _async_rados;
238 http_manager = _http_manager;
239 error_logger = _error_logger;
240 source_zone = _source_zone;
241 sync_module = _sync_module;
242 observer = _observer;
243 }
244
245 string shard_obj_name(int shard_id);
246 string status_oid();
247 };
248
249 class RGWRemoteDataLog : public RGWCoroutinesManager {
250 RGWRados *store;
251 RGWAsyncRadosProcessor *async_rados;
252 rgw::BucketChangeObserver *observer;
253 RGWHTTPManager http_manager;
254
255 RGWDataSyncEnv sync_env;
256
257 RWLock lock;
258 RGWDataSyncControlCR *data_sync_cr;
259
260 bool initialized;
261
262 public:
263 RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
264 rgw::BucketChangeObserver *observer)
265 : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
266 store(_store), async_rados(async_rados), observer(observer),
267 http_manager(store->ctx(), completion_mgr),
268 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
269 initialized(false) {}
270 int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module);
271 void finish();
272
273 int read_log_info(rgw_datalog_info *log_info);
274 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
275 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
276 int read_sync_status(rgw_data_sync_status *sync_status);
277 int init_sync_status(int num_shards);
278 int run_sync(int num_shards);
279
280 void wakeup(int shard_id, set<string>& keys);
281 };
282
283 class RGWDataSyncStatusManager {
284 RGWRados *store;
285 rgw_rados_ref ref;
286
287 string source_zone;
288 RGWRESTConn *conn;
289 RGWSyncErrorLogger *error_logger;
290 RGWSyncModuleInstanceRef sync_module;
291
292 RGWRemoteDataLog source_log;
293
294 string source_status_oid;
295 string source_shard_status_oid_prefix;
296
297 map<int, rgw_raw_obj> shard_objs;
298
299 int num_shards;
300
301 public:
302 RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
303 const string& _source_zone,
304 rgw::BucketChangeObserver *observer = nullptr)
305 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
306 sync_module(nullptr),
307 source_log(store, async_rados, observer), num_shards(0) {}
308 ~RGWDataSyncStatusManager() {
309 finalize();
310 }
311 int init();
312 void finalize();
313
314 static string shard_obj_name(const string& source_zone, int shard_id);
315 static string sync_status_oid(const string& source_zone);
316
317 int read_sync_status(rgw_data_sync_status *sync_status) {
318 return source_log.read_sync_status(sync_status);
319 }
320 int init_sync_status() { return source_log.init_sync_status(num_shards); }
321
322 int read_log_info(rgw_datalog_info *log_info) {
323 return source_log.read_log_info(log_info);
324 }
325 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
326 return source_log.read_source_log_shards_info(shards_info);
327 }
328 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
329 return source_log.read_source_log_shards_next(shard_markers, result);
330 }
331
332 int run() { return source_log.run_sync(num_shards); }
333
334 void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
335 void stop() {
336 source_log.finish();
337 }
338 };
339
340 class RGWBucketSyncStatusManager;
341 class RGWBucketSyncCR;
342
343 struct rgw_bucket_shard_full_sync_marker {
344 rgw_obj_key position;
345 uint64_t count;
346
347 rgw_bucket_shard_full_sync_marker() : count(0) {}
348
349 void encode_attr(map<string, bufferlist>& attrs);
350
351 void encode(bufferlist& bl) const {
352 ENCODE_START(1, 1, bl);
353 ::encode(position, bl);
354 ::encode(count, bl);
355 ENCODE_FINISH(bl);
356 }
357
358 void decode(bufferlist::iterator& bl) {
359 DECODE_START(1, bl);
360 ::decode(position, bl);
361 ::decode(count, bl);
362 DECODE_FINISH(bl);
363 }
364
365 void dump(Formatter *f) const;
366 void decode_json(JSONObj *obj);
367 };
368 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
369
370 struct rgw_bucket_shard_inc_sync_marker {
371 string position;
372
373 rgw_bucket_shard_inc_sync_marker() {}
374
375 void encode_attr(map<string, bufferlist>& attrs);
376
377 void encode(bufferlist& bl) const {
378 ENCODE_START(1, 1, bl);
379 ::encode(position, bl);
380 ENCODE_FINISH(bl);
381 }
382
383 void decode(bufferlist::iterator& bl) {
384 DECODE_START(1, bl);
385 ::decode(position, bl);
386 DECODE_FINISH(bl);
387 }
388
389 void dump(Formatter *f) const;
390 void decode_json(JSONObj *obj);
391
392 bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
393 return (position < m.position);
394 }
395 };
396 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
397
398 struct rgw_bucket_shard_sync_info {
399 enum SyncState {
400 StateInit = 0,
401 StateFullSync = 1,
402 StateIncrementalSync = 2,
403 };
404
405 uint16_t state;
406 rgw_bucket_shard_full_sync_marker full_marker;
407 rgw_bucket_shard_inc_sync_marker inc_marker;
408
409 void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
410 void encode_all_attrs(map<string, bufferlist>& attrs);
411 void encode_state_attr(map<string, bufferlist>& attrs);
412
413 void encode(bufferlist& bl) const {
414 ENCODE_START(1, 1, bl);
415 ::encode(state, bl);
416 ::encode(full_marker, bl);
417 ::encode(inc_marker, bl);
418 ENCODE_FINISH(bl);
419 }
420
421 void decode(bufferlist::iterator& bl) {
422 DECODE_START(1, bl);
423 ::decode(state, bl);
424 ::decode(full_marker, bl);
425 ::decode(inc_marker, bl);
426 DECODE_FINISH(bl);
427 }
428
429 void dump(Formatter *f) const;
430 void decode_json(JSONObj *obj);
431
432 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
433
434 };
435 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
436
437
438 class RGWRemoteBucketLog : public RGWCoroutinesManager {
439 RGWRados *store;
440 RGWRESTConn *conn{nullptr};
441 string source_zone;
442 rgw_bucket_shard bs;
443
444 RGWBucketSyncStatusManager *status_manager;
445 RGWAsyncRadosProcessor *async_rados;
446 RGWHTTPManager *http_manager;
447
448 RGWDataSyncEnv sync_env;
449 rgw_bucket_shard_sync_info init_status;
450
451 RGWBucketSyncCR *sync_cr{nullptr};
452
453 public:
454 RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
455 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
456 status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager) {}
457
458 int init(const string& _source_zone, RGWRESTConn *_conn,
459 const rgw_bucket& bucket, int shard_id,
460 RGWSyncErrorLogger *_error_logger,
461 RGWSyncModuleInstanceRef& _sync_module);
462 void finish();
463
464 RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
465 RGWCoroutine *init_sync_status_cr();
466 RGWCoroutine *run_sync_cr();
467
468 void wakeup();
469 };
470
471 class RGWBucketSyncStatusManager {
472 RGWRados *store;
473
474 RGWCoroutinesManager cr_mgr;
475
476 RGWHTTPManager http_manager;
477
478 string source_zone;
479 RGWRESTConn *conn;
480 RGWSyncErrorLogger *error_logger;
481 RGWSyncModuleInstanceRef sync_module;
482
483 rgw_bucket bucket;
484
485 map<int, RGWRemoteBucketLog *> source_logs;
486
487 string source_status_oid;
488 string source_shard_status_oid_prefix;
489
490 map<int, rgw_bucket_shard_sync_info> sync_status;
491 rgw_raw_obj status_obj;
492
493 int num_shards;
494
495 public:
496 RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone,
497 const rgw_bucket& bucket) : store(_store),
498 cr_mgr(_store->ctx(), _store->get_cr_registry()),
499 http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
500 source_zone(_source_zone),
501 conn(NULL), error_logger(NULL),
502 bucket(bucket),
503 num_shards(0) {}
504 ~RGWBucketSyncStatusManager();
505
506 int init();
507
508 map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
509 int init_sync_status();
510
511 static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
512
513 int read_sync_status();
514 int run();
515 };
516
517 /// read the sync status of all bucket shards from the given source zone
518 int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
519 const rgw_bucket& bucket,
520 std::vector<rgw_bucket_shard_sync_info> *status);
521
522 class RGWDefaultSyncModule : public RGWSyncModule {
523 public:
524 RGWDefaultSyncModule() {}
525 bool supports_data_export() override { return true; }
526 int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
527 };
528
529 // DataLogTrimCR factory function
530 extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
531 RGWHTTPManager *http,
532 int num_shards, utime_t interval);
533
534 #endif