]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
7c673cae FG |
4 | #ifndef CEPH_RGW_DATA_SYNC_H |
5 | #define CEPH_RGW_DATA_SYNC_H | |
6 | ||
11fdf7f2 TL |
7 | #include "include/encoding.h" |
8 | ||
9 | #include "common/RWLock.h" | |
10 | #include "common/ceph_json.h" | |
11 | ||
7c673cae FG |
12 | #include "rgw_coroutine.h" |
13 | #include "rgw_http_client.h" | |
9f95a23c | 14 | #include "rgw_sal.h" |
7c673cae | 15 | |
f67539c2 | 16 | #include "rgw_datalog.h" |
7c673cae | 17 | #include "rgw_sync_module.h" |
11fdf7f2 | 18 | #include "rgw_sync_trace.h" |
9f95a23c TL |
19 | #include "rgw_sync_policy.h" |
20 | ||
21 | #include "rgw_bucket_sync.h" | |
22 | ||
f67539c2 TL |
23 | // represents an obligation to sync an entry up a given time |
24 | struct rgw_data_sync_obligation { | |
25 | std::string key; | |
26 | std::string marker; | |
27 | ceph::real_time timestamp; | |
28 | bool retry = false; | |
29 | }; | |
30 | ||
31 | inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) { | |
32 | out << "key=" << o.key; | |
33 | if (!o.marker.empty()) { | |
34 | out << " marker=" << o.marker; | |
35 | } | |
36 | if (o.timestamp != ceph::real_time{}) { | |
37 | out << " timestamp=" << o.timestamp; | |
38 | } | |
39 | if (o.retry) { | |
40 | out << " retry"; | |
41 | } | |
42 | return out; | |
43 | } | |
44 | ||
9f95a23c TL |
45 | class JSONObj; |
46 | struct rgw_sync_bucket_pipe; | |
47 | ||
48 | struct rgw_bucket_sync_pair_info { | |
49 | RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */ | |
50 | rgw_bucket_shard source_bs; | |
51 | rgw_bucket_shard dest_bs; | |
52 | }; | |
53 | ||
54 | inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) { | |
55 | if (p.source_bs.bucket == p.dest_bs.bucket) { | |
56 | return out << p.source_bs; | |
57 | } | |
58 | ||
59 | out << p.source_bs; | |
60 | ||
61 | out << "->" << p.dest_bs.bucket; | |
62 | ||
63 | return out; | |
64 | } | |
65 | ||
66 | struct rgw_bucket_sync_pipe { | |
67 | rgw_bucket_sync_pair_info info; | |
68 | RGWBucketInfo source_bucket_info; | |
69 | map<string, bufferlist> source_bucket_attrs; | |
70 | RGWBucketInfo dest_bucket_info; | |
71 | map<string, bufferlist> dest_bucket_attrs; | |
72 | ||
73 | RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() { | |
74 | return info.handler.rules; | |
75 | } | |
76 | }; | |
77 | ||
78 | inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) { | |
79 | return out << p.info; | |
80 | } | |
7c673cae | 81 | |
7c673cae FG |
82 | struct rgw_datalog_info { |
83 | uint32_t num_shards; | |
84 | ||
85 | rgw_datalog_info() : num_shards(0) {} | |
86 | ||
87 | void decode_json(JSONObj *obj); | |
88 | }; | |
89 | ||
90 | struct rgw_data_sync_info { | |
91 | enum SyncState { | |
92 | StateInit = 0, | |
93 | StateBuildingFullSyncMaps = 1, | |
94 | StateSync = 2, | |
95 | }; | |
96 | ||
97 | uint16_t state; | |
98 | uint32_t num_shards; | |
99 | ||
31f18b77 FG |
100 | uint64_t instance_id{0}; |
101 | ||
7c673cae | 102 | void encode(bufferlist& bl) const { |
31f18b77 | 103 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
104 | encode(state, bl); |
105 | encode(num_shards, bl); | |
106 | encode(instance_id, bl); | |
7c673cae FG |
107 | ENCODE_FINISH(bl); |
108 | } | |
109 | ||
11fdf7f2 | 110 | void decode(bufferlist::const_iterator& bl) { |
31f18b77 | 111 | DECODE_START(2, bl); |
11fdf7f2 TL |
112 | decode(state, bl); |
113 | decode(num_shards, bl); | |
31f18b77 | 114 | if (struct_v >= 2) { |
11fdf7f2 | 115 | decode(instance_id, bl); |
31f18b77 | 116 | } |
7c673cae FG |
117 | DECODE_FINISH(bl); |
118 | } | |
119 | ||
120 | void dump(Formatter *f) const { | |
121 | string s; | |
122 | switch ((SyncState)state) { | |
123 | case StateInit: | |
124 | s = "init"; | |
125 | break; | |
126 | case StateBuildingFullSyncMaps: | |
127 | s = "building-full-sync-maps"; | |
128 | break; | |
129 | case StateSync: | |
130 | s = "sync"; | |
131 | break; | |
132 | default: | |
133 | s = "unknown"; | |
134 | break; | |
135 | } | |
136 | encode_json("status", s, f); | |
137 | encode_json("num_shards", num_shards, f); | |
31f18b77 | 138 | encode_json("instance_id", instance_id, f); |
7c673cae FG |
139 | } |
140 | void decode_json(JSONObj *obj) { | |
141 | std::string s; | |
142 | JSONDecoder::decode_json("status", s, obj); | |
143 | if (s == "building-full-sync-maps") { | |
144 | state = StateBuildingFullSyncMaps; | |
145 | } else if (s == "sync") { | |
146 | state = StateSync; | |
147 | } else { | |
148 | state = StateInit; | |
149 | } | |
150 | JSONDecoder::decode_json("num_shards", num_shards, obj); | |
a8e16298 | 151 | JSONDecoder::decode_json("instance_id", instance_id, obj); |
7c673cae | 152 | } |
b32b8144 | 153 | static void generate_test_instances(std::list<rgw_data_sync_info*>& o); |
7c673cae FG |
154 | |
155 | rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} | |
156 | }; | |
157 | WRITE_CLASS_ENCODER(rgw_data_sync_info) | |
158 | ||
159 | struct rgw_data_sync_marker { | |
160 | enum SyncState { | |
161 | FullSync = 0, | |
162 | IncrementalSync = 1, | |
163 | }; | |
164 | uint16_t state; | |
165 | string marker; | |
166 | string next_step_marker; | |
167 | uint64_t total_entries; | |
168 | uint64_t pos; | |
169 | real_time timestamp; | |
170 | ||
171 | rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {} | |
172 | ||
173 | void encode(bufferlist& bl) const { | |
174 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
175 | encode(state, bl); |
176 | encode(marker, bl); | |
177 | encode(next_step_marker, bl); | |
178 | encode(total_entries, bl); | |
179 | encode(pos, bl); | |
180 | encode(timestamp, bl); | |
7c673cae FG |
181 | ENCODE_FINISH(bl); |
182 | } | |
183 | ||
11fdf7f2 | 184 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 185 | DECODE_START(1, bl); |
11fdf7f2 TL |
186 | decode(state, bl); |
187 | decode(marker, bl); | |
188 | decode(next_step_marker, bl); | |
189 | decode(total_entries, bl); | |
190 | decode(pos, bl); | |
191 | decode(timestamp, bl); | |
7c673cae FG |
192 | DECODE_FINISH(bl); |
193 | } | |
194 | ||
195 | void dump(Formatter *f) const { | |
28e407b8 AA |
196 | const char *s{nullptr}; |
197 | switch ((SyncState)state) { | |
198 | case FullSync: | |
199 | s = "full-sync"; | |
200 | break; | |
201 | case IncrementalSync: | |
202 | s = "incremental-sync"; | |
203 | break; | |
204 | default: | |
205 | s = "unknown"; | |
206 | break; | |
207 | } | |
208 | encode_json("status", s, f); | |
7c673cae FG |
209 | encode_json("marker", marker, f); |
210 | encode_json("next_step_marker", next_step_marker, f); | |
211 | encode_json("total_entries", total_entries, f); | |
212 | encode_json("pos", pos, f); | |
213 | encode_json("timestamp", utime_t(timestamp), f); | |
214 | } | |
215 | void decode_json(JSONObj *obj) { | |
28e407b8 AA |
216 | std::string s; |
217 | JSONDecoder::decode_json("status", s, obj); | |
218 | if (s == "full-sync") { | |
219 | state = FullSync; | |
220 | } else if (s == "incremental-sync") { | |
221 | state = IncrementalSync; | |
222 | } | |
7c673cae FG |
223 | JSONDecoder::decode_json("marker", marker, obj); |
224 | JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); | |
225 | JSONDecoder::decode_json("total_entries", total_entries, obj); | |
226 | JSONDecoder::decode_json("pos", pos, obj); | |
227 | utime_t t; | |
228 | JSONDecoder::decode_json("timestamp", t, obj); | |
229 | timestamp = t.to_real_time(); | |
230 | } | |
b32b8144 | 231 | static void generate_test_instances(std::list<rgw_data_sync_marker*>& o); |
7c673cae FG |
232 | }; |
233 | WRITE_CLASS_ENCODER(rgw_data_sync_marker) | |
234 | ||
235 | struct rgw_data_sync_status { | |
236 | rgw_data_sync_info sync_info; | |
237 | map<uint32_t, rgw_data_sync_marker> sync_markers; | |
238 | ||
239 | rgw_data_sync_status() {} | |
240 | ||
241 | void encode(bufferlist& bl) const { | |
242 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 243 | encode(sync_info, bl); |
7c673cae FG |
244 | /* sync markers are encoded separately */ |
245 | ENCODE_FINISH(bl); | |
246 | } | |
247 | ||
11fdf7f2 | 248 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 249 | DECODE_START(1, bl); |
11fdf7f2 | 250 | decode(sync_info, bl); |
7c673cae FG |
251 | /* sync markers are decoded separately */ |
252 | DECODE_FINISH(bl); | |
253 | } | |
254 | ||
255 | void dump(Formatter *f) const { | |
256 | encode_json("info", sync_info, f); | |
257 | encode_json("markers", sync_markers, f); | |
258 | } | |
259 | void decode_json(JSONObj *obj) { | |
260 | JSONDecoder::decode_json("info", sync_info, obj); | |
261 | JSONDecoder::decode_json("markers", sync_markers, obj); | |
262 | } | |
b32b8144 | 263 | static void generate_test_instances(std::list<rgw_data_sync_status*>& o); |
7c673cae FG |
264 | }; |
265 | WRITE_CLASS_ENCODER(rgw_data_sync_status) | |
266 | ||
267 | struct rgw_datalog_entry { | |
268 | string key; | |
269 | ceph::real_time timestamp; | |
270 | ||
271 | void decode_json(JSONObj *obj); | |
272 | }; | |
273 | ||
274 | struct rgw_datalog_shard_data { | |
275 | string marker; | |
276 | bool truncated; | |
277 | vector<rgw_datalog_entry> entries; | |
278 | ||
279 | void decode_json(JSONObj *obj); | |
280 | }; | |
281 | ||
282 | class RGWAsyncRadosProcessor; | |
283 | class RGWDataSyncControlCR; | |
284 | ||
285 | struct rgw_bucket_entry_owner { | |
286 | string id; | |
287 | string display_name; | |
288 | ||
289 | rgw_bucket_entry_owner() {} | |
290 | rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {} | |
291 | ||
292 | void decode_json(JSONObj *obj); | |
293 | }; | |
294 | ||
295 | class RGWSyncErrorLogger; | |
11fdf7f2 | 296 | class RGWRESTConn; |
9f95a23c | 297 | class RGWServices; |
7c673cae FG |
298 | |
299 | struct RGWDataSyncEnv { | |
11fdf7f2 TL |
300 | const DoutPrefixProvider *dpp{nullptr}; |
301 | CephContext *cct{nullptr}; | |
9f95a23c TL |
302 | rgw::sal::RGWRadosStore *store{nullptr}; |
303 | RGWServices *svc{nullptr}; | |
11fdf7f2 TL |
304 | RGWAsyncRadosProcessor *async_rados{nullptr}; |
305 | RGWHTTPManager *http_manager{nullptr}; | |
306 | RGWSyncErrorLogger *error_logger{nullptr}; | |
307 | RGWSyncTraceManager *sync_tracer{nullptr}; | |
11fdf7f2 | 308 | RGWSyncModuleInstanceRef sync_module{nullptr}; |
81eedcae | 309 | PerfCounters* counters{nullptr}; |
7c673cae | 310 | |
11fdf7f2 | 311 | RGWDataSyncEnv() {} |
7c673cae | 312 | |
9f95a23c | 313 | void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWServices *_svc, |
7c673cae | 314 | RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, |
11fdf7f2 | 315 | RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, |
9f95a23c | 316 | RGWSyncModuleInstanceRef& _sync_module, |
81eedcae | 317 | PerfCounters* _counters) { |
9f95a23c | 318 | dpp = _dpp; |
7c673cae FG |
319 | cct = _cct; |
320 | store = _store; | |
9f95a23c | 321 | svc = _svc; |
7c673cae FG |
322 | async_rados = _async_rados; |
323 | http_manager = _http_manager; | |
324 | error_logger = _error_logger; | |
11fdf7f2 | 325 | sync_tracer = _sync_tracer; |
7c673cae | 326 | sync_module = _sync_module; |
81eedcae | 327 | counters = _counters; |
7c673cae FG |
328 | } |
329 | ||
330 | string shard_obj_name(int shard_id); | |
331 | string status_oid(); | |
332 | }; | |
333 | ||
9f95a23c TL |
334 | struct RGWDataSyncCtx { |
335 | CephContext *cct{nullptr}; | |
336 | RGWDataSyncEnv *env{nullptr}; | |
337 | ||
338 | RGWRESTConn *conn{nullptr}; | |
339 | rgw_zone_id source_zone; | |
340 | ||
341 | void init(RGWDataSyncEnv *_env, | |
342 | RGWRESTConn *_conn, | |
343 | const rgw_zone_id& _source_zone) { | |
344 | cct = _env->cct; | |
345 | env = _env; | |
346 | conn = _conn; | |
347 | source_zone = _source_zone; | |
348 | } | |
349 | }; | |
350 | ||
351 | class RGWRados; | |
9f95a23c | 352 | |
7c673cae | 353 | class RGWRemoteDataLog : public RGWCoroutinesManager { |
11fdf7f2 | 354 | const DoutPrefixProvider *dpp; |
9f95a23c TL |
355 | rgw::sal::RGWRadosStore *store; |
356 | CephContext *cct; | |
357 | RGWCoroutinesManagerRegistry *cr_registry; | |
7c673cae FG |
358 | RGWAsyncRadosProcessor *async_rados; |
359 | RGWHTTPManager http_manager; | |
360 | ||
361 | RGWDataSyncEnv sync_env; | |
9f95a23c | 362 | RGWDataSyncCtx sc; |
7c673cae | 363 | |
9f95a23c | 364 | ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock"); |
7c673cae FG |
365 | RGWDataSyncControlCR *data_sync_cr; |
366 | ||
11fdf7f2 TL |
367 | RGWSyncTraceNodeRef tn; |
368 | ||
7c673cae FG |
369 | bool initialized; |
370 | ||
371 | public: | |
9f95a23c TL |
372 | RGWRemoteDataLog(const DoutPrefixProvider *dpp, |
373 | rgw::sal::RGWRadosStore *_store, | |
374 | RGWAsyncRadosProcessor *async_rados); | |
375 | int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, | |
81eedcae TL |
376 | RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, |
377 | PerfCounters* _counters); | |
7c673cae FG |
378 | void finish(); |
379 | ||
380 | int read_log_info(rgw_datalog_info *log_info); | |
381 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info); | |
382 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result); | |
7c673cae | 383 | int read_sync_status(rgw_data_sync_status *sync_status); |
28e407b8 AA |
384 | int read_recovering_shards(const int num_shards, set<int>& recovering_shards); |
385 | int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries); | |
7c673cae FG |
386 | int init_sync_status(int num_shards); |
387 | int run_sync(int num_shards); | |
388 | ||
389 | void wakeup(int shard_id, set<string>& keys); | |
390 | }; | |
391 | ||
11fdf7f2 | 392 | class RGWDataSyncStatusManager : public DoutPrefixProvider { |
9f95a23c | 393 | rgw::sal::RGWRadosStore *store; |
7c673cae | 394 | |
9f95a23c | 395 | rgw_zone_id source_zone; |
7c673cae FG |
396 | RGWRESTConn *conn; |
397 | RGWSyncErrorLogger *error_logger; | |
398 | RGWSyncModuleInstanceRef sync_module; | |
81eedcae | 399 | PerfCounters* counters; |
7c673cae FG |
400 | |
401 | RGWRemoteDataLog source_log; | |
402 | ||
403 | string source_status_oid; | |
404 | string source_shard_status_oid_prefix; | |
405 | ||
406 | map<int, rgw_raw_obj> shard_objs; | |
407 | ||
408 | int num_shards; | |
409 | ||
410 | public: | |
9f95a23c TL |
411 | RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados, |
412 | const rgw_zone_id& _source_zone, PerfCounters* counters) | |
7c673cae | 413 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 414 | sync_module(nullptr), counters(counters), |
11fdf7f2 | 415 | source_log(this, store, async_rados), num_shards(0) {} |
9f95a23c TL |
416 | RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados, |
417 | const rgw_zone_id& _source_zone, PerfCounters* counters, | |
81eedcae | 418 | const RGWSyncModuleInstanceRef& _sync_module) |
94b18763 | 419 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 420 | sync_module(_sync_module), counters(counters), |
11fdf7f2 | 421 | source_log(this, store, async_rados), num_shards(0) {} |
7c673cae FG |
422 | ~RGWDataSyncStatusManager() { |
423 | finalize(); | |
424 | } | |
425 | int init(); | |
426 | void finalize(); | |
427 | ||
9f95a23c TL |
428 | static string shard_obj_name(const rgw_zone_id& source_zone, int shard_id); |
429 | static string sync_status_oid(const rgw_zone_id& source_zone); | |
7c673cae FG |
430 | |
431 | int read_sync_status(rgw_data_sync_status *sync_status) { | |
432 | return source_log.read_sync_status(sync_status); | |
433 | } | |
28e407b8 AA |
434 | |
435 | int read_recovering_shards(const int num_shards, set<int>& recovering_shards) { | |
436 | return source_log.read_recovering_shards(num_shards, recovering_shards); | |
437 | } | |
438 | ||
439 | int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) { | |
440 | return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries); | |
441 | } | |
7c673cae FG |
442 | int init_sync_status() { return source_log.init_sync_status(num_shards); } |
443 | ||
444 | int read_log_info(rgw_datalog_info *log_info) { | |
445 | return source_log.read_log_info(log_info); | |
446 | } | |
447 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) { | |
448 | return source_log.read_source_log_shards_info(shards_info); | |
449 | } | |
450 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) { | |
451 | return source_log.read_source_log_shards_next(shard_markers, result); | |
452 | } | |
453 | ||
454 | int run() { return source_log.run_sync(num_shards); } | |
455 | ||
456 | void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); } | |
457 | void stop() { | |
458 | source_log.finish(); | |
459 | } | |
11fdf7f2 TL |
460 | |
461 | // implements DoutPrefixProvider | |
9f95a23c | 462 | CephContext *get_cct() const override; |
11fdf7f2 TL |
463 | unsigned get_subsys() const override; |
464 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae FG |
465 | }; |
466 | ||
9f95a23c | 467 | class RGWBucketPipeSyncStatusManager; |
7c673cae FG |
468 | class RGWBucketSyncCR; |
469 | ||
470 | struct rgw_bucket_shard_full_sync_marker { | |
471 | rgw_obj_key position; | |
472 | uint64_t count; | |
473 | ||
474 | rgw_bucket_shard_full_sync_marker() : count(0) {} | |
475 | ||
476 | void encode_attr(map<string, bufferlist>& attrs); | |
477 | ||
478 | void encode(bufferlist& bl) const { | |
479 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
480 | encode(position, bl); |
481 | encode(count, bl); | |
7c673cae FG |
482 | ENCODE_FINISH(bl); |
483 | } | |
484 | ||
11fdf7f2 | 485 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 486 | DECODE_START(1, bl); |
11fdf7f2 TL |
487 | decode(position, bl); |
488 | decode(count, bl); | |
7c673cae FG |
489 | DECODE_FINISH(bl); |
490 | } | |
491 | ||
b32b8144 FG |
492 | void dump(Formatter *f) const; |
493 | void decode_json(JSONObj *obj); | |
7c673cae FG |
494 | }; |
495 | WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker) | |
496 | ||
497 | struct rgw_bucket_shard_inc_sync_marker { | |
498 | string position; | |
f67539c2 | 499 | ceph::real_time timestamp; |
7c673cae FG |
500 | |
501 | void encode_attr(map<string, bufferlist>& attrs); | |
502 | ||
503 | void encode(bufferlist& bl) const { | |
f67539c2 | 504 | ENCODE_START(2, 1, bl); |
11fdf7f2 | 505 | encode(position, bl); |
f67539c2 | 506 | encode(timestamp, bl); |
7c673cae FG |
507 | ENCODE_FINISH(bl); |
508 | } | |
509 | ||
11fdf7f2 | 510 | void decode(bufferlist::const_iterator& bl) { |
f67539c2 | 511 | DECODE_START(2, bl); |
11fdf7f2 | 512 | decode(position, bl); |
f67539c2 TL |
513 | if (struct_v >= 2) { |
514 | decode(timestamp, bl); | |
515 | } | |
516 | DECODE_FINISH(bl); | |
7c673cae FG |
517 | } |
518 | ||
b32b8144 FG |
519 | void dump(Formatter *f) const; |
520 | void decode_json(JSONObj *obj); | |
7c673cae FG |
521 | }; |
522 | WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker) | |
523 | ||
524 | struct rgw_bucket_shard_sync_info { | |
525 | enum SyncState { | |
526 | StateInit = 0, | |
527 | StateFullSync = 1, | |
528 | StateIncrementalSync = 2, | |
9f95a23c | 529 | StateStopped = 3, |
7c673cae FG |
530 | }; |
531 | ||
532 | uint16_t state; | |
533 | rgw_bucket_shard_full_sync_marker full_marker; | |
534 | rgw_bucket_shard_inc_sync_marker inc_marker; | |
535 | ||
536 | void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs); | |
537 | void encode_all_attrs(map<string, bufferlist>& attrs); | |
538 | void encode_state_attr(map<string, bufferlist>& attrs); | |
539 | ||
540 | void encode(bufferlist& bl) const { | |
541 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
542 | encode(state, bl); |
543 | encode(full_marker, bl); | |
544 | encode(inc_marker, bl); | |
7c673cae FG |
545 | ENCODE_FINISH(bl); |
546 | } | |
547 | ||
11fdf7f2 | 548 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 549 | DECODE_START(1, bl); |
11fdf7f2 TL |
550 | decode(state, bl); |
551 | decode(full_marker, bl); | |
552 | decode(inc_marker, bl); | |
7c673cae FG |
553 | DECODE_FINISH(bl); |
554 | } | |
555 | ||
b32b8144 FG |
556 | void dump(Formatter *f) const; |
557 | void decode_json(JSONObj *obj); | |
7c673cae FG |
558 | |
559 | rgw_bucket_shard_sync_info() : state((int)StateInit) {} | |
560 | ||
561 | }; | |
562 | WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) | |
563 | ||
28e407b8 AA |
564 | struct rgw_bucket_index_marker_info { |
565 | string bucket_ver; | |
566 | string master_ver; | |
567 | string max_marker; | |
568 | bool syncstopped{false}; | |
569 | ||
570 | void decode_json(JSONObj *obj) { | |
571 | JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); | |
572 | JSONDecoder::decode_json("master_ver", master_ver, obj); | |
573 | JSONDecoder::decode_json("max_marker", max_marker, obj); | |
574 | JSONDecoder::decode_json("syncstopped", syncstopped, obj); | |
575 | } | |
576 | }; | |
577 | ||
7c673cae | 578 | |
9f95a23c | 579 | class RGWRemoteBucketManager { |
11fdf7f2 | 580 | const DoutPrefixProvider *dpp; |
9f95a23c TL |
581 | |
582 | RGWDataSyncEnv *sync_env; | |
583 | ||
7c673cae | 584 | RGWRESTConn *conn{nullptr}; |
9f95a23c | 585 | rgw_zone_id source_zone; |
7c673cae | 586 | |
9f95a23c | 587 | vector<rgw_bucket_sync_pair_info> sync_pairs; |
7c673cae | 588 | |
9f95a23c | 589 | RGWDataSyncCtx sc; |
7c673cae FG |
590 | rgw_bucket_shard_sync_info init_status; |
591 | ||
592 | RGWBucketSyncCR *sync_cr{nullptr}; | |
593 | ||
594 | public: | |
9f95a23c TL |
595 | RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, |
596 | RGWDataSyncEnv *_sync_env, | |
597 | const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
598 | const RGWBucketInfo& source_bucket_info, | |
599 | const rgw_bucket& dest_bucket); | |
600 | ||
601 | void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
602 | const rgw_bucket& source_bucket, int shard_id, | |
603 | const rgw_bucket& dest_bucket); | |
604 | ||
605 | RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); | |
f67539c2 | 606 | RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker); |
9f95a23c TL |
607 | RGWCoroutine *run_sync_cr(int num); |
608 | ||
609 | int num_pipes() { | |
610 | return sync_pairs.size(); | |
611 | } | |
7c673cae FG |
612 | |
613 | void wakeup(); | |
614 | }; | |
615 | ||
f67539c2 TL |
616 | class BucketIndexShardsManager; |
617 | ||
618 | int rgw_read_remote_bilog_info(RGWRESTConn* conn, | |
619 | const rgw_bucket& bucket, | |
620 | BucketIndexShardsManager& markers, | |
621 | optional_yield y); | |
622 | ||
9f95a23c TL |
623 | class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { |
624 | rgw::sal::RGWRadosStore *store; | |
625 | ||
626 | RGWDataSyncEnv sync_env; | |
7c673cae FG |
627 | |
628 | RGWCoroutinesManager cr_mgr; | |
629 | ||
630 | RGWHTTPManager http_manager; | |
631 | ||
9f95a23c TL |
632 | std::optional<rgw_zone_id> source_zone; |
633 | std::optional<rgw_bucket> source_bucket; | |
634 | ||
7c673cae FG |
635 | RGWRESTConn *conn; |
636 | RGWSyncErrorLogger *error_logger; | |
637 | RGWSyncModuleInstanceRef sync_module; | |
638 | ||
9f95a23c | 639 | rgw_bucket dest_bucket; |
7c673cae | 640 | |
9f95a23c | 641 | vector<RGWRemoteBucketManager *> source_mgrs; |
7c673cae FG |
642 | |
643 | string source_status_oid; | |
644 | string source_shard_status_oid_prefix; | |
645 | ||
646 | map<int, rgw_bucket_shard_sync_info> sync_status; | |
647 | rgw_raw_obj status_obj; | |
648 | ||
649 | int num_shards; | |
650 | ||
651 | public: | |
9f95a23c TL |
652 | RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, |
653 | std::optional<rgw_zone_id> _source_zone, | |
654 | std::optional<rgw_bucket> _source_bucket, | |
655 | const rgw_bucket& dest_bucket); | |
656 | ~RGWBucketPipeSyncStatusManager(); | |
7c673cae FG |
657 | |
658 | int init(); | |
659 | ||
660 | map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; } | |
661 | int init_sync_status(); | |
662 | ||
9f95a23c TL |
663 | static string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs); |
664 | static string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe, | |
f67539c2 | 665 | const rgw_zone_id& source_zone, const rgw::sal::RGWObject* obj); /* specific source obj sync status, |
9f95a23c | 666 | can be used by sync modules */ |
11fdf7f2 TL |
667 | |
668 | // implements DoutPrefixProvider | |
9f95a23c | 669 | CephContext *get_cct() const override; |
11fdf7f2 TL |
670 | unsigned get_subsys() const override; |
671 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae FG |
672 | |
673 | int read_sync_status(); | |
674 | int run(); | |
675 | }; | |
676 | ||
b32b8144 | 677 | /// read the sync status of all bucket shards from the given source zone |
9f95a23c TL |
678 | int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, |
679 | rgw::sal::RGWRadosStore *store, | |
680 | const rgw_sync_bucket_pipe& pipe, | |
681 | const RGWBucketInfo& dest_bucket_info, | |
682 | const RGWBucketInfo *psource_bucket_info, | |
b32b8144 FG |
683 | std::vector<rgw_bucket_shard_sync_info> *status); |
684 | ||
7c673cae FG |
685 | class RGWDefaultSyncModule : public RGWSyncModule { |
686 | public: | |
687 | RGWDefaultSyncModule() {} | |
11fdf7f2 | 688 | bool supports_writes() override { return true; } |
7c673cae | 689 | bool supports_data_export() override { return true; } |
11fdf7f2 TL |
690 | int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; |
691 | }; | |
692 | ||
693 | class RGWArchiveSyncModule : public RGWDefaultSyncModule { | |
694 | public: | |
695 | RGWArchiveSyncModule() {} | |
696 | bool supports_writes() override { return true; } | |
697 | bool supports_data_export() override { return false; } | |
698 | int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; | |
7c673cae FG |
699 | }; |
700 | ||
7c673cae | 701 | #endif |