]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
7c673cae FG |
4 | #ifndef CEPH_RGW_DATA_SYNC_H |
5 | #define CEPH_RGW_DATA_SYNC_H | |
6 | ||
11fdf7f2 TL |
7 | #include "include/encoding.h" |
8 | ||
9 | #include "common/RWLock.h" | |
10 | #include "common/ceph_json.h" | |
11 | ||
12 | ||
7c673cae FG |
13 | #include "rgw_coroutine.h" |
14 | #include "rgw_http_client.h" | |
15 | #include "rgw_bucket.h" | |
16 | ||
17 | #include "rgw_sync_module.h" | |
11fdf7f2 | 18 | #include "rgw_sync_trace.h" |
7c673cae | 19 | |
7c673cae FG |
20 | struct rgw_datalog_info { |
21 | uint32_t num_shards; | |
22 | ||
23 | rgw_datalog_info() : num_shards(0) {} | |
24 | ||
25 | void decode_json(JSONObj *obj); | |
26 | }; | |
27 | ||
28 | struct rgw_data_sync_info { | |
29 | enum SyncState { | |
30 | StateInit = 0, | |
31 | StateBuildingFullSyncMaps = 1, | |
32 | StateSync = 2, | |
33 | }; | |
34 | ||
35 | uint16_t state; | |
36 | uint32_t num_shards; | |
37 | ||
31f18b77 FG |
38 | uint64_t instance_id{0}; |
39 | ||
7c673cae | 40 | void encode(bufferlist& bl) const { |
31f18b77 | 41 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
42 | encode(state, bl); |
43 | encode(num_shards, bl); | |
44 | encode(instance_id, bl); | |
7c673cae FG |
45 | ENCODE_FINISH(bl); |
46 | } | |
47 | ||
11fdf7f2 | 48 | void decode(bufferlist::const_iterator& bl) { |
31f18b77 | 49 | DECODE_START(2, bl); |
11fdf7f2 TL |
50 | decode(state, bl); |
51 | decode(num_shards, bl); | |
31f18b77 | 52 | if (struct_v >= 2) { |
11fdf7f2 | 53 | decode(instance_id, bl); |
31f18b77 | 54 | } |
7c673cae FG |
55 | DECODE_FINISH(bl); |
56 | } | |
57 | ||
58 | void dump(Formatter *f) const { | |
59 | string s; | |
60 | switch ((SyncState)state) { | |
61 | case StateInit: | |
62 | s = "init"; | |
63 | break; | |
64 | case StateBuildingFullSyncMaps: | |
65 | s = "building-full-sync-maps"; | |
66 | break; | |
67 | case StateSync: | |
68 | s = "sync"; | |
69 | break; | |
70 | default: | |
71 | s = "unknown"; | |
72 | break; | |
73 | } | |
74 | encode_json("status", s, f); | |
75 | encode_json("num_shards", num_shards, f); | |
31f18b77 | 76 | encode_json("instance_id", instance_id, f); |
7c673cae FG |
77 | } |
78 | void decode_json(JSONObj *obj) { | |
79 | std::string s; | |
80 | JSONDecoder::decode_json("status", s, obj); | |
81 | if (s == "building-full-sync-maps") { | |
82 | state = StateBuildingFullSyncMaps; | |
83 | } else if (s == "sync") { | |
84 | state = StateSync; | |
85 | } else { | |
86 | state = StateInit; | |
87 | } | |
88 | JSONDecoder::decode_json("num_shards", num_shards, obj); | |
a8e16298 | 89 | JSONDecoder::decode_json("instance_id", instance_id, obj); |
7c673cae | 90 | } |
b32b8144 | 91 | static void generate_test_instances(std::list<rgw_data_sync_info*>& o); |
7c673cae FG |
92 | |
93 | rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} | |
94 | }; | |
95 | WRITE_CLASS_ENCODER(rgw_data_sync_info) | |
96 | ||
97 | struct rgw_data_sync_marker { | |
98 | enum SyncState { | |
99 | FullSync = 0, | |
100 | IncrementalSync = 1, | |
101 | }; | |
102 | uint16_t state; | |
103 | string marker; | |
104 | string next_step_marker; | |
105 | uint64_t total_entries; | |
106 | uint64_t pos; | |
107 | real_time timestamp; | |
108 | ||
109 | rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {} | |
110 | ||
111 | void encode(bufferlist& bl) const { | |
112 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
113 | encode(state, bl); |
114 | encode(marker, bl); | |
115 | encode(next_step_marker, bl); | |
116 | encode(total_entries, bl); | |
117 | encode(pos, bl); | |
118 | encode(timestamp, bl); | |
7c673cae FG |
119 | ENCODE_FINISH(bl); |
120 | } | |
121 | ||
11fdf7f2 | 122 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 123 | DECODE_START(1, bl); |
11fdf7f2 TL |
124 | decode(state, bl); |
125 | decode(marker, bl); | |
126 | decode(next_step_marker, bl); | |
127 | decode(total_entries, bl); | |
128 | decode(pos, bl); | |
129 | decode(timestamp, bl); | |
7c673cae FG |
130 | DECODE_FINISH(bl); |
131 | } | |
132 | ||
133 | void dump(Formatter *f) const { | |
28e407b8 AA |
134 | const char *s{nullptr}; |
135 | switch ((SyncState)state) { | |
136 | case FullSync: | |
137 | s = "full-sync"; | |
138 | break; | |
139 | case IncrementalSync: | |
140 | s = "incremental-sync"; | |
141 | break; | |
142 | default: | |
143 | s = "unknown"; | |
144 | break; | |
145 | } | |
146 | encode_json("status", s, f); | |
7c673cae FG |
147 | encode_json("marker", marker, f); |
148 | encode_json("next_step_marker", next_step_marker, f); | |
149 | encode_json("total_entries", total_entries, f); | |
150 | encode_json("pos", pos, f); | |
151 | encode_json("timestamp", utime_t(timestamp), f); | |
152 | } | |
153 | void decode_json(JSONObj *obj) { | |
28e407b8 AA |
154 | std::string s; |
155 | JSONDecoder::decode_json("status", s, obj); | |
156 | if (s == "full-sync") { | |
157 | state = FullSync; | |
158 | } else if (s == "incremental-sync") { | |
159 | state = IncrementalSync; | |
160 | } | |
7c673cae FG |
161 | JSONDecoder::decode_json("marker", marker, obj); |
162 | JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); | |
163 | JSONDecoder::decode_json("total_entries", total_entries, obj); | |
164 | JSONDecoder::decode_json("pos", pos, obj); | |
165 | utime_t t; | |
166 | JSONDecoder::decode_json("timestamp", t, obj); | |
167 | timestamp = t.to_real_time(); | |
168 | } | |
b32b8144 | 169 | static void generate_test_instances(std::list<rgw_data_sync_marker*>& o); |
7c673cae FG |
170 | }; |
171 | WRITE_CLASS_ENCODER(rgw_data_sync_marker) | |
172 | ||
173 | struct rgw_data_sync_status { | |
174 | rgw_data_sync_info sync_info; | |
175 | map<uint32_t, rgw_data_sync_marker> sync_markers; | |
176 | ||
177 | rgw_data_sync_status() {} | |
178 | ||
179 | void encode(bufferlist& bl) const { | |
180 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 181 | encode(sync_info, bl); |
7c673cae FG |
182 | /* sync markers are encoded separately */ |
183 | ENCODE_FINISH(bl); | |
184 | } | |
185 | ||
11fdf7f2 | 186 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 187 | DECODE_START(1, bl); |
11fdf7f2 | 188 | decode(sync_info, bl); |
7c673cae FG |
189 | /* sync markers are decoded separately */ |
190 | DECODE_FINISH(bl); | |
191 | } | |
192 | ||
193 | void dump(Formatter *f) const { | |
194 | encode_json("info", sync_info, f); | |
195 | encode_json("markers", sync_markers, f); | |
196 | } | |
197 | void decode_json(JSONObj *obj) { | |
198 | JSONDecoder::decode_json("info", sync_info, obj); | |
199 | JSONDecoder::decode_json("markers", sync_markers, obj); | |
200 | } | |
b32b8144 | 201 | static void generate_test_instances(std::list<rgw_data_sync_status*>& o); |
7c673cae FG |
202 | }; |
203 | WRITE_CLASS_ENCODER(rgw_data_sync_status) | |
204 | ||
205 | struct rgw_datalog_entry { | |
206 | string key; | |
207 | ceph::real_time timestamp; | |
208 | ||
209 | void decode_json(JSONObj *obj); | |
210 | }; | |
211 | ||
212 | struct rgw_datalog_shard_data { | |
213 | string marker; | |
214 | bool truncated; | |
215 | vector<rgw_datalog_entry> entries; | |
216 | ||
217 | void decode_json(JSONObj *obj); | |
218 | }; | |
219 | ||
220 | class RGWAsyncRadosProcessor; | |
221 | class RGWDataSyncControlCR; | |
222 | ||
223 | struct rgw_bucket_entry_owner { | |
224 | string id; | |
225 | string display_name; | |
226 | ||
227 | rgw_bucket_entry_owner() {} | |
228 | rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {} | |
229 | ||
230 | void decode_json(JSONObj *obj); | |
231 | }; | |
232 | ||
233 | class RGWSyncErrorLogger; | |
11fdf7f2 | 234 | class RGWRESTConn; |
7c673cae FG |
235 | |
236 | struct RGWDataSyncEnv { | |
11fdf7f2 TL |
237 | const DoutPrefixProvider *dpp{nullptr}; |
238 | CephContext *cct{nullptr}; | |
239 | RGWRados *store{nullptr}; | |
240 | RGWRESTConn *conn{nullptr}; | |
241 | RGWAsyncRadosProcessor *async_rados{nullptr}; | |
242 | RGWHTTPManager *http_manager{nullptr}; | |
243 | RGWSyncErrorLogger *error_logger{nullptr}; | |
244 | RGWSyncTraceManager *sync_tracer{nullptr}; | |
7c673cae | 245 | string source_zone; |
11fdf7f2 | 246 | RGWSyncModuleInstanceRef sync_module{nullptr}; |
81eedcae | 247 | PerfCounters* counters{nullptr}; |
7c673cae | 248 | |
11fdf7f2 | 249 | RGWDataSyncEnv() {} |
7c673cae | 250 | |
11fdf7f2 | 251 | void init(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, |
7c673cae | 252 | RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, |
11fdf7f2 | 253 | RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, |
81eedcae TL |
254 | const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module, |
255 | PerfCounters* _counters) { | |
11fdf7f2 | 256 | dpp = _dpp; |
7c673cae FG |
257 | cct = _cct; |
258 | store = _store; | |
259 | conn = _conn; | |
260 | async_rados = _async_rados; | |
261 | http_manager = _http_manager; | |
262 | error_logger = _error_logger; | |
11fdf7f2 | 263 | sync_tracer = _sync_tracer; |
7c673cae FG |
264 | source_zone = _source_zone; |
265 | sync_module = _sync_module; | |
81eedcae | 266 | counters = _counters; |
7c673cae FG |
267 | } |
268 | ||
269 | string shard_obj_name(int shard_id); | |
270 | string status_oid(); | |
271 | }; | |
272 | ||
273 | class RGWRemoteDataLog : public RGWCoroutinesManager { | |
11fdf7f2 | 274 | const DoutPrefixProvider *dpp; |
7c673cae FG |
275 | RGWRados *store; |
276 | RGWAsyncRadosProcessor *async_rados; | |
277 | RGWHTTPManager http_manager; | |
278 | ||
279 | RGWDataSyncEnv sync_env; | |
280 | ||
281 | RWLock lock; | |
282 | RGWDataSyncControlCR *data_sync_cr; | |
283 | ||
11fdf7f2 TL |
284 | RGWSyncTraceNodeRef tn; |
285 | ||
7c673cae FG |
286 | bool initialized; |
287 | ||
288 | public: | |
11fdf7f2 TL |
289 | RGWRemoteDataLog(const DoutPrefixProvider *dpp, RGWRados *_store, |
290 | RGWAsyncRadosProcessor *async_rados) | |
7c673cae | 291 | : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), |
11fdf7f2 | 292 | dpp(dpp), store(_store), async_rados(async_rados), |
7c673cae FG |
293 | http_manager(store->ctx(), completion_mgr), |
294 | lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), | |
295 | initialized(false) {} | |
11fdf7f2 | 296 | int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, |
81eedcae TL |
297 | RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, |
298 | PerfCounters* _counters); | |
7c673cae FG |
299 | void finish(); |
300 | ||
301 | int read_log_info(rgw_datalog_info *log_info); | |
302 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info); | |
303 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result); | |
7c673cae | 304 | int read_sync_status(rgw_data_sync_status *sync_status); |
28e407b8 AA |
305 | int read_recovering_shards(const int num_shards, set<int>& recovering_shards); |
306 | int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries); | |
7c673cae FG |
307 | int init_sync_status(int num_shards); |
308 | int run_sync(int num_shards); | |
309 | ||
310 | void wakeup(int shard_id, set<string>& keys); | |
311 | }; | |
312 | ||
11fdf7f2 | 313 | class RGWDataSyncStatusManager : public DoutPrefixProvider { |
7c673cae FG |
314 | RGWRados *store; |
315 | rgw_rados_ref ref; | |
316 | ||
317 | string source_zone; | |
318 | RGWRESTConn *conn; | |
319 | RGWSyncErrorLogger *error_logger; | |
320 | RGWSyncModuleInstanceRef sync_module; | |
81eedcae | 321 | PerfCounters* counters; |
7c673cae FG |
322 | |
323 | RGWRemoteDataLog source_log; | |
324 | ||
325 | string source_status_oid; | |
326 | string source_shard_status_oid_prefix; | |
327 | ||
328 | map<int, rgw_raw_obj> shard_objs; | |
329 | ||
330 | int num_shards; | |
331 | ||
332 | public: | |
333 | RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, | |
81eedcae | 334 | const string& _source_zone, PerfCounters* counters) |
7c673cae | 335 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 336 | sync_module(nullptr), counters(counters), |
11fdf7f2 | 337 | source_log(this, store, async_rados), num_shards(0) {} |
94b18763 | 338 | RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, |
81eedcae TL |
339 | const string& _source_zone, PerfCounters* counters, |
340 | const RGWSyncModuleInstanceRef& _sync_module) | |
94b18763 | 341 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 342 | sync_module(_sync_module), counters(counters), |
11fdf7f2 | 343 | source_log(this, store, async_rados), num_shards(0) {} |
7c673cae FG |
344 | ~RGWDataSyncStatusManager() { |
345 | finalize(); | |
346 | } | |
347 | int init(); | |
348 | void finalize(); | |
349 | ||
350 | static string shard_obj_name(const string& source_zone, int shard_id); | |
351 | static string sync_status_oid(const string& source_zone); | |
352 | ||
353 | int read_sync_status(rgw_data_sync_status *sync_status) { | |
354 | return source_log.read_sync_status(sync_status); | |
355 | } | |
28e407b8 AA |
356 | |
357 | int read_recovering_shards(const int num_shards, set<int>& recovering_shards) { | |
358 | return source_log.read_recovering_shards(num_shards, recovering_shards); | |
359 | } | |
360 | ||
361 | int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) { | |
362 | return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries); | |
363 | } | |
7c673cae FG |
364 | int init_sync_status() { return source_log.init_sync_status(num_shards); } |
365 | ||
366 | int read_log_info(rgw_datalog_info *log_info) { | |
367 | return source_log.read_log_info(log_info); | |
368 | } | |
369 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) { | |
370 | return source_log.read_source_log_shards_info(shards_info); | |
371 | } | |
372 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) { | |
373 | return source_log.read_source_log_shards_next(shard_markers, result); | |
374 | } | |
375 | ||
376 | int run() { return source_log.run_sync(num_shards); } | |
377 | ||
378 | void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); } | |
379 | void stop() { | |
380 | source_log.finish(); | |
381 | } | |
11fdf7f2 TL |
382 | |
383 | // implements DoutPrefixProvider | |
384 | CephContext *get_cct() const override { return store->ctx(); } | |
385 | unsigned get_subsys() const override; | |
386 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae FG |
387 | }; |
388 | ||
389 | class RGWBucketSyncStatusManager; | |
390 | class RGWBucketSyncCR; | |
391 | ||
392 | struct rgw_bucket_shard_full_sync_marker { | |
393 | rgw_obj_key position; | |
394 | uint64_t count; | |
395 | ||
396 | rgw_bucket_shard_full_sync_marker() : count(0) {} | |
397 | ||
398 | void encode_attr(map<string, bufferlist>& attrs); | |
399 | ||
400 | void encode(bufferlist& bl) const { | |
401 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
402 | encode(position, bl); |
403 | encode(count, bl); | |
7c673cae FG |
404 | ENCODE_FINISH(bl); |
405 | } | |
406 | ||
11fdf7f2 | 407 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 408 | DECODE_START(1, bl); |
11fdf7f2 TL |
409 | decode(position, bl); |
410 | decode(count, bl); | |
7c673cae FG |
411 | DECODE_FINISH(bl); |
412 | } | |
413 | ||
b32b8144 FG |
414 | void dump(Formatter *f) const; |
415 | void decode_json(JSONObj *obj); | |
7c673cae FG |
416 | }; |
417 | WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker) | |
418 | ||
419 | struct rgw_bucket_shard_inc_sync_marker { | |
420 | string position; | |
421 | ||
422 | rgw_bucket_shard_inc_sync_marker() {} | |
423 | ||
424 | void encode_attr(map<string, bufferlist>& attrs); | |
425 | ||
426 | void encode(bufferlist& bl) const { | |
427 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 428 | encode(position, bl); |
7c673cae FG |
429 | ENCODE_FINISH(bl); |
430 | } | |
431 | ||
11fdf7f2 | 432 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 433 | DECODE_START(1, bl); |
11fdf7f2 | 434 | decode(position, bl); |
7c673cae FG |
435 | DECODE_FINISH(bl); |
436 | } | |
437 | ||
b32b8144 FG |
438 | void dump(Formatter *f) const; |
439 | void decode_json(JSONObj *obj); | |
7c673cae FG |
440 | |
441 | bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const { | |
442 | return (position < m.position); | |
443 | } | |
444 | }; | |
445 | WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker) | |
446 | ||
447 | struct rgw_bucket_shard_sync_info { | |
448 | enum SyncState { | |
449 | StateInit = 0, | |
450 | StateFullSync = 1, | |
451 | StateIncrementalSync = 2, | |
452 | }; | |
453 | ||
454 | uint16_t state; | |
455 | rgw_bucket_shard_full_sync_marker full_marker; | |
456 | rgw_bucket_shard_inc_sync_marker inc_marker; | |
457 | ||
458 | void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs); | |
459 | void encode_all_attrs(map<string, bufferlist>& attrs); | |
460 | void encode_state_attr(map<string, bufferlist>& attrs); | |
461 | ||
462 | void encode(bufferlist& bl) const { | |
463 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
464 | encode(state, bl); |
465 | encode(full_marker, bl); | |
466 | encode(inc_marker, bl); | |
7c673cae FG |
467 | ENCODE_FINISH(bl); |
468 | } | |
469 | ||
11fdf7f2 | 470 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 471 | DECODE_START(1, bl); |
11fdf7f2 TL |
472 | decode(state, bl); |
473 | decode(full_marker, bl); | |
474 | decode(inc_marker, bl); | |
7c673cae FG |
475 | DECODE_FINISH(bl); |
476 | } | |
477 | ||
b32b8144 FG |
478 | void dump(Formatter *f) const; |
479 | void decode_json(JSONObj *obj); | |
7c673cae FG |
480 | |
481 | rgw_bucket_shard_sync_info() : state((int)StateInit) {} | |
482 | ||
483 | }; | |
484 | WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) | |
485 | ||
28e407b8 AA |
486 | struct rgw_bucket_index_marker_info { |
487 | string bucket_ver; | |
488 | string master_ver; | |
489 | string max_marker; | |
490 | bool syncstopped{false}; | |
491 | ||
492 | void decode_json(JSONObj *obj) { | |
493 | JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); | |
494 | JSONDecoder::decode_json("master_ver", master_ver, obj); | |
495 | JSONDecoder::decode_json("max_marker", max_marker, obj); | |
496 | JSONDecoder::decode_json("syncstopped", syncstopped, obj); | |
497 | } | |
498 | }; | |
499 | ||
7c673cae FG |
500 | |
501 | class RGWRemoteBucketLog : public RGWCoroutinesManager { | |
11fdf7f2 | 502 | const DoutPrefixProvider *dpp; |
7c673cae FG |
503 | RGWRados *store; |
504 | RGWRESTConn *conn{nullptr}; | |
505 | string source_zone; | |
506 | rgw_bucket_shard bs; | |
507 | ||
508 | RGWBucketSyncStatusManager *status_manager; | |
509 | RGWAsyncRadosProcessor *async_rados; | |
510 | RGWHTTPManager *http_manager; | |
511 | ||
512 | RGWDataSyncEnv sync_env; | |
513 | rgw_bucket_shard_sync_info init_status; | |
514 | ||
515 | RGWBucketSyncCR *sync_cr{nullptr}; | |
516 | ||
517 | public: | |
11fdf7f2 TL |
518 | RGWRemoteBucketLog(const DoutPrefixProvider *_dpp, RGWRados *_store, |
519 | RGWBucketSyncStatusManager *_sm, | |
520 | RGWAsyncRadosProcessor *_async_rados, | |
521 | RGWHTTPManager *_http_manager) | |
522 | : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), | |
523 | dpp(_dpp), store(_store), status_manager(_sm), | |
524 | async_rados(_async_rados), http_manager(_http_manager) | |
525 | {} | |
7c673cae FG |
526 | |
527 | int init(const string& _source_zone, RGWRESTConn *_conn, | |
528 | const rgw_bucket& bucket, int shard_id, | |
529 | RGWSyncErrorLogger *_error_logger, | |
11fdf7f2 | 530 | RGWSyncTraceManager *_sync_tracer, |
7c673cae FG |
531 | RGWSyncModuleInstanceRef& _sync_module); |
532 | void finish(); | |
533 | ||
534 | RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status); | |
535 | RGWCoroutine *init_sync_status_cr(); | |
536 | RGWCoroutine *run_sync_cr(); | |
537 | ||
538 | void wakeup(); | |
539 | }; | |
540 | ||
11fdf7f2 | 541 | class RGWBucketSyncStatusManager : public DoutPrefixProvider { |
7c673cae FG |
542 | RGWRados *store; |
543 | ||
544 | RGWCoroutinesManager cr_mgr; | |
545 | ||
546 | RGWHTTPManager http_manager; | |
547 | ||
548 | string source_zone; | |
549 | RGWRESTConn *conn; | |
550 | RGWSyncErrorLogger *error_logger; | |
551 | RGWSyncModuleInstanceRef sync_module; | |
552 | ||
553 | rgw_bucket bucket; | |
554 | ||
555 | map<int, RGWRemoteBucketLog *> source_logs; | |
556 | ||
557 | string source_status_oid; | |
558 | string source_shard_status_oid_prefix; | |
559 | ||
560 | map<int, rgw_bucket_shard_sync_info> sync_status; | |
561 | rgw_raw_obj status_obj; | |
562 | ||
563 | int num_shards; | |
564 | ||
565 | public: | |
566 | RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone, | |
567 | const rgw_bucket& bucket) : store(_store), | |
568 | cr_mgr(_store->ctx(), _store->get_cr_registry()), | |
569 | http_manager(store->ctx(), cr_mgr.get_completion_mgr()), | |
570 | source_zone(_source_zone), | |
571 | conn(NULL), error_logger(NULL), | |
572 | bucket(bucket), | |
573 | num_shards(0) {} | |
574 | ~RGWBucketSyncStatusManager(); | |
575 | ||
576 | int init(); | |
577 | ||
578 | map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; } | |
579 | int init_sync_status(); | |
580 | ||
581 | static string status_oid(const string& source_zone, const rgw_bucket_shard& bs); | |
11fdf7f2 TL |
582 | static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */ |
583 | ||
584 | // implements DoutPrefixProvider | |
585 | CephContext *get_cct() const override { return store->ctx(); } | |
586 | unsigned get_subsys() const override; | |
587 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae FG |
588 | |
589 | int read_sync_status(); | |
590 | int run(); | |
591 | }; | |
592 | ||
b32b8144 | 593 | /// read the sync status of all bucket shards from the given source zone |
11fdf7f2 | 594 | int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone, |
28e407b8 | 595 | const RGWBucketInfo& bucket_info, |
b32b8144 FG |
596 | std::vector<rgw_bucket_shard_sync_info> *status); |
597 | ||
7c673cae FG |
598 | class RGWDefaultSyncModule : public RGWSyncModule { |
599 | public: | |
600 | RGWDefaultSyncModule() {} | |
11fdf7f2 | 601 | bool supports_writes() override { return true; } |
7c673cae | 602 | bool supports_data_export() override { return true; } |
11fdf7f2 TL |
603 | int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; |
604 | }; | |
605 | ||
606 | class RGWArchiveSyncModule : public RGWDefaultSyncModule { | |
607 | public: | |
608 | RGWArchiveSyncModule() {} | |
609 | bool supports_writes() override { return true; } | |
610 | bool supports_data_export() override { return false; } | |
611 | int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; | |
7c673cae FG |
612 | }; |
613 | ||
614 | // DataLogTrimCR factory function | |
615 | extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store, | |
616 | RGWHTTPManager *http, | |
617 | int num_shards, utime_t interval); | |
618 | ||
a8e16298 TL |
619 | // factory function for datalog trim via radosgw-admin |
620 | RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store, | |
621 | RGWHTTPManager *http, | |
622 | int num_shards, | |
623 | std::vector<std::string>& markers); | |
624 | ||
7c673cae | 625 | #endif |