1 #ifndef CEPH_RGW_SYNC_H
2 #define CEPH_RGW_SYNC_H
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_meta_sync_status.h"
8 #include "include/stringify.h"
9 #include "common/RWLock.h"
13 #define ERROR_LOGGER_SHARDS 32
14 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
16 struct rgw_mdlog_info
{
18 std::string period
; //< period id of the master's oldest metadata log
19 epoch_t realm_epoch
; //< realm epoch of oldest metadata log
21 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
23 void decode_json(JSONObj
*obj
);
27 struct rgw_mdlog_entry
{
31 ceph::real_time timestamp
;
32 RGWMetadataLogData log_data
;
34 void decode_json(JSONObj
*obj
);
36 bool convert_from(cls_log_entry
& le
) {
40 timestamp
= le
.timestamp
.to_real_time();
42 bufferlist::iterator iter
= le
.data
.begin();
43 ::decode(log_data
, iter
);
44 } catch (buffer::error
& err
) {
51 struct rgw_mdlog_shard_data
{
54 vector
<rgw_mdlog_entry
> entries
;
56 void decode_json(JSONObj
*obj
);
59 class RGWAsyncRadosProcessor
;
60 class RGWMetaSyncStatusManager
;
64 class RGWSyncErrorLogger
{
70 std::atomic
<int64_t> counter
= { 0 };
72 RGWSyncErrorLogger(RGWRados
*_store
, const string
&oid_prefix
, int _num_shards
);
73 RGWCoroutine
*log_error_cr(const string
& source_zone
, const string
& section
, const string
& name
, uint32_t error_code
, const string
& message
);
75 static string
get_shard_oid(const string
& oid_prefix
, int shard_id
);
78 struct rgw_sync_error_info
{
83 rgw_sync_error_info() : error_code(0) {}
84 rgw_sync_error_info(const string
& _source_zone
, uint32_t _error_code
, const string
& _message
) : source_zone(_source_zone
), error_code(_error_code
), message(_message
) {}
86 void encode(bufferlist
& bl
) const {
87 ENCODE_START(1, 1, bl
);
88 ::encode(source_zone
, bl
);
89 ::encode(error_code
, bl
);
90 ::encode(message
, bl
);
94 void decode(bufferlist::iterator
& bl
) {
96 ::decode(source_zone
, bl
);
97 ::decode(error_code
, bl
);
98 ::decode(message
, bl
);
102 void dump(Formatter
*f
) const;
104 WRITE_CLASS_ENCODER(rgw_sync_error_info
)
106 #define DEFAULT_BACKOFF_MAX 30
108 class RGWSyncBackoff
{
112 void update_wait_time();
114 RGWSyncBackoff(int _max_secs
= DEFAULT_BACKOFF_MAX
) : cur_wait(0), max_secs(_max_secs
) {}
116 void backoff_sleep();
121 void backoff(RGWCoroutine
*op
);
124 class RGWBackoffControlCR
: public RGWCoroutine
129 RGWSyncBackoff backoff
;
135 bool *backoff_ptr() {
136 return &reset_backoff
;
143 RGWCoroutine
*get_cr() {
148 RGWBackoffControlCR(CephContext
*_cct
, bool _exit_on_error
) : RGWCoroutine(_cct
), cr(NULL
), lock("RGWBackoffControlCR::lock:" + stringify(this)),
149 reset_backoff(false), exit_on_error(_exit_on_error
) {
152 ~RGWBackoffControlCR() override
{
158 virtual RGWCoroutine
*alloc_cr() = 0;
159 virtual RGWCoroutine
*alloc_finisher_cr() { return NULL
; }
161 int operate() override
;
164 struct RGWMetaSyncEnv
{
168 RGWAsyncRadosProcessor
*async_rados
;
169 RGWHTTPManager
*http_manager
;
170 RGWSyncErrorLogger
*error_logger
;
172 RGWMetaSyncEnv() : cct(NULL
), store(NULL
), conn(NULL
), async_rados(NULL
), http_manager(NULL
), error_logger(NULL
) {}
174 void init(CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
175 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
176 RGWSyncErrorLogger
*_error_logger
);
178 string
shard_obj_name(int shard_id
);
182 class RGWRemoteMetaLog
: public RGWCoroutinesManager
{
185 RGWAsyncRadosProcessor
*async_rados
;
187 RGWHTTPManager http_manager
;
188 RGWMetaSyncStatusManager
*status_manager
;
189 RGWSyncErrorLogger
*error_logger
;
191 RGWMetaSyncCR
*meta_sync_cr
;
193 RGWSyncBackoff backoff
;
195 RGWMetaSyncEnv sync_env
;
197 void init_sync_env(RGWMetaSyncEnv
*env
);
198 int store_sync_info(const rgw_meta_sync_info
& sync_info
);
200 std::atomic
<bool> going_down
= { false };
203 RGWRemoteMetaLog(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
,
204 RGWMetaSyncStatusManager
*_sm
)
205 : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()),
206 store(_store
), conn(NULL
), async_rados(async_rados
),
207 http_manager(store
->ctx(), completion_mgr
),
208 status_manager(_sm
), error_logger(NULL
), meta_sync_cr(NULL
) {}
210 ~RGWRemoteMetaLog() override
;
215 int read_log_info(rgw_mdlog_info
*log_info
);
216 int read_master_log_shards_info(const string
& master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
);
217 int read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
);
218 int read_sync_status(rgw_meta_sync_status
*sync_status
);
219 int init_sync_status();
222 void wakeup(int shard_id
);
224 RGWMetaSyncEnv
& get_sync_env() {
229 class RGWMetaSyncStatusManager
{
231 librados::IoCtx ioctx
;
233 RGWRemoteMetaLog master_log
;
235 map
<int, rgw_raw_obj
> shard_objs
;
241 utime_shard() : shard_id(-1) {}
243 bool operator<(const utime_shard
& rhs
) const {
245 return shard_id
< rhs
.shard_id
;
251 RWLock ts_to_shard_lock
;
252 map
<utime_shard
, int> ts_to_shard
;
253 vector
<string
> clone_markers
;
256 RGWMetaSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
)
257 : store(_store
), master_log(store
, async_rados
, this),
258 ts_to_shard_lock("ts_to_shard_lock") {}
261 int read_sync_status(rgw_meta_sync_status
*sync_status
) {
262 return master_log
.read_sync_status(sync_status
);
264 int init_sync_status() { return master_log
.init_sync_status(); }
265 int read_log_info(rgw_mdlog_info
*log_info
) {
266 return master_log
.read_log_info(log_info
);
268 int read_master_log_shards_info(const string
& master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
) {
269 return master_log
.read_master_log_shards_info(master_period
, shards_info
);
271 int read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
) {
272 return master_log
.read_master_log_shards_next(period
, shard_markers
, result
);
275 int run() { return master_log
.run_sync(); }
277 void wakeup(int shard_id
) { return master_log
.wakeup(shard_id
); }
283 template <class T
, class K
>
284 class RGWSyncShardMarkerTrack
{
285 struct marker_entry
{
289 marker_entry() : pos(0) {}
290 marker_entry(uint64_t _p
, const real_time
& _ts
) : pos(_p
), timestamp(_ts
) {}
292 typename
std::map
<T
, marker_entry
> pending
;
294 map
<T
, marker_entry
> finish_markers
;
297 int updates_since_flush
;
301 typename
std::set
<K
> need_retry_set
;
303 virtual RGWCoroutine
*store_marker(const T
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) = 0;
304 virtual void handle_finish(const T
& marker
) { }
307 RGWSyncShardMarkerTrack(int _window_size
) : window_size(_window_size
), updates_since_flush(0) {}
308 virtual ~RGWSyncShardMarkerTrack() {}
310 bool start(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
311 if (pending
.find(pos
) != pending
.end()) {
314 pending
[pos
] = marker_entry(index_pos
, timestamp
);
318 void try_update_high_marker(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
319 finish_markers
[pos
] = marker_entry(index_pos
, timestamp
);
322 RGWCoroutine
*finish(const T
& pos
) {
323 if (pending
.empty()) {
324 /* can happen, due to a bug that ended up with multiple objects with the same name and version
325 * -- which can happen when versioning is enabled an the version is 'null'.
330 typename
std::map
<T
, marker_entry
>::iterator iter
= pending
.begin();
332 bool is_first
= (pos
== iter
->first
);
334 typename
std::map
<T
, marker_entry
>::iterator pos_iter
= pending
.find(pos
);
335 if (pos_iter
== pending
.end()) {
336 /* see pending.empty() comment */
340 finish_markers
[pos
] = pos_iter
->second
;
346 updates_since_flush
++;
348 if (is_first
&& (updates_since_flush
>= window_size
|| pending
.empty())) {
354 RGWCoroutine
*flush() {
355 if (finish_markers
.empty()) {
359 typename
std::map
<T
, marker_entry
>::iterator i
;
361 if (pending
.empty()) {
362 i
= finish_markers
.end();
364 i
= finish_markers
.lower_bound(pending
.begin()->first
);
366 if (i
== finish_markers
.begin()) {
369 updates_since_flush
= 0;
373 const T
& high_marker
= i
->first
;
374 marker_entry
& high_entry
= i
->second
;
375 RGWCoroutine
*cr
= store_marker(high_marker
, high_entry
.pos
, high_entry
.timestamp
);
376 finish_markers
.erase(finish_markers
.begin(), last
);
381 * a key needs retry if it was processing when another marker that points
382 * to the same bucket shards arrives. Instead of processing it, we mark
383 * it as need_retry so that when we finish processing the original, we
384 * retry the processing on the same bucket shard, in case there are more
385 * entries to process. This closes a race that can happen.
387 bool need_retry(const K
& key
) {
388 return (need_retry_set
.find(key
) != need_retry_set
.end());
391 void set_need_retry(const K
& key
) {
392 need_retry_set
.insert(key
);
395 void reset_need_retry(const K
& key
) {
396 need_retry_set
.erase(key
);
400 class RGWMetaSyncShardMarkerTrack
;
402 class RGWMetaSyncSingleEntryCR
: public RGWCoroutine
{
403 RGWMetaSyncEnv
*sync_env
;
407 RGWMDLogStatus op_status
;
417 RGWMetaSyncShardMarkerTrack
*marker_tracker
;
421 bool error_injection
;
424 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv
*_sync_env
,
425 const string
& _raw_key
, const string
& _entry_marker
,
426 const RGWMDLogStatus
& _op_status
,
427 RGWMetaSyncShardMarkerTrack
*_marker_tracker
) : RGWCoroutine(_sync_env
->cct
),
429 raw_key(_raw_key
), entry_marker(_entry_marker
),
430 op_status(_op_status
),
431 pos(0), sync_status(0),
432 marker_tracker(_marker_tracker
), tries(0) {
433 error_injection
= (sync_env
->cct
->_conf
->rgw_sync_meta_inject_err_probability
> 0);
436 int operate() override
;
439 class RGWShardCollectCR
: public RGWCoroutine
{
446 RGWShardCollectCR(CephContext
*_cct
, int _max_concurrent
) : RGWCoroutine(_cct
),
448 max_concurrent(_max_concurrent
),
451 virtual bool spawn_next() = 0;
452 int operate() override
;
455 // MetaLogTrimCR factory function
456 RGWCoroutine
* create_meta_log_trim_cr(RGWRados
*store
, RGWHTTPManager
*http
,
457 int num_shards
, utime_t interval
);
459 // factory function for mdlog trim via radosgw-admin
460 RGWCoroutine
* create_admin_meta_log_trim_cr(RGWRados
*store
,
461 RGWHTTPManager
*http
,