1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RGW_SYNC_H
5 #define CEPH_RGW_SYNC_H
9 #include "include/stringify.h"
10 #include "common/RWLock.h"
12 #include "rgw_coroutine.h"
13 #include "rgw_http_client.h"
14 #include "rgw_metadata.h"
15 #include "rgw_meta_sync_status.h"
16 #include "rgw_rados.h"
17 #include "rgw_sync_trace.h"
20 #define ERROR_LOGGER_SHARDS 32
21 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
23 struct rgw_mdlog_info
{
25 std::string period
; //< period id of the master's oldest metadata log
26 epoch_t realm_epoch
; //< realm epoch of oldest metadata log
28 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
30 void decode_json(JSONObj
*obj
);
34 struct rgw_mdlog_entry
{
38 ceph::real_time timestamp
;
39 RGWMetadataLogData log_data
;
41 void decode_json(JSONObj
*obj
);
43 bool convert_from(cls_log_entry
& le
) {
47 timestamp
= le
.timestamp
.to_real_time();
49 auto iter
= le
.data
.cbegin();
50 decode(log_data
, iter
);
51 } catch (buffer::error
& err
) {
58 struct rgw_mdlog_shard_data
{
61 vector
<rgw_mdlog_entry
> entries
;
63 void decode_json(JSONObj
*obj
);
66 class RGWAsyncRadosProcessor
;
67 class RGWMetaSyncStatusManager
;
70 class RGWSyncTraceManager
;
72 class RGWSyncErrorLogger
{
78 std::atomic
<int64_t> counter
= { 0 };
80 RGWSyncErrorLogger(RGWRados
*_store
, const string
&oid_prefix
, int _num_shards
);
81 RGWCoroutine
*log_error_cr(const string
& source_zone
, const string
& section
, const string
& name
, uint32_t error_code
, const string
& message
);
83 static string
get_shard_oid(const string
& oid_prefix
, int shard_id
);
86 struct rgw_sync_error_info
{
91 rgw_sync_error_info() : error_code(0) {}
92 rgw_sync_error_info(const string
& _source_zone
, uint32_t _error_code
, const string
& _message
) : source_zone(_source_zone
), error_code(_error_code
), message(_message
) {}
94 void encode(bufferlist
& bl
) const {
95 ENCODE_START(1, 1, bl
);
96 encode(source_zone
, bl
);
97 encode(error_code
, bl
);
102 void decode(bufferlist::const_iterator
& bl
) {
104 decode(source_zone
, bl
);
105 decode(error_code
, bl
);
110 void dump(Formatter
*f
) const;
112 WRITE_CLASS_ENCODER(rgw_sync_error_info
)
114 #define DEFAULT_BACKOFF_MAX 30
116 class RGWSyncBackoff
{
120 void update_wait_time();
122 explicit RGWSyncBackoff(int _max_secs
= DEFAULT_BACKOFF_MAX
) : cur_wait(0), max_secs(_max_secs
) {}
124 void backoff_sleep();
129 void backoff(RGWCoroutine
*op
);
132 class RGWBackoffControlCR
: public RGWCoroutine
137 RGWSyncBackoff backoff
;
143 bool *backoff_ptr() {
144 return &reset_backoff
;
151 RGWCoroutine
*get_cr() {
156 RGWBackoffControlCR(CephContext
*_cct
, bool _exit_on_error
) : RGWCoroutine(_cct
), cr(NULL
), lock("RGWBackoffControlCR::lock:" + stringify(this)),
157 reset_backoff(false), exit_on_error(_exit_on_error
) {
160 ~RGWBackoffControlCR() override
{
166 virtual RGWCoroutine
*alloc_cr() = 0;
167 virtual RGWCoroutine
*alloc_finisher_cr() { return NULL
; }
169 int operate() override
;
172 struct RGWMetaSyncEnv
{
173 const DoutPrefixProvider
*dpp
;
174 CephContext
*cct
{nullptr};
175 RGWRados
*store
{nullptr};
176 RGWRESTConn
*conn
{nullptr};
177 RGWAsyncRadosProcessor
*async_rados
{nullptr};
178 RGWHTTPManager
*http_manager
{nullptr};
179 RGWSyncErrorLogger
*error_logger
{nullptr};
180 RGWSyncTraceManager
*sync_tracer
{nullptr};
184 void init(const DoutPrefixProvider
*_dpp
, CephContext
*_cct
, RGWRados
*_store
, RGWRESTConn
*_conn
,
185 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
186 RGWSyncErrorLogger
*_error_logger
, RGWSyncTraceManager
*_sync_tracer
);
188 string
shard_obj_name(int shard_id
);
192 class RGWRemoteMetaLog
: public RGWCoroutinesManager
{
193 const DoutPrefixProvider
*dpp
;
196 RGWAsyncRadosProcessor
*async_rados
;
198 RGWHTTPManager http_manager
;
199 RGWMetaSyncStatusManager
*status_manager
;
200 RGWSyncErrorLogger
*error_logger
{nullptr};
201 RGWSyncTraceManager
*sync_tracer
{nullptr};
203 RGWMetaSyncCR
*meta_sync_cr
{nullptr};
205 RGWSyncBackoff backoff
;
207 RGWMetaSyncEnv sync_env
;
209 void init_sync_env(RGWMetaSyncEnv
*env
);
210 int store_sync_info(const rgw_meta_sync_info
& sync_info
);
212 std::atomic
<bool> going_down
= { false };
214 RGWSyncTraceNodeRef tn
;
217 RGWRemoteMetaLog(const DoutPrefixProvider
*dpp
, RGWRados
*_store
,
218 RGWAsyncRadosProcessor
*async_rados
,
219 RGWMetaSyncStatusManager
*_sm
)
220 : RGWCoroutinesManager(_store
->ctx(), _store
->get_cr_registry()),
221 dpp(dpp
), store(_store
), conn(NULL
), async_rados(async_rados
),
222 http_manager(store
->ctx(), completion_mgr
),
223 status_manager(_sm
) {}
225 ~RGWRemoteMetaLog() override
;
230 int read_log_info(rgw_mdlog_info
*log_info
);
231 int read_master_log_shards_info(const string
& master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
);
232 int read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
);
233 int read_sync_status(rgw_meta_sync_status
*sync_status
);
234 int init_sync_status();
237 void wakeup(int shard_id
);
239 RGWMetaSyncEnv
& get_sync_env() {
244 class RGWMetaSyncStatusManager
: public DoutPrefixProvider
{
246 librados::IoCtx ioctx
;
248 RGWRemoteMetaLog master_log
;
250 map
<int, rgw_raw_obj
> shard_objs
;
256 utime_shard() : shard_id(-1) {}
258 bool operator<(const utime_shard
& rhs
) const {
260 return shard_id
< rhs
.shard_id
;
266 RWLock ts_to_shard_lock
;
267 map
<utime_shard
, int> ts_to_shard
;
268 vector
<string
> clone_markers
;
271 RGWMetaSyncStatusManager(RGWRados
*_store
, RGWAsyncRadosProcessor
*async_rados
)
272 : store(_store
), master_log(this, store
, async_rados
, this),
273 ts_to_shard_lock("ts_to_shard_lock") {}
276 int read_sync_status(rgw_meta_sync_status
*sync_status
) {
277 return master_log
.read_sync_status(sync_status
);
279 int init_sync_status() { return master_log
.init_sync_status(); }
280 int read_log_info(rgw_mdlog_info
*log_info
) {
281 return master_log
.read_log_info(log_info
);
283 int read_master_log_shards_info(const string
& master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
) {
284 return master_log
.read_master_log_shards_info(master_period
, shards_info
);
286 int read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
) {
287 return master_log
.read_master_log_shards_next(period
, shard_markers
, result
);
290 int run() { return master_log
.run_sync(); }
293 // implements DoutPrefixProvider
294 CephContext
*get_cct() const override
{ return store
->ctx(); }
295 unsigned get_subsys() const override
;
296 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
298 void wakeup(int shard_id
) { return master_log
.wakeup(shard_id
); }
304 class RGWOrderCallCR
: public RGWCoroutine
307 RGWOrderCallCR(CephContext
*cct
) : RGWCoroutine(cct
) {}
309 virtual void call_cr(RGWCoroutine
*_cr
) = 0;
312 class RGWLastCallerWinsCR
: public RGWOrderCallCR
314 RGWCoroutine
*cr
{nullptr};
317 explicit RGWLastCallerWinsCR(CephContext
*cct
) : RGWOrderCallCR(cct
) {}
318 ~RGWLastCallerWinsCR() {
324 int operate() override
;
326 void call_cr(RGWCoroutine
*_cr
) override
{
334 template <class T
, class K
>
335 class RGWSyncShardMarkerTrack
{
336 struct marker_entry
{
340 marker_entry() : pos(0) {}
341 marker_entry(uint64_t _p
, const real_time
& _ts
) : pos(_p
), timestamp(_ts
) {}
343 typename
std::map
<T
, marker_entry
> pending
;
345 map
<T
, marker_entry
> finish_markers
;
348 int updates_since_flush
;
350 RGWOrderCallCR
*order_cr
{nullptr};
353 typename
std::set
<K
> need_retry_set
;
355 virtual RGWCoroutine
*store_marker(const T
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) = 0;
356 virtual RGWOrderCallCR
*allocate_order_control_cr() = 0;
357 virtual void handle_finish(const T
& marker
) { }
360 RGWSyncShardMarkerTrack(int _window_size
) : window_size(_window_size
), updates_since_flush(0) {}
361 virtual ~RGWSyncShardMarkerTrack() {
367 bool start(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
368 if (pending
.find(pos
) != pending
.end()) {
371 pending
[pos
] = marker_entry(index_pos
, timestamp
);
375 void try_update_high_marker(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
376 finish_markers
[pos
] = marker_entry(index_pos
, timestamp
);
379 RGWCoroutine
*finish(const T
& pos
) {
380 if (pending
.empty()) {
381 /* can happen, due to a bug that ended up with multiple objects with the same name and version
382 * -- which can happen when versioning is enabled an the version is 'null'.
387 typename
std::map
<T
, marker_entry
>::iterator iter
= pending
.begin();
389 bool is_first
= (pos
== iter
->first
);
391 typename
std::map
<T
, marker_entry
>::iterator pos_iter
= pending
.find(pos
);
392 if (pos_iter
== pending
.end()) {
393 /* see pending.empty() comment */
397 finish_markers
[pos
] = pos_iter
->second
;
403 updates_since_flush
++;
405 if (is_first
&& (updates_since_flush
>= window_size
|| pending
.empty())) {
411 RGWCoroutine
*flush() {
412 if (finish_markers
.empty()) {
416 typename
std::map
<T
, marker_entry
>::iterator i
;
418 if (pending
.empty()) {
419 i
= finish_markers
.end();
421 i
= finish_markers
.lower_bound(pending
.begin()->first
);
423 if (i
== finish_markers
.begin()) {
426 updates_since_flush
= 0;
430 const T
& high_marker
= i
->first
;
431 marker_entry
& high_entry
= i
->second
;
432 RGWCoroutine
*cr
= order(store_marker(high_marker
, high_entry
.pos
, high_entry
.timestamp
));
433 finish_markers
.erase(finish_markers
.begin(), last
);
438 * a key needs retry if it was processing when another marker that points
439 * to the same bucket shards arrives. Instead of processing it, we mark
440 * it as need_retry so that when we finish processing the original, we
441 * retry the processing on the same bucket shard, in case there are more
442 * entries to process. This closes a race that can happen.
444 bool need_retry(const K
& key
) {
445 return (need_retry_set
.find(key
) != need_retry_set
.end());
448 void set_need_retry(const K
& key
) {
449 need_retry_set
.insert(key
);
452 void reset_need_retry(const K
& key
) {
453 need_retry_set
.erase(key
);
456 RGWCoroutine
*order(RGWCoroutine
*cr
) {
457 /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
458 * nothing and the existing one will call the cr
460 if (order_cr
&& order_cr
->is_done()) {
465 order_cr
= allocate_order_control_cr();
467 order_cr
->call_cr(cr
);
470 order_cr
->call_cr(cr
);
471 return nullptr; /* don't call it a second time */
475 class RGWMetaSyncShardMarkerTrack
;
477 class RGWMetaSyncSingleEntryCR
: public RGWCoroutine
{
478 RGWMetaSyncEnv
*sync_env
;
482 RGWMDLogStatus op_status
;
492 RGWMetaSyncShardMarkerTrack
*marker_tracker
;
496 bool error_injection
;
498 RGWSyncTraceNodeRef tn
;
501 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv
*_sync_env
,
502 const string
& _raw_key
, const string
& _entry_marker
,
503 const RGWMDLogStatus
& _op_status
,
504 RGWMetaSyncShardMarkerTrack
*_marker_tracker
, const RGWSyncTraceNodeRef
& _tn_parent
);
506 int operate() override
;
509 class RGWShardCollectCR
: public RGWCoroutine
{
516 RGWShardCollectCR(CephContext
*_cct
, int _max_concurrent
) : RGWCoroutine(_cct
),
518 max_concurrent(_max_concurrent
),
521 virtual bool spawn_next() = 0;
522 int operate() override
;
525 // MetaLogTrimCR factory function
526 RGWCoroutine
* create_meta_log_trim_cr(const DoutPrefixProvider
*dpp
, RGWRados
*store
, RGWHTTPManager
*http
,
527 int num_shards
, utime_t interval
);
529 // factory function for mdlog trim via radosgw-admin
530 RGWCoroutine
* create_admin_meta_log_trim_cr(const DoutPrefixProvider
*dpp
, RGWRados
*store
,
531 RGWHTTPManager
*http
,