]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
7c673cae FG |
4 | #ifndef CEPH_RGW_DATA_SYNC_H |
5 | #define CEPH_RGW_DATA_SYNC_H | |
6 | ||
11fdf7f2 TL |
7 | #include "include/encoding.h" |
8 | ||
11fdf7f2 TL |
9 | #include "common/ceph_json.h" |
10 | ||
7c673cae FG |
11 | #include "rgw_coroutine.h" |
12 | #include "rgw_http_client.h" | |
20effc67 | 13 | #include "rgw_sal_rados.h" |
7c673cae | 14 | |
f67539c2 | 15 | #include "rgw_datalog.h" |
7c673cae | 16 | #include "rgw_sync_module.h" |
11fdf7f2 | 17 | #include "rgw_sync_trace.h" |
9f95a23c TL |
18 | #include "rgw_sync_policy.h" |
19 | ||
20 | #include "rgw_bucket_sync.h" | |
21 | ||
f67539c2 TL |
22 | // represents an obligation to sync an entry up a given time |
23 | struct rgw_data_sync_obligation { | |
24 | std::string key; | |
25 | std::string marker; | |
26 | ceph::real_time timestamp; | |
27 | bool retry = false; | |
28 | }; | |
29 | ||
30 | inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) { | |
31 | out << "key=" << o.key; | |
32 | if (!o.marker.empty()) { | |
33 | out << " marker=" << o.marker; | |
34 | } | |
35 | if (o.timestamp != ceph::real_time{}) { | |
36 | out << " timestamp=" << o.timestamp; | |
37 | } | |
38 | if (o.retry) { | |
39 | out << " retry"; | |
40 | } | |
41 | return out; | |
42 | } | |
43 | ||
9f95a23c TL |
44 | class JSONObj; |
45 | struct rgw_sync_bucket_pipe; | |
46 | ||
47 | struct rgw_bucket_sync_pair_info { | |
48 | RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */ | |
49 | rgw_bucket_shard source_bs; | |
50 | rgw_bucket_shard dest_bs; | |
51 | }; | |
52 | ||
20effc67 | 53 | inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_sync_pair_info& p) { |
9f95a23c TL |
54 | if (p.source_bs.bucket == p.dest_bs.bucket) { |
55 | return out << p.source_bs; | |
56 | } | |
57 | ||
58 | out << p.source_bs; | |
59 | ||
60 | out << "->" << p.dest_bs.bucket; | |
61 | ||
62 | return out; | |
63 | } | |
64 | ||
65 | struct rgw_bucket_sync_pipe { | |
66 | rgw_bucket_sync_pair_info info; | |
67 | RGWBucketInfo source_bucket_info; | |
20effc67 | 68 | std::map<std::string, bufferlist> source_bucket_attrs; |
9f95a23c | 69 | RGWBucketInfo dest_bucket_info; |
20effc67 | 70 | std::map<std::string, bufferlist> dest_bucket_attrs; |
9f95a23c TL |
71 | |
72 | RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() { | |
73 | return info.handler.rules; | |
74 | } | |
75 | }; | |
76 | ||
20effc67 | 77 | inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_sync_pipe& p) { |
9f95a23c TL |
78 | return out << p.info; |
79 | } | |
7c673cae | 80 | |
7c673cae FG |
81 | struct rgw_datalog_info { |
82 | uint32_t num_shards; | |
83 | ||
84 | rgw_datalog_info() : num_shards(0) {} | |
85 | ||
86 | void decode_json(JSONObj *obj); | |
87 | }; | |
88 | ||
89 | struct rgw_data_sync_info { | |
90 | enum SyncState { | |
91 | StateInit = 0, | |
92 | StateBuildingFullSyncMaps = 1, | |
93 | StateSync = 2, | |
94 | }; | |
95 | ||
96 | uint16_t state; | |
97 | uint32_t num_shards; | |
98 | ||
31f18b77 FG |
99 | uint64_t instance_id{0}; |
100 | ||
7c673cae | 101 | void encode(bufferlist& bl) const { |
31f18b77 | 102 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
103 | encode(state, bl); |
104 | encode(num_shards, bl); | |
105 | encode(instance_id, bl); | |
7c673cae FG |
106 | ENCODE_FINISH(bl); |
107 | } | |
108 | ||
11fdf7f2 | 109 | void decode(bufferlist::const_iterator& bl) { |
31f18b77 | 110 | DECODE_START(2, bl); |
11fdf7f2 TL |
111 | decode(state, bl); |
112 | decode(num_shards, bl); | |
31f18b77 | 113 | if (struct_v >= 2) { |
11fdf7f2 | 114 | decode(instance_id, bl); |
31f18b77 | 115 | } |
7c673cae FG |
116 | DECODE_FINISH(bl); |
117 | } | |
118 | ||
119 | void dump(Formatter *f) const { | |
20effc67 | 120 | std::string s; |
7c673cae FG |
121 | switch ((SyncState)state) { |
122 | case StateInit: | |
123 | s = "init"; | |
124 | break; | |
125 | case StateBuildingFullSyncMaps: | |
126 | s = "building-full-sync-maps"; | |
127 | break; | |
128 | case StateSync: | |
129 | s = "sync"; | |
130 | break; | |
131 | default: | |
132 | s = "unknown"; | |
133 | break; | |
134 | } | |
135 | encode_json("status", s, f); | |
136 | encode_json("num_shards", num_shards, f); | |
31f18b77 | 137 | encode_json("instance_id", instance_id, f); |
7c673cae FG |
138 | } |
139 | void decode_json(JSONObj *obj) { | |
140 | std::string s; | |
141 | JSONDecoder::decode_json("status", s, obj); | |
142 | if (s == "building-full-sync-maps") { | |
143 | state = StateBuildingFullSyncMaps; | |
144 | } else if (s == "sync") { | |
145 | state = StateSync; | |
146 | } else { | |
147 | state = StateInit; | |
148 | } | |
149 | JSONDecoder::decode_json("num_shards", num_shards, obj); | |
a8e16298 | 150 | JSONDecoder::decode_json("instance_id", instance_id, obj); |
7c673cae | 151 | } |
b32b8144 | 152 | static void generate_test_instances(std::list<rgw_data_sync_info*>& o); |
7c673cae FG |
153 | |
154 | rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} | |
155 | }; | |
156 | WRITE_CLASS_ENCODER(rgw_data_sync_info) | |
157 | ||
158 | struct rgw_data_sync_marker { | |
159 | enum SyncState { | |
160 | FullSync = 0, | |
161 | IncrementalSync = 1, | |
162 | }; | |
163 | uint16_t state; | |
20effc67 TL |
164 | std::string marker; |
165 | std::string next_step_marker; | |
7c673cae FG |
166 | uint64_t total_entries; |
167 | uint64_t pos; | |
168 | real_time timestamp; | |
169 | ||
170 | rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {} | |
171 | ||
172 | void encode(bufferlist& bl) const { | |
173 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
174 | encode(state, bl); |
175 | encode(marker, bl); | |
176 | encode(next_step_marker, bl); | |
177 | encode(total_entries, bl); | |
178 | encode(pos, bl); | |
179 | encode(timestamp, bl); | |
7c673cae FG |
180 | ENCODE_FINISH(bl); |
181 | } | |
182 | ||
11fdf7f2 | 183 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 184 | DECODE_START(1, bl); |
11fdf7f2 TL |
185 | decode(state, bl); |
186 | decode(marker, bl); | |
187 | decode(next_step_marker, bl); | |
188 | decode(total_entries, bl); | |
189 | decode(pos, bl); | |
190 | decode(timestamp, bl); | |
7c673cae FG |
191 | DECODE_FINISH(bl); |
192 | } | |
193 | ||
194 | void dump(Formatter *f) const { | |
28e407b8 AA |
195 | const char *s{nullptr}; |
196 | switch ((SyncState)state) { | |
197 | case FullSync: | |
198 | s = "full-sync"; | |
199 | break; | |
200 | case IncrementalSync: | |
201 | s = "incremental-sync"; | |
202 | break; | |
203 | default: | |
204 | s = "unknown"; | |
205 | break; | |
206 | } | |
207 | encode_json("status", s, f); | |
7c673cae FG |
208 | encode_json("marker", marker, f); |
209 | encode_json("next_step_marker", next_step_marker, f); | |
210 | encode_json("total_entries", total_entries, f); | |
211 | encode_json("pos", pos, f); | |
212 | encode_json("timestamp", utime_t(timestamp), f); | |
213 | } | |
214 | void decode_json(JSONObj *obj) { | |
28e407b8 AA |
215 | std::string s; |
216 | JSONDecoder::decode_json("status", s, obj); | |
217 | if (s == "full-sync") { | |
218 | state = FullSync; | |
219 | } else if (s == "incremental-sync") { | |
220 | state = IncrementalSync; | |
221 | } | |
7c673cae FG |
222 | JSONDecoder::decode_json("marker", marker, obj); |
223 | JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); | |
224 | JSONDecoder::decode_json("total_entries", total_entries, obj); | |
225 | JSONDecoder::decode_json("pos", pos, obj); | |
226 | utime_t t; | |
227 | JSONDecoder::decode_json("timestamp", t, obj); | |
228 | timestamp = t.to_real_time(); | |
229 | } | |
b32b8144 | 230 | static void generate_test_instances(std::list<rgw_data_sync_marker*>& o); |
7c673cae FG |
231 | }; |
232 | WRITE_CLASS_ENCODER(rgw_data_sync_marker) | |
233 | ||
234 | struct rgw_data_sync_status { | |
235 | rgw_data_sync_info sync_info; | |
20effc67 | 236 | std::map<uint32_t, rgw_data_sync_marker> sync_markers; |
7c673cae FG |
237 | |
238 | rgw_data_sync_status() {} | |
239 | ||
240 | void encode(bufferlist& bl) const { | |
241 | ENCODE_START(1, 1, bl); | |
11fdf7f2 | 242 | encode(sync_info, bl); |
7c673cae FG |
243 | /* sync markers are encoded separately */ |
244 | ENCODE_FINISH(bl); | |
245 | } | |
246 | ||
11fdf7f2 | 247 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 248 | DECODE_START(1, bl); |
11fdf7f2 | 249 | decode(sync_info, bl); |
7c673cae FG |
250 | /* sync markers are decoded separately */ |
251 | DECODE_FINISH(bl); | |
252 | } | |
253 | ||
254 | void dump(Formatter *f) const { | |
255 | encode_json("info", sync_info, f); | |
256 | encode_json("markers", sync_markers, f); | |
257 | } | |
258 | void decode_json(JSONObj *obj) { | |
259 | JSONDecoder::decode_json("info", sync_info, obj); | |
260 | JSONDecoder::decode_json("markers", sync_markers, obj); | |
261 | } | |
b32b8144 | 262 | static void generate_test_instances(std::list<rgw_data_sync_status*>& o); |
7c673cae FG |
263 | }; |
264 | WRITE_CLASS_ENCODER(rgw_data_sync_status) | |
265 | ||
266 | struct rgw_datalog_entry { | |
20effc67 | 267 | std::string key; |
7c673cae FG |
268 | ceph::real_time timestamp; |
269 | ||
270 | void decode_json(JSONObj *obj); | |
271 | }; | |
272 | ||
273 | struct rgw_datalog_shard_data { | |
20effc67 | 274 | std::string marker; |
7c673cae | 275 | bool truncated; |
20effc67 | 276 | std::vector<rgw_datalog_entry> entries; |
7c673cae FG |
277 | |
278 | void decode_json(JSONObj *obj); | |
279 | }; | |
280 | ||
281 | class RGWAsyncRadosProcessor; | |
282 | class RGWDataSyncControlCR; | |
283 | ||
284 | struct rgw_bucket_entry_owner { | |
20effc67 TL |
285 | std::string id; |
286 | std::string display_name; | |
7c673cae FG |
287 | |
288 | rgw_bucket_entry_owner() {} | |
20effc67 | 289 | rgw_bucket_entry_owner(const std::string& _id, const std::string& _display_name) : id(_id), display_name(_display_name) {} |
7c673cae FG |
290 | |
291 | void decode_json(JSONObj *obj); | |
292 | }; | |
293 | ||
294 | class RGWSyncErrorLogger; | |
11fdf7f2 | 295 | class RGWRESTConn; |
9f95a23c | 296 | class RGWServices; |
7c673cae FG |
297 | |
298 | struct RGWDataSyncEnv { | |
11fdf7f2 TL |
299 | const DoutPrefixProvider *dpp{nullptr}; |
300 | CephContext *cct{nullptr}; | |
20effc67 | 301 | rgw::sal::RadosStore* store{nullptr}; |
9f95a23c | 302 | RGWServices *svc{nullptr}; |
11fdf7f2 TL |
303 | RGWAsyncRadosProcessor *async_rados{nullptr}; |
304 | RGWHTTPManager *http_manager{nullptr}; | |
305 | RGWSyncErrorLogger *error_logger{nullptr}; | |
306 | RGWSyncTraceManager *sync_tracer{nullptr}; | |
11fdf7f2 | 307 | RGWSyncModuleInstanceRef sync_module{nullptr}; |
81eedcae | 308 | PerfCounters* counters{nullptr}; |
7c673cae | 309 | |
11fdf7f2 | 310 | RGWDataSyncEnv() {} |
7c673cae | 311 | |
20effc67 | 312 | void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RadosStore* _store, RGWServices *_svc, |
7c673cae | 313 | RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, |
11fdf7f2 | 314 | RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, |
9f95a23c | 315 | RGWSyncModuleInstanceRef& _sync_module, |
81eedcae | 316 | PerfCounters* _counters) { |
9f95a23c | 317 | dpp = _dpp; |
7c673cae FG |
318 | cct = _cct; |
319 | store = _store; | |
9f95a23c | 320 | svc = _svc; |
7c673cae FG |
321 | async_rados = _async_rados; |
322 | http_manager = _http_manager; | |
323 | error_logger = _error_logger; | |
11fdf7f2 | 324 | sync_tracer = _sync_tracer; |
7c673cae | 325 | sync_module = _sync_module; |
81eedcae | 326 | counters = _counters; |
7c673cae FG |
327 | } |
328 | ||
20effc67 TL |
329 | std::string shard_obj_name(int shard_id); |
330 | std::string status_oid(); | |
7c673cae FG |
331 | }; |
332 | ||
9f95a23c TL |
333 | struct RGWDataSyncCtx { |
334 | CephContext *cct{nullptr}; | |
335 | RGWDataSyncEnv *env{nullptr}; | |
336 | ||
337 | RGWRESTConn *conn{nullptr}; | |
338 | rgw_zone_id source_zone; | |
339 | ||
340 | void init(RGWDataSyncEnv *_env, | |
341 | RGWRESTConn *_conn, | |
342 | const rgw_zone_id& _source_zone) { | |
343 | cct = _env->cct; | |
344 | env = _env; | |
345 | conn = _conn; | |
346 | source_zone = _source_zone; | |
347 | } | |
348 | }; | |
349 | ||
350 | class RGWRados; | |
9f95a23c | 351 | |
7c673cae | 352 | class RGWRemoteDataLog : public RGWCoroutinesManager { |
11fdf7f2 | 353 | const DoutPrefixProvider *dpp; |
20effc67 | 354 | rgw::sal::RadosStore* store; |
9f95a23c TL |
355 | CephContext *cct; |
356 | RGWCoroutinesManagerRegistry *cr_registry; | |
7c673cae FG |
357 | RGWAsyncRadosProcessor *async_rados; |
358 | RGWHTTPManager http_manager; | |
359 | ||
360 | RGWDataSyncEnv sync_env; | |
9f95a23c | 361 | RGWDataSyncCtx sc; |
7c673cae | 362 | |
9f95a23c | 363 | ceph::shared_mutex lock = ceph::make_shared_mutex("RGWRemoteDataLog::lock"); |
7c673cae FG |
364 | RGWDataSyncControlCR *data_sync_cr; |
365 | ||
11fdf7f2 TL |
366 | RGWSyncTraceNodeRef tn; |
367 | ||
7c673cae FG |
368 | bool initialized; |
369 | ||
370 | public: | |
9f95a23c | 371 | RGWRemoteDataLog(const DoutPrefixProvider *dpp, |
20effc67 | 372 | rgw::sal::RadosStore* _store, |
9f95a23c TL |
373 | RGWAsyncRadosProcessor *async_rados); |
374 | int init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, | |
81eedcae TL |
375 | RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& module, |
376 | PerfCounters* _counters); | |
7c673cae FG |
377 | void finish(); |
378 | ||
b3b6e05e | 379 | int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info); |
20effc67 TL |
380 | int read_source_log_shards_info(const DoutPrefixProvider *dpp, std::map<int, RGWDataChangesLogInfo> *shards_info); |
381 | int read_source_log_shards_next(const DoutPrefixProvider *dpp, std::map<int, std::string> shard_markers, std::map<int, rgw_datalog_shard_data> *result); | |
b3b6e05e | 382 | int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status); |
20effc67 TL |
383 | int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, std::set<int>& recovering_shards); |
384 | int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, std::set<std::string>& lagging_buckets,std::set<std::string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries); | |
b3b6e05e TL |
385 | int init_sync_status(const DoutPrefixProvider *dpp, int num_shards); |
386 | int run_sync(const DoutPrefixProvider *dpp, int num_shards); | |
7c673cae | 387 | |
20effc67 | 388 | void wakeup(int shard_id, std::set<std::string>& keys); |
7c673cae FG |
389 | }; |
390 | ||
11fdf7f2 | 391 | class RGWDataSyncStatusManager : public DoutPrefixProvider { |
20effc67 | 392 | rgw::sal::RadosStore* store; |
7c673cae | 393 | |
9f95a23c | 394 | rgw_zone_id source_zone; |
7c673cae FG |
395 | RGWRESTConn *conn; |
396 | RGWSyncErrorLogger *error_logger; | |
397 | RGWSyncModuleInstanceRef sync_module; | |
81eedcae | 398 | PerfCounters* counters; |
7c673cae FG |
399 | |
400 | RGWRemoteDataLog source_log; | |
401 | ||
20effc67 TL |
402 | std::string source_status_oid; |
403 | std::string source_shard_status_oid_prefix; | |
7c673cae | 404 | |
20effc67 | 405 | std::map<int, rgw_raw_obj> shard_objs; |
7c673cae FG |
406 | |
407 | int num_shards; | |
408 | ||
409 | public: | |
20effc67 | 410 | RGWDataSyncStatusManager(rgw::sal::RadosStore* _store, RGWAsyncRadosProcessor *async_rados, |
9f95a23c | 411 | const rgw_zone_id& _source_zone, PerfCounters* counters) |
7c673cae | 412 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 413 | sync_module(nullptr), counters(counters), |
11fdf7f2 | 414 | source_log(this, store, async_rados), num_shards(0) {} |
20effc67 | 415 | RGWDataSyncStatusManager(rgw::sal::RadosStore* _store, RGWAsyncRadosProcessor *async_rados, |
9f95a23c | 416 | const rgw_zone_id& _source_zone, PerfCounters* counters, |
81eedcae | 417 | const RGWSyncModuleInstanceRef& _sync_module) |
94b18763 | 418 | : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), |
81eedcae | 419 | sync_module(_sync_module), counters(counters), |
11fdf7f2 | 420 | source_log(this, store, async_rados), num_shards(0) {} |
7c673cae FG |
421 | ~RGWDataSyncStatusManager() { |
422 | finalize(); | |
423 | } | |
b3b6e05e | 424 | int init(const DoutPrefixProvider *dpp); |
7c673cae FG |
425 | void finalize(); |
426 | ||
20effc67 TL |
427 | static std::string shard_obj_name(const rgw_zone_id& source_zone, int shard_id); |
428 | static std::string sync_status_oid(const rgw_zone_id& source_zone); | |
7c673cae | 429 | |
b3b6e05e TL |
430 | int read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status) { |
431 | return source_log.read_sync_status(dpp, sync_status); | |
7c673cae | 432 | } |
28e407b8 | 433 | |
20effc67 | 434 | int read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, std::set<int>& recovering_shards) { |
b3b6e05e | 435 | return source_log.read_recovering_shards(dpp, num_shards, recovering_shards); |
28e407b8 AA |
436 | } |
437 | ||
20effc67 | 438 | int read_shard_status(const DoutPrefixProvider *dpp, int shard_id, std::set<std::string>& lagging_buckets, std::set<std::string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) { |
b3b6e05e | 439 | return source_log.read_shard_status(dpp, shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries); |
28e407b8 | 440 | } |
b3b6e05e | 441 | int init_sync_status(const DoutPrefixProvider *dpp) { return source_log.init_sync_status(dpp, num_shards); } |
7c673cae | 442 | |
b3b6e05e TL |
443 | int read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info) { |
444 | return source_log.read_log_info(dpp, log_info); | |
7c673cae | 445 | } |
20effc67 | 446 | int read_source_log_shards_info(const DoutPrefixProvider *dpp, std::map<int, RGWDataChangesLogInfo> *shards_info) { |
b3b6e05e | 447 | return source_log.read_source_log_shards_info(dpp, shards_info); |
7c673cae | 448 | } |
20effc67 | 449 | int read_source_log_shards_next(const DoutPrefixProvider *dpp, std::map<int, std::string> shard_markers, std::map<int, rgw_datalog_shard_data> *result) { |
b3b6e05e | 450 | return source_log.read_source_log_shards_next(dpp, shard_markers, result); |
7c673cae FG |
451 | } |
452 | ||
b3b6e05e | 453 | int run(const DoutPrefixProvider *dpp) { return source_log.run_sync(dpp, num_shards); } |
7c673cae | 454 | |
20effc67 | 455 | void wakeup(int shard_id, std::set<std::string>& keys) { return source_log.wakeup(shard_id, keys); } |
7c673cae FG |
456 | void stop() { |
457 | source_log.finish(); | |
458 | } | |
11fdf7f2 TL |
459 | |
460 | // implements DoutPrefixProvider | |
9f95a23c | 461 | CephContext *get_cct() const override; |
11fdf7f2 TL |
462 | unsigned get_subsys() const override; |
463 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae FG |
464 | }; |
465 | ||
9f95a23c | 466 | class RGWBucketPipeSyncStatusManager; |
7c673cae FG |
467 | class RGWBucketSyncCR; |
468 | ||
469 | struct rgw_bucket_shard_full_sync_marker { | |
470 | rgw_obj_key position; | |
471 | uint64_t count; | |
472 | ||
473 | rgw_bucket_shard_full_sync_marker() : count(0) {} | |
474 | ||
20effc67 | 475 | void encode_attr(std::map<std::string, bufferlist>& attrs); |
7c673cae FG |
476 | |
477 | void encode(bufferlist& bl) const { | |
478 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
479 | encode(position, bl); |
480 | encode(count, bl); | |
7c673cae FG |
481 | ENCODE_FINISH(bl); |
482 | } | |
483 | ||
11fdf7f2 | 484 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 485 | DECODE_START(1, bl); |
11fdf7f2 TL |
486 | decode(position, bl); |
487 | decode(count, bl); | |
7c673cae FG |
488 | DECODE_FINISH(bl); |
489 | } | |
490 | ||
b32b8144 FG |
491 | void dump(Formatter *f) const; |
492 | void decode_json(JSONObj *obj); | |
7c673cae FG |
493 | }; |
494 | WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker) | |
495 | ||
496 | struct rgw_bucket_shard_inc_sync_marker { | |
20effc67 | 497 | std::string position; |
f67539c2 | 498 | ceph::real_time timestamp; |
7c673cae | 499 | |
20effc67 | 500 | void encode_attr(std::map<std::string, bufferlist>& attrs); |
7c673cae FG |
501 | |
502 | void encode(bufferlist& bl) const { | |
f67539c2 | 503 | ENCODE_START(2, 1, bl); |
11fdf7f2 | 504 | encode(position, bl); |
f67539c2 | 505 | encode(timestamp, bl); |
7c673cae FG |
506 | ENCODE_FINISH(bl); |
507 | } | |
508 | ||
11fdf7f2 | 509 | void decode(bufferlist::const_iterator& bl) { |
f67539c2 | 510 | DECODE_START(2, bl); |
11fdf7f2 | 511 | decode(position, bl); |
f67539c2 TL |
512 | if (struct_v >= 2) { |
513 | decode(timestamp, bl); | |
514 | } | |
515 | DECODE_FINISH(bl); | |
7c673cae FG |
516 | } |
517 | ||
b32b8144 FG |
518 | void dump(Formatter *f) const; |
519 | void decode_json(JSONObj *obj); | |
7c673cae FG |
520 | }; |
521 | WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker) | |
522 | ||
523 | struct rgw_bucket_shard_sync_info { | |
524 | enum SyncState { | |
525 | StateInit = 0, | |
526 | StateFullSync = 1, | |
527 | StateIncrementalSync = 2, | |
9f95a23c | 528 | StateStopped = 3, |
7c673cae FG |
529 | }; |
530 | ||
531 | uint16_t state; | |
532 | rgw_bucket_shard_full_sync_marker full_marker; | |
533 | rgw_bucket_shard_inc_sync_marker inc_marker; | |
534 | ||
20effc67 TL |
535 | void decode_from_attrs(CephContext *cct, std::map<std::string, bufferlist>& attrs); |
536 | void encode_all_attrs(std::map<std::string, bufferlist>& attrs); | |
537 | void encode_state_attr(std::map<std::string, bufferlist>& attrs); | |
7c673cae FG |
538 | |
539 | void encode(bufferlist& bl) const { | |
540 | ENCODE_START(1, 1, bl); | |
11fdf7f2 TL |
541 | encode(state, bl); |
542 | encode(full_marker, bl); | |
543 | encode(inc_marker, bl); | |
7c673cae FG |
544 | ENCODE_FINISH(bl); |
545 | } | |
546 | ||
11fdf7f2 | 547 | void decode(bufferlist::const_iterator& bl) { |
7c673cae | 548 | DECODE_START(1, bl); |
11fdf7f2 TL |
549 | decode(state, bl); |
550 | decode(full_marker, bl); | |
551 | decode(inc_marker, bl); | |
7c673cae FG |
552 | DECODE_FINISH(bl); |
553 | } | |
554 | ||
b32b8144 FG |
555 | void dump(Formatter *f) const; |
556 | void decode_json(JSONObj *obj); | |
7c673cae FG |
557 | |
558 | rgw_bucket_shard_sync_info() : state((int)StateInit) {} | |
559 | ||
560 | }; | |
561 | WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) | |
562 | ||
28e407b8 | 563 | struct rgw_bucket_index_marker_info { |
20effc67 TL |
564 | std::string bucket_ver; |
565 | std::string master_ver; | |
566 | std::string max_marker; | |
28e407b8 AA |
567 | bool syncstopped{false}; |
568 | ||
569 | void decode_json(JSONObj *obj) { | |
570 | JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); | |
571 | JSONDecoder::decode_json("master_ver", master_ver, obj); | |
572 | JSONDecoder::decode_json("max_marker", max_marker, obj); | |
573 | JSONDecoder::decode_json("syncstopped", syncstopped, obj); | |
574 | } | |
575 | }; | |
576 | ||
7c673cae | 577 | |
9f95a23c | 578 | class RGWRemoteBucketManager { |
11fdf7f2 | 579 | const DoutPrefixProvider *dpp; |
9f95a23c TL |
580 | |
581 | RGWDataSyncEnv *sync_env; | |
582 | ||
7c673cae | 583 | RGWRESTConn *conn{nullptr}; |
9f95a23c | 584 | rgw_zone_id source_zone; |
7c673cae | 585 | |
20effc67 | 586 | std::vector<rgw_bucket_sync_pair_info> sync_pairs; |
7c673cae | 587 | |
9f95a23c | 588 | RGWDataSyncCtx sc; |
7c673cae FG |
589 | rgw_bucket_shard_sync_info init_status; |
590 | ||
591 | RGWBucketSyncCR *sync_cr{nullptr}; | |
592 | ||
593 | public: | |
9f95a23c TL |
594 | RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, |
595 | RGWDataSyncEnv *_sync_env, | |
596 | const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
597 | const RGWBucketInfo& source_bucket_info, | |
598 | const rgw_bucket& dest_bucket); | |
599 | ||
600 | void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, | |
601 | const rgw_bucket& source_bucket, int shard_id, | |
602 | const rgw_bucket& dest_bucket); | |
603 | ||
604 | RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); | |
f67539c2 | 605 | RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker); |
9f95a23c TL |
606 | RGWCoroutine *run_sync_cr(int num); |
607 | ||
608 | int num_pipes() { | |
609 | return sync_pairs.size(); | |
610 | } | |
7c673cae FG |
611 | |
612 | void wakeup(); | |
613 | }; | |
614 | ||
f67539c2 TL |
615 | class BucketIndexShardsManager; |
616 | ||
b3b6e05e TL |
617 | int rgw_read_remote_bilog_info(const DoutPrefixProvider *dpp, |
618 | RGWRESTConn* conn, | |
f67539c2 TL |
619 | const rgw_bucket& bucket, |
620 | BucketIndexShardsManager& markers, | |
621 | optional_yield y); | |
622 | ||
9f95a23c | 623 | class RGWBucketPipeSyncStatusManager : public DoutPrefixProvider { |
20effc67 | 624 | rgw::sal::RadosStore* store; |
9f95a23c TL |
625 | |
626 | RGWDataSyncEnv sync_env; | |
7c673cae FG |
627 | |
628 | RGWCoroutinesManager cr_mgr; | |
629 | ||
630 | RGWHTTPManager http_manager; | |
631 | ||
9f95a23c TL |
632 | std::optional<rgw_zone_id> source_zone; |
633 | std::optional<rgw_bucket> source_bucket; | |
634 | ||
7c673cae FG |
635 | RGWRESTConn *conn; |
636 | RGWSyncErrorLogger *error_logger; | |
637 | RGWSyncModuleInstanceRef sync_module; | |
638 | ||
9f95a23c | 639 | rgw_bucket dest_bucket; |
7c673cae | 640 | |
20effc67 | 641 | std::vector<RGWRemoteBucketManager *> source_mgrs; |
7c673cae | 642 | |
20effc67 TL |
643 | std::string source_status_oid; |
644 | std::string source_shard_status_oid_prefix; | |
7c673cae | 645 | |
20effc67 | 646 | std::map<int, rgw_bucket_shard_sync_info> sync_status; |
7c673cae FG |
647 | rgw_raw_obj status_obj; |
648 | ||
649 | int num_shards; | |
650 | ||
651 | public: | |
20effc67 | 652 | RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store, |
9f95a23c TL |
653 | std::optional<rgw_zone_id> _source_zone, |
654 | std::optional<rgw_bucket> _source_bucket, | |
655 | const rgw_bucket& dest_bucket); | |
656 | ~RGWBucketPipeSyncStatusManager(); | |
7c673cae | 657 | |
b3b6e05e | 658 | int init(const DoutPrefixProvider *dpp); |
7c673cae | 659 | |
20effc67 | 660 | std::map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; } |
b3b6e05e | 661 | int init_sync_status(const DoutPrefixProvider *dpp); |
7c673cae | 662 | |
20effc67 TL |
663 | static std::string status_oid(const rgw_zone_id& source_zone, const rgw_bucket_sync_pair_info& bs); |
664 | static std::string obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe, | |
665 | const rgw_zone_id& source_zone, const rgw::sal::Object* obj); /* specific source obj sync status, | |
9f95a23c | 666 | can be used by sync modules */ |
11fdf7f2 TL |
667 | |
668 | // implements DoutPrefixProvider | |
9f95a23c | 669 | CephContext *get_cct() const override; |
11fdf7f2 TL |
670 | unsigned get_subsys() const override; |
671 | std::ostream& gen_prefix(std::ostream& out) const override; | |
7c673cae | 672 | |
b3b6e05e TL |
673 | int read_sync_status(const DoutPrefixProvider *dpp); |
674 | int run(const DoutPrefixProvider *dpp); | |
7c673cae FG |
675 | }; |
676 | ||
b32b8144 | 677 | /// read the sync status of all bucket shards from the given source zone |
9f95a23c | 678 | int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, |
20effc67 | 679 | rgw::sal::RadosStore* store, |
9f95a23c TL |
680 | const rgw_sync_bucket_pipe& pipe, |
681 | const RGWBucketInfo& dest_bucket_info, | |
682 | const RGWBucketInfo *psource_bucket_info, | |
b32b8144 FG |
683 | std::vector<rgw_bucket_shard_sync_info> *status); |
684 | ||
7c673cae FG |
685 | class RGWDefaultSyncModule : public RGWSyncModule { |
686 | public: | |
687 | RGWDefaultSyncModule() {} | |
11fdf7f2 | 688 | bool supports_writes() override { return true; } |
7c673cae | 689 | bool supports_data_export() override { return true; } |
20effc67 | 690 | int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; |
11fdf7f2 TL |
691 | }; |
692 | ||
693 | class RGWArchiveSyncModule : public RGWDefaultSyncModule { | |
694 | public: | |
695 | RGWArchiveSyncModule() {} | |
696 | bool supports_writes() override { return true; } | |
697 | bool supports_data_export() override { return false; } | |
20effc67 | 698 | int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; |
7c673cae FG |
699 | }; |
700 | ||
7c673cae | 701 | #endif |