]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync.h
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_sync.h
CommitLineData
7c673cae
FG
1#ifndef CEPH_RGW_SYNC_H
2#define CEPH_RGW_SYNC_H
3
4#include "rgw_coroutine.h"
5#include "rgw_http_client.h"
6#include "rgw_meta_sync_status.h"
7
8#include "include/stringify.h"
9#include "common/RWLock.h"
10
11#include <atomic>
12
13#define ERROR_LOGGER_SHARDS 32
14#define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
15
16struct rgw_mdlog_info {
17 uint32_t num_shards;
18 std::string period; //< period id of the master's oldest metadata log
19 epoch_t realm_epoch; //< realm epoch of oldest metadata log
20
21 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
22
23 void decode_json(JSONObj *obj);
24};
25
26
27struct rgw_mdlog_entry {
28 string id;
29 string section;
30 string name;
31 ceph::real_time timestamp;
32 RGWMetadataLogData log_data;
33
34 void decode_json(JSONObj *obj);
35
36 bool convert_from(cls_log_entry& le) {
37 id = le.id;
38 section = le.section;
39 name = le.name;
40 timestamp = le.timestamp.to_real_time();
41 try {
42 bufferlist::iterator iter = le.data.begin();
43 ::decode(log_data, iter);
44 } catch (buffer::error& err) {
45 return false;
46 }
47 return true;
48 }
49};
50
51struct rgw_mdlog_shard_data {
52 string marker;
53 bool truncated;
54 vector<rgw_mdlog_entry> entries;
55
56 void decode_json(JSONObj *obj);
57};
58
59class RGWAsyncRadosProcessor;
60class RGWMetaSyncStatusManager;
61class RGWMetaSyncCR;
62class RGWRESTConn;
63
64class RGWSyncErrorLogger {
65 RGWRados *store;
66
67 vector<string> oids;
68 int num_shards;
69
70 std::atomic<int64_t> counter = { 0 };
71public:
72 RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards);
73 RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message);
74
75 static string get_shard_oid(const string& oid_prefix, int shard_id);
76};
77
78struct rgw_sync_error_info {
79 string source_zone;
80 uint32_t error_code;
81 string message;
82
83 rgw_sync_error_info() : error_code(0) {}
84 rgw_sync_error_info(const string& _source_zone, uint32_t _error_code, const string& _message) : source_zone(_source_zone), error_code(_error_code), message(_message) {}
85
86 void encode(bufferlist& bl) const {
87 ENCODE_START(1, 1, bl);
88 ::encode(source_zone, bl);
89 ::encode(error_code, bl);
90 ::encode(message, bl);
91 ENCODE_FINISH(bl);
92 }
93
94 void decode(bufferlist::iterator& bl) {
95 DECODE_START(1, bl);
96 ::decode(source_zone, bl);
97 ::decode(error_code, bl);
98 ::decode(message, bl);
99 DECODE_FINISH(bl);
100 }
101
102 void dump(Formatter *f) const;
103};
104WRITE_CLASS_ENCODER(rgw_sync_error_info)
105
106#define DEFAULT_BACKOFF_MAX 30
107
108class RGWSyncBackoff {
109 int cur_wait;
110 int max_secs;
111
112 void update_wait_time();
113public:
114 RGWSyncBackoff(int _max_secs = DEFAULT_BACKOFF_MAX) : cur_wait(0), max_secs(_max_secs) {}
115
116 void backoff_sleep();
117 void reset() {
118 cur_wait = 0;
119 }
120
121 void backoff(RGWCoroutine *op);
122};
123
124class RGWBackoffControlCR : public RGWCoroutine
125{
126 RGWCoroutine *cr;
127 Mutex lock;
128
129 RGWSyncBackoff backoff;
130 bool reset_backoff;
131
132 bool exit_on_error;
133
134protected:
135 bool *backoff_ptr() {
136 return &reset_backoff;
137 }
138
139 Mutex& cr_lock() {
140 return lock;
141 }
142
143 RGWCoroutine *get_cr() {
144 return cr;
145 }
146
147public:
148 RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) : RGWCoroutine(_cct), cr(NULL), lock("RGWBackoffControlCR::lock:" + stringify(this)),
149 reset_backoff(false), exit_on_error(_exit_on_error) {
150 }
151
152 ~RGWBackoffControlCR() override {
153 if (cr) {
154 cr->put();
155 }
156 }
157
158 virtual RGWCoroutine *alloc_cr() = 0;
159 virtual RGWCoroutine *alloc_finisher_cr() { return NULL; }
160
161 int operate() override;
162};
163
164struct RGWMetaSyncEnv {
165 CephContext *cct;
166 RGWRados *store;
167 RGWRESTConn *conn;
168 RGWAsyncRadosProcessor *async_rados;
169 RGWHTTPManager *http_manager;
170 RGWSyncErrorLogger *error_logger;
171
172 RGWMetaSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {}
173
174 void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
175 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
176 RGWSyncErrorLogger *_error_logger);
177
178 string shard_obj_name(int shard_id);
179 string status_oid();
180};
181
182class RGWRemoteMetaLog : public RGWCoroutinesManager {
183 RGWRados *store;
184 RGWRESTConn *conn;
185 RGWAsyncRadosProcessor *async_rados;
186
187 RGWHTTPManager http_manager;
188 RGWMetaSyncStatusManager *status_manager;
189 RGWSyncErrorLogger *error_logger;
190
191 RGWMetaSyncCR *meta_sync_cr;
192
193 RGWSyncBackoff backoff;
194
195 RGWMetaSyncEnv sync_env;
196
197 void init_sync_env(RGWMetaSyncEnv *env);
198 int store_sync_info(const rgw_meta_sync_info& sync_info);
199
200 std::atomic<bool> going_down = { false };
201
202public:
203 RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
204 RGWMetaSyncStatusManager *_sm)
205 : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
206 store(_store), conn(NULL), async_rados(async_rados),
207 http_manager(store->ctx(), completion_mgr),
208 status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {}
209
210 ~RGWRemoteMetaLog() override;
211
212 int init();
213 void finish();
214
215 int read_log_info(rgw_mdlog_info *log_info);
216 int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info);
217 int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result);
218 int read_sync_status(rgw_meta_sync_status *sync_status);
219 int init_sync_status();
220 int run_sync();
221
222 void wakeup(int shard_id);
223
224 RGWMetaSyncEnv& get_sync_env() {
225 return sync_env;
226 }
227};
228
229class RGWMetaSyncStatusManager {
230 RGWRados *store;
231 librados::IoCtx ioctx;
232
233 RGWRemoteMetaLog master_log;
234
235 map<int, rgw_raw_obj> shard_objs;
236
237 struct utime_shard {
238 real_time ts;
239 int shard_id;
240
241 utime_shard() : shard_id(-1) {}
242
243 bool operator<(const utime_shard& rhs) const {
244 if (ts == rhs.ts) {
245 return shard_id < rhs.shard_id;
246 }
247 return ts < rhs.ts;
248 }
249 };
250
251 RWLock ts_to_shard_lock;
252 map<utime_shard, int> ts_to_shard;
253 vector<string> clone_markers;
254
255public:
256 RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
257 : store(_store), master_log(store, async_rados, this),
258 ts_to_shard_lock("ts_to_shard_lock") {}
259 int init();
260
261 int read_sync_status(rgw_meta_sync_status *sync_status) {
262 return master_log.read_sync_status(sync_status);
263 }
264 int init_sync_status() { return master_log.init_sync_status(); }
265 int read_log_info(rgw_mdlog_info *log_info) {
266 return master_log.read_log_info(log_info);
267 }
268 int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info) {
269 return master_log.read_master_log_shards_info(master_period, shards_info);
270 }
271 int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result) {
272 return master_log.read_master_log_shards_next(period, shard_markers, result);
273 }
274
275 int run() { return master_log.run_sync(); }
276
277 void wakeup(int shard_id) { return master_log.wakeup(shard_id); }
278 void stop() {
279 master_log.finish();
280 }
281};
282
91327a77
AA
283class RGWOrderCallCR : public RGWCoroutine
284{
285public:
286 RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {}
287
288 virtual void call_cr(RGWCoroutine *_cr) = 0;
289};
290
291class RGWLastCallerWinsCR : public RGWOrderCallCR
292{
293 RGWCoroutine *cr{nullptr};
294
295public:
296 RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {}
297 ~RGWLastCallerWinsCR() {
298 if (cr) {
299 cr->put();
300 }
301 }
302
303 int operate() override;
304
305 void call_cr(RGWCoroutine *_cr) {
306 if (cr) {
307 cr->put();
308 }
309 cr = _cr;
310 }
311};
312
7c673cae
FG
313template <class T, class K>
314class RGWSyncShardMarkerTrack {
315 struct marker_entry {
316 uint64_t pos;
317 real_time timestamp;
318
319 marker_entry() : pos(0) {}
320 marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
321 };
322 typename std::map<T, marker_entry> pending;
323
324 map<T, marker_entry> finish_markers;
325
326 int window_size;
327 int updates_since_flush;
328
91327a77 329 RGWOrderCallCR *order_cr{nullptr};
7c673cae
FG
330
331protected:
332 typename std::set<K> need_retry_set;
333
334 virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
91327a77 335 virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
7c673cae
FG
336 virtual void handle_finish(const T& marker) { }
337
338public:
339 RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
91327a77
AA
340 virtual ~RGWSyncShardMarkerTrack() {
341 if (order_cr) {
342 order_cr->put();
343 }
344 }
7c673cae
FG
345
346 bool start(const T& pos, int index_pos, const real_time& timestamp) {
347 if (pending.find(pos) != pending.end()) {
348 return false;
349 }
350 pending[pos] = marker_entry(index_pos, timestamp);
351 return true;
352 }
353
354 void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
355 finish_markers[pos] = marker_entry(index_pos, timestamp);
356 }
357
358 RGWCoroutine *finish(const T& pos) {
359 if (pending.empty()) {
360 /* can happen, due to a bug that ended up with multiple objects with the same name and version
361 * -- which can happen when versioning is enabled an the version is 'null'.
362 */
363 return NULL;
364 }
365
366 typename std::map<T, marker_entry>::iterator iter = pending.begin();
367
368 bool is_first = (pos == iter->first);
369
370 typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
371 if (pos_iter == pending.end()) {
372 /* see pending.empty() comment */
373 return NULL;
374 }
375
376 finish_markers[pos] = pos_iter->second;
377
378 pending.erase(pos);
379
380 handle_finish(pos);
381
382 updates_since_flush++;
383
384 if (is_first && (updates_since_flush >= window_size || pending.empty())) {
385 return flush();
386 }
387 return NULL;
388 }
389
390 RGWCoroutine *flush() {
391 if (finish_markers.empty()) {
392 return NULL;
393 }
394
395 typename std::map<T, marker_entry>::iterator i;
396
397 if (pending.empty()) {
398 i = finish_markers.end();
399 } else {
400 i = finish_markers.lower_bound(pending.begin()->first);
401 }
402 if (i == finish_markers.begin()) {
403 return NULL;
404 }
405 updates_since_flush = 0;
406
407 auto last = i;
408 --i;
409 const T& high_marker = i->first;
410 marker_entry& high_entry = i->second;
91327a77 411 RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
7c673cae
FG
412 finish_markers.erase(finish_markers.begin(), last);
413 return cr;
414 }
415
416 /*
417 * a key needs retry if it was processing when another marker that points
418 * to the same bucket shards arrives. Instead of processing it, we mark
419 * it as need_retry so that when we finish processing the original, we
420 * retry the processing on the same bucket shard, in case there are more
421 * entries to process. This closes a race that can happen.
422 */
423 bool need_retry(const K& key) {
424 return (need_retry_set.find(key) != need_retry_set.end());
425 }
426
427 void set_need_retry(const K& key) {
428 need_retry_set.insert(key);
429 }
430
431 void reset_need_retry(const K& key) {
432 need_retry_set.erase(key);
433 }
91327a77
AA
434
435 RGWCoroutine *order(RGWCoroutine *cr) {
436 /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
437 * nothing and the existing one will call the cr
438 */
439 if (order_cr && order_cr->is_done()) {
440 order_cr->put();
441 order_cr = nullptr;
442 }
443 if (!order_cr) {
444 order_cr = allocate_order_control_cr();
445 order_cr->get();
446 order_cr->call_cr(cr);
447 return order_cr;
448 }
449 order_cr->call_cr(cr);
450 return nullptr; /* don't call it a second time */
451 }
7c673cae
FG
452};
453
454class RGWMetaSyncShardMarkerTrack;
455
456class RGWMetaSyncSingleEntryCR : public RGWCoroutine {
457 RGWMetaSyncEnv *sync_env;
458
459 string raw_key;
460 string entry_marker;
461 RGWMDLogStatus op_status;
462
463 ssize_t pos;
464 string section;
465 string key;
466
467 int sync_status;
468
469 bufferlist md_bl;
470
471 RGWMetaSyncShardMarkerTrack *marker_tracker;
472
473 int tries;
474
475 bool error_injection;
476
477public:
478 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
479 const string& _raw_key, const string& _entry_marker,
480 const RGWMDLogStatus& _op_status,
481 RGWMetaSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct),
482 sync_env(_sync_env),
483 raw_key(_raw_key), entry_marker(_entry_marker),
484 op_status(_op_status),
485 pos(0), sync_status(0),
486 marker_tracker(_marker_tracker), tries(0) {
487 error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
488 }
489
490 int operate() override;
491};
492
493class RGWShardCollectCR : public RGWCoroutine {
494 int cur_shard;
495 int current_running;
496 int max_concurrent;
497 int status;
498
499public:
500 RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
501 current_running(0),
502 max_concurrent(_max_concurrent),
503 status(0) {}
504
505 virtual bool spawn_next() = 0;
506 int operate() override;
507};
508
509// MetaLogTrimCR factory function
510RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
511 int num_shards, utime_t interval);
512
513// factory function for mdlog trim via radosgw-admin
514RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
515 RGWHTTPManager *http,
516 int num_shards);
517
518#endif