1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_SYNC_H
5 #define CEPH_RGW_SYNC_H
9 #include "include/stringify.h"
10 #include "common/RWLock.h"
12 #include "rgw_coroutine.h"
13 #include "rgw_http_client.h"
14 #include "rgw_metadata.h"
15 #include "rgw_meta_sync_status.h"
17 #include "rgw_sync_trace.h"
18 #include "rgw_mdlog.h"
20 namespace rgw
{ namespace sal
{
24 #define ERROR_LOGGER_SHARDS 32
25 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
27 struct rgw_mdlog_info
{
29 std::string period
; //< period id of the master's oldest metadata log
30 epoch_t realm_epoch
; //< realm epoch of oldest metadata log
32 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
34 void decode_json(JSONObj
*obj
);
38 struct rgw_mdlog_entry
{
42 ceph::real_time timestamp
;
43 RGWMetadataLogData log_data
;
45 void decode_json(JSONObj
*obj
);
47 bool convert_from(cls_log_entry
& le
) {
51 timestamp
= le
.timestamp
.to_real_time();
53 auto iter
= le
.data
.cbegin();
54 decode(log_data
, iter
);
55 } catch (buffer::error
& err
) {
62 struct rgw_mdlog_shard_data
{
65 vector
<rgw_mdlog_entry
> entries
;
67 void decode_json(JSONObj
*obj
);
70 class RGWAsyncRadosProcessor
;
71 class RGWMetaSyncStatusManager
;
74 class RGWSyncTraceManager
;
76 class RGWSyncErrorLogger
{
77 rgw::sal::RGWRadosStore
*store
;
82 std::atomic
<int64_t> counter
= { 0 };
84 RGWSyncErrorLogger(rgw::sal::RGWRadosStore
*_store
, const string
&oid_prefix
, int _num_shards
);
85 RGWCoroutine
*log_error_cr(const string
& source_zone
, const string
& section
, const string
& name
, uint32_t error_code
, const string
& message
);
87 static string
get_shard_oid(const string
& oid_prefix
, int shard_id
);
90 struct rgw_sync_error_info
{
95 rgw_sync_error_info() : error_code(0) {}
96 rgw_sync_error_info(const string
& _source_zone
, uint32_t _error_code
, const string
& _message
) : source_zone(_source_zone
), error_code(_error_code
), message(_message
) {}
98 void encode(bufferlist
& bl
) const {
99 ENCODE_START(1, 1, bl
);
100 encode(source_zone
, bl
);
101 encode(error_code
, bl
);
106 void decode(bufferlist::const_iterator
& bl
) {
108 decode(source_zone
, bl
);
109 decode(error_code
, bl
);
114 void dump(Formatter
*f
) const;
116 WRITE_CLASS_ENCODER(rgw_sync_error_info
)
118 #define DEFAULT_BACKOFF_MAX 30
120 class RGWSyncBackoff
{
124 void update_wait_time();
126 explicit RGWSyncBackoff(int _max_secs
= DEFAULT_BACKOFF_MAX
) : cur_wait(0), max_secs(_max_secs
) {}
128 void backoff_sleep();
133 void backoff(RGWCoroutine
*op
);
136 class RGWBackoffControlCR
: public RGWCoroutine
141 RGWSyncBackoff backoff
;
147 bool *backoff_ptr() {
148 return &reset_backoff
;
151 ceph::mutex
& cr_lock() {
155 RGWCoroutine
*get_cr() {
160 RGWBackoffControlCR(CephContext
*_cct
, bool _exit_on_error
)
161 : RGWCoroutine(_cct
),
163 lock(ceph::make_mutex("RGWBackoffControlCR::lock:" + stringify(this))),
164 reset_backoff(false), exit_on_error(_exit_on_error
) {
167 ~RGWBackoffControlCR() override
{
173 virtual RGWCoroutine
*alloc_cr() = 0;
174 virtual RGWCoroutine
*alloc_finisher_cr() { return NULL
; }
176 int operate() override
;
179 struct RGWMetaSyncEnv
{
180 const DoutPrefixProvider
*dpp
;
181 CephContext
*cct
{nullptr};
182 rgw::sal::RGWRadosStore
*store
{nullptr};
183 RGWRESTConn
*conn
{nullptr};
184 RGWAsyncRadosProcessor
*async_rados
{nullptr};
185 RGWHTTPManager
*http_manager
{nullptr};
186 RGWSyncErrorLogger
*error_logger
{nullptr};
187 RGWSyncTraceManager
*sync_tracer
{nullptr};
191 void init(const DoutPrefixProvider
*_dpp
, CephContext
*_cct
, rgw::sal::RGWRadosStore
*_store
, RGWRESTConn
*_conn
,
192 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
193 RGWSyncErrorLogger
*_error_logger
, RGWSyncTraceManager
*_sync_tracer
);
195 string
shard_obj_name(int shard_id
);
199 class RGWRemoteMetaLog
: public RGWCoroutinesManager
{
200 const DoutPrefixProvider
*dpp
;
201 rgw::sal::RGWRadosStore
*store
;
203 RGWAsyncRadosProcessor
*async_rados
;
205 RGWHTTPManager http_manager
;
206 RGWMetaSyncStatusManager
*status_manager
;
207 RGWSyncErrorLogger
*error_logger
{nullptr};
208 RGWSyncTraceManager
*sync_tracer
{nullptr};
210 RGWMetaSyncCR
*meta_sync_cr
{nullptr};
212 RGWSyncBackoff backoff
;
214 RGWMetaSyncEnv sync_env
;
216 void init_sync_env(RGWMetaSyncEnv
*env
);
217 int store_sync_info(const rgw_meta_sync_info
& sync_info
);
219 std::atomic
<bool> going_down
= { false };
221 RGWSyncTraceNodeRef tn
;
224 RGWRemoteMetaLog(const DoutPrefixProvider
*dpp
, rgw::sal::RGWRadosStore
*_store
,
225 RGWAsyncRadosProcessor
*async_rados
,
226 RGWMetaSyncStatusManager
*_sm
)
227 : RGWCoroutinesManager(_store
->ctx(), _store
->getRados()->get_cr_registry()),
228 dpp(dpp
), store(_store
), conn(NULL
), async_rados(async_rados
),
229 http_manager(store
->ctx(), completion_mgr
),
230 status_manager(_sm
) {}
232 ~RGWRemoteMetaLog() override
;
237 int read_log_info(rgw_mdlog_info
*log_info
);
238 int read_master_log_shards_info(const string
& master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
);
239 int read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
);
240 int read_sync_status(rgw_meta_sync_status
*sync_status
);
241 int init_sync_status();
244 void wakeup(int shard_id
);
246 RGWMetaSyncEnv
& get_sync_env() {
251 class RGWMetaSyncStatusManager
: public DoutPrefixProvider
{
252 rgw::sal::RGWRadosStore
*store
;
253 librados::IoCtx ioctx
;
255 RGWRemoteMetaLog master_log
;
257 map
<int, rgw_raw_obj
> shard_objs
;
263 utime_shard() : shard_id(-1) {}
265 bool operator<(const utime_shard
& rhs
) const {
267 return shard_id
< rhs
.shard_id
;
273 ceph::shared_mutex ts_to_shard_lock
= ceph::make_shared_mutex("ts_to_shard_lock");
274 map
<utime_shard
, int> ts_to_shard
;
275 vector
<string
> clone_markers
;
278 RGWMetaSyncStatusManager(rgw::sal::RGWRadosStore
*_store
, RGWAsyncRadosProcessor
*async_rados
)
279 : store(_store
), master_log(this, store
, async_rados
, this)
283 int read_sync_status(rgw_meta_sync_status
*sync_status
) {
284 return master_log
.read_sync_status(sync_status
);
286 int init_sync_status() { return master_log
.init_sync_status(); }
287 int read_log_info(rgw_mdlog_info
*log_info
) {
288 return master_log
.read_log_info(log_info
);
290 int read_master_log_shards_info(const string
& master_period
, map
<int, RGWMetadataLogInfo
> *shards_info
) {
291 return master_log
.read_master_log_shards_info(master_period
, shards_info
);
293 int read_master_log_shards_next(const string
& period
, map
<int, string
> shard_markers
, map
<int, rgw_mdlog_shard_data
> *result
) {
294 return master_log
.read_master_log_shards_next(period
, shard_markers
, result
);
297 int run() { return master_log
.run_sync(); }
300 // implements DoutPrefixProvider
301 CephContext
*get_cct() const override
{ return store
->ctx(); }
302 unsigned get_subsys() const override
;
303 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
305 void wakeup(int shard_id
) { return master_log
.wakeup(shard_id
); }
311 class RGWOrderCallCR
: public RGWCoroutine
314 RGWOrderCallCR(CephContext
*cct
) : RGWCoroutine(cct
) {}
316 virtual void call_cr(RGWCoroutine
*_cr
) = 0;
319 class RGWLastCallerWinsCR
: public RGWOrderCallCR
321 RGWCoroutine
*cr
{nullptr};
324 explicit RGWLastCallerWinsCR(CephContext
*cct
) : RGWOrderCallCR(cct
) {}
325 ~RGWLastCallerWinsCR() {
331 int operate() override
;
333 void call_cr(RGWCoroutine
*_cr
) override
{
341 template <class T
, class K
>
342 class RGWSyncShardMarkerTrack
{
343 struct marker_entry
{
347 marker_entry() : pos(0) {}
348 marker_entry(uint64_t _p
, const real_time
& _ts
) : pos(_p
), timestamp(_ts
) {}
350 typename
std::map
<T
, marker_entry
> pending
;
352 map
<T
, marker_entry
> finish_markers
;
355 int updates_since_flush
;
357 RGWOrderCallCR
*order_cr
{nullptr};
360 typename
std::set
<K
> need_retry_set
;
362 virtual RGWCoroutine
*store_marker(const T
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) = 0;
363 virtual RGWOrderCallCR
*allocate_order_control_cr() = 0;
364 virtual void handle_finish(const T
& marker
) { }
367 RGWSyncShardMarkerTrack(int _window_size
) : window_size(_window_size
), updates_since_flush(0) {}
368 virtual ~RGWSyncShardMarkerTrack() {
374 bool start(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
375 if (pending
.find(pos
) != pending
.end()) {
378 pending
[pos
] = marker_entry(index_pos
, timestamp
);
382 void try_update_high_marker(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
383 finish_markers
[pos
] = marker_entry(index_pos
, timestamp
);
386 RGWCoroutine
*finish(const T
& pos
) {
387 if (pending
.empty()) {
388 /* can happen, due to a bug that ended up with multiple objects with the same name and version
389 * -- which can happen when versioning is enabled an the version is 'null'.
394 typename
std::map
<T
, marker_entry
>::iterator iter
= pending
.begin();
396 bool is_first
= (pos
== iter
->first
);
398 typename
std::map
<T
, marker_entry
>::iterator pos_iter
= pending
.find(pos
);
399 if (pos_iter
== pending
.end()) {
400 /* see pending.empty() comment */
404 finish_markers
[pos
] = pos_iter
->second
;
410 updates_since_flush
++;
412 if (is_first
&& (updates_since_flush
>= window_size
|| pending
.empty())) {
418 RGWCoroutine
*flush() {
419 if (finish_markers
.empty()) {
423 typename
std::map
<T
, marker_entry
>::iterator i
;
425 if (pending
.empty()) {
426 i
= finish_markers
.end();
428 i
= finish_markers
.lower_bound(pending
.begin()->first
);
430 if (i
== finish_markers
.begin()) {
433 updates_since_flush
= 0;
437 const T
& high_marker
= i
->first
;
438 marker_entry
& high_entry
= i
->second
;
439 RGWCoroutine
*cr
= order(store_marker(high_marker
, high_entry
.pos
, high_entry
.timestamp
));
440 finish_markers
.erase(finish_markers
.begin(), last
);
445 * a key needs retry if it was processing when another marker that points
446 * to the same bucket shards arrives. Instead of processing it, we mark
447 * it as need_retry so that when we finish processing the original, we
448 * retry the processing on the same bucket shard, in case there are more
449 * entries to process. This closes a race that can happen.
451 bool need_retry(const K
& key
) {
452 return (need_retry_set
.find(key
) != need_retry_set
.end());
455 void set_need_retry(const K
& key
) {
456 need_retry_set
.insert(key
);
459 void reset_need_retry(const K
& key
) {
460 need_retry_set
.erase(key
);
463 RGWCoroutine
*order(RGWCoroutine
*cr
) {
464 /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
465 * nothing and the existing one will call the cr
467 if (order_cr
&& order_cr
->is_done()) {
472 order_cr
= allocate_order_control_cr();
474 order_cr
->call_cr(cr
);
477 order_cr
->call_cr(cr
);
478 return nullptr; /* don't call it a second time */
482 class RGWMetaSyncShardMarkerTrack
;
484 class RGWMetaSyncSingleEntryCR
: public RGWCoroutine
{
485 RGWMetaSyncEnv
*sync_env
;
489 RGWMDLogStatus op_status
;
499 RGWMetaSyncShardMarkerTrack
*marker_tracker
;
503 bool error_injection
;
505 RGWSyncTraceNodeRef tn
;
508 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv
*_sync_env
,
509 const string
& _raw_key
, const string
& _entry_marker
,
510 const RGWMDLogStatus
& _op_status
,
511 RGWMetaSyncShardMarkerTrack
*_marker_tracker
, const RGWSyncTraceNodeRef
& _tn_parent
);
513 int operate() override
;
516 class RGWShardCollectCR
: public RGWCoroutine
{
523 RGWShardCollectCR(CephContext
*_cct
, int _max_concurrent
) : RGWCoroutine(_cct
),
525 max_concurrent(_max_concurrent
),
528 virtual bool spawn_next() = 0;
529 int operate() override
;
532 // factory functions for meta sync coroutines needed in mdlog trimming
534 RGWCoroutine
* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv
*env
,
535 const std::string
& period
,
537 RGWMetadataLogInfo
* info
);
539 RGWCoroutine
* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv
*env
,
540 const std::string
& period
,
542 const std::string
& marker
,
543 uint32_t max_entries
,
544 rgw_mdlog_shard_data
*result
);