]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | #ifndef CEPH_RGW_DATA_SYNC_H |
2 | #define CEPH_RGW_DATA_SYNC_H | |
3 | ||
4 | #include "rgw_coroutine.h" | |
5 | #include "rgw_http_client.h" | |
6 | #include "rgw_bucket.h" | |
7 | ||
8 | #include "rgw_sync_module.h" | |
9 | ||
10 | #include "common/RWLock.h" | |
11 | #include "common/ceph_json.h" | |
12 | ||
13 | ||
14 | struct rgw_datalog_info { | |
15 | uint32_t num_shards; | |
16 | ||
17 | rgw_datalog_info() : num_shards(0) {} | |
18 | ||
19 | void decode_json(JSONObj *obj); | |
20 | }; | |
21 | ||
22 | struct rgw_data_sync_info { | |
23 | enum SyncState { | |
24 | StateInit = 0, | |
25 | StateBuildingFullSyncMaps = 1, | |
26 | StateSync = 2, | |
27 | }; | |
28 | ||
29 | uint16_t state; | |
30 | uint32_t num_shards; | |
31 | ||
31f18b77 FG |
32 | uint64_t instance_id{0}; |
33 | ||
7c673cae | 34 | void encode(bufferlist& bl) const { |
31f18b77 | 35 | ENCODE_START(2, 1, bl); |
7c673cae FG |
36 | ::encode(state, bl); |
37 | ::encode(num_shards, bl); | |
31f18b77 | 38 | ::encode(instance_id, bl); |
7c673cae FG |
39 | ENCODE_FINISH(bl); |
40 | } | |
41 | ||
42 | void decode(bufferlist::iterator& bl) { | |
31f18b77 | 43 | DECODE_START(2, bl); |
7c673cae FG |
44 | ::decode(state, bl); |
45 | ::decode(num_shards, bl); | |
31f18b77 FG |
46 | if (struct_v >= 2) { |
47 | ::decode(instance_id, bl); | |
48 | } | |
7c673cae FG |
49 | DECODE_FINISH(bl); |
50 | } | |
51 | ||
52 | void dump(Formatter *f) const { | |
53 | string s; | |
54 | switch ((SyncState)state) { | |
55 | case StateInit: | |
56 | s = "init"; | |
57 | break; | |
58 | case StateBuildingFullSyncMaps: | |
59 | s = "building-full-sync-maps"; | |
60 | break; | |
61 | case StateSync: | |
62 | s = "sync"; | |
63 | break; | |
64 | default: | |
65 | s = "unknown"; | |
66 | break; | |
67 | } | |
68 | encode_json("status", s, f); | |
69 | encode_json("num_shards", num_shards, f); | |
31f18b77 | 70 | encode_json("instance_id", instance_id, f); |
7c673cae FG |
71 | } |
72 | void decode_json(JSONObj *obj) { | |
73 | std::string s; | |
74 | JSONDecoder::decode_json("status", s, obj); | |
75 | if (s == "building-full-sync-maps") { | |
76 | state = StateBuildingFullSyncMaps; | |
77 | } else if (s == "sync") { | |
78 | state = StateSync; | |
79 | } else { | |
80 | state = StateInit; | |
81 | } | |
82 | JSONDecoder::decode_json("num_shards", num_shards, obj); | |
31f18b77 | 83 | JSONDecoder::decode_json("instance_id", num_shards, obj); |
7c673cae FG |
84 | } |
85 | ||
86 | rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} | |
87 | }; | |
88 | WRITE_CLASS_ENCODER(rgw_data_sync_info) | |
89 | ||
90 | struct rgw_data_sync_marker { | |
91 | enum SyncState { | |
92 | FullSync = 0, | |
93 | IncrementalSync = 1, | |
94 | }; | |
95 | uint16_t state; | |
96 | string marker; | |
97 | string next_step_marker; | |
98 | uint64_t total_entries; | |
99 | uint64_t pos; | |
100 | real_time timestamp; | |
101 | ||
102 | rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {} | |
103 | ||
104 | void encode(bufferlist& bl) const { | |
105 | ENCODE_START(1, 1, bl); | |
106 | ::encode(state, bl); | |
107 | ::encode(marker, bl); | |
108 | ::encode(next_step_marker, bl); | |
109 | ::encode(total_entries, bl); | |
110 | ::encode(pos, bl); | |
111 | ::encode(timestamp, bl); | |
112 | ENCODE_FINISH(bl); | |
113 | } | |
114 | ||
115 | void decode(bufferlist::iterator& bl) { | |
116 | DECODE_START(1, bl); | |
117 | ::decode(state, bl); | |
118 | ::decode(marker, bl); | |
119 | ::decode(next_step_marker, bl); | |
120 | ::decode(total_entries, bl); | |
121 | ::decode(pos, bl); | |
122 | ::decode(timestamp, bl); | |
123 | DECODE_FINISH(bl); | |
124 | } | |
125 | ||
126 | void dump(Formatter *f) const { | |
127 | encode_json("state", (int)state, f); | |
128 | encode_json("marker", marker, f); | |
129 | encode_json("next_step_marker", next_step_marker, f); | |
130 | encode_json("total_entries", total_entries, f); | |
131 | encode_json("pos", pos, f); | |
132 | encode_json("timestamp", utime_t(timestamp), f); | |
133 | } | |
134 | void decode_json(JSONObj *obj) { | |
135 | int s; | |
136 | JSONDecoder::decode_json("state", s, obj); | |
137 | state = s; | |
138 | JSONDecoder::decode_json("marker", marker, obj); | |
139 | JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); | |
140 | JSONDecoder::decode_json("total_entries", total_entries, obj); | |
141 | JSONDecoder::decode_json("pos", pos, obj); | |
142 | utime_t t; | |
143 | JSONDecoder::decode_json("timestamp", t, obj); | |
144 | timestamp = t.to_real_time(); | |
145 | } | |
146 | }; | |
147 | WRITE_CLASS_ENCODER(rgw_data_sync_marker) | |
148 | ||
149 | struct rgw_data_sync_status { | |
150 | rgw_data_sync_info sync_info; | |
151 | map<uint32_t, rgw_data_sync_marker> sync_markers; | |
152 | ||
153 | rgw_data_sync_status() {} | |
154 | ||
155 | void encode(bufferlist& bl) const { | |
156 | ENCODE_START(1, 1, bl); | |
157 | ::encode(sync_info, bl); | |
158 | /* sync markers are encoded separately */ | |
159 | ENCODE_FINISH(bl); | |
160 | } | |
161 | ||
162 | void decode(bufferlist::iterator& bl) { | |
163 | DECODE_START(1, bl); | |
164 | ::decode(sync_info, bl); | |
165 | /* sync markers are decoded separately */ | |
166 | DECODE_FINISH(bl); | |
167 | } | |
168 | ||
169 | void dump(Formatter *f) const { | |
170 | encode_json("info", sync_info, f); | |
171 | encode_json("markers", sync_markers, f); | |
172 | } | |
173 | void decode_json(JSONObj *obj) { | |
174 | JSONDecoder::decode_json("info", sync_info, obj); | |
175 | JSONDecoder::decode_json("markers", sync_markers, obj); | |
176 | } | |
177 | }; | |
178 | WRITE_CLASS_ENCODER(rgw_data_sync_status) | |
179 | ||
180 | struct rgw_datalog_entry { | |
181 | string key; | |
182 | ceph::real_time timestamp; | |
183 | ||
184 | void decode_json(JSONObj *obj); | |
185 | }; | |
186 | ||
187 | struct rgw_datalog_shard_data { | |
188 | string marker; | |
189 | bool truncated; | |
190 | vector<rgw_datalog_entry> entries; | |
191 | ||
192 | void decode_json(JSONObj *obj); | |
193 | }; | |
194 | ||
195 | class RGWAsyncRadosProcessor; | |
196 | class RGWDataSyncControlCR; | |
197 | ||
198 | struct rgw_bucket_entry_owner { | |
199 | string id; | |
200 | string display_name; | |
201 | ||
202 | rgw_bucket_entry_owner() {} | |
203 | rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {} | |
204 | ||
205 | void decode_json(JSONObj *obj); | |
206 | }; | |
207 | ||
208 | class RGWSyncErrorLogger; | |
209 | ||
210 | struct RGWDataSyncEnv { | |
211 | CephContext *cct; | |
212 | RGWRados *store; | |
213 | RGWRESTConn *conn; | |
214 | RGWAsyncRadosProcessor *async_rados; | |
215 | RGWHTTPManager *http_manager; | |
216 | RGWSyncErrorLogger *error_logger; | |
217 | string source_zone; | |
218 | RGWSyncModuleInstanceRef sync_module; | |
219 | ||
220 | RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {} | |
221 | ||
222 | void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, | |
223 | RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, | |
224 | RGWSyncErrorLogger *_error_logger, const string& _source_zone, | |
225 | RGWSyncModuleInstanceRef& _sync_module) { | |
226 | cct = _cct; | |
227 | store = _store; | |
228 | conn = _conn; | |
229 | async_rados = _async_rados; | |
230 | http_manager = _http_manager; | |
231 | error_logger = _error_logger; | |
232 | source_zone = _source_zone; | |
233 | sync_module = _sync_module; | |
234 | } | |
235 | ||
236 | string shard_obj_name(int shard_id); | |
237 | string status_oid(); | |
238 | }; | |
239 | ||
240 | class RGWRemoteDataLog : public RGWCoroutinesManager { | |
241 | RGWRados *store; | |
242 | RGWAsyncRadosProcessor *async_rados; | |
243 | RGWHTTPManager http_manager; | |
244 | ||
245 | RGWDataSyncEnv sync_env; | |
246 | ||
247 | RWLock lock; | |
248 | RGWDataSyncControlCR *data_sync_cr; | |
249 | ||
250 | bool initialized; | |
251 | ||
252 | public: | |
253 | RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) | |
254 | : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), | |
255 | store(_store), async_rados(async_rados), | |
256 | http_manager(store->ctx(), completion_mgr), | |
257 | lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), | |
258 | initialized(false) {} | |
259 | int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module); | |
260 | void finish(); | |
261 | ||
262 | int read_log_info(rgw_datalog_info *log_info); | |
263 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info); | |
264 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result); | |
265 | int get_shard_info(int shard_id); | |
266 | int read_sync_status(rgw_data_sync_status *sync_status); | |
267 | int init_sync_status(int num_shards); | |
268 | int run_sync(int num_shards); | |
269 | ||
270 | void wakeup(int shard_id, set<string>& keys); | |
271 | }; | |
272 | ||
273 | class RGWDataSyncStatusManager { | |
274 | RGWRados *store; | |
275 | rgw_rados_ref ref; | |
276 | ||
277 | string source_zone; | |
278 | RGWRESTConn *conn; | |
279 | RGWSyncErrorLogger *error_logger; | |
280 | RGWSyncModuleInstanceRef sync_module; | |
281 | ||
282 | RGWRemoteDataLog source_log; | |
283 | ||
284 | string source_status_oid; | |
285 | string source_shard_status_oid_prefix; | |
286 | ||
287 | map<int, rgw_raw_obj> shard_objs; | |
288 | ||
289 | int num_shards; | |
290 | ||
291 | public: | |
292 | RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, | |
293 | const string& _source_zone) | |
294 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), | |
295 | sync_module(nullptr), | |
296 | source_log(store, async_rados), num_shards(0) {} | |
297 | ~RGWDataSyncStatusManager() { | |
298 | finalize(); | |
299 | } | |
300 | int init(); | |
301 | void finalize(); | |
302 | ||
303 | static string shard_obj_name(const string& source_zone, int shard_id); | |
304 | static string sync_status_oid(const string& source_zone); | |
305 | ||
306 | int read_sync_status(rgw_data_sync_status *sync_status) { | |
307 | return source_log.read_sync_status(sync_status); | |
308 | } | |
309 | int init_sync_status() { return source_log.init_sync_status(num_shards); } | |
310 | ||
311 | int read_log_info(rgw_datalog_info *log_info) { | |
312 | return source_log.read_log_info(log_info); | |
313 | } | |
314 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) { | |
315 | return source_log.read_source_log_shards_info(shards_info); | |
316 | } | |
317 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) { | |
318 | return source_log.read_source_log_shards_next(shard_markers, result); | |
319 | } | |
320 | ||
321 | int run() { return source_log.run_sync(num_shards); } | |
322 | ||
323 | void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); } | |
324 | void stop() { | |
325 | source_log.finish(); | |
326 | } | |
327 | }; | |
328 | ||
329 | class RGWBucketSyncStatusManager; | |
330 | class RGWBucketSyncCR; | |
331 | ||
332 | struct rgw_bucket_shard_full_sync_marker { | |
333 | rgw_obj_key position; | |
334 | uint64_t count; | |
335 | ||
336 | rgw_bucket_shard_full_sync_marker() : count(0) {} | |
337 | ||
338 | void encode_attr(map<string, bufferlist>& attrs); | |
339 | ||
340 | void encode(bufferlist& bl) const { | |
341 | ENCODE_START(1, 1, bl); | |
342 | ::encode(position, bl); | |
343 | ::encode(count, bl); | |
344 | ENCODE_FINISH(bl); | |
345 | } | |
346 | ||
347 | void decode(bufferlist::iterator& bl) { | |
348 | DECODE_START(1, bl); | |
349 | ::decode(position, bl); | |
350 | ::decode(count, bl); | |
351 | DECODE_FINISH(bl); | |
352 | } | |
353 | ||
354 | void dump(Formatter *f) const { | |
355 | encode_json("position", position, f); | |
356 | encode_json("count", count, f); | |
357 | } | |
358 | }; | |
359 | WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker) | |
360 | ||
361 | struct rgw_bucket_shard_inc_sync_marker { | |
362 | string position; | |
363 | ||
364 | rgw_bucket_shard_inc_sync_marker() {} | |
365 | ||
366 | void encode_attr(map<string, bufferlist>& attrs); | |
367 | ||
368 | void encode(bufferlist& bl) const { | |
369 | ENCODE_START(1, 1, bl); | |
370 | ::encode(position, bl); | |
371 | ENCODE_FINISH(bl); | |
372 | } | |
373 | ||
374 | void decode(bufferlist::iterator& bl) { | |
375 | DECODE_START(1, bl); | |
376 | ::decode(position, bl); | |
377 | DECODE_FINISH(bl); | |
378 | } | |
379 | ||
380 | void dump(Formatter *f) const { | |
381 | encode_json("position", position, f); | |
382 | } | |
383 | ||
384 | bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const { | |
385 | return (position < m.position); | |
386 | } | |
387 | }; | |
388 | WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker) | |
389 | ||
390 | struct rgw_bucket_shard_sync_info { | |
391 | enum SyncState { | |
392 | StateInit = 0, | |
393 | StateFullSync = 1, | |
394 | StateIncrementalSync = 2, | |
395 | }; | |
396 | ||
397 | uint16_t state; | |
398 | rgw_bucket_shard_full_sync_marker full_marker; | |
399 | rgw_bucket_shard_inc_sync_marker inc_marker; | |
400 | ||
401 | void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs); | |
402 | void encode_all_attrs(map<string, bufferlist>& attrs); | |
403 | void encode_state_attr(map<string, bufferlist>& attrs); | |
404 | ||
405 | void encode(bufferlist& bl) const { | |
406 | ENCODE_START(1, 1, bl); | |
407 | ::encode(state, bl); | |
408 | ::encode(full_marker, bl); | |
409 | ::encode(inc_marker, bl); | |
410 | ENCODE_FINISH(bl); | |
411 | } | |
412 | ||
413 | void decode(bufferlist::iterator& bl) { | |
414 | DECODE_START(1, bl); | |
415 | ::decode(state, bl); | |
416 | ::decode(full_marker, bl); | |
417 | ::decode(inc_marker, bl); | |
418 | DECODE_FINISH(bl); | |
419 | } | |
420 | ||
421 | void dump(Formatter *f) const { | |
422 | string s; | |
423 | switch ((SyncState)state) { | |
424 | case StateInit: | |
425 | s = "init"; | |
426 | break; | |
427 | case StateFullSync: | |
428 | s = "full-sync"; | |
429 | break; | |
430 | case StateIncrementalSync: | |
431 | s = "incremental-sync"; | |
432 | break; | |
433 | default: | |
434 | s = "unknown"; | |
435 | break; | |
436 | } | |
437 | encode_json("status", s, f); | |
438 | encode_json("full_marker", full_marker, f); | |
439 | encode_json("inc_marker", inc_marker, f); | |
440 | } | |
441 | ||
442 | rgw_bucket_shard_sync_info() : state((int)StateInit) {} | |
443 | ||
444 | }; | |
445 | WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) | |
446 | ||
447 | ||
448 | class RGWRemoteBucketLog : public RGWCoroutinesManager { | |
449 | RGWRados *store; | |
450 | RGWRESTConn *conn{nullptr}; | |
451 | string source_zone; | |
452 | rgw_bucket_shard bs; | |
453 | ||
454 | RGWBucketSyncStatusManager *status_manager; | |
455 | RGWAsyncRadosProcessor *async_rados; | |
456 | RGWHTTPManager *http_manager; | |
457 | ||
458 | RGWDataSyncEnv sync_env; | |
459 | rgw_bucket_shard_sync_info init_status; | |
460 | ||
461 | RGWBucketSyncCR *sync_cr{nullptr}; | |
462 | ||
463 | public: | |
464 | RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm, | |
465 | RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), | |
466 | status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager) {} | |
467 | ||
468 | int init(const string& _source_zone, RGWRESTConn *_conn, | |
469 | const rgw_bucket& bucket, int shard_id, | |
470 | RGWSyncErrorLogger *_error_logger, | |
471 | RGWSyncModuleInstanceRef& _sync_module); | |
472 | void finish(); | |
473 | ||
474 | RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status); | |
475 | RGWCoroutine *init_sync_status_cr(); | |
476 | RGWCoroutine *run_sync_cr(); | |
477 | ||
478 | void wakeup(); | |
479 | }; | |
480 | ||
481 | class RGWBucketSyncStatusManager { | |
482 | RGWRados *store; | |
483 | ||
484 | RGWCoroutinesManager cr_mgr; | |
485 | ||
486 | RGWHTTPManager http_manager; | |
487 | ||
488 | string source_zone; | |
489 | RGWRESTConn *conn; | |
490 | RGWSyncErrorLogger *error_logger; | |
491 | RGWSyncModuleInstanceRef sync_module; | |
492 | ||
493 | rgw_bucket bucket; | |
494 | ||
495 | map<int, RGWRemoteBucketLog *> source_logs; | |
496 | ||
497 | string source_status_oid; | |
498 | string source_shard_status_oid_prefix; | |
499 | ||
500 | map<int, rgw_bucket_shard_sync_info> sync_status; | |
501 | rgw_raw_obj status_obj; | |
502 | ||
503 | int num_shards; | |
504 | ||
505 | public: | |
506 | RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone, | |
507 | const rgw_bucket& bucket) : store(_store), | |
508 | cr_mgr(_store->ctx(), _store->get_cr_registry()), | |
509 | http_manager(store->ctx(), cr_mgr.get_completion_mgr()), | |
510 | source_zone(_source_zone), | |
511 | conn(NULL), error_logger(NULL), | |
512 | bucket(bucket), | |
513 | num_shards(0) {} | |
514 | ~RGWBucketSyncStatusManager(); | |
515 | ||
516 | int init(); | |
517 | ||
518 | map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; } | |
519 | int init_sync_status(); | |
520 | ||
521 | static string status_oid(const string& source_zone, const rgw_bucket_shard& bs); | |
522 | ||
523 | int read_sync_status(); | |
524 | int run(); | |
525 | }; | |
526 | ||
527 | class RGWDefaultSyncModule : public RGWSyncModule { | |
528 | public: | |
529 | RGWDefaultSyncModule() {} | |
530 | bool supports_data_export() override { return true; } | |
31f18b77 | 531 | int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override; |
7c673cae FG |
532 | }; |
533 | ||
534 | // DataLogTrimCR factory function | |
535 | extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store, | |
536 | RGWHTTPManager *http, | |
537 | int num_shards, utime_t interval); | |
538 | ||
539 | #endif |