]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_sync.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_SYNC_H
5 #define CEPH_RGW_SYNC_H
6
7 #include <atomic>
8
9 #include "include/stringify.h"
10
11 #include "rgw_coroutine.h"
12 #include "rgw_http_client.h"
13 #include "rgw_metadata.h"
14 #include "rgw_meta_sync_status.h"
15 #include "rgw_sal.h"
16 #include "rgw_sal_rados.h"
17 #include "rgw_sync_trace.h"
18 #include "rgw_mdlog.h"
19
20 #define ERROR_LOGGER_SHARDS 32
21 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
22
23 struct rgw_mdlog_info {
24 uint32_t num_shards;
25 std::string period; //< period id of the master's oldest metadata log
26 epoch_t realm_epoch; //< realm epoch of oldest metadata log
27
28 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
29
30 void decode_json(JSONObj *obj);
31 };
32
33
34 struct rgw_mdlog_entry {
35 std::string id;
36 std::string section;
37 std::string name;
38 ceph::real_time timestamp;
39 RGWMetadataLogData log_data;
40
41 void decode_json(JSONObj *obj);
42
43 bool convert_from(cls_log_entry& le) {
44 id = le.id;
45 section = le.section;
46 name = le.name;
47 timestamp = le.timestamp.to_real_time();
48 try {
49 auto iter = le.data.cbegin();
50 decode(log_data, iter);
51 } catch (buffer::error& err) {
52 return false;
53 }
54 return true;
55 }
56 };
57
58 struct rgw_mdlog_shard_data {
59 std::string marker;
60 bool truncated;
61 std::vector<rgw_mdlog_entry> entries;
62
63 void decode_json(JSONObj *obj);
64 };
65
66 class RGWAsyncRadosProcessor;
67 class RGWMetaSyncStatusManager;
68 class RGWMetaSyncCR;
69 class RGWRESTConn;
70 class RGWSyncTraceManager;
71
72 class RGWSyncErrorLogger {
73 rgw::sal::RadosStore* store;
74
75 std::vector<std::string> oids;
76 int num_shards;
77
78 std::atomic<int64_t> counter = { 0 };
79 public:
80 RGWSyncErrorLogger(rgw::sal::RadosStore* _store, const std::string &oid_prefix, int _num_shards);
81 RGWCoroutine *log_error_cr(const DoutPrefixProvider *dpp, const std::string& source_zone, const std::string& section, const std::string& name, uint32_t error_code, const std::string& message);
82
83 static std::string get_shard_oid(const std::string& oid_prefix, int shard_id);
84 };
85
86 struct rgw_sync_error_info {
87 std::string source_zone;
88 uint32_t error_code;
89 std::string message;
90
91 rgw_sync_error_info() : error_code(0) {}
92 rgw_sync_error_info(const std::string& _source_zone, uint32_t _error_code, const std::string& _message) : source_zone(_source_zone), error_code(_error_code), message(_message) {}
93
94 void encode(bufferlist& bl) const {
95 ENCODE_START(1, 1, bl);
96 encode(source_zone, bl);
97 encode(error_code, bl);
98 encode(message, bl);
99 ENCODE_FINISH(bl);
100 }
101
102 void decode(bufferlist::const_iterator& bl) {
103 DECODE_START(1, bl);
104 decode(source_zone, bl);
105 decode(error_code, bl);
106 decode(message, bl);
107 DECODE_FINISH(bl);
108 }
109
110 void dump(Formatter *f) const;
111 };
112 WRITE_CLASS_ENCODER(rgw_sync_error_info)
113
114 #define DEFAULT_BACKOFF_MAX 30
115
116 class RGWSyncBackoff {
117 int cur_wait;
118 int max_secs;
119
120 void update_wait_time();
121 public:
122 explicit RGWSyncBackoff(int _max_secs = DEFAULT_BACKOFF_MAX) : cur_wait(0), max_secs(_max_secs) {}
123
124 void backoff_sleep();
125 void reset() {
126 cur_wait = 0;
127 }
128
129 void backoff(RGWCoroutine *op);
130 };
131
132 class RGWBackoffControlCR : public RGWCoroutine
133 {
134 RGWCoroutine *cr;
135 ceph::mutex lock;
136
137 RGWSyncBackoff backoff;
138 bool reset_backoff;
139
140 bool exit_on_error;
141
142 protected:
143 bool *backoff_ptr() {
144 return &reset_backoff;
145 }
146
147 ceph::mutex& cr_lock() {
148 return lock;
149 }
150
151 RGWCoroutine *get_cr() {
152 return cr;
153 }
154
155 public:
156 RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error)
157 : RGWCoroutine(_cct),
158 cr(nullptr),
159 lock(ceph::make_mutex("RGWBackoffControlCR::lock:" + stringify(this))),
160 reset_backoff(false), exit_on_error(_exit_on_error) {
161 }
162
163 ~RGWBackoffControlCR() override {
164 if (cr) {
165 cr->put();
166 }
167 }
168
169 virtual RGWCoroutine *alloc_cr() = 0;
170 virtual RGWCoroutine *alloc_finisher_cr() { return NULL; }
171
172 int operate(const DoutPrefixProvider *dpp) override;
173 };
174
175 struct RGWMetaSyncEnv {
176 const DoutPrefixProvider *dpp;
177 CephContext *cct{nullptr};
178 rgw::sal::RadosStore* store{nullptr};
179 RGWRESTConn *conn{nullptr};
180 RGWAsyncRadosProcessor *async_rados{nullptr};
181 RGWHTTPManager *http_manager{nullptr};
182 RGWSyncErrorLogger *error_logger{nullptr};
183 RGWSyncTraceManager *sync_tracer{nullptr};
184
185 RGWMetaSyncEnv() {}
186
187 void init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RadosStore* _store, RGWRESTConn *_conn,
188 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
189 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer);
190
191 std::string shard_obj_name(int shard_id);
192 std::string status_oid();
193 };
194
195 class RGWRemoteMetaLog : public RGWCoroutinesManager {
196 const DoutPrefixProvider *dpp;
197 rgw::sal::RadosStore* store;
198 RGWRESTConn *conn;
199 RGWAsyncRadosProcessor *async_rados;
200
201 RGWHTTPManager http_manager;
202 RGWMetaSyncStatusManager *status_manager;
203 RGWSyncErrorLogger *error_logger{nullptr};
204 RGWSyncTraceManager *sync_tracer{nullptr};
205
206 RGWMetaSyncCR *meta_sync_cr{nullptr};
207
208 RGWSyncBackoff backoff;
209
210 RGWMetaSyncEnv sync_env;
211
212 void init_sync_env(RGWMetaSyncEnv *env);
213 int store_sync_info(const DoutPrefixProvider *dpp, const rgw_meta_sync_info& sync_info);
214
215 std::atomic<bool> going_down = { false };
216
217 RGWSyncTraceNodeRef tn;
218
219 public:
220 RGWRemoteMetaLog(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* _store,
221 RGWAsyncRadosProcessor *async_rados,
222 RGWMetaSyncStatusManager *_sm)
223 : RGWCoroutinesManager(_store->ctx(), _store->getRados()->get_cr_registry()),
224 dpp(dpp), store(_store), conn(NULL), async_rados(async_rados),
225 http_manager(store->ctx(), completion_mgr),
226 status_manager(_sm) {}
227
228 virtual ~RGWRemoteMetaLog() override;
229
230 int init();
231 void finish();
232
233 int read_log_info(const DoutPrefixProvider *dpp, rgw_mdlog_info *log_info);
234 int read_master_log_shards_info(const DoutPrefixProvider *dpp, const std::string& master_period, std::map<int, RGWMetadataLogInfo> *shards_info);
235 int read_master_log_shards_next(const DoutPrefixProvider *dpp, const std::string& period, std::map<int, std::string> shard_markers, std::map<int, rgw_mdlog_shard_data> *result);
236 int read_sync_status(const DoutPrefixProvider *dpp, rgw_meta_sync_status *sync_status);
237 int init_sync_status(const DoutPrefixProvider *dpp);
238 int run_sync(const DoutPrefixProvider *dpp, optional_yield y);
239
240 void wakeup(int shard_id);
241
242 RGWMetaSyncEnv& get_sync_env() {
243 return sync_env;
244 }
245 };
246
247 class RGWMetaSyncStatusManager : public DoutPrefixProvider {
248 rgw::sal::RadosStore* store;
249 librados::IoCtx ioctx;
250
251 RGWRemoteMetaLog master_log;
252
253 std::map<int, rgw_raw_obj> shard_objs;
254
255 struct utime_shard {
256 real_time ts;
257 int shard_id;
258
259 utime_shard() : shard_id(-1) {}
260
261 bool operator<(const utime_shard& rhs) const {
262 if (ts == rhs.ts) {
263 return shard_id < rhs.shard_id;
264 }
265 return ts < rhs.ts;
266 }
267 };
268
269 ceph::shared_mutex ts_to_shard_lock = ceph::make_shared_mutex("ts_to_shard_lock");
270 std::map<utime_shard, int> ts_to_shard;
271 std::vector<std::string> clone_markers;
272
273 public:
274 RGWMetaSyncStatusManager(rgw::sal::RadosStore* _store, RGWAsyncRadosProcessor *async_rados)
275 : store(_store), master_log(this, store, async_rados, this)
276 {}
277
278 virtual ~RGWMetaSyncStatusManager() override;
279
280 int init(const DoutPrefixProvider *dpp);
281
282 int read_sync_status(const DoutPrefixProvider *dpp, rgw_meta_sync_status *sync_status) {
283 return master_log.read_sync_status(dpp, sync_status);
284 }
285 int init_sync_status(const DoutPrefixProvider *dpp) { return master_log.init_sync_status(dpp); }
286 int read_log_info(const DoutPrefixProvider *dpp, rgw_mdlog_info *log_info) {
287 return master_log.read_log_info(dpp, log_info);
288 }
289 int read_master_log_shards_info(const DoutPrefixProvider *dpp, const std::string& master_period, std::map<int, RGWMetadataLogInfo> *shards_info) {
290 return master_log.read_master_log_shards_info(dpp, master_period, shards_info);
291 }
292 int read_master_log_shards_next(const DoutPrefixProvider *dpp, const std::string& period, std::map<int, std::string> shard_markers, std::map<int, rgw_mdlog_shard_data> *result) {
293 return master_log.read_master_log_shards_next(dpp, period, shard_markers, result);
294 }
295
296 int run(const DoutPrefixProvider *dpp, optional_yield y) { return master_log.run_sync(dpp, y); }
297
298
299 // implements DoutPrefixProvider
300 CephContext *get_cct() const override { return store->ctx(); }
301 unsigned get_subsys() const override;
302 std::ostream& gen_prefix(std::ostream& out) const override;
303
304 void wakeup(int shard_id) { return master_log.wakeup(shard_id); }
305 void stop() {
306 master_log.finish();
307 }
308 };
309
310 class RGWOrderCallCR : public RGWCoroutine
311 {
312 public:
313 RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {}
314
315 virtual void call_cr(RGWCoroutine *_cr) = 0;
316 };
317
318 class RGWLastCallerWinsCR : public RGWOrderCallCR
319 {
320 RGWCoroutine *cr{nullptr};
321
322 public:
323 explicit RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {}
324 ~RGWLastCallerWinsCR() {
325 if (cr) {
326 cr->put();
327 }
328 }
329
330 int operate(const DoutPrefixProvider *dpp) override;
331
332 void call_cr(RGWCoroutine *_cr) override {
333 if (cr) {
334 cr->put();
335 }
336 cr = _cr;
337 }
338 };
339
340 template <class T, class K>
341 class RGWSyncShardMarkerTrack {
342 struct marker_entry {
343 uint64_t pos;
344 real_time timestamp;
345
346 marker_entry() : pos(0) {}
347 marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
348 };
349 typename std::map<T, marker_entry> pending;
350
351 std::map<T, marker_entry> finish_markers;
352
353 int window_size;
354 int updates_since_flush;
355
356 RGWOrderCallCR *order_cr{nullptr};
357
358 protected:
359 typename std::set<K> need_retry_set;
360
361 virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
362 virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
363 virtual void handle_finish(const T& marker) { }
364
365 public:
366 RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
367 virtual ~RGWSyncShardMarkerTrack() {
368 if (order_cr) {
369 order_cr->put();
370 }
371 }
372
373 bool start(const T& pos, int index_pos, const real_time& timestamp) {
374 if (pending.find(pos) != pending.end()) {
375 return false;
376 }
377 pending[pos] = marker_entry(index_pos, timestamp);
378 return true;
379 }
380
381 void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
382 finish_markers[pos] = marker_entry(index_pos, timestamp);
383 }
384
385 RGWCoroutine *finish(const T& pos) {
386 if (pending.empty()) {
387 /* can happen, due to a bug that ended up with multiple objects with the same name and version
388 * -- which can happen when versioning is enabled an the version is 'null'.
389 */
390 return NULL;
391 }
392
393 typename std::map<T, marker_entry>::iterator iter = pending.begin();
394
395 bool is_first = (pos == iter->first);
396
397 typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
398 if (pos_iter == pending.end()) {
399 /* see pending.empty() comment */
400 return NULL;
401 }
402
403 finish_markers[pos] = pos_iter->second;
404
405 pending.erase(pos);
406
407 handle_finish(pos);
408
409 updates_since_flush++;
410
411 if (is_first && (updates_since_flush >= window_size || pending.empty())) {
412 return flush();
413 }
414 return NULL;
415 }
416
417 RGWCoroutine *flush() {
418 if (finish_markers.empty()) {
419 return NULL;
420 }
421
422 typename std::map<T, marker_entry>::iterator i;
423
424 if (pending.empty()) {
425 i = finish_markers.end();
426 } else {
427 i = finish_markers.lower_bound(pending.begin()->first);
428 }
429 if (i == finish_markers.begin()) {
430 return NULL;
431 }
432 updates_since_flush = 0;
433
434 auto last = i;
435 --i;
436 const T& high_marker = i->first;
437 marker_entry& high_entry = i->second;
438 RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
439 finish_markers.erase(finish_markers.begin(), last);
440 return cr;
441 }
442
443 /*
444 * a key needs retry if it was processing when another marker that points
445 * to the same bucket shards arrives. Instead of processing it, we mark
446 * it as need_retry so that when we finish processing the original, we
447 * retry the processing on the same bucket shard, in case there are more
448 * entries to process. This closes a race that can happen.
449 */
450 bool need_retry(const K& key) {
451 return (need_retry_set.find(key) != need_retry_set.end());
452 }
453
454 void set_need_retry(const K& key) {
455 need_retry_set.insert(key);
456 }
457
458 void reset_need_retry(const K& key) {
459 need_retry_set.erase(key);
460 }
461
462 RGWCoroutine *order(RGWCoroutine *cr) {
463 /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
464 * nothing and the existing one will call the cr
465 */
466 if (order_cr && order_cr->is_done()) {
467 order_cr->put();
468 order_cr = nullptr;
469 }
470 if (!order_cr) {
471 order_cr = allocate_order_control_cr();
472 order_cr->get();
473 order_cr->call_cr(cr);
474 return order_cr;
475 }
476 order_cr->call_cr(cr);
477 return nullptr; /* don't call it a second time */
478 }
479 };
480
481 class RGWMetaSyncShardMarkerTrack;
482
483 class RGWMetaSyncSingleEntryCR : public RGWCoroutine {
484 RGWMetaSyncEnv *sync_env;
485
486 std::string raw_key;
487 std::string entry_marker;
488 RGWMDLogStatus op_status;
489
490 ssize_t pos;
491 std::string section;
492 std::string key;
493
494 int sync_status;
495
496 bufferlist md_bl;
497
498 RGWMetaSyncShardMarkerTrack *marker_tracker;
499
500 int tries;
501
502 bool error_injection;
503
504 RGWSyncTraceNodeRef tn;
505
506 public:
507 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
508 const std::string& _raw_key, const std::string& _entry_marker,
509 const RGWMDLogStatus& _op_status,
510 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent);
511
512 int operate(const DoutPrefixProvider *dpp) override;
513 };
514
515 class RGWShardCollectCR : public RGWCoroutine {
516 int cur_shard = 0;
517 int current_running;
518 int max_concurrent;
519 int status;
520
521 public:
522 RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
523 current_running(0),
524 max_concurrent(_max_concurrent),
525 status(0) {}
526
527 virtual bool spawn_next() = 0;
528 int operate(const DoutPrefixProvider *dpp) override;
529 };
530
531 // factory functions for meta sync coroutines needed in mdlog trimming
532
533 RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env,
534 const std::string& period,
535 int shard_id,
536 RGWMetadataLogInfo* info);
537
538 RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env,
539 const std::string& period,
540 int shard_id,
541 const std::string& marker,
542 uint32_t max_entries,
543 rgw_mdlog_shard_data *result);
544
545 #endif