]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_sync.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#ifndef CEPH_RGW_SYNC_H
5#define CEPH_RGW_SYNC_H
6
11fdf7f2 7#include <atomic>
7c673cae
FG
8
9#include "include/stringify.h"
10#include "common/RWLock.h"
11
11fdf7f2
TL
12#include "rgw_coroutine.h"
13#include "rgw_http_client.h"
14#include "rgw_metadata.h"
15#include "rgw_meta_sync_status.h"
16#include "rgw_rados.h"
17#include "rgw_sync_trace.h"
18
7c673cae
FG
19
20#define ERROR_LOGGER_SHARDS 32
21#define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
22
23struct rgw_mdlog_info {
24 uint32_t num_shards;
25 std::string period; //< period id of the master's oldest metadata log
26 epoch_t realm_epoch; //< realm epoch of oldest metadata log
27
28 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
29
30 void decode_json(JSONObj *obj);
31};
32
33
34struct rgw_mdlog_entry {
35 string id;
36 string section;
37 string name;
38 ceph::real_time timestamp;
39 RGWMetadataLogData log_data;
40
41 void decode_json(JSONObj *obj);
42
43 bool convert_from(cls_log_entry& le) {
44 id = le.id;
45 section = le.section;
46 name = le.name;
47 timestamp = le.timestamp.to_real_time();
48 try {
11fdf7f2
TL
49 auto iter = le.data.cbegin();
50 decode(log_data, iter);
7c673cae
FG
51 } catch (buffer::error& err) {
52 return false;
53 }
54 return true;
55 }
56};
57
58struct rgw_mdlog_shard_data {
59 string marker;
60 bool truncated;
61 vector<rgw_mdlog_entry> entries;
62
63 void decode_json(JSONObj *obj);
64};
65
66class RGWAsyncRadosProcessor;
67class RGWMetaSyncStatusManager;
68class RGWMetaSyncCR;
69class RGWRESTConn;
11fdf7f2 70class RGWSyncTraceManager;
7c673cae
FG
71
72class RGWSyncErrorLogger {
73 RGWRados *store;
74
75 vector<string> oids;
76 int num_shards;
77
78 std::atomic<int64_t> counter = { 0 };
79public:
80 RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards);
81 RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message);
82
83 static string get_shard_oid(const string& oid_prefix, int shard_id);
84};
85
86struct rgw_sync_error_info {
87 string source_zone;
88 uint32_t error_code;
89 string message;
90
91 rgw_sync_error_info() : error_code(0) {}
92 rgw_sync_error_info(const string& _source_zone, uint32_t _error_code, const string& _message) : source_zone(_source_zone), error_code(_error_code), message(_message) {}
93
94 void encode(bufferlist& bl) const {
95 ENCODE_START(1, 1, bl);
11fdf7f2
TL
96 encode(source_zone, bl);
97 encode(error_code, bl);
98 encode(message, bl);
7c673cae
FG
99 ENCODE_FINISH(bl);
100 }
101
11fdf7f2 102 void decode(bufferlist::const_iterator& bl) {
7c673cae 103 DECODE_START(1, bl);
11fdf7f2
TL
104 decode(source_zone, bl);
105 decode(error_code, bl);
106 decode(message, bl);
7c673cae
FG
107 DECODE_FINISH(bl);
108 }
109
110 void dump(Formatter *f) const;
111};
112WRITE_CLASS_ENCODER(rgw_sync_error_info)
113
114#define DEFAULT_BACKOFF_MAX 30
115
116class RGWSyncBackoff {
117 int cur_wait;
118 int max_secs;
119
120 void update_wait_time();
121public:
11fdf7f2 122 explicit RGWSyncBackoff(int _max_secs = DEFAULT_BACKOFF_MAX) : cur_wait(0), max_secs(_max_secs) {}
7c673cae
FG
123
124 void backoff_sleep();
125 void reset() {
126 cur_wait = 0;
127 }
128
129 void backoff(RGWCoroutine *op);
130};
131
132class RGWBackoffControlCR : public RGWCoroutine
133{
134 RGWCoroutine *cr;
135 Mutex lock;
136
137 RGWSyncBackoff backoff;
138 bool reset_backoff;
139
140 bool exit_on_error;
141
142protected:
143 bool *backoff_ptr() {
144 return &reset_backoff;
145 }
146
147 Mutex& cr_lock() {
148 return lock;
149 }
150
151 RGWCoroutine *get_cr() {
152 return cr;
153 }
154
155public:
156 RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) : RGWCoroutine(_cct), cr(NULL), lock("RGWBackoffControlCR::lock:" + stringify(this)),
157 reset_backoff(false), exit_on_error(_exit_on_error) {
158 }
159
160 ~RGWBackoffControlCR() override {
161 if (cr) {
162 cr->put();
163 }
164 }
165
166 virtual RGWCoroutine *alloc_cr() = 0;
167 virtual RGWCoroutine *alloc_finisher_cr() { return NULL; }
168
169 int operate() override;
170};
171
172struct RGWMetaSyncEnv {
11fdf7f2
TL
173 const DoutPrefixProvider *dpp;
174 CephContext *cct{nullptr};
175 RGWRados *store{nullptr};
176 RGWRESTConn *conn{nullptr};
177 RGWAsyncRadosProcessor *async_rados{nullptr};
178 RGWHTTPManager *http_manager{nullptr};
179 RGWSyncErrorLogger *error_logger{nullptr};
180 RGWSyncTraceManager *sync_tracer{nullptr};
181
182 RGWMetaSyncEnv() {}
183
184 void init(const DoutPrefixProvider *_dpp, CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
7c673cae 185 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
11fdf7f2 186 RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer);
7c673cae
FG
187
188 string shard_obj_name(int shard_id);
189 string status_oid();
190};
191
192class RGWRemoteMetaLog : public RGWCoroutinesManager {
11fdf7f2 193 const DoutPrefixProvider *dpp;
7c673cae
FG
194 RGWRados *store;
195 RGWRESTConn *conn;
196 RGWAsyncRadosProcessor *async_rados;
197
198 RGWHTTPManager http_manager;
199 RGWMetaSyncStatusManager *status_manager;
11fdf7f2
TL
200 RGWSyncErrorLogger *error_logger{nullptr};
201 RGWSyncTraceManager *sync_tracer{nullptr};
7c673cae 202
11fdf7f2 203 RGWMetaSyncCR *meta_sync_cr{nullptr};
7c673cae
FG
204
205 RGWSyncBackoff backoff;
206
207 RGWMetaSyncEnv sync_env;
208
209 void init_sync_env(RGWMetaSyncEnv *env);
210 int store_sync_info(const rgw_meta_sync_info& sync_info);
211
212 std::atomic<bool> going_down = { false };
213
11fdf7f2
TL
214 RGWSyncTraceNodeRef tn;
215
7c673cae 216public:
11fdf7f2
TL
217 RGWRemoteMetaLog(const DoutPrefixProvider *dpp, RGWRados *_store,
218 RGWAsyncRadosProcessor *async_rados,
7c673cae
FG
219 RGWMetaSyncStatusManager *_sm)
220 : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
11fdf7f2 221 dpp(dpp), store(_store), conn(NULL), async_rados(async_rados),
7c673cae 222 http_manager(store->ctx(), completion_mgr),
11fdf7f2 223 status_manager(_sm) {}
7c673cae
FG
224
225 ~RGWRemoteMetaLog() override;
226
227 int init();
228 void finish();
229
230 int read_log_info(rgw_mdlog_info *log_info);
231 int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info);
232 int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result);
233 int read_sync_status(rgw_meta_sync_status *sync_status);
234 int init_sync_status();
235 int run_sync();
236
237 void wakeup(int shard_id);
238
239 RGWMetaSyncEnv& get_sync_env() {
240 return sync_env;
241 }
242};
243
11fdf7f2 244class RGWMetaSyncStatusManager : public DoutPrefixProvider {
7c673cae
FG
245 RGWRados *store;
246 librados::IoCtx ioctx;
247
248 RGWRemoteMetaLog master_log;
249
250 map<int, rgw_raw_obj> shard_objs;
251
252 struct utime_shard {
253 real_time ts;
254 int shard_id;
255
256 utime_shard() : shard_id(-1) {}
257
258 bool operator<(const utime_shard& rhs) const {
259 if (ts == rhs.ts) {
260 return shard_id < rhs.shard_id;
261 }
262 return ts < rhs.ts;
263 }
264 };
265
266 RWLock ts_to_shard_lock;
267 map<utime_shard, int> ts_to_shard;
268 vector<string> clone_markers;
269
270public:
271 RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
11fdf7f2 272 : store(_store), master_log(this, store, async_rados, this),
7c673cae
FG
273 ts_to_shard_lock("ts_to_shard_lock") {}
274 int init();
275
276 int read_sync_status(rgw_meta_sync_status *sync_status) {
277 return master_log.read_sync_status(sync_status);
278 }
279 int init_sync_status() { return master_log.init_sync_status(); }
280 int read_log_info(rgw_mdlog_info *log_info) {
281 return master_log.read_log_info(log_info);
282 }
283 int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info) {
284 return master_log.read_master_log_shards_info(master_period, shards_info);
285 }
286 int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result) {
287 return master_log.read_master_log_shards_next(period, shard_markers, result);
288 }
289
290 int run() { return master_log.run_sync(); }
291
11fdf7f2
TL
292
293 // implements DoutPrefixProvider
294 CephContext *get_cct() const override { return store->ctx(); }
295 unsigned get_subsys() const override;
296 std::ostream& gen_prefix(std::ostream& out) const override;
297
7c673cae
FG
298 void wakeup(int shard_id) { return master_log.wakeup(shard_id); }
299 void stop() {
300 master_log.finish();
301 }
302};
303
91327a77
AA
304class RGWOrderCallCR : public RGWCoroutine
305{
306public:
307 RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {}
308
309 virtual void call_cr(RGWCoroutine *_cr) = 0;
310};
311
312class RGWLastCallerWinsCR : public RGWOrderCallCR
313{
314 RGWCoroutine *cr{nullptr};
315
316public:
11fdf7f2 317 explicit RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {}
91327a77
AA
318 ~RGWLastCallerWinsCR() {
319 if (cr) {
320 cr->put();
321 }
322 }
323
324 int operate() override;
325
11fdf7f2 326 void call_cr(RGWCoroutine *_cr) override {
91327a77
AA
327 if (cr) {
328 cr->put();
329 }
330 cr = _cr;
331 }
332};
333
7c673cae
FG
334template <class T, class K>
335class RGWSyncShardMarkerTrack {
336 struct marker_entry {
337 uint64_t pos;
338 real_time timestamp;
339
340 marker_entry() : pos(0) {}
341 marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
342 };
343 typename std::map<T, marker_entry> pending;
344
345 map<T, marker_entry> finish_markers;
346
347 int window_size;
348 int updates_since_flush;
349
91327a77 350 RGWOrderCallCR *order_cr{nullptr};
7c673cae
FG
351
352protected:
353 typename std::set<K> need_retry_set;
354
355 virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
91327a77 356 virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
7c673cae
FG
357 virtual void handle_finish(const T& marker) { }
358
359public:
360 RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
91327a77
AA
361 virtual ~RGWSyncShardMarkerTrack() {
362 if (order_cr) {
363 order_cr->put();
364 }
365 }
7c673cae
FG
366
367 bool start(const T& pos, int index_pos, const real_time& timestamp) {
368 if (pending.find(pos) != pending.end()) {
369 return false;
370 }
371 pending[pos] = marker_entry(index_pos, timestamp);
372 return true;
373 }
374
375 void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
376 finish_markers[pos] = marker_entry(index_pos, timestamp);
377 }
378
379 RGWCoroutine *finish(const T& pos) {
380 if (pending.empty()) {
381 /* can happen, due to a bug that ended up with multiple objects with the same name and version
382 * -- which can happen when versioning is enabled an the version is 'null'.
383 */
384 return NULL;
385 }
386
387 typename std::map<T, marker_entry>::iterator iter = pending.begin();
388
389 bool is_first = (pos == iter->first);
390
391 typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
392 if (pos_iter == pending.end()) {
393 /* see pending.empty() comment */
394 return NULL;
395 }
396
397 finish_markers[pos] = pos_iter->second;
398
399 pending.erase(pos);
400
401 handle_finish(pos);
402
403 updates_since_flush++;
404
405 if (is_first && (updates_since_flush >= window_size || pending.empty())) {
406 return flush();
407 }
408 return NULL;
409 }
410
411 RGWCoroutine *flush() {
412 if (finish_markers.empty()) {
413 return NULL;
414 }
415
416 typename std::map<T, marker_entry>::iterator i;
417
418 if (pending.empty()) {
419 i = finish_markers.end();
420 } else {
421 i = finish_markers.lower_bound(pending.begin()->first);
422 }
423 if (i == finish_markers.begin()) {
424 return NULL;
425 }
426 updates_since_flush = 0;
427
428 auto last = i;
429 --i;
430 const T& high_marker = i->first;
431 marker_entry& high_entry = i->second;
91327a77 432 RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
7c673cae
FG
433 finish_markers.erase(finish_markers.begin(), last);
434 return cr;
435 }
436
437 /*
438 * a key needs retry if it was processing when another marker that points
439 * to the same bucket shards arrives. Instead of processing it, we mark
440 * it as need_retry so that when we finish processing the original, we
441 * retry the processing on the same bucket shard, in case there are more
442 * entries to process. This closes a race that can happen.
443 */
444 bool need_retry(const K& key) {
445 return (need_retry_set.find(key) != need_retry_set.end());
446 }
447
448 void set_need_retry(const K& key) {
449 need_retry_set.insert(key);
450 }
451
452 void reset_need_retry(const K& key) {
453 need_retry_set.erase(key);
454 }
91327a77
AA
455
456 RGWCoroutine *order(RGWCoroutine *cr) {
457 /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
458 * nothing and the existing one will call the cr
459 */
460 if (order_cr && order_cr->is_done()) {
461 order_cr->put();
462 order_cr = nullptr;
463 }
464 if (!order_cr) {
465 order_cr = allocate_order_control_cr();
466 order_cr->get();
467 order_cr->call_cr(cr);
468 return order_cr;
469 }
470 order_cr->call_cr(cr);
471 return nullptr; /* don't call it a second time */
472 }
7c673cae
FG
473};
474
475class RGWMetaSyncShardMarkerTrack;
476
477class RGWMetaSyncSingleEntryCR : public RGWCoroutine {
478 RGWMetaSyncEnv *sync_env;
479
480 string raw_key;
481 string entry_marker;
482 RGWMDLogStatus op_status;
483
484 ssize_t pos;
485 string section;
486 string key;
487
488 int sync_status;
489
490 bufferlist md_bl;
491
492 RGWMetaSyncShardMarkerTrack *marker_tracker;
493
494 int tries;
495
496 bool error_injection;
497
11fdf7f2
TL
498 RGWSyncTraceNodeRef tn;
499
7c673cae
FG
500public:
501 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
11fdf7f2 502 const string& _raw_key, const string& _entry_marker,
7c673cae 503 const RGWMDLogStatus& _op_status,
11fdf7f2 504 RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent);
7c673cae
FG
505
506 int operate() override;
507};
508
509class RGWShardCollectCR : public RGWCoroutine {
510 int cur_shard;
511 int current_running;
512 int max_concurrent;
513 int status;
514
515public:
516 RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
517 current_running(0),
518 max_concurrent(_max_concurrent),
519 status(0) {}
520
521 virtual bool spawn_next() = 0;
522 int operate() override;
523};
524
525// MetaLogTrimCR factory function
11fdf7f2 526RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store, RGWHTTPManager *http,
7c673cae
FG
527 int num_shards, utime_t interval);
528
529// factory function for mdlog trim via radosgw-admin
11fdf7f2 530RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, RGWRados *store,
7c673cae
FG
531 RGWHTTPManager *http,
532 int num_shards);
533
534#endif