]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.h
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_data_sync.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_DATA_SYNC_H
5 #define CEPH_RGW_DATA_SYNC_H
6
7 #include "include/encoding.h"
8
9 #include "common/RWLock.h"
10 #include "common/ceph_json.h"
11
12 #include "rgw_coroutine.h"
13 #include "rgw_http_client.h"
14 #include "rgw_sal.h"
15
16 #include "rgw_sync_module.h"
17 #include "rgw_sync_trace.h"
18 #include "rgw_sync_policy.h"
19
20 #include "rgw_bucket_sync.h"
21
22 class JSONObj;
23 struct rgw_sync_bucket_pipe;
24
25 struct rgw_bucket_sync_pair_info {
26 RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */
27 rgw_bucket_shard source_bs;
28 rgw_bucket_shard dest_bs;
29 };
30
31 inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
32 if (p.source_bs.bucket == p.dest_bs.bucket) {
33 return out << p.source_bs;
34 }
35
36 out << p.source_bs;
37
38 out << "->" << p.dest_bs.bucket;
39
40 return out;
41 }
42
43 struct rgw_bucket_sync_pipe {
44 rgw_bucket_sync_pair_info info;
45 RGWBucketInfo source_bucket_info;
46 map<string, bufferlist> source_bucket_attrs;
47 RGWBucketInfo dest_bucket_info;
48 map<string, bufferlist> dest_bucket_attrs;
49
50 RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() {
51 return info.handler.rules;
52 }
53 };
54
55 inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
56 return out << p.info;
57 }
58
59 struct rgw_datalog_info {
60 uint32_t num_shards;
61
62 rgw_datalog_info() : num_shards(0) {}
63
64 void decode_json(JSONObj *obj);
65 };
66
67 struct rgw_data_sync_info {
68 enum SyncState {
69 StateInit = 0,
70 StateBuildingFullSyncMaps = 1,
71 StateSync = 2,
72 };
73
74 uint16_t state;
75 uint32_t num_shards;
76
77 uint64_t instance_id{0};
78
79 void encode(bufferlist& bl) const {
80 ENCODE_START(2, 1, bl);
81 encode(state, bl);
82 encode(num_shards, bl);
83 encode(instance_id, bl);
84 ENCODE_FINISH(bl);
85 }
86
87 void decode(bufferlist::const_iterator& bl) {
88 DECODE_START(2, bl);
89 decode(state, bl);
90 decode(num_shards, bl);
91 if (struct_v >= 2) {
92 decode(instance_id, bl);
93 }
94 DECODE_FINISH(bl);
95 }
96
97 void dump(Formatter *f) const {
98 string s;
99 switch ((SyncState)state) {
100 case StateInit:
101 s = "init";
102 break;
103 case StateBuildingFullSyncMaps:
104 s = "building-full-sync-maps";
105 break;
106 case StateSync:
107 s = "sync";
108 break;
109 default:
110 s = "unknown";
111 break;
112 }
113 encode_json("status", s, f);
114 encode_json("num_shards", num_shards, f);
115 encode_json("instance_id", instance_id, f);
116 }
117 void decode_json(JSONObj *obj) {
118 std::string s;
119 JSONDecoder::decode_json("status", s, obj);
120 if (s == "building-full-sync-maps") {
121 state = StateBuildingFullSyncMaps;
122 } else if (s == "sync") {
123 state = StateSync;
124 } else {
125 state = StateInit;
126 }
127 JSONDecoder::decode_json("num_shards", num_shards, obj);
128 JSONDecoder::decode_json("instance_id", instance_id, obj);
129 }
130 static void generate_test_instances(std::list<rgw_data_sync_info*>& o);
131
132 rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
133 };
134 WRITE_CLASS_ENCODER(rgw_data_sync_info)
135
136 struct rgw_data_sync_marker {
137 enum SyncState {
138 FullSync = 0,
139 IncrementalSync = 1,
140 };
141 uint16_t state;
142 string marker;
143 string next_step_marker;
144 uint64_t total_entries;
145 uint64_t pos;
146 real_time timestamp;
147
148 rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
149
150 void encode(bufferlist& bl) const {
151 ENCODE_START(1, 1, bl);
152 encode(state, bl);
153 encode(marker, bl);
154 encode(next_step_marker, bl);
155 encode(total_entries, bl);
156 encode(pos, bl);
157 encode(timestamp, bl);
158 ENCODE_FINISH(bl);
159 }
160
161 void decode(bufferlist::const_iterator& bl) {
162 DECODE_START(1, bl);
163 decode(state, bl);
164 decode(marker, bl);
165 decode(next_step_marker, bl);
166 decode(total_entries, bl);
167 decode(pos, bl);
168 decode(timestamp, bl);
169 DECODE_FINISH(bl);
170 }
171
172 void dump(Formatter *f) const {
173 const char *s{nullptr};
174 switch ((SyncState)state) {
175 case FullSync:
176 s = "full-sync";
177 break;
178 case IncrementalSync:
179 s = "incremental-sync";
180 break;
181 default:
182 s = "unknown";
183 break;
184 }
185 encode_json("status", s, f);
186 encode_json("marker", marker, f);
187 encode_json("next_step_marker", next_step_marker, f);
188 encode_json("total_entries", total_entries, f);
189 encode_json("pos", pos, f);
190 encode_json("timestamp", utime_t(timestamp), f);
191 }
192 void decode_json(JSONObj *obj) {
193 std::string s;
194 JSONDecoder::decode_json("status", s, obj);
195 if (s == "full-sync") {
196 state = FullSync;
197 } else if (s == "incremental-sync") {
198 state = IncrementalSync;
199 }
200 JSONDecoder::decode_json("marker", marker, obj);
201 JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
202 JSONDecoder::decode_json("total_entries", total_entries, obj);
203 JSONDecoder::decode_json("pos", pos, obj);
204 utime_t t;
205 JSONDecoder::decode_json("timestamp", t, obj);
206 timestamp = t.to_real_time();
207 }
208 static void generate_test_instances(std::list<rgw_data_sync_marker*>& o);
209 };
210 WRITE_CLASS_ENCODER(rgw_data_sync_marker)
211
212 struct rgw_data_sync_status {
213 rgw_data_sync_info sync_info;
214 map<uint32_t, rgw_data_sync_marker> sync_markers;
215
216 rgw_data_sync_status() {}
217
218 void encode(bufferlist& bl) const {
219 ENCODE_START(1, 1, bl);
220 encode(sync_info, bl);
221 /* sync markers are encoded separately */
222 ENCODE_FINISH(bl);
223 }
224
225 void decode(bufferlist::const_iterator& bl) {
226 DECODE_START(1, bl);
227 decode(sync_info, bl);
228 /* sync markers are decoded separately */
229 DECODE_FINISH(bl);
230 }
231
232 void dump(Formatter *f) const {
233 encode_json("info", sync_info, f);
234 encode_json("markers", sync_markers, f);
235 }
236 void decode_json(JSONObj *obj) {
237 JSONDecoder::decode_json("info", sync_info, obj);
238 JSONDecoder::decode_json("markers", sync_markers, obj);
239 }
240 static void generate_test_instances(std::list<rgw_data_sync_status*>& o);
241 };
242 WRITE_CLASS_ENCODER(rgw_data_sync_status)
243
244 struct rgw_datalog_entry {
245 string key;
246 ceph::real_time timestamp;
247
248 void decode_json(JSONObj *obj);
249 };
250
251 struct rgw_datalog_shard_data {
252 string marker;
253 bool truncated;
254 vector<rgw_datalog_entry> entries;
255
256 void decode_json(JSONObj *obj);
257 };
258
259 class RGWAsyncRadosProcessor;
260 class RGWDataSyncControlCR;
261
262 struct rgw_bucket_entry_owner {
263 string id;
264 string display_name;
265
266 rgw_bucket_entry_owner() {}
267 rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
268
269 void decode_json(JSONObj *obj);
270 };
271
272 class RGWSyncErrorLogger;
273 class RGWRESTConn;
274 class RGWServices;
275
276 struct RGWDataSyncEnv {
277 const DoutPrefixProvider *dpp{nullptr};
278 CephContext *cct{nullptr};
279 rgw::sal::RGWRadosStore *store{nullptr};
280 RGWServices *svc{nullptr};
281 RGWAsyncRadosProcessor *async_rados{nullptr};
282 RGWHTTPManager *http_manager{nullptr};
283 RGWSyncErrorLogger *error_logger{nullptr};
284 RGWSyncTraceManager *sync_tracer{nullptr};
285 RGWSyncModuleInstanceRef sync_module{nullptr};
286 PerfCounters* counters{nullptr};
287
288 RGWDataSyncEnv() {}
289
290 void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWServices *_svc,
291 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
292 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
293 RGWSyncModuleInstanceRef& _sync_module,
294 PerfCounters* _counters) {
295 dpp = _dpp;
296 cct = _cct;
297 store = _store;
298 svc = _svc;
299 async_rados = _async_rados;
300 http_manager = _http_manager;
301 error_logger = _error_logger;
302 sync_tracer = _sync_tracer;
303 sync_module = _sync_module;
304 counters = _counters;
305 }
306
307 string shard_obj_name(int shard_id);
308 string status_oid();
309 };
310
311 struct RGWDataSyncCtx {
312 CephContext *cct{nullptr};
313 RGWDataSyncEnv *env{nullptr};
314
315 RGWRESTConn *conn{nullptr};
316 rgw_zone_id source_zone;
317
318 void init(RGWDataSyncEnv *_env,
319 RGWRESTConn *_conn,
320 const rgw_zone_id& _source_zone) {
321 cct = _env->cct;
322 env = _env;
323 conn = _conn;
324 source_zone = _source_zone;
325 }
326 };
327
328 class RGWRados;
329 class RGWDataChangesLogInfo;
330
331 class RGWRemoteDataLog : public RGWCoroutinesManager {
332 const DoutPrefixProvider *dpp;
333 rgw::sal::RGWRadosStore *store;
334 CephContext *cct;
335 RGWCoroutinesManagerRegistry *cr_registry;
336 RGWAsyncRadosProcessor *async_rados;
337 RGWHTTPManager http_manager;
338
339 RGWDataSyncEnv sync_env;
340 RGWDataSyncCtx sc;
341
342 ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock");
343 RGWDataSyncControlCR *data_sync_cr;
344
345 RGWSyncTraceNodeRef tn;
346
347 bool initialized;
348
349 public:
350 RGWRemoteDataLog(const DoutPrefixProvider *dpp,
351 rgw::sal::RGWRadosStore *_store,
352 RGWAsyncRadosProcessor *async_rados);
353 int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
354 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module,
355 PerfCounters* _counters);
356 void finish();
357
358 int read_log_info(rgw_datalog_info *log_info);
359 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
360 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
361 int read_sync_status(rgw_data_sync_status *sync_status);
362 int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
363 int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
364 int init_sync_status(int num_shards);
365 int run_sync(int num_shards);
366
367 void wakeup(int shard_id, set<string>& keys);
368 };
369
370 class RGWDataSyncStatusManager : public DoutPrefixProvider {
371 rgw::sal::RGWRadosStore *store;
372
373 rgw_zone_id source_zone;
374 RGWRESTConn *conn;
375 RGWSyncErrorLogger *error_logger;
376 RGWSyncModuleInstanceRef sync_module;
377 PerfCounters* counters;
378
379 RGWRemoteDataLog source_log;
380
381 string source_status_oid;
382 string source_shard_status_oid_prefix;
383
384 map<int, rgw_raw_obj> shard_objs;
385
386 int num_shards;
387
388 public:
389 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados,
390 const rgw_zone_id& _source_zone, PerfCounters* counters)
391 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
392 sync_module(nullptr), counters(counters),
393 source_log(this, store, async_rados), num_shards(0) {}
394 RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados,
395 const rgw_zone_id& _source_zone, PerfCounters* counters,
396 const RGWSyncModuleInstanceRef& _sync_module)
397 : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
398 sync_module(_sync_module), counters(counters),
399 source_log(this, store, async_rados), num_shards(0) {}
400 ~RGWDataSyncStatusManager() {
401 finalize();
402 }
403 int init();
404 void finalize();
405
406 static string shard_obj_name(const rgw_zone_id& source_zone, int shard_id);
407 static string sync_status_oid(const rgw_zone_id& source_zone);
408
409 int read_sync_status(rgw_data_sync_status *sync_status) {
410 return source_log.read_sync_status(sync_status);
411 }
412
413 int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
414 return source_log.read_recovering_shards(num_shards, recovering_shards);
415 }
416
417 int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
418 return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
419 }
420 int init_sync_status() { return source_log.init_sync_status(num_shards); }
421
422 int read_log_info(rgw_datalog_info *log_info) {
423 return source_log.read_log_info(log_info);
424 }
425 int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
426 return source_log.read_source_log_shards_info(shards_info);
427 }
428 int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
429 return source_log.read_source_log_shards_next(shard_markers, result);
430 }
431
432 int run() { return source_log.run_sync(num_shards); }
433
434 void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
435 void stop() {
436 source_log.finish();
437 }
438
439 // implements DoutPrefixProvider
440 CephContext *get_cct() const override;
441 unsigned get_subsys() const override;
442 std::ostream& gen_prefix(std::ostream& out) const override;
443 };
444
445 class RGWBucketPipeSyncStatusManager;
446 class RGWBucketSyncCR;
447
448 struct rgw_bucket_shard_full_sync_marker {
449 rgw_obj_key position;
450 uint64_t count;
451
452 rgw_bucket_shard_full_sync_marker() : count(0) {}
453
454 void encode_attr(map<string, bufferlist>& attrs);
455
456 void encode(bufferlist& bl) const {
457 ENCODE_START(1, 1, bl);
458 encode(position, bl);
459 encode(count, bl);
460 ENCODE_FINISH(bl);
461 }
462
463 void decode(bufferlist::const_iterator& bl) {
464 DECODE_START(1, bl);
465 decode(position, bl);
466 decode(count, bl);
467 DECODE_FINISH(bl);
468 }
469
470 void dump(Formatter *f) const;
471 void decode_json(JSONObj *obj);
472 };
473 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
474
475 struct rgw_bucket_shard_inc_sync_marker {
476 string position;
477
478 rgw_bucket_shard_inc_sync_marker() {}
479
480 void encode_attr(map<string, bufferlist>& attrs);
481
482 void encode(bufferlist& bl) const {
483 ENCODE_START(1, 1, bl);
484 encode(position, bl);
485 ENCODE_FINISH(bl);
486 }
487
488 void decode(bufferlist::const_iterator& bl) {
489 DECODE_START(1, bl);
490 decode(position, bl);
491 DECODE_FINISH(bl);
492 }
493
494 void dump(Formatter *f) const;
495 void decode_json(JSONObj *obj);
496
497 bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
498 return (position < m.position);
499 }
500 };
501 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
502
503 struct rgw_bucket_shard_sync_info {
504 enum SyncState {
505 StateInit = 0,
506 StateFullSync = 1,
507 StateIncrementalSync = 2,
508 StateStopped = 3,
509 };
510
511 uint16_t state;
512 rgw_bucket_shard_full_sync_marker full_marker;
513 rgw_bucket_shard_inc_sync_marker inc_marker;
514
515 void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
516 void encode_all_attrs(map<string, bufferlist>& attrs);
517 void encode_state_attr(map<string, bufferlist>& attrs);
518
519 void encode(bufferlist& bl) const {
520 ENCODE_START(1, 1, bl);
521 encode(state, bl);
522 encode(full_marker, bl);
523 encode(inc_marker, bl);
524 ENCODE_FINISH(bl);
525 }
526
527 void decode(bufferlist::const_iterator& bl) {
528 DECODE_START(1, bl);
529 decode(state, bl);
530 decode(full_marker, bl);
531 decode(inc_marker, bl);
532 DECODE_FINISH(bl);
533 }
534
535 void dump(Formatter *f) const;
536 void decode_json(JSONObj *obj);
537
538 rgw_bucket_shard_sync_info() : state((int)StateInit) {}
539
540 };
541 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
542
543 struct rgw_bucket_index_marker_info {
544 string bucket_ver;
545 string master_ver;
546 string max_marker;
547 bool syncstopped{false};
548
549 void decode_json(JSONObj *obj) {
550 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
551 JSONDecoder::decode_json("master_ver", master_ver, obj);
552 JSONDecoder::decode_json("max_marker", max_marker, obj);
553 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
554 }
555 };
556
557
558 class RGWRemoteBucketManager {
559 const DoutPrefixProvider *dpp;
560
561 RGWDataSyncEnv *sync_env;
562
563 RGWRESTConn *conn{nullptr};
564 rgw_zone_id source_zone;
565
566 vector<rgw_bucket_sync_pair_info> sync_pairs;
567
568 RGWDataSyncCtx sc;
569 rgw_bucket_shard_sync_info init_status;
570
571 RGWBucketSyncCR *sync_cr{nullptr};
572
573 public:
574 RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
575 RGWDataSyncEnv *_sync_env,
576 const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
577 const RGWBucketInfo& source_bucket_info,
578 const rgw_bucket& dest_bucket);
579
580 void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
581 const rgw_bucket& source_bucket, int shard_id,
582 const rgw_bucket& dest_bucket);
583
584 RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
585 RGWCoroutine *init_sync_status_cr(int num);
586 RGWCoroutine *run_sync_cr(int num);
587
588 int num_pipes() {
589 return sync_pairs.size();
590 }
591
592 void wakeup();
593 };
594
595 class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider {
596 rgw::sal::RGWRadosStore *store;
597
598 RGWDataSyncEnv sync_env;
599
600 RGWCoroutinesManager cr_mgr;
601
602 RGWHTTPManager http_manager;
603
604 std::optional<rgw_zone_id> source_zone;
605 std::optional<rgw_bucket> source_bucket;
606
607 RGWRESTConn *conn;
608 RGWSyncErrorLogger *error_logger;
609 RGWSyncModuleInstanceRef sync_module;
610
611 rgw_bucket dest_bucket;
612
613 vector<RGWRemoteBucketManager *> source_mgrs;
614
615 string source_status_oid;
616 string source_shard_status_oid_prefix;
617
618 map<int, rgw_bucket_shard_sync_info> sync_status;
619 rgw_raw_obj status_obj;
620
621 int num_shards;
622
623 public:
624 RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
625 std::optional<rgw_zone_id> _source_zone,
626 std::optional<rgw_bucket> _source_bucket,
627 const rgw_bucket& dest_bucket);
628 ~RGWBucketPipeSyncStatusManager();
629
630 int init();
631
632 map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
633 int init_sync_status();
634
635 static string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs);
636 static string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
637 const rgw_zone_id& source_zone, const rgw_obj& obj); /* specific source obj sync status,
638 can be used by sync modules */
639
640 // implements DoutPrefixProvider
641 CephContext *get_cct() const override;
642 unsigned get_subsys() const override;
643 std::ostream& gen_prefix(std::ostream& out) const override;
644
645 int read_sync_status();
646 int run();
647 };
648
649 /// read the sync status of all bucket shards from the given source zone
650 int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
651 rgw::sal::RGWRadosStore *store,
652 const rgw_sync_bucket_pipe& pipe,
653 const RGWBucketInfo& dest_bucket_info,
654 const RGWBucketInfo *psource_bucket_info,
655 std::vector<rgw_bucket_shard_sync_info> *status);
656
657 class RGWDefaultSyncModule : public RGWSyncModule {
658 public:
659 RGWDefaultSyncModule() {}
660 bool supports_writes() override { return true; }
661 bool supports_data_export() override { return true; }
662 int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
663 };
664
665 class RGWArchiveSyncModule : public RGWDefaultSyncModule {
666 public:
667 RGWArchiveSyncModule() {}
668 bool supports_writes() override { return true; }
669 bool supports_data_export() override { return false; }
670 int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
671 };
672
673 #endif