]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_sync.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_SYNC_H
5 #define CEPH_RGW_SYNC_H
6
7 #include <atomic>
8
9 #include "include/stringify.h"
10 #include "common/RWLock.h"
11
12 #include "rgw_coroutine.h"
13 #include "rgw_http_client.h"
14 #include "rgw_metadata.h"
15 #include "rgw_meta_sync_status.h"
16 #include "rgw_sal.h"
17 #include "rgw_sync_trace.h"
18 #include "rgw_mdlog.h"
19
20 namespace rgw { namespace sal {
21 class RGWRadosStore;
22 } }
23
24 #define ERROR_LOGGER_SHARDS 32
25 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
26
27 struct rgw_mdlog_info {
28 uint32_t num_shards;
29 std::string period; //< period id of the master's oldest metadata log
30 epoch_t realm_epoch; //< realm epoch of oldest metadata log
31
32 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
33
34 void decode_json(JSONObj *obj);
35 };
36
37
38 struct rgw_mdlog_entry {
39 string id;
40 string section;
41 string name;
42 ceph::real_time timestamp;
43 RGWMetadataLogData log_data;
44
45 void decode_json(JSONObj *obj);
46
47 bool convert_from(cls_log_entry& le) {
48 id = le.id;
49 section = le.section;
50 name = le.name;
51 timestamp = le.timestamp.to_real_time();
52 try {
53 auto iter = le.data.cbegin();
54 decode(log_data, iter);
55 } catch (buffer::error& err) {
56 return false;
57 }
58 return true;
59 }
60 };
61
62 struct rgw_mdlog_shard_data {
63 string marker;
64 bool truncated;
65 vector<rgw_mdlog_entry> entries;
66
67 void decode_json(JSONObj *obj);
68 };
69
70 class RGWAsyncRadosProcessor;
71 class RGWMetaSyncStatusManager;
72 class RGWMetaSyncCR;
73 class RGWRESTConn;
74 class RGWSyncTraceManager;
75
76 class RGWSyncErrorLogger {
77 rgw::sal::RGWRadosStore *store;
78
79 vector<string> oids;
80 int num_shards;
81
82 std::atomic<int64_t> counter = { 0 };
83 public:
84 RGWSyncErrorLogger(rgw::sal::RGWRadosStore *_store, const string &oid_prefix, int _num_shards);
85 RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message);
86
87 static string get_shard_oid(const string& oid_prefix, int shard_id);
88 };
89
90 struct rgw_sync_error_info {
91 string source_zone;
92 uint32_t error_code;
93 string message;
94
95 rgw_sync_error_info() : error_code(0) {}
96 rgw_sync_error_info(const string& _source_zone, uint32_t _error_code, const string& _message) : source_zone(_source_zone), error_code(_error_code), message(_message) {}
97
98 void encode(bufferlist& bl) const {
99 ENCODE_START(1, 1, bl);
100 encode(source_zone, bl);
101 encode(error_code, bl);
102 encode(message, bl);
103 ENCODE_FINISH(bl);
104 }
105
106 void decode(bufferlist::const_iterator& bl) {
107 DECODE_START(1, bl);
108 decode(source_zone, bl);
109 decode(error_code, bl);
110 decode(message, bl);
111 DECODE_FINISH(bl);
112 }
113
114 void dump(Formatter *f) const;
115 };
116 WRITE_CLASS_ENCODER(rgw_sync_error_info)
117
118 #define DEFAULT_BACKOFF_MAX 30
119
120 class RGWSyncBackoff {
121 int cur_wait;
122 int max_secs;
123
124 void update_wait_time();
125 public:
126 explicit RGWSyncBackoff(int _max_secs = DEFAULT_BACKOFF_MAX) : cur_wait(0), max_secs(_max_secs) {}
127
128 void backoff_sleep();
129 void reset() {
130 cur_wait = 0;
131 }
132
133 void backoff(RGWCoroutine *op);
134 };
135
136 class RGWBackoffControlCR : public RGWCoroutine
137 {
138 RGWCoroutine *cr;
139 ceph::mutex lock;
140
141 RGWSyncBackoff backoff;
142 bool reset_backoff;
143
144 bool exit_on_error;
145
146 protected:
147 bool *backoff_ptr() {
148 return &reset_backoff;
149 }
150
151 ceph::mutex& cr_lock() {
152 return lock;
153 }
154
155 RGWCoroutine *get_cr() {
156 return cr;
157 }
158
159 public:
160 RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error)
161 : RGWCoroutine(_cct),
162 cr(nullptr),
163 lock(ceph::make_mutex("RGWBackoffControlCR::lock:" + stringify(this))),
164 reset_backoff(false), exit_on_error(_exit_on_error) {
165 }
166
167 ~RGWBackoffControlCR() override {
168 if (cr) {
169 cr->put();
170 }
171 }
172
173 virtual RGWCoroutine *alloc_cr() = 0;
174 virtual RGWCoroutine *alloc_finisher_cr() { return NULL; }
175
176 int operate() override;
177 };
178
179 struct RGWMetaSyncEnv {
180 const DoutPrefixProvider *dpp;
181 CephContext *cct{nullptr};
182 rgw::sal::RGWRadosStore *store{nullptr};
183 RGWRESTConn *conn{nullptr};
184 RGWAsyncRadosProcessor *async_rados{nullptr};
185 RGWHTTPManager *http_manager{nullptr};
186 RGWSyncErrorLogger *error_logger{nullptr};
187 RGWSyncTraceManager *sync_tracer{nullptr};
188
189 RGWMetaSyncEnv() {}
190
191 void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWRESTConn *_conn,
192 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
193 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer);
194
195 string shard_obj_name(int shard_id);
196 string status_oid();
197 };
198
199 class RGWRemoteMetaLog : public RGWCoroutinesManager {
200 const DoutPrefixProvider *dpp;
201 rgw::sal::RGWRadosStore *store;
202 RGWRESTConn *conn;
203 RGWAsyncRadosProcessor *async_rados;
204
205 RGWHTTPManager http_manager;
206 RGWMetaSyncStatusManager *status_manager;
207 RGWSyncErrorLogger *error_logger{nullptr};
208 RGWSyncTraceManager *sync_tracer{nullptr};
209
210 RGWMetaSyncCR *meta_sync_cr{nullptr};
211
212 RGWSyncBackoff backoff;
213
214 RGWMetaSyncEnv sync_env;
215
216 void init_sync_env(RGWMetaSyncEnv *env);
217 int store_sync_info(const rgw_meta_sync_info& sync_info);
218
219 std::atomic<bool> going_down = { false };
220
221 RGWSyncTraceNodeRef tn;
222
223 public:
224 RGWRemoteMetaLog(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *_store,
225 RGWAsyncRadosProcessor *async_rados,
226 RGWMetaSyncStatusManager *_sm)
227 : RGWCoroutinesManager(_store->ctx(), _store->getRados()->get_cr_registry()),
228 dpp(dpp), store(_store), conn(NULL), async_rados(async_rados),
229 http_manager(store->ctx(), completion_mgr),
230 status_manager(_sm) {}
231
232 ~RGWRemoteMetaLog() override;
233
234 int init();
235 void finish();
236
237 int read_log_info(rgw_mdlog_info *log_info);
238 int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info);
239 int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result);
240 int read_sync_status(rgw_meta_sync_status *sync_status);
241 int init_sync_status();
242 int run_sync();
243
244 void wakeup(int shard_id);
245
246 RGWMetaSyncEnv& get_sync_env() {
247 return sync_env;
248 }
249 };
250
251 class RGWMetaSyncStatusManager : public DoutPrefixProvider {
252 rgw::sal::RGWRadosStore *store;
253 librados::IoCtx ioctx;
254
255 RGWRemoteMetaLog master_log;
256
257 map<int, rgw_raw_obj> shard_objs;
258
259 struct utime_shard {
260 real_time ts;
261 int shard_id;
262
263 utime_shard() : shard_id(-1) {}
264
265 bool operator<(const utime_shard& rhs) const {
266 if (ts == rhs.ts) {
267 return shard_id < rhs.shard_id;
268 }
269 return ts < rhs.ts;
270 }
271 };
272
273 ceph::shared_mutex ts_to_shard_lock = ceph::make_shared_mutex("ts_to_shard_lock");
274 map<utime_shard, int> ts_to_shard;
275 vector<string> clone_markers;
276
277 public:
278 RGWMetaSyncStatusManager(rgw::sal::RGWRadosStore *_store, RGWAsyncRadosProcessor *async_rados)
279 : store(_store), master_log(this, store, async_rados, this)
280 {}
281 int init();
282
283 int read_sync_status(rgw_meta_sync_status *sync_status) {
284 return master_log.read_sync_status(sync_status);
285 }
286 int init_sync_status() { return master_log.init_sync_status(); }
287 int read_log_info(rgw_mdlog_info *log_info) {
288 return master_log.read_log_info(log_info);
289 }
290 int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info) {
291 return master_log.read_master_log_shards_info(master_period, shards_info);
292 }
293 int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result) {
294 return master_log.read_master_log_shards_next(period, shard_markers, result);
295 }
296
297 int run() { return master_log.run_sync(); }
298
299
300 // implements DoutPrefixProvider
301 CephContext *get_cct() const override { return store->ctx(); }
302 unsigned get_subsys() const override;
303 std::ostream& gen_prefix(std::ostream& out) const override;
304
305 void wakeup(int shard_id) { return master_log.wakeup(shard_id); }
306 void stop() {
307 master_log.finish();
308 }
309 };
310
311 class RGWOrderCallCR : public RGWCoroutine
312 {
313 public:
314 RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {}
315
316 virtual void call_cr(RGWCoroutine *_cr) = 0;
317 };
318
319 class RGWLastCallerWinsCR : public RGWOrderCallCR
320 {
321 RGWCoroutine *cr{nullptr};
322
323 public:
324 explicit RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {}
325 ~RGWLastCallerWinsCR() {
326 if (cr) {
327 cr->put();
328 }
329 }
330
331 int operate() override;
332
333 void call_cr(RGWCoroutine *_cr) override {
334 if (cr) {
335 cr->put();
336 }
337 cr = _cr;
338 }
339 };
340
341 template <class T, class K>
342 class RGWSyncShardMarkerTrack {
343 struct marker_entry {
344 uint64_t pos;
345 real_time timestamp;
346
347 marker_entry() : pos(0) {}
348 marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
349 };
350 typename std::map<T, marker_entry> pending;
351
352 map<T, marker_entry> finish_markers;
353
354 int window_size;
355 int updates_since_flush;
356
357 RGWOrderCallCR *order_cr{nullptr};
358
359 protected:
360 typename std::set<K> need_retry_set;
361
362 virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
363 virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
364 virtual void handle_finish(const T& marker) { }
365
366 public:
367 RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
368 virtual ~RGWSyncShardMarkerTrack() {
369 if (order_cr) {
370 order_cr->put();
371 }
372 }
373
374 bool start(const T& pos, int index_pos, const real_time& timestamp) {
375 if (pending.find(pos) != pending.end()) {
376 return false;
377 }
378 pending[pos] = marker_entry(index_pos, timestamp);
379 return true;
380 }
381
382 void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
383 finish_markers[pos] = marker_entry(index_pos, timestamp);
384 }
385
386 RGWCoroutine *finish(const T& pos) {
387 if (pending.empty()) {
388 /* can happen, due to a bug that ended up with multiple objects with the same name and version
389 * -- which can happen when versioning is enabled an the version is 'null'.
390 */
391 return NULL;
392 }
393
394 typename std::map<T, marker_entry>::iterator iter = pending.begin();
395
396 bool is_first = (pos == iter->first);
397
398 typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
399 if (pos_iter == pending.end()) {
400 /* see pending.empty() comment */
401 return NULL;
402 }
403
404 finish_markers[pos] = pos_iter->second;
405
406 pending.erase(pos);
407
408 handle_finish(pos);
409
410 updates_since_flush++;
411
412 if (is_first && (updates_since_flush >= window_size || pending.empty())) {
413 return flush();
414 }
415 return NULL;
416 }
417
418 RGWCoroutine *flush() {
419 if (finish_markers.empty()) {
420 return NULL;
421 }
422
423 typename std::map<T, marker_entry>::iterator i;
424
425 if (pending.empty()) {
426 i = finish_markers.end();
427 } else {
428 i = finish_markers.lower_bound(pending.begin()->first);
429 }
430 if (i == finish_markers.begin()) {
431 return NULL;
432 }
433 updates_since_flush = 0;
434
435 auto last = i;
436 --i;
437 const T& high_marker = i->first;
438 marker_entry& high_entry = i->second;
439 RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
440 finish_markers.erase(finish_markers.begin(), last);
441 return cr;
442 }
443
444 /*
445 * a key needs retry if it was processing when another marker that points
446 * to the same bucket shards arrives. Instead of processing it, we mark
447 * it as need_retry so that when we finish processing the original, we
448 * retry the processing on the same bucket shard, in case there are more
449 * entries to process. This closes a race that can happen.
450 */
451 bool need_retry(const K& key) {
452 return (need_retry_set.find(key) != need_retry_set.end());
453 }
454
455 void set_need_retry(const K& key) {
456 need_retry_set.insert(key);
457 }
458
459 void reset_need_retry(const K& key) {
460 need_retry_set.erase(key);
461 }
462
463 RGWCoroutine *order(RGWCoroutine *cr) {
464 /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
465 * nothing and the existing one will call the cr
466 */
467 if (order_cr && order_cr->is_done()) {
468 order_cr->put();
469 order_cr = nullptr;
470 }
471 if (!order_cr) {
472 order_cr = allocate_order_control_cr();
473 order_cr->get();
474 order_cr->call_cr(cr);
475 return order_cr;
476 }
477 order_cr->call_cr(cr);
478 return nullptr; /* don't call it a second time */
479 }
480 };
481
482 class RGWMetaSyncShardMarkerTrack;
483
484 class RGWMetaSyncSingleEntryCR : public RGWCoroutine {
485 RGWMetaSyncEnv *sync_env;
486
487 string raw_key;
488 string entry_marker;
489 RGWMDLogStatus op_status;
490
491 ssize_t pos;
492 string section;
493 string key;
494
495 int sync_status;
496
497 bufferlist md_bl;
498
499 RGWMetaSyncShardMarkerTrack *marker_tracker;
500
501 int tries;
502
503 bool error_injection;
504
505 RGWSyncTraceNodeRef tn;
506
507 public:
508 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
509 const string& _raw_key, const string& _entry_marker,
510 const RGWMDLogStatus& _op_status,
511 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent);
512
513 int operate() override;
514 };
515
516 class RGWShardCollectCR : public RGWCoroutine {
517 int cur_shard = 0;
518 int current_running;
519 int max_concurrent;
520 int status;
521
522 public:
523 RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
524 current_running(0),
525 max_concurrent(_max_concurrent),
526 status(0) {}
527
528 virtual bool spawn_next() = 0;
529 int operate() override;
530 };
531
532 // factory functions for meta sync coroutines needed in mdlog trimming
533
534 RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
535 const std::string& period,
536 int shard_id,
537 RGWMetadataLogInfo* info);
538
539 RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
540 const std::string& period,
541 int shard_id,
542 const std::string& marker,
543 uint32_t max_entries,
544 rgw_mdlog_shard_data *result);
545
546 #endif