]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.h
00e094474557d446512aaee86bee44521487af36
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
3
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
7
8 #include "rgw_sync_module.h"
9
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
12
13
14 struct rgw_datalog_info {
15 uint32_t num_shards;
16
17 rgw_datalog_info() : num_shards(0) {}
18
19 void decode_json(JSONObj *obj);
20 };
21
22 struct rgw_data_sync_info {
23 enum SyncState {
24 StateInit = 0,
25 StateBuildingFullSyncMaps = 1,
26 StateSync = 2,
27 };
28
29 uint16_t state;
30 uint32_t num_shards;
31
32 uint64_t instance_id{0};
33
34 void encode(bufferlist& bl) const {
35 ENCODE_START(2, 1, bl);
36 ::encode(state, bl);
37 ::encode(num_shards, bl);
38 ::encode(instance_id, bl);
39 ENCODE_FINISH(bl);
40 }
41
42 void decode(bufferlist::iterator& bl) {
43 DECODE_START(2, bl);
44 ::decode(state, bl);
45 ::decode(num_shards, bl);
46 if (struct_v >= 2) {
47 ::decode(instance_id, bl);
48 }
49 DECODE_FINISH(bl);
50 }
51
52 void dump(Formatter *f) const {
53 string s;
54 switch ((SyncState)state) {
55 case StateInit:
56 s = "init";
57 break;
58 case StateBuildingFullSyncMaps:
59 s = "building-full-sync-maps";
60 break;
61 case StateSync:
62 s = "sync";
63 break;
64 default:
65 s = "unknown";
66 break;
67 }
68 encode_json("status", s, f);
69 encode_json("num_shards", num_shards, f);
70 encode_json("instance_id", instance_id, f);
71 }
72 void decode_json(JSONObj *obj) {
73 std::string s;
74 JSONDecoder::decode_json("status", s, obj);
75 if (s == "building-full-sync-maps") {
76 state = StateBuildingFullSyncMaps;
77 } else if (s == "sync") {
78 state = StateSync;
79 } else {
80 state = StateInit;
81 }
82 JSONDecoder::decode_json("num_shards", num_shards, obj);
83 JSONDecoder::decode_json("instance_id", num_shards, obj);
84 }
85
86 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
87 };
88 WRITE_CLASS_ENCODER(rgw_data_sync_info)
89
90 struct rgw_data_sync_marker {
91 enum SyncState {
92 FullSync = 0,
93 IncrementalSync = 1,
94 };
95 uint16_t state;
96 string marker;
97 string next_step_marker;
98 uint64_t total_entries;
99 uint64_t pos;
100 real_time timestamp;
101
102 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
103
104 void encode(bufferlist& bl) const {
105 ENCODE_START(1, 1, bl);
106 ::encode(state, bl);
107 ::encode(marker, bl);
108 ::encode(next_step_marker, bl);
109 ::encode(total_entries, bl);
110 ::encode(pos, bl);
111 ::encode(timestamp, bl);
112 ENCODE_FINISH(bl);
113 }
114
115 void decode(bufferlist::iterator& bl) {
116 DECODE_START(1, bl);
117 ::decode(state, bl);
118 ::decode(marker, bl);
119 ::decode(next_step_marker, bl);
120 ::decode(total_entries, bl);
121 ::decode(pos, bl);
122 ::decode(timestamp, bl);
123 DECODE_FINISH(bl);
124 }
125
126 void dump(Formatter *f) const {
127 encode_json("state", (int)state, f);
128 encode_json("marker", marker, f);
129 encode_json("next_step_marker", next_step_marker, f);
130 encode_json("total_entries", total_entries, f);
131 encode_json("pos", pos, f);
132 encode_json("timestamp", utime_t(timestamp), f);
133 }
134 void decode_json(JSONObj *obj) {
135 int s;
136 JSONDecoder::decode_json("state", s, obj);
137 state = s;
138 JSONDecoder::decode_json("marker", marker, obj);
139 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
140 JSONDecoder::decode_json("total_entries", total_entries, obj);
141 JSONDecoder::decode_json("pos", pos, obj);
142 utime_t t;
143 JSONDecoder::decode_json("timestamp", t, obj);
144 timestamp = t.to_real_time();
145 }
146 };
147 WRITE_CLASS_ENCODER(rgw_data_sync_marker)
148
149 struct rgw_data_sync_status {
150 rgw_data_sync_info sync_info;
151 map<uint32_t, rgw_data_sync_marker> sync_markers;
152
153 rgw_data_sync_status() {}
154
155 void encode(bufferlist& bl) const {
156 ENCODE_START(1, 1, bl);
157 ::encode(sync_info, bl);
158 /* sync markers are encoded separately */
159 ENCODE_FINISH(bl);
160 }
161
162 void decode(bufferlist::iterator& bl) {
163 DECODE_START(1, bl);
164 ::decode(sync_info, bl);
165 /* sync markers are decoded separately */
166 DECODE_FINISH(bl);
167 }
168
169 void dump(Formatter *f) const {
170 encode_json("info", sync_info, f);
171 encode_json("markers", sync_markers, f);
172 }
173 void decode_json(JSONObj *obj) {
174 JSONDecoder::decode_json("info", sync_info, obj);
175 JSONDecoder::decode_json("markers", sync_markers, obj);
176 }
177 };
178 WRITE_CLASS_ENCODER(rgw_data_sync_status)
179
180 struct rgw_datalog_entry {
181 string key;
182 ceph::real_time timestamp;
183
184 void decode_json(JSONObj *obj);
185 };
186
187 struct rgw_datalog_shard_data {
188 string marker;
189 bool truncated;
190 vector<rgw_datalog_entry> entries;
191
192 void decode_json(JSONObj *obj);
193 };
194
195 class RGWAsyncRadosProcessor;
196 class RGWDataSyncControlCR;
197
198 struct rgw_bucket_entry_owner {
199 string id;
200 string display_name;
201
202 rgw_bucket_entry_owner() {}
203 rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
204
205 void decode_json(JSONObj *obj);
206 };
207
208 class RGWSyncErrorLogger;
209
210 struct RGWDataSyncEnv {
211 CephContext *cct;
212 RGWRados *store;
213 RGWRESTConn *conn;
214 RGWAsyncRadosProcessor *async_rados;
215 RGWHTTPManager *http_manager;
216 RGWSyncErrorLogger *error_logger;
217 string source_zone;
218 RGWSyncModuleInstanceRef sync_module;
219
220 RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
221
222 void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
223 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
224 RGWSyncErrorLogger *_error_logger, const string& _source_zone,
225 RGWSyncModuleInstanceRef& _sync_module) {
226 cct = _cct;
227 store = _store;
228 conn = _conn;
229 async_rados = _async_rados;
230 http_manager = _http_manager;
231 error_logger = _error_logger;
232 source_zone = _source_zone;
233 sync_module = _sync_module;
234 }
235
236 string shard_obj_name(int shard_id);
237 string status_oid();
238 };
239
240 class RGWRemoteDataLog : public RGWCoroutinesManager {
241 RGWRados *store;
242 RGWAsyncRadosProcessor *async_rados;
243 RGWHTTPManager http_manager;
244
245 RGWDataSyncEnv sync_env;
246
247 RWLock lock;
248 RGWDataSyncControlCR *data_sync_cr;
249
250 bool initialized;
251
252 public:
253 RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
254 : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
255 store(_store), async_rados(async_rados),
256 http_manager(store->ctx(), completion_mgr),
257 lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
258 initialized(false) {}
259 int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module);
260 void finish();
261
262 int read_log_info(rgw_datalog_info *log_info);
263 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
264 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
265 int get_shard_info(int shard_id);
266 int read_sync_status(rgw_data_sync_status *sync_status);
267 int init_sync_status(int num_shards);
268 int run_sync(int num_shards);
269
270 void wakeup(int shard_id, set<string>& keys);
271 };
272
273 class RGWDataSyncStatusManager {
274 RGWRados *store;
275 rgw_rados_ref ref;
276
277 string source_zone;
278 RGWRESTConn *conn;
279 RGWSyncErrorLogger *error_logger;
280 RGWSyncModuleInstanceRef sync_module;
281
282 RGWRemoteDataLog source_log;
283
284 string source_status_oid;
285 string source_shard_status_oid_prefix;
286
287 map<int, rgw_raw_obj> shard_objs;
288
289 int num_shards;
290
291 public:
292 RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
293 const string& _source_zone)
294 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
295 sync_module(nullptr),
296 source_log(store, async_rados), num_shards(0) {}
297 ~RGWDataSyncStatusManager() {
298 finalize();
299 }
300 int init();
301 void finalize();
302
303 static string shard_obj_name(const string& source_zone, int shard_id);
304 static string sync_status_oid(const string& source_zone);
305
306 int read_sync_status(rgw_data_sync_status *sync_status) {
307 return source_log.read_sync_status(sync_status);
308 }
309 int init_sync_status() { return source_log.init_sync_status(num_shards); }
310
311 int read_log_info(rgw_datalog_info *log_info) {
312 return source_log.read_log_info(log_info);
313 }
314 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
315 return source_log.read_source_log_shards_info(shards_info);
316 }
317 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
318 return source_log.read_source_log_shards_next(shard_markers, result);
319 }
320
321 int run() { return source_log.run_sync(num_shards); }
322
323 void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
324 void stop() {
325 source_log.finish();
326 }
327 };
328
329 class RGWBucketSyncStatusManager;
330 class RGWBucketSyncCR;
331
332 struct rgw_bucket_shard_full_sync_marker {
333 rgw_obj_key position;
334 uint64_t count;
335
336 rgw_bucket_shard_full_sync_marker() : count(0) {}
337
338 void encode_attr(map<string, bufferlist>& attrs);
339
340 void encode(bufferlist& bl) const {
341 ENCODE_START(1, 1, bl);
342 ::encode(position, bl);
343 ::encode(count, bl);
344 ENCODE_FINISH(bl);
345 }
346
347 void decode(bufferlist::iterator& bl) {
348 DECODE_START(1, bl);
349 ::decode(position, bl);
350 ::decode(count, bl);
351 DECODE_FINISH(bl);
352 }
353
354 void dump(Formatter *f) const {
355 encode_json("position", position, f);
356 encode_json("count", count, f);
357 }
358 };
359 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
360
361 struct rgw_bucket_shard_inc_sync_marker {
362 string position;
363
364 rgw_bucket_shard_inc_sync_marker() {}
365
366 void encode_attr(map<string, bufferlist>& attrs);
367
368 void encode(bufferlist& bl) const {
369 ENCODE_START(1, 1, bl);
370 ::encode(position, bl);
371 ENCODE_FINISH(bl);
372 }
373
374 void decode(bufferlist::iterator& bl) {
375 DECODE_START(1, bl);
376 ::decode(position, bl);
377 DECODE_FINISH(bl);
378 }
379
380 void dump(Formatter *f) const {
381 encode_json("position", position, f);
382 }
383
384 bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
385 return (position < m.position);
386 }
387 };
388 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
389
390 struct rgw_bucket_shard_sync_info {
391 enum SyncState {
392 StateInit = 0,
393 StateFullSync = 1,
394 StateIncrementalSync = 2,
395 };
396
397 uint16_t state;
398 rgw_bucket_shard_full_sync_marker full_marker;
399 rgw_bucket_shard_inc_sync_marker inc_marker;
400
401 void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
402 void encode_all_attrs(map<string, bufferlist>& attrs);
403 void encode_state_attr(map<string, bufferlist>& attrs);
404
405 void encode(bufferlist& bl) const {
406 ENCODE_START(1, 1, bl);
407 ::encode(state, bl);
408 ::encode(full_marker, bl);
409 ::encode(inc_marker, bl);
410 ENCODE_FINISH(bl);
411 }
412
413 void decode(bufferlist::iterator& bl) {
414 DECODE_START(1, bl);
415 ::decode(state, bl);
416 ::decode(full_marker, bl);
417 ::decode(inc_marker, bl);
418 DECODE_FINISH(bl);
419 }
420
421 void dump(Formatter *f) const {
422 string s;
423 switch ((SyncState)state) {
424 case StateInit:
425 s = "init";
426 break;
427 case StateFullSync:
428 s = "full-sync";
429 break;
430 case StateIncrementalSync:
431 s = "incremental-sync";
432 break;
433 default:
434 s = "unknown";
435 break;
436 }
437 encode_json("status", s, f);
438 encode_json("full_marker", full_marker, f);
439 encode_json("inc_marker", inc_marker, f);
440 }
441
442 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
443
444 };
445 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
446
447
448 class RGWRemoteBucketLog : public RGWCoroutinesManager {
449 RGWRados *store;
450 RGWRESTConn *conn{nullptr};
451 string source_zone;
452 rgw_bucket_shard bs;
453
454 RGWBucketSyncStatusManager *status_manager;
455 RGWAsyncRadosProcessor *async_rados;
456 RGWHTTPManager *http_manager;
457
458 RGWDataSyncEnv sync_env;
459 rgw_bucket_shard_sync_info init_status;
460
461 RGWBucketSyncCR *sync_cr{nullptr};
462
463 public:
464 RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
465 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
466 status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager) {}
467
468 int init(const string& _source_zone, RGWRESTConn *_conn,
469 const rgw_bucket& bucket, int shard_id,
470 RGWSyncErrorLogger *_error_logger,
471 RGWSyncModuleInstanceRef& _sync_module);
472 void finish();
473
474 RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
475 RGWCoroutine *init_sync_status_cr();
476 RGWCoroutine *run_sync_cr();
477
478 void wakeup();
479 };
480
481 class RGWBucketSyncStatusManager {
482 RGWRados *store;
483
484 RGWCoroutinesManager cr_mgr;
485
486 RGWHTTPManager http_manager;
487
488 string source_zone;
489 RGWRESTConn *conn;
490 RGWSyncErrorLogger *error_logger;
491 RGWSyncModuleInstanceRef sync_module;
492
493 rgw_bucket bucket;
494
495 map<int, RGWRemoteBucketLog *> source_logs;
496
497 string source_status_oid;
498 string source_shard_status_oid_prefix;
499
500 map<int, rgw_bucket_shard_sync_info> sync_status;
501 rgw_raw_obj status_obj;
502
503 int num_shards;
504
505 public:
506 RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone,
507 const rgw_bucket& bucket) : store(_store),
508 cr_mgr(_store->ctx(), _store->get_cr_registry()),
509 http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
510 source_zone(_source_zone),
511 conn(NULL), error_logger(NULL),
512 bucket(bucket),
513 num_shards(0) {}
514 ~RGWBucketSyncStatusManager();
515
516 int init();
517
518 map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
519 int init_sync_status();
520
521 static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
522
523 int read_sync_status();
524 int run();
525 };
526
527 class RGWDefaultSyncModule : public RGWSyncModule {
528 public:
529 RGWDefaultSyncModule() {}
530 bool supports_data_export() override { return true; }
531 int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
532 };
533
534 // DataLogTrimCR factory function
535 extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
536 RGWHTTPManager *http,
537 int num_shards, utime_t interval);
538
539 #endif