]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #ifndef CEPH_RGW_DATA_SYNC_H | |
5 | #define CEPH_RGW_DATA_SYNC_H | |
6 | ||
7 | #include "include/encoding.h" | |
8 | ||
9 | #include "common/ceph_json.h" | |
10 | ||
11 | #include "rgw_coroutine.h" | |
12 | #include "rgw_http_client.h" | |
13 | #include "rgw_sal_rados.h" | |
14 | ||
15 | #include "rgw_datalog.h" | |
16 | #include "rgw_sync_module.h" | |
17 | #include "rgw_sync_trace.h" | |
18 | #include "rgw_sync_policy.h" | |
19 | ||
20 | #include "rgw_bucket_sync.h" | |
21 | ||
22 | // represents an obligation to sync an entry up a given time | |
23 | struct rgw_data_sync_obligation { | |
24 | std::string key; | |
25 | std::string marker; | |
26 | ceph::real_time timestamp; | |
27 | bool retry = false; | |
28 | }; | |
29 | ||
30 | inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) { | |
31 | out << "key=" << o.key; | |
32 | if (!o.marker.empty()) { | |
33 | out << " marker=" << o.marker; | |
34 | } | |
35 | if (o.timestamp != ceph::real_time{}) { | |
36 | out << " timestamp=" << o.timestamp; | |
37 | } | |
38 | if (o.retry) { | |
39 | out << " retry"; | |
40 | } | |
41 | return out; | |
42 | } | |
43 | ||
44 | class JSONObj; | |
45 | struct rgw_sync_bucket_pipe; | |
46 | ||
47 | struct rgw_bucket_sync_pair_info { | |
48 | RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */ | |
49 | rgw_bucket_shard source_bs; | |
50 | rgw_bucket_shard dest_bs; | |
51 | }; | |
52 | ||
53 | inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_sync_pair_info& p) { | |
54 | if (p.source_bs.bucket == p.dest_bs.bucket) { | |
55 | return out << p.source_bs; | |
56 | } | |
57 | ||
58 | out << p.source_bs; | |
59 | ||
60 | out << "->" << p.dest_bs.bucket; | |
61 | ||
62 | return out; | |
63 | } | |
64 | ||
65 | struct rgw_bucket_sync_pipe { | |
66 | rgw_bucket_sync_pair_info info; | |
67 | RGWBucketInfo source_bucket_info; | |
68 | std::map<std::string, bufferlist> source_bucket_attrs; | |
69 | RGWBucketInfo dest_bucket_info; | |
70 | std::map<std::string, bufferlist> dest_bucket_attrs; | |
71 | ||
72 | RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() { | |
73 | return info.handler.rules; | |
74 | } | |
75 | }; | |
76 | ||
77 | inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_sync_pipe& p) { | |
78 | return out << p.info; | |
79 | } | |
80 | ||
81 | struct rgw_datalog_info { | |
82 | uint32_t num_shards; | |
83 | ||
84 | rgw_datalog_info() : num_shards(0) {} | |
85 | ||
86 | void decode_json(JSONObj *obj); | |
87 | }; | |
88 | ||
89 | struct rgw_data_sync_info { | |
90 | enum SyncState { | |
91 | StateInit = 0, | |
92 | StateBuildingFullSyncMaps = 1, | |
93 | StateSync = 2, | |
94 | }; | |
95 | ||
96 | uint16_t state; | |
97 | uint32_t num_shards; | |
98 | ||
99 | uint64_t instance_id{0}; | |
100 | ||
101 | void encode(bufferlist& bl) const { | |
102 | ENCODE_START(2, 1, bl); | |
103 | encode(state, bl); | |
104 | encode(num_shards, bl); | |
105 | encode(instance_id, bl); | |
106 | ENCODE_FINISH(bl); | |
107 | } | |
108 | ||
109 | void decode(bufferlist::const_iterator& bl) { | |
110 | DECODE_START(2, bl); | |
111 | decode(state, bl); | |
112 | decode(num_shards, bl); | |
113 | if (struct_v >= 2) { | |
114 | decode(instance_id, bl); | |
115 | } | |
116 | DECODE_FINISH(bl); | |
117 | } | |
118 | ||
119 | void dump(Formatter *f) const { | |
120 | std::string s; | |
121 | switch ((SyncState)state) { | |
122 | case StateInit: | |
123 | s = "init"; | |
124 | break; | |
125 | case StateBuildingFullSyncMaps: | |
126 | s = "building-full-sync-maps"; | |
127 | break; | |
128 | case StateSync: | |
129 | s = "sync"; | |
130 | break; | |
131 | default: | |
132 | s = "unknown"; | |
133 | break; | |
134 | } | |
135 | encode_json("status", s, f); | |
136 | encode_json("num_shards", num_shards, f); | |
137 | encode_json("instance_id", instance_id, f); | |
138 | } | |
139 | void decode_json(JSONObj *obj) { | |
140 | std::string s; | |
141 | JSONDecoder::decode_json("status", s, obj); | |
142 | if (s == "building-full-sync-maps") { | |
143 | state = StateBuildingFullSyncMaps; | |
144 | } else if (s == "sync") { | |
145 | state = StateSync; | |
146 | } else { | |
147 | state = StateInit; | |
148 | } | |
149 | JSONDecoder::decode_json("num_shards", num_shards, obj); | |
150 | JSONDecoder::decode_json("instance_id", instance_id, obj); | |
151 | } | |
152 | static void generate_test_instances(std::list<rgw_data_sync_info*>& o); | |
153 | ||
154 | rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} | |
155 | }; | |
156 | WRITE_CLASS_ENCODER(rgw_data_sync_info) | |
157 | ||
158 | struct rgw_data_sync_marker { | |
159 | enum SyncState { | |
160 | FullSync = 0, | |
161 | IncrementalSync = 1, | |
162 | }; | |
163 | uint16_t state; | |
164 | std::string marker; | |
165 | std::string next_step_marker; | |
166 | uint64_t total_entries; | |
167 | uint64_t pos; | |
168 | real_time timestamp; | |
169 | ||
170 | rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {} | |
171 | ||
172 | void encode(bufferlist& bl) const { | |
173 | ENCODE_START(1, 1, bl); | |
174 | encode(state, bl); | |
175 | encode(marker, bl); | |
176 | encode(next_step_marker, bl); | |
177 | encode(total_entries, bl); | |
178 | encode(pos, bl); | |
179 | encode(timestamp, bl); | |
180 | ENCODE_FINISH(bl); | |
181 | } | |
182 | ||
183 | void decode(bufferlist::const_iterator& bl) { | |
184 | DECODE_START(1, bl); | |
185 | decode(state, bl); | |
186 | decode(marker, bl); | |
187 | decode(next_step_marker, bl); | |
188 | decode(total_entries, bl); | |
189 | decode(pos, bl); | |
190 | decode(timestamp, bl); | |
191 | DECODE_FINISH(bl); | |
192 | } | |
193 | ||
194 | void dump(Formatter *f) const { | |
195 | const char *s{nullptr}; | |
196 | switch ((SyncState)state) { | |
197 | case FullSync: | |
198 | s = "full-sync"; | |
199 | break; | |
200 | case IncrementalSync: | |
201 | s = "incremental-sync"; | |
202 | break; | |
203 | default: | |
204 | s = "unknown"; | |
205 | break; | |
206 | } | |
207 | encode_json("status", s, f); | |
208 | encode_json("marker", marker, f); | |
209 | encode_json("next_step_marker", next_step_marker, f); | |
210 | encode_json("total_entries", total_entries, f); | |
211 | encode_json("pos", pos, f); | |
212 | encode_json("timestamp", utime_t(timestamp), f); | |
213 | } | |
214 | void decode_json(JSONObj *obj) { | |
215 | std::string s; | |
216 | JSONDecoder::decode_json("status", s, obj); | |
217 | if (s == "full-sync") { | |
218 | state = FullSync; | |
219 | } else if (s == "incremental-sync") { | |
220 | state = IncrementalSync; | |
221 | } | |
222 | JSONDecoder::decode_json("marker", marker, obj); | |
223 | JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); | |
224 | JSONDecoder::decode_json("total_entries", total_entries, obj); | |
225 | JSONDecoder::decode_json("pos", pos, obj); | |
226 | utime_t t; | |
227 | JSONDecoder::decode_json("timestamp", t, obj); | |
228 | timestamp = t.to_real_time(); | |
229 | } | |
230 | static void generate_test_instances(std::list<rgw_data_sync_marker*>& o); | |
231 | }; | |
232 | WRITE_CLASS_ENCODER(rgw_data_sync_marker) | |
233 | ||
234 | struct rgw_data_sync_status { | |
235 | rgw_data_sync_info sync_info; | |
236 | std::map<uint32_t, rgw_data_sync_marker> sync_markers; | |
237 | ||
238 | rgw_data_sync_status() {} | |
239 | ||
240 | void encode(bufferlist& bl) const { | |
241 | ENCODE_START(1, 1, bl); | |
242 | encode(sync_info, bl); | |
243 | /* sync markers are encoded separately */ | |
244 | ENCODE_FINISH(bl); | |
245 | } | |
246 | ||
247 | void decode(bufferlist::const_iterator& bl) { | |
248 | DECODE_START(1, bl); | |
249 | decode(sync_info, bl); | |
250 | /* sync markers are decoded separately */ | |
251 | DECODE_FINISH(bl); | |
252 | } | |
253 | ||
254 | void dump(Formatter *f) const { | |
255 | encode_json("info", sync_info, f); | |
256 | encode_json("markers", sync_markers, f); | |
257 | } | |
258 | void decode_json(JSONObj *obj) { | |
259 | JSONDecoder::decode_json("info", sync_info, obj); | |
260 | JSONDecoder::decode_json("markers", sync_markers, obj); | |
261 | } | |
262 | static void generate_test_instances(std::list<rgw_data_sync_status*>& o); | |
263 | }; | |
264 | WRITE_CLASS_ENCODER(rgw_data_sync_status) | |
265 | ||
266 | struct rgw_datalog_entry { | |
267 | std::string key; | |
268 | ceph::real_time timestamp; | |
269 | ||
270 | void decode_json(JSONObj *obj); | |
271 | }; | |
272 | ||
273 | struct rgw_datalog_shard_data { | |
274 | std::string marker; | |
275 | bool truncated; | |
276 | std::vector<rgw_datalog_entry> entries; | |
277 | ||
278 | void decode_json(JSONObj *obj); | |
279 | }; | |
280 | ||
281 | class RGWAsyncRadosProcessor; | |
282 | class RGWDataSyncControlCR; | |
283 | ||
284 | struct rgw_bucket_entry_owner { | |
285 | std::string id; | |
286 | std::string display_name; | |
287 | ||
288 | rgw_bucket_entry_owner() {} | |
289 | rgw_bucket_entry_owner(const std::string& _id, const std::string& _display_name) : id(_id), display_name(_display_name) {} | |
290 | ||
291 | void decode_json(JSONObj *obj); | |
292 | }; | |
293 | ||
294 | class RGWSyncErrorLogger; | |
295 | class RGWRESTConn; | |
296 | class RGWServices; | |
297 | ||
298 | struct RGWDataSyncEnv { | |
299 | const DoutPrefixProvider *dpp{nullptr}; | |
300 | CephContext *cct{nullptr}; | |
301 | rgw::sal::RadosStore* store{nullptr}; | |
302 | RGWServices *svc{nullptr}; | |
303 | RGWAsyncRadosProcessor *async_rados{nullptr}; | |
304 | RGWHTTPManager *http_manager{nullptr}; | |
305 | RGWSyncErrorLogger *error_logger{nullptr}; | |
306 | RGWSyncTraceManager *sync_tracer{nullptr}; | |
307 | RGWSyncModuleInstanceRef sync_module{nullptr}; | |
308 | PerfCounters* counters{nullptr}; | |
309 | ||
310 | RGWDataSyncEnv() {} | |
311 | ||
312 | void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RadosStore* _store, RGWServices *_svc, | |
313 | RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, | |
314 | RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, | |
315 | RGWSyncModuleInstanceRef& _sync_module, | |
316 | PerfCounters* _counters) { | |
317 | dpp = _dpp; | |
318 | cct = _cct; | |
319 | store = _store; | |
320 | svc = _svc; | |
321 | async_rados = _async_rados; | |
322 | http_manager = _http_manager; | |
323 | error_logger = _error_logger; | |
324 | sync_tracer = _sync_tracer; | |
325 | sync_module = _sync_module; | |
326 | counters = _counters; | |
327 | } | |
328 | ||
329 | std::string shard_obj_name(int shard_id); | |
330 | std::string status_oid(); | |
331 | }; | |
332 | ||
333 | struct RGWDataSyncCtx { | |
334 | CephContext *cct{nullptr}; | |
335 | RGWDataSyncEnv *env{nullptr}; | |
336 | ||
337 | RGWRESTConn *conn{nullptr}; | |
338 | rgw_zone_id source_zone; | |
339 | ||
340 | void init(RGWDataSyncEnv *_env, | |
341 | RGWRESTConn *_conn, | |
342 | const rgw_zone_id& _source_zone) { | |
343 | cct = _env->cct; | |
344 | env = _env; | |
345 | conn = _conn; | |
346 | source_zone = _source_zone; | |
347 | } | |
348 | }; | |
349 | ||
350 | class RGWRados; | |
351 | ||
352 | class RGWRemoteDataLog : public RGWCoroutinesManager { | |
353 | const DoutPrefixProvider *dpp; | |
354 | rgw::sal::RadosStore* store; | |
355 | CephContext *cct; | |
356 | RGWCoroutinesManagerRegistry *cr_registry; | |
357 | RGWAsyncRadosProcessor *async_rados; | |
358 | RGWHTTPManager http_manager; | |
359 | ||
360 | RGWDataSyncEnv sync_env; | |
361 | RGWDataSyncCtx sc; | |
362 | ||
363 | ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock"); | |
364 | RGWDataSyncControlCR *data_sync_cr; | |
365 | ||
366 | RGWSyncTraceNodeRef tn; | |
367 | ||
368 | bool initialized; | |
369 | ||
370 | public: | |
371 | RGWRemoteDataLog(const DoutPrefixProvider *dpp, | |
372 | rgw::sal::RadosStore* _store, | |
373 | RGWAsyncRadosProcessor *async_rados); | |
374 | int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, | |
375 | RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, | |
376 | PerfCounters* _counters); | |
377 | void finish(); | |
378 | ||
379 | int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info); | |
380 | int read_source_log_shards_info(const DoutPrefixProvider *dpp, std::map<int, RGWDataChangesLogInfo> *shards_info); | |
381 | int read_source_log_shards_next(const DoutPrefixProvider *dpp, std::map<int, std::string> shard_markers, std::map<int, rgw_datalog_shard_data> *result); | |
382 | int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status); | |
383 | int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, std::set<int>& recovering_shards); | |
384 | int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, std::set<std::string>& lagging_buckets,std::set<std::string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries); | |
385 | int init_sync_status(const DoutPrefixProvider *dpp, int num_shards); | |
386 | int run_sync(const DoutPrefixProvider *dpp, int num_shards); | |
387 | ||
388 | void wakeup(int shard_id, std::set<std::string>& keys); | |
389 | }; | |
390 | ||
391 | class RGWDataSyncStatusManager : public DoutPrefixProvider { | |
392 | rgw::sal::RadosStore* store; | |
393 | ||
394 | rgw_zone_id source_zone; | |
395 | RGWRESTConn *conn; | |
396 | RGWSyncErrorLogger *error_logger; | |
397 | RGWSyncModuleInstanceRef sync_module; | |
398 | PerfCounters* counters; | |
399 | ||
400 | RGWRemoteDataLog source_log; | |
401 | ||
402 | std::string source_status_oid; | |
403 | std::string source_shard_status_oid_prefix; | |
404 | ||
405 | std::map<int, rgw_raw_obj> shard_objs; | |
406 | ||
407 | int num_shards; | |
408 | ||
409 | public: | |
410 | RGWDataSyncStatusManager(rgw::sal::RadosStore* _store, RGWAsyncRadosProcessor *async_rados, | |
411 | const rgw_zone_id& _source_zone, PerfCounters* counters) | |
412 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), | |
413 | sync_module(nullptr), counters(counters), | |
414 | source_log(this, store, async_rados), num_shards(0) {} | |
415 | RGWDataSyncStatusManager(rgw::sal::RadosStore* _store, RGWAsyncRadosProcessor *async_rados, | |
416 | const rgw_zone_id& _source_zone, PerfCounters* counters, | |
417 | const RGWSyncModuleInstanceRef& _sync_module) | |
418 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), | |
419 | sync_module(_sync_module), counters(counters), | |
420 | source_log(this, store, async_rados), num_shards(0) {} | |
421 | ~RGWDataSyncStatusManager() { | |
422 | finalize(); | |
423 | } | |
424 | int init(const DoutPrefixProvider *dpp); | |
425 | void finalize(); | |
426 | ||
427 | static std::string shard_obj_name(const rgw_zone_id& source_zone, int shard_id); | |
428 | static std::string sync_status_oid(const rgw_zone_id& source_zone); | |
429 | ||
430 | int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status) { | |
431 | return source_log.read_sync_status(dpp, sync_status); | |
432 | } | |
433 | ||
434 | int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, std::set<int>& recovering_shards) { | |
435 | return source_log.read_recovering_shards(dpp, num_shards, recovering_shards); | |
436 | } | |
437 | ||
438 | int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, std::set<std::string>& lagging_buckets, std::set<std::string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) { | |
439 | return source_log.read_shard_status(dpp, shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries); | |
440 | } | |
441 | int init_sync_status(const DoutPrefixProvider *dpp) { return source_log.init_sync_status(dpp, num_shards); } | |
442 | ||
443 | int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info) { | |
444 | return source_log.read_log_info(dpp, log_info); | |
445 | } | |
446 | int read_source_log_shards_info(const DoutPrefixProvider *dpp, std::map<int, RGWDataChangesLogInfo> *shards_info) { | |
447 | return source_log.read_source_log_shards_info(dpp, shards_info); | |
448 | } | |
449 | int read_source_log_shards_next(const DoutPrefixProvider *dpp, std::map<int, std::string> shard_markers, std::map<int, rgw_datalog_shard_data> *result) { | |
450 | return source_log.read_source_log_shards_next(dpp, shard_markers, result); | |
451 | } | |
452 | ||
453 | int run(const DoutPrefixProvider *dpp) { return source_log.run_sync(dpp, num_shards); } | |
454 | ||
455 | void wakeup(int shard_id, std::set<std::string>& keys) { return source_log.wakeup(shard_id, keys); } | |
456 | void stop() { | |
457 | source_log.finish(); | |
458 | } | |
459 | ||
460 | // implements DoutPrefixProvider | |
461 | CephContext *get_cct() const override; | |
462 | unsigned get_subsys() const override; | |
463 | std::ostream& gen_prefix(std::ostream& out) const override; | |
464 | }; | |
465 | ||
466 | class RGWBucketPipeSyncStatusManager; | |
467 | class RGWBucketSyncCR; | |
468 | ||
469 | struct rgw_bucket_shard_full_sync_marker { | |
470 | rgw_obj_key position; | |
471 | uint64_t count; | |
472 | ||
473 | rgw_bucket_shard_full_sync_marker() : count(0) {} | |
474 | ||
475 | void encode_attr(std::map<std::string, bufferlist>& attrs); | |
476 | ||
477 | void encode(bufferlist& bl) const { | |
478 | ENCODE_START(1, 1, bl); | |
479 | encode(position, bl); | |
480 | encode(count, bl); | |
481 | ENCODE_FINISH(bl); | |
482 | } | |
483 | ||
484 | void decode(bufferlist::const_iterator& bl) { | |
485 | DECODE_START(1, bl); | |
486 | decode(position, bl); | |
487 | decode(count, bl); | |
488 | DECODE_FINISH(bl); | |
489 | } | |
490 | ||
491 | void dump(Formatter *f) const; | |
492 | void decode_json(JSONObj *obj); | |
493 | }; | |
494 | WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker) | |
495 | ||
496 | struct rgw_bucket_shard_inc_sync_marker { | |
497 | std::string position; | |
498 | ceph::real_time timestamp; | |
499 | ||
500 | void encode_attr(std::map<std::string, bufferlist>& attrs); | |
501 | ||
502 | void encode(bufferlist& bl) const { | |
503 | ENCODE_START(2, 1, bl); | |
504 | encode(position, bl); | |
505 | encode(timestamp, bl); | |
506 | ENCODE_FINISH(bl); | |
507 | } | |
508 | ||
509 | void decode(bufferlist::const_iterator& bl) { | |
510 | DECODE_START(2, bl); | |
511 | decode(position, bl); | |
512 | if (struct_v >= 2) { | |
513 | decode(timestamp, bl); | |
514 | } | |
515 | DECODE_FINISH(bl); | |
516 | } | |
517 | ||
518 | void dump(Formatter *f) const; | |
519 | void decode_json(JSONObj *obj); | |
520 | }; | |
521 | WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker) | |
522 | ||
523 | struct rgw_bucket_shard_sync_info { | |
524 | enum SyncState { | |
525 | StateInit = 0, | |
526 | StateFullSync = 1, | |
527 | StateIncrementalSync = 2, | |
528 | StateStopped = 3, | |
529 | }; | |
530 | ||
531 | uint16_t state; | |
532 | rgw_bucket_shard_full_sync_marker full_marker; | |
533 | rgw_bucket_shard_inc_sync_marker inc_marker; | |
534 | ||
535 | void decode_from_attrs(CephContext *cct, std::map<std::string, bufferlist>& attrs); | |
536 | void encode_all_attrs(std::map<std::string, bufferlist>& attrs); | |
537 | void encode_state_attr(std::map<std::string, bufferlist>& attrs); | |
538 | ||
539 | void encode(bufferlist& bl) const { | |
540 | ENCODE_START(1, 1, bl); | |
541 | encode(state, bl); | |
542 | encode(full_marker, bl); | |
543 | encode(inc_marker, bl); | |
544 | ENCODE_FINISH(bl); | |
545 | } | |
546 | ||
547 | void decode(bufferlist::const_iterator& bl) { | |
548 | DECODE_START(1, bl); | |
549 | decode(state, bl); | |
550 | decode(full_marker, bl); | |
551 | decode(inc_marker, bl); | |
552 | DECODE_FINISH(bl); | |
553 | } | |
554 | ||
555 | void dump(Formatter *f) const; | |
556 | void decode_json(JSONObj *obj); | |
557 | ||
558 | rgw_bucket_shard_sync_info() : state((int)StateInit) {} | |
559 | ||
560 | }; | |
561 | WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) | |
562 | ||
563 | struct rgw_bucket_index_marker_info { | |
564 | std::string bucket_ver; | |
565 | std::string master_ver; | |
566 | std::string max_marker; | |
567 | bool syncstopped{false}; | |
568 | ||
569 | void decode_json(JSONObj *obj) { | |
570 | JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); | |
571 | JSONDecoder::decode_json("master_ver", master_ver, obj); | |
572 | JSONDecoder::decode_json("max_marker", max_marker, obj); | |
573 | JSONDecoder::decode_json("syncstopped", syncstopped, obj); | |
574 | } | |
575 | }; | |
576 | ||
577 | ||
578 | class RGWRemoteBucketManager { | |
579 | const DoutPrefixProvider *dpp; | |
580 | ||
581 | RGWDataSyncEnv *sync_env; | |
582 | ||
583 | RGWRESTConn *conn{nullptr}; | |
584 | rgw_zone_id source_zone; | |
585 | ||
586 | std::vector<rgw_bucket_sync_pair_info> sync_pairs; | |
587 | ||
588 | RGWDataSyncCtx sc; | |
589 | rgw_bucket_shard_sync_info init_status; | |
590 | ||
591 | RGWBucketSyncCR *sync_cr{nullptr}; | |
592 | ||
593 | public: | |
594 | RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, | |
595 | RGWDataSyncEnv *_sync_env, | |
596 | const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
597 | const RGWBucketInfo& source_bucket_info, | |
598 | const rgw_bucket& dest_bucket); | |
599 | ||
600 | void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
601 | const rgw_bucket& source_bucket, int shard_id, | |
602 | const rgw_bucket& dest_bucket); | |
603 | ||
604 | RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); | |
605 | RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker); | |
606 | RGWCoroutine *run_sync_cr(int num); | |
607 | ||
608 | int num_pipes() { | |
609 | return sync_pairs.size(); | |
610 | } | |
611 | ||
612 | void wakeup(); | |
613 | }; | |
614 | ||
615 | class BucketIndexShardsManager; | |
616 | ||
617 | int rgw_read_remote_bilog_info(const DoutPrefixProvider *dpp, | |
618 | RGWRESTConn* conn, | |
619 | const rgw_bucket& bucket, | |
620 | BucketIndexShardsManager& markers, | |
621 | optional_yield y); | |
622 | ||
623 | class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { | |
624 | rgw::sal::RadosStore* store; | |
625 | ||
626 | RGWDataSyncEnv sync_env; | |
627 | ||
628 | RGWCoroutinesManager cr_mgr; | |
629 | ||
630 | RGWHTTPManager http_manager; | |
631 | ||
632 | std::optional<rgw_zone_id> source_zone; | |
633 | std::optional<rgw_bucket> source_bucket; | |
634 | ||
635 | RGWRESTConn *conn; | |
636 | RGWSyncErrorLogger *error_logger; | |
637 | RGWSyncModuleInstanceRef sync_module; | |
638 | ||
639 | rgw_bucket dest_bucket; | |
640 | ||
641 | std::vector<RGWRemoteBucketManager *> source_mgrs; | |
642 | ||
643 | std::string source_status_oid; | |
644 | std::string source_shard_status_oid_prefix; | |
645 | ||
646 | std::map<int, rgw_bucket_shard_sync_info> sync_status; | |
647 | rgw_raw_obj status_obj; | |
648 | ||
649 | int num_shards; | |
650 | ||
651 | public: | |
652 | RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store, | |
653 | std::optional<rgw_zone_id> _source_zone, | |
654 | std::optional<rgw_bucket> _source_bucket, | |
655 | const rgw_bucket& dest_bucket); | |
656 | ~RGWBucketPipeSyncStatusManager(); | |
657 | ||
658 | int init(const DoutPrefixProvider *dpp); | |
659 | ||
660 | std::map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; } | |
661 | int init_sync_status(const DoutPrefixProvider *dpp); | |
662 | ||
663 | static std::string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs); | |
664 | static std::string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe, | |
665 | const rgw_zone_id& source_zone, const rgw::sal::Object* obj); /* specific source obj sync status, | |
666 | can be used by sync modules */ | |
667 | ||
668 | // implements DoutPrefixProvider | |
669 | CephContext *get_cct() const override; | |
670 | unsigned get_subsys() const override; | |
671 | std::ostream& gen_prefix(std::ostream& out) const override; | |
672 | ||
673 | int read_sync_status(const DoutPrefixProvider *dpp); | |
674 | int run(const DoutPrefixProvider *dpp); | |
675 | }; | |
676 | ||
677 | /// read the sync status of all bucket shards from the given source zone | |
678 | int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, | |
679 | rgw::sal::RadosStore* store, | |
680 | const rgw_sync_bucket_pipe& pipe, | |
681 | const RGWBucketInfo& dest_bucket_info, | |
682 | const RGWBucketInfo *psource_bucket_info, | |
683 | std::vector<rgw_bucket_shard_sync_info> *status); | |
684 | ||
685 | class RGWDefaultSyncModule : public RGWSyncModule { | |
686 | public: | |
687 | RGWDefaultSyncModule() {} | |
688 | bool supports_writes() override { return true; } | |
689 | bool supports_data_export() override { return true; } | |
690 | int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; | |
691 | }; | |
692 | ||
693 | class RGWArchiveSyncModule : public RGWDefaultSyncModule { | |
694 | public: | |
695 | RGWArchiveSyncModule() {} | |
696 | bool supports_writes() override { return true; } | |
697 | bool supports_data_export() override { return false; } | |
698 | int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; | |
699 | }; | |
700 | ||
701 | #endif |