]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
7c673cae FG |
4 | #ifndef CEPH_RGW_DATA_SYNC_H |
5 | #define CEPH_RGW_DATA_SYNC_H | |
6 | ||
11fdf7f2 TL |
7 | #include "include/encoding.h" |
8 | ||
9 | #include "common/RWLock.h" | |
10 | #include "common/ceph_json.h" | |
11 | ||
7c673cae FG |
12 | #include "rgw_coroutine.h" |
13 | #include "rgw_http_client.h" | |
9f95a23c | 14 | #include "rgw_sal.h" |
7c673cae FG |
15 | |
16 | #include "rgw_sync_module.h" | |
11fdf7f2 | 17 | #include "rgw_sync_trace.h" |
9f95a23c TL |
18 | #include "rgw_sync_policy.h" |
19 | ||
20 | #include "rgw_bucket_sync.h" | |
21 | ||
22 | class JSONObj; | |
23 | struct rgw_sync_bucket_pipe; | |
24 | ||
25 | struct rgw_bucket_sync_pair_info { | |
26 | RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */ | |
27 | rgw_bucket_shard source_bs; | |
28 | rgw_bucket_shard dest_bs; | |
29 | }; | |
30 | ||
31 | inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) { | |
32 | if (p.source_bs.bucket == p.dest_bs.bucket) { | |
33 | return out << p.source_bs; | |
34 | } | |
35 | ||
36 | out << p.source_bs; | |
37 | ||
38 | out << "->" << p.dest_bs.bucket; | |
39 | ||
40 | return out; | |
41 | } | |
42 | ||
43 | struct rgw_bucket_sync_pipe { | |
44 | rgw_bucket_sync_pair_info info; | |
45 | RGWBucketInfo source_bucket_info; | |
46 | map<string, bufferlist> source_bucket_attrs; | |
47 | RGWBucketInfo dest_bucket_info; | |
48 | map<string, bufferlist> dest_bucket_attrs; | |
49 | ||
50 | RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() { | |
51 | return info.handler.rules; | |
52 | } | |
53 | }; | |
54 | ||
55 | inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) { | |
56 | return out << p.info; | |
57 | } | |
7c673cae | 58 | |
7c673cae FG |
59 | struct rgw_datalog_info { |
60 | uint32_t num_shards; | |
61 | ||
62 | rgw_datalog_info() : num_shards(0) {} | |
63 | ||
64 | void decode_json(JSONObj *obj); | |
65 | }; | |
66 | ||
67 | struct rgw_data_sync_info { | |
68 | enum SyncState { | |
69 | StateInit = 0, | |
70 | StateBuildingFullSyncMaps = 1, | |
71 | StateSync = 2, | |
72 | }; | |
73 | ||
74 | uint16_t state; | |
75 | uint32_t num_shards; | |
76 | ||
31f18b77 FG |
77 | uint64_t instance_id{0}; |
78 | ||
7c673cae | 79 | void encode(bufferlist& bl) const { |
31f18b77 | 80 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
81 | encode(state, bl); |
82 | encode(num_shards, bl); | |
83 | encode(instance_id, bl); | |
7c673cae FG |
84 | ENCODE_FINISH(bl); |
85 | } | |
86 | ||
11fdf7f2 | 87 | void decode(bufferlist::const_iterator& bl) { |
31f18b77 | 88 | DECODE_START(2, bl); |
11fdf7f2 TL |
89 | decode(state, bl); |
90 | decode(num_shards, bl); | |
31f18b77 | 91 | if (struct_v >= 2) { |
11fdf7f2 | 92 | decode(instance_id, bl); |
31f18b77 | 93 | } |
7c673cae FG |
94 | DECODE_FINISH(bl); |
95 | } | |
96 | ||
97 | void dump(Formatter *f) const { | |
98 | string s; | |
99 | switch ((SyncState)state) { | |
100 | case StateInit: | |
101 | s = "init"; | |
102 | break; | |
103 | case StateBuildingFullSyncMaps: | |
104 | s = "building-full-sync-maps"; | |
105 | break; | |
106 | case StateSync: | |
107 | s = "sync"; | |
108 | break; | |
109 | default: | |
110 | s = "unknown"; | |
111 | break; | |
112 | } | |
113 | encode_json("status", s, f); | |
114 | encode_json("num_shards", num_shards, f); | |
31f18b77 | 115 | encode_json("instance_id", instance_id, f); |
7c673cae FG |
116 | } |
117 | void decode_json(JSONObj *obj) { | |
118 | std::string s; | |
119 | JSONDecoder::decode_json("status", s, obj); | |
120 | if (s == "building-full-sync-maps") { | |
121 | state = StateBuildingFullSyncMaps; | |
122 | } else if (s == "sync") { | |
123 | state = StateSync; | |
124 | } else { | |
125 | state = StateInit; | |
126 | } | |
127 | JSONDecoder::decode_json("num_shards", num_shards, obj); | |
a8e16298 | 128 | JSONDecoder::decode_json("instance_id", instance_id, obj); |
7c673cae | 129 | } |
b32b8144 | 130 | static void generate_test_instances(std::list<rgw_data_sync_info*>& o); |
7c673cae FG |
131 | |
132 | rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} | |
133 | }; | |
134 | WRITE_CLASS_ENCODER(rgw_data_sync_info) | |
135 | ||
136 | struct rgw_data_sync_marker { | |
137 | enum SyncState { | |
138 | FullSync = 0, | |
139 | IncrementalSync = 1, | |
140 | }; | |
141 | uint16_t state; | |
142 | string marker; | |
143 | string next_step_marker; | |
144 | uint64_t total_entries; | |
145 | uint64_t pos; | |
146 | real_time timestamp; | |
147 | ||
148 | rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {} | |
149 | ||
150 | void encode(bufferlist& bl) const { | |
151 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
152 | encode(state, bl); |
153 | encode(marker, bl); | |
154 | encode(next_step_marker, bl); | |
155 | encode(total_entries, bl); | |
156 | encode(pos, bl); | |
157 | encode(timestamp, bl); | |
7c673cae FG |
158 | ENCODE_FINISH(bl); |
159 | } | |
160 | ||
11fdf7f2 | 161 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 162 | DECODE_START(1, bl); |
11fdf7f2 TL |
163 | decode(state, bl); |
164 | decode(marker, bl); | |
165 | decode(next_step_marker, bl); | |
166 | decode(total_entries, bl); | |
167 | decode(pos, bl); | |
168 | decode(timestamp, bl); | |
7c673cae FG |
169 | DECODE_FINISH(bl); |
170 | } | |
171 | ||
172 | void dump(Formatter *f) const { | |
28e407b8 AA |
173 | const char *s{nullptr}; |
174 | switch ((SyncState)state) { | |
175 | case FullSync: | |
176 | s = "full-sync"; | |
177 | break; | |
178 | case IncrementalSync: | |
179 | s = "incremental-sync"; | |
180 | break; | |
181 | default: | |
182 | s = "unknown"; | |
183 | break; | |
184 | } | |
185 | encode_json("status", s, f); | |
7c673cae FG |
186 | encode_json("marker", marker, f); |
187 | encode_json("next_step_marker", next_step_marker, f); | |
188 | encode_json("total_entries", total_entries, f); | |
189 | encode_json("pos", pos, f); | |
190 | encode_json("timestamp", utime_t(timestamp), f); | |
191 | } | |
192 | void decode_json(JSONObj *obj) { | |
28e407b8 AA |
193 | std::string s; |
194 | JSONDecoder::decode_json("status", s, obj); | |
195 | if (s == "full-sync") { | |
196 | state = FullSync; | |
197 | } else if (s == "incremental-sync") { | |
198 | state = IncrementalSync; | |
199 | } | |
7c673cae FG |
200 | JSONDecoder::decode_json("marker", marker, obj); |
201 | JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); | |
202 | JSONDecoder::decode_json("total_entries", total_entries, obj); | |
203 | JSONDecoder::decode_json("pos", pos, obj); | |
204 | utime_t t; | |
205 | JSONDecoder::decode_json("timestamp", t, obj); | |
206 | timestamp = t.to_real_time(); | |
207 | } | |
b32b8144 | 208 | static void generate_test_instances(std::list<rgw_data_sync_marker*>& o); |
7c673cae FG |
209 | }; |
210 | WRITE_CLASS_ENCODER(rgw_data_sync_marker) | |
211 | ||
212 | struct rgw_data_sync_status { | |
213 | rgw_data_sync_info sync_info; | |
214 | map<uint32_t, rgw_data_sync_marker> sync_markers; | |
215 | ||
216 | rgw_data_sync_status() {} | |
217 | ||
218 | void encode(bufferlist& bl) const { | |
219 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 220 | encode(sync_info, bl); |
7c673cae FG |
221 | /* sync markers are encoded separately */ |
222 | ENCODE_FINISH(bl); | |
223 | } | |
224 | ||
11fdf7f2 | 225 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 226 | DECODE_START(1, bl); |
11fdf7f2 | 227 | decode(sync_info, bl); |
7c673cae FG |
228 | /* sync markers are decoded separately */ |
229 | DECODE_FINISH(bl); | |
230 | } | |
231 | ||
232 | void dump(Formatter *f) const { | |
233 | encode_json("info", sync_info, f); | |
234 | encode_json("markers", sync_markers, f); | |
235 | } | |
236 | void decode_json(JSONObj *obj) { | |
237 | JSONDecoder::decode_json("info", sync_info, obj); | |
238 | JSONDecoder::decode_json("markers", sync_markers, obj); | |
239 | } | |
b32b8144 | 240 | static void generate_test_instances(std::list<rgw_data_sync_status*>& o); |
7c673cae FG |
241 | }; |
242 | WRITE_CLASS_ENCODER(rgw_data_sync_status) | |
243 | ||
244 | struct rgw_datalog_entry { | |
245 | string key; | |
246 | ceph::real_time timestamp; | |
247 | ||
248 | void decode_json(JSONObj *obj); | |
249 | }; | |
250 | ||
251 | struct rgw_datalog_shard_data { | |
252 | string marker; | |
253 | bool truncated; | |
254 | vector<rgw_datalog_entry> entries; | |
255 | ||
256 | void decode_json(JSONObj *obj); | |
257 | }; | |
258 | ||
259 | class RGWAsyncRadosProcessor; | |
260 | class RGWDataSyncControlCR; | |
261 | ||
262 | struct rgw_bucket_entry_owner { | |
263 | string id; | |
264 | string display_name; | |
265 | ||
266 | rgw_bucket_entry_owner() {} | |
267 | rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {} | |
268 | ||
269 | void decode_json(JSONObj *obj); | |
270 | }; | |
271 | ||
272 | class RGWSyncErrorLogger; | |
11fdf7f2 | 273 | class RGWRESTConn; |
9f95a23c | 274 | class RGWServices; |
7c673cae FG |
275 | |
276 | struct RGWDataSyncEnv { | |
11fdf7f2 TL |
277 | const DoutPrefixProvider *dpp{nullptr}; |
278 | CephContext *cct{nullptr}; | |
9f95a23c TL |
279 | rgw::sal::RGWRadosStore *store{nullptr}; |
280 | RGWServices *svc{nullptr}; | |
11fdf7f2 TL |
281 | RGWAsyncRadosProcessor *async_rados{nullptr}; |
282 | RGWHTTPManager *http_manager{nullptr}; | |
283 | RGWSyncErrorLogger *error_logger{nullptr}; | |
284 | RGWSyncTraceManager *sync_tracer{nullptr}; | |
11fdf7f2 | 285 | RGWSyncModuleInstanceRef sync_module{nullptr}; |
81eedcae | 286 | PerfCounters* counters{nullptr}; |
7c673cae | 287 | |
11fdf7f2 | 288 | RGWDataSyncEnv() {} |
7c673cae | 289 | |
9f95a23c | 290 | void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWServices *_svc, |
7c673cae | 291 | RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, |
11fdf7f2 | 292 | RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, |
9f95a23c | 293 | RGWSyncModuleInstanceRef& _sync_module, |
81eedcae | 294 | PerfCounters* _counters) { |
9f95a23c | 295 | dpp = _dpp; |
7c673cae FG |
296 | cct = _cct; |
297 | store = _store; | |
9f95a23c | 298 | svc = _svc; |
7c673cae FG |
299 | async_rados = _async_rados; |
300 | http_manager = _http_manager; | |
301 | error_logger = _error_logger; | |
11fdf7f2 | 302 | sync_tracer = _sync_tracer; |
7c673cae | 303 | sync_module = _sync_module; |
81eedcae | 304 | counters = _counters; |
7c673cae FG |
305 | } |
306 | ||
307 | string shard_obj_name(int shard_id); | |
308 | string status_oid(); | |
309 | }; | |
310 | ||
9f95a23c TL |
311 | struct RGWDataSyncCtx { |
312 | CephContext *cct{nullptr}; | |
313 | RGWDataSyncEnv *env{nullptr}; | |
314 | ||
315 | RGWRESTConn *conn{nullptr}; | |
316 | rgw_zone_id source_zone; | |
317 | ||
318 | void init(RGWDataSyncEnv *_env, | |
319 | RGWRESTConn *_conn, | |
320 | const rgw_zone_id& _source_zone) { | |
321 | cct = _env->cct; | |
322 | env = _env; | |
323 | conn = _conn; | |
324 | source_zone = _source_zone; | |
325 | } | |
326 | }; | |
327 | ||
328 | class RGWRados; | |
329 | class RGWDataChangesLogInfo; | |
330 | ||
7c673cae | 331 | class RGWRemoteDataLog : public RGWCoroutinesManager { |
11fdf7f2 | 332 | const DoutPrefixProvider *dpp; |
9f95a23c TL |
333 | rgw::sal::RGWRadosStore *store; |
334 | CephContext *cct; | |
335 | RGWCoroutinesManagerRegistry *cr_registry; | |
7c673cae FG |
336 | RGWAsyncRadosProcessor *async_rados; |
337 | RGWHTTPManager http_manager; | |
338 | ||
339 | RGWDataSyncEnv sync_env; | |
9f95a23c | 340 | RGWDataSyncCtx sc; |
7c673cae | 341 | |
9f95a23c | 342 | ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock"); |
7c673cae FG |
343 | RGWDataSyncControlCR *data_sync_cr; |
344 | ||
11fdf7f2 TL |
345 | RGWSyncTraceNodeRef tn; |
346 | ||
7c673cae FG |
347 | bool initialized; |
348 | ||
349 | public: | |
9f95a23c TL |
350 | RGWRemoteDataLog(const DoutPrefixProvider *dpp, |
351 | rgw::sal::RGWRadosStore *_store, | |
352 | RGWAsyncRadosProcessor *async_rados); | |
353 | int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, | |
81eedcae TL |
354 | RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, |
355 | PerfCounters* _counters); | |
7c673cae FG |
356 | void finish(); |
357 | ||
358 | int read_log_info(rgw_datalog_info *log_info); | |
359 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info); | |
360 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result); | |
7c673cae | 361 | int read_sync_status(rgw_data_sync_status *sync_status); |
28e407b8 AA |
362 | int read_recovering_shards(const int num_shards, set<int>& recovering_shards); |
363 | int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries); | |
7c673cae FG |
364 | int init_sync_status(int num_shards); |
365 | int run_sync(int num_shards); | |
366 | ||
367 | void wakeup(int shard_id, set<string>& keys); | |
368 | }; | |
369 | ||
11fdf7f2 | 370 | class RGWDataSyncStatusManager : public DoutPrefixProvider { |
9f95a23c | 371 | rgw::sal::RGWRadosStore *store; |
7c673cae | 372 | |
9f95a23c | 373 | rgw_zone_id source_zone; |
7c673cae FG |
374 | RGWRESTConn *conn; |
375 | RGWSyncErrorLogger *error_logger; | |
376 | RGWSyncModuleInstanceRef sync_module; | |
81eedcae | 377 | PerfCounters* counters; |
7c673cae FG |
378 | |
379 | RGWRemoteDataLog source_log; | |
380 | ||
381 | string source_status_oid; | |
382 | string source_shard_status_oid_prefix; | |
383 | ||
384 | map<int, rgw_raw_obj> shard_objs; | |
385 | ||
386 | int num_shards; | |
387 | ||
388 | public: | |
9f95a23c TL |
389 | RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados, |
390 | const rgw_zone_id& _source_zone, PerfCounters* counters) | |
7c673cae | 391 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 392 | sync_module(nullptr), counters(counters), |
11fdf7f2 | 393 | source_log(this, store, async_rados), num_shards(0) {} |
9f95a23c TL |
394 | RGWDataSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados, |
395 | const rgw_zone_id& _source_zone, PerfCounters* counters, | |
81eedcae | 396 | const RGWSyncModuleInstanceRef& _sync_module) |
94b18763 | 397 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 398 | sync_module(_sync_module), counters(counters), |
11fdf7f2 | 399 | source_log(this, store, async_rados), num_shards(0) {} |
7c673cae FG |
400 | ~RGWDataSyncStatusManager() { |
401 | finalize(); | |
402 | } | |
403 | int init(); | |
404 | void finalize(); | |
405 | ||
9f95a23c TL |
406 | static string shard_obj_name(const rgw_zone_id& source_zone, int shard_id); |
407 | static string sync_status_oid(const rgw_zone_id& source_zone); | |
7c673cae FG |
408 | |
409 | int read_sync_status(rgw_data_sync_status *sync_status) { | |
410 | return source_log.read_sync_status(sync_status); | |
411 | } | |
28e407b8 AA |
412 | |
413 | int read_recovering_shards(const int num_shards, set<int>& recovering_shards) { | |
414 | return source_log.read_recovering_shards(num_shards, recovering_shards); | |
415 | } | |
416 | ||
417 | int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) { | |
418 | return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries); | |
419 | } | |
7c673cae FG |
420 | int init_sync_status() { return source_log.init_sync_status(num_shards); } |
421 | ||
422 | int read_log_info(rgw_datalog_info *log_info) { | |
423 | return source_log.read_log_info(log_info); | |
424 | } | |
425 | int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) { | |
426 | return source_log.read_source_log_shards_info(shards_info); | |
427 | } | |
428 | int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) { | |
429 | return source_log.read_source_log_shards_next(shard_markers, result); | |
430 | } | |
431 | ||
432 | int run() { return source_log.run_sync(num_shards); } | |
433 | ||
434 | void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); } | |
435 | void stop() { | |
436 | source_log.finish(); | |
437 | } | |
11fdf7f2 TL |
438 | |
439 | // implements DoutPrefixProvider | |
9f95a23c | 440 | CephContext *get_cct() const override; |
11fdf7f2 TL |
441 | unsigned get_subsys() const override; |
442 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae FG |
443 | }; |
444 | ||
9f95a23c | 445 | class RGWBucketPipeSyncStatusManager; |
7c673cae FG |
446 | class RGWBucketSyncCR; |
447 | ||
448 | struct rgw_bucket_shard_full_sync_marker { | |
449 | rgw_obj_key position; | |
450 | uint64_t count; | |
451 | ||
452 | rgw_bucket_shard_full_sync_marker() : count(0) {} | |
453 | ||
454 | void encode_attr(map<string, bufferlist>& attrs); | |
455 | ||
456 | void encode(bufferlist& bl) const { | |
457 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
458 | encode(position, bl); |
459 | encode(count, bl); | |
7c673cae FG |
460 | ENCODE_FINISH(bl); |
461 | } | |
462 | ||
11fdf7f2 | 463 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 464 | DECODE_START(1, bl); |
11fdf7f2 TL |
465 | decode(position, bl); |
466 | decode(count, bl); | |
7c673cae FG |
467 | DECODE_FINISH(bl); |
468 | } | |
469 | ||
b32b8144 FG |
470 | void dump(Formatter *f) const; |
471 | void decode_json(JSONObj *obj); | |
7c673cae FG |
472 | }; |
473 | WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker) | |
474 | ||
475 | struct rgw_bucket_shard_inc_sync_marker { | |
476 | string position; | |
477 | ||
478 | rgw_bucket_shard_inc_sync_marker() {} | |
479 | ||
480 | void encode_attr(map<string, bufferlist>& attrs); | |
481 | ||
482 | void encode(bufferlist& bl) const { | |
483 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 484 | encode(position, bl); |
7c673cae FG |
485 | ENCODE_FINISH(bl); |
486 | } | |
487 | ||
11fdf7f2 | 488 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 489 | DECODE_START(1, bl); |
11fdf7f2 | 490 | decode(position, bl); |
7c673cae FG |
491 | DECODE_FINISH(bl); |
492 | } | |
493 | ||
b32b8144 FG |
494 | void dump(Formatter *f) const; |
495 | void decode_json(JSONObj *obj); | |
7c673cae FG |
496 | |
497 | bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const { | |
498 | return (position < m.position); | |
499 | } | |
500 | }; | |
501 | WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker) | |
502 | ||
503 | struct rgw_bucket_shard_sync_info { | |
504 | enum SyncState { | |
505 | StateInit = 0, | |
506 | StateFullSync = 1, | |
507 | StateIncrementalSync = 2, | |
9f95a23c | 508 | StateStopped = 3, |
7c673cae FG |
509 | }; |
510 | ||
511 | uint16_t state; | |
512 | rgw_bucket_shard_full_sync_marker full_marker; | |
513 | rgw_bucket_shard_inc_sync_marker inc_marker; | |
514 | ||
515 | void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs); | |
516 | void encode_all_attrs(map<string, bufferlist>& attrs); | |
517 | void encode_state_attr(map<string, bufferlist>& attrs); | |
518 | ||
519 | void encode(bufferlist& bl) const { | |
520 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
521 | encode(state, bl); |
522 | encode(full_marker, bl); | |
523 | encode(inc_marker, bl); | |
7c673cae FG |
524 | ENCODE_FINISH(bl); |
525 | } | |
526 | ||
11fdf7f2 | 527 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 528 | DECODE_START(1, bl); |
11fdf7f2 TL |
529 | decode(state, bl); |
530 | decode(full_marker, bl); | |
531 | decode(inc_marker, bl); | |
7c673cae FG |
532 | DECODE_FINISH(bl); |
533 | } | |
534 | ||
b32b8144 FG |
535 | void dump(Formatter *f) const; |
536 | void decode_json(JSONObj *obj); | |
7c673cae FG |
537 | |
538 | rgw_bucket_shard_sync_info() : state((int)StateInit) {} | |
539 | ||
540 | }; | |
541 | WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) | |
542 | ||
28e407b8 AA |
543 | struct rgw_bucket_index_marker_info { |
544 | string bucket_ver; | |
545 | string master_ver; | |
546 | string max_marker; | |
547 | bool syncstopped{false}; | |
548 | ||
549 | void decode_json(JSONObj *obj) { | |
550 | JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); | |
551 | JSONDecoder::decode_json("master_ver", master_ver, obj); | |
552 | JSONDecoder::decode_json("max_marker", max_marker, obj); | |
553 | JSONDecoder::decode_json("syncstopped", syncstopped, obj); | |
554 | } | |
555 | }; | |
556 | ||
7c673cae | 557 | |
9f95a23c | 558 | class RGWRemoteBucketManager { |
11fdf7f2 | 559 | const DoutPrefixProvider *dpp; |
9f95a23c TL |
560 | |
561 | RGWDataSyncEnv *sync_env; | |
562 | ||
7c673cae | 563 | RGWRESTConn *conn{nullptr}; |
9f95a23c | 564 | rgw_zone_id source_zone; |
7c673cae | 565 | |
9f95a23c | 566 | vector<rgw_bucket_sync_pair_info> sync_pairs; |
7c673cae | 567 | |
9f95a23c | 568 | RGWDataSyncCtx sc; |
7c673cae FG |
569 | rgw_bucket_shard_sync_info init_status; |
570 | ||
571 | RGWBucketSyncCR *sync_cr{nullptr}; | |
572 | ||
573 | public: | |
9f95a23c TL |
574 | RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, |
575 | RGWDataSyncEnv *_sync_env, | |
576 | const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
577 | const RGWBucketInfo& source_bucket_info, | |
578 | const rgw_bucket& dest_bucket); | |
579 | ||
580 | void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
581 | const rgw_bucket& source_bucket, int shard_id, | |
582 | const rgw_bucket& dest_bucket); | |
583 | ||
584 | RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); | |
585 | RGWCoroutine *init_sync_status_cr(int num); | |
586 | RGWCoroutine *run_sync_cr(int num); | |
587 | ||
588 | int num_pipes() { | |
589 | return sync_pairs.size(); | |
590 | } | |
7c673cae FG |
591 | |
592 | void wakeup(); | |
593 | }; | |
594 | ||
9f95a23c TL |
595 | class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { |
596 | rgw::sal::RGWRadosStore *store; | |
597 | ||
598 | RGWDataSyncEnv sync_env; | |
7c673cae FG |
599 | |
600 | RGWCoroutinesManager cr_mgr; | |
601 | ||
602 | RGWHTTPManager http_manager; | |
603 | ||
9f95a23c TL |
604 | std::optional<rgw_zone_id> source_zone; |
605 | std::optional<rgw_bucket> source_bucket; | |
606 | ||
7c673cae FG |
607 | RGWRESTConn *conn; |
608 | RGWSyncErrorLogger *error_logger; | |
609 | RGWSyncModuleInstanceRef sync_module; | |
610 | ||
9f95a23c | 611 | rgw_bucket dest_bucket; |
7c673cae | 612 | |
9f95a23c | 613 | vector<RGWRemoteBucketManager *> source_mgrs; |
7c673cae FG |
614 | |
615 | string source_status_oid; | |
616 | string source_shard_status_oid_prefix; | |
617 | ||
618 | map<int, rgw_bucket_shard_sync_info> sync_status; | |
619 | rgw_raw_obj status_obj; | |
620 | ||
621 | int num_shards; | |
622 | ||
623 | public: | |
9f95a23c TL |
624 | RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, |
625 | std::optional<rgw_zone_id> _source_zone, | |
626 | std::optional<rgw_bucket> _source_bucket, | |
627 | const rgw_bucket& dest_bucket); | |
628 | ~RGWBucketPipeSyncStatusManager(); | |
7c673cae FG |
629 | |
630 | int init(); | |
631 | ||
632 | map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; } | |
633 | int init_sync_status(); | |
634 | ||
9f95a23c TL |
635 | static string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs); |
636 | static string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe, | |
637 | const rgw_zone_id& source_zone, const rgw_obj& obj); /* specific source obj sync status, | |
638 | can be used by sync modules */ | |
11fdf7f2 TL |
639 | |
640 | // implements DoutPrefixProvider | |
9f95a23c | 641 | CephContext *get_cct() const override; |
11fdf7f2 TL |
642 | unsigned get_subsys() const override; |
643 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae FG |
644 | |
645 | int read_sync_status(); | |
646 | int run(); | |
647 | }; | |
648 | ||
b32b8144 | 649 | /// read the sync status of all bucket shards from the given source zone |
9f95a23c TL |
650 | int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, |
651 | rgw::sal::RGWRadosStore *store, | |
652 | const rgw_sync_bucket_pipe& pipe, | |
653 | const RGWBucketInfo& dest_bucket_info, | |
654 | const RGWBucketInfo *psource_bucket_info, | |
b32b8144 FG |
655 | std::vector<rgw_bucket_shard_sync_info> *status); |
656 | ||
7c673cae FG |
657 | class RGWDefaultSyncModule : public RGWSyncModule { |
658 | public: | |
659 | RGWDefaultSyncModule() {} | |
11fdf7f2 | 660 | bool supports_writes() override { return true; } |
7c673cae | 661 | bool supports_data_export() override { return true; } |
11fdf7f2 TL |
662 | int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; |
663 | }; | |
664 | ||
665 | class RGWArchiveSyncModule : public RGWDefaultSyncModule { | |
666 | public: | |
667 | RGWArchiveSyncModule() {} | |
668 | bool supports_writes() override { return true; } | |
669 | bool supports_data_export() override { return false; } | |
670 | int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; | |
7c673cae FG |
671 | }; |
672 | ||
7c673cae | 673 | #endif |