1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_SYNC_H
5 #define CEPH_RGW_SYNC_H
9 #include "include/stringify.h"
11 #include "rgw_coroutine.h"
12 #include "rgw_http_client.h"
13 #include "rgw_metadata.h"
14 #include "rgw_meta_sync_status.h"
16 #include "rgw_sal_rados.h"
17 #include "rgw_sync_trace.h"
18 #include "rgw_mdlog.h"
20 #define ERROR_LOGGER_SHARDS 32
21 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
23 struct rgw_mdlog_info
{
25 std::string period
; //< period id of the master's oldest metadata log
26 epoch_t realm_epoch
; //< realm epoch of oldest metadata log
28 rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
30 void decode_json(JSONObj
*obj
);
34 struct rgw_mdlog_entry
{
38 ceph::real_time timestamp
;
39 RGWMetadataLogData log_data
;
41 void decode_json(JSONObj
*obj
);
43 bool convert_from(cls_log_entry
& le
) {
47 timestamp
= le
.timestamp
.to_real_time();
49 auto iter
= le
.data
.cbegin();
50 decode(log_data
, iter
);
51 } catch (buffer::error
& err
) {
58 struct rgw_mdlog_shard_data
{
61 std::vector
<rgw_mdlog_entry
> entries
;
63 void decode_json(JSONObj
*obj
);
66 class RGWAsyncRadosProcessor
;
67 class RGWMetaSyncStatusManager
;
70 class RGWSyncTraceManager
;
72 class RGWSyncErrorLogger
{
73 rgw::sal::RadosStore
* store
;
75 std::vector
<std::string
> oids
;
78 std::atomic
<int64_t> counter
= { 0 };
80 RGWSyncErrorLogger(rgw::sal::RadosStore
* _store
, const std::string
&oid_prefix
, int _num_shards
);
81 RGWCoroutine
*log_error_cr(const DoutPrefixProvider
*dpp
, const std::string
& source_zone
, const std::string
& section
, const std::string
& name
, uint32_t error_code
, const std::string
& message
);
83 static std::string
get_shard_oid(const std::string
& oid_prefix
, int shard_id
);
86 struct rgw_sync_error_info
{
87 std::string source_zone
;
91 rgw_sync_error_info() : error_code(0) {}
92 rgw_sync_error_info(const std::string
& _source_zone
, uint32_t _error_code
, const std::string
& _message
) : source_zone(_source_zone
), error_code(_error_code
), message(_message
) {}
94 void encode(bufferlist
& bl
) const {
95 ENCODE_START(1, 1, bl
);
96 encode(source_zone
, bl
);
97 encode(error_code
, bl
);
102 void decode(bufferlist::const_iterator
& bl
) {
104 decode(source_zone
, bl
);
105 decode(error_code
, bl
);
110 void dump(Formatter
*f
) const;
112 WRITE_CLASS_ENCODER(rgw_sync_error_info
)
114 #define DEFAULT_BACKOFF_MAX 30
116 class RGWSyncBackoff
{
120 void update_wait_time();
122 explicit RGWSyncBackoff(int _max_secs
= DEFAULT_BACKOFF_MAX
) : cur_wait(0), max_secs(_max_secs
) {}
124 void backoff_sleep();
129 void backoff(RGWCoroutine
*op
);
132 class RGWBackoffControlCR
: public RGWCoroutine
137 RGWSyncBackoff backoff
;
143 bool *backoff_ptr() {
144 return &reset_backoff
;
147 ceph::mutex
& cr_lock() {
151 RGWCoroutine
*get_cr() {
156 RGWBackoffControlCR(CephContext
*_cct
, bool _exit_on_error
)
157 : RGWCoroutine(_cct
),
159 lock(ceph::make_mutex("RGWBackoffControlCR::lock:" + stringify(this))),
160 reset_backoff(false), exit_on_error(_exit_on_error
) {
163 ~RGWBackoffControlCR() override
{
169 virtual RGWCoroutine
*alloc_cr() = 0;
170 virtual RGWCoroutine
*alloc_finisher_cr() { return NULL
; }
172 int operate(const DoutPrefixProvider
*dpp
) override
;
175 struct RGWMetaSyncEnv
{
176 const DoutPrefixProvider
*dpp
;
177 CephContext
*cct
{nullptr};
178 rgw::sal::RadosStore
* store
{nullptr};
179 RGWRESTConn
*conn
{nullptr};
180 RGWAsyncRadosProcessor
*async_rados
{nullptr};
181 RGWHTTPManager
*http_manager
{nullptr};
182 RGWSyncErrorLogger
*error_logger
{nullptr};
183 RGWSyncTraceManager
*sync_tracer
{nullptr};
187 void init(const DoutPrefixProvider
*_dpp
, CephContext
*_cct
, rgw::sal::RadosStore
* _store
, RGWRESTConn
*_conn
,
188 RGWAsyncRadosProcessor
*_async_rados
, RGWHTTPManager
*_http_manager
,
189 RGWSyncErrorLogger
*_error_logger
, RGWSyncTraceManager
*_sync_tracer
);
191 std::string
shard_obj_name(int shard_id
);
192 std::string
status_oid();
195 class RGWRemoteMetaLog
: public RGWCoroutinesManager
{
196 const DoutPrefixProvider
*dpp
;
197 rgw::sal::RadosStore
* store
;
199 RGWAsyncRadosProcessor
*async_rados
;
201 RGWHTTPManager http_manager
;
202 RGWMetaSyncStatusManager
*status_manager
;
203 RGWSyncErrorLogger
*error_logger
{nullptr};
204 RGWSyncTraceManager
*sync_tracer
{nullptr};
206 RGWMetaSyncCR
*meta_sync_cr
{nullptr};
208 RGWSyncBackoff backoff
;
210 RGWMetaSyncEnv sync_env
;
212 void init_sync_env(RGWMetaSyncEnv
*env
);
213 int store_sync_info(const DoutPrefixProvider
*dpp
, const rgw_meta_sync_info
& sync_info
);
215 std::atomic
<bool> going_down
= { false };
217 RGWSyncTraceNodeRef tn
;
220 RGWRemoteMetaLog(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* _store
,
221 RGWAsyncRadosProcessor
*async_rados
,
222 RGWMetaSyncStatusManager
*_sm
)
223 : RGWCoroutinesManager(_store
->ctx(), _store
->getRados()->get_cr_registry()),
224 dpp(dpp
), store(_store
), conn(NULL
), async_rados(async_rados
),
225 http_manager(store
->ctx(), completion_mgr
),
226 status_manager(_sm
) {}
228 virtual ~RGWRemoteMetaLog() override
;
233 int read_log_info(const DoutPrefixProvider
*dpp
, rgw_mdlog_info
*log_info
);
234 int read_master_log_shards_info(const DoutPrefixProvider
*dpp
, const std::string
& master_period
, std::map
<int, RGWMetadataLogInfo
> *shards_info
);
235 int read_master_log_shards_next(const DoutPrefixProvider
*dpp
, const std::string
& period
, std::map
<int, std::string
> shard_markers
, std::map
<int, rgw_mdlog_shard_data
> *result
);
236 int read_sync_status(const DoutPrefixProvider
*dpp
, rgw_meta_sync_status
*sync_status
);
237 int init_sync_status(const DoutPrefixProvider
*dpp
);
238 int run_sync(const DoutPrefixProvider
*dpp
, optional_yield y
);
240 void wakeup(int shard_id
);
242 RGWMetaSyncEnv
& get_sync_env() {
247 class RGWMetaSyncStatusManager
: public DoutPrefixProvider
{
248 rgw::sal::RadosStore
* store
;
249 librados::IoCtx ioctx
;
251 RGWRemoteMetaLog master_log
;
253 std::map
<int, rgw_raw_obj
> shard_objs
;
259 utime_shard() : shard_id(-1) {}
261 bool operator<(const utime_shard
& rhs
) const {
263 return shard_id
< rhs
.shard_id
;
269 ceph::shared_mutex ts_to_shard_lock
= ceph::make_shared_mutex("ts_to_shard_lock");
270 std::map
<utime_shard
, int> ts_to_shard
;
271 std::vector
<std::string
> clone_markers
;
274 RGWMetaSyncStatusManager(rgw::sal::RadosStore
* _store
, RGWAsyncRadosProcessor
*async_rados
)
275 : store(_store
), master_log(this, store
, async_rados
, this)
278 virtual ~RGWMetaSyncStatusManager() override
;
280 int init(const DoutPrefixProvider
*dpp
);
282 int read_sync_status(const DoutPrefixProvider
*dpp
, rgw_meta_sync_status
*sync_status
) {
283 return master_log
.read_sync_status(dpp
, sync_status
);
285 int init_sync_status(const DoutPrefixProvider
*dpp
) { return master_log
.init_sync_status(dpp
); }
286 int read_log_info(const DoutPrefixProvider
*dpp
, rgw_mdlog_info
*log_info
) {
287 return master_log
.read_log_info(dpp
, log_info
);
289 int read_master_log_shards_info(const DoutPrefixProvider
*dpp
, const std::string
& master_period
, std::map
<int, RGWMetadataLogInfo
> *shards_info
) {
290 return master_log
.read_master_log_shards_info(dpp
, master_period
, shards_info
);
292 int read_master_log_shards_next(const DoutPrefixProvider
*dpp
, const std::string
& period
, std::map
<int, std::string
> shard_markers
, std::map
<int, rgw_mdlog_shard_data
> *result
) {
293 return master_log
.read_master_log_shards_next(dpp
, period
, shard_markers
, result
);
296 int run(const DoutPrefixProvider
*dpp
, optional_yield y
) { return master_log
.run_sync(dpp
, y
); }
299 // implements DoutPrefixProvider
300 CephContext
*get_cct() const override
{ return store
->ctx(); }
301 unsigned get_subsys() const override
;
302 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
304 void wakeup(int shard_id
) { return master_log
.wakeup(shard_id
); }
310 class RGWOrderCallCR
: public RGWCoroutine
313 RGWOrderCallCR(CephContext
*cct
) : RGWCoroutine(cct
) {}
315 virtual void call_cr(RGWCoroutine
*_cr
) = 0;
318 class RGWLastCallerWinsCR
: public RGWOrderCallCR
320 RGWCoroutine
*cr
{nullptr};
323 explicit RGWLastCallerWinsCR(CephContext
*cct
) : RGWOrderCallCR(cct
) {}
324 ~RGWLastCallerWinsCR() {
330 int operate(const DoutPrefixProvider
*dpp
) override
;
332 void call_cr(RGWCoroutine
*_cr
) override
{
340 template <class T
, class K
>
341 class RGWSyncShardMarkerTrack
{
342 struct marker_entry
{
346 marker_entry() : pos(0) {}
347 marker_entry(uint64_t _p
, const real_time
& _ts
) : pos(_p
), timestamp(_ts
) {}
349 typename
std::map
<T
, marker_entry
> pending
;
351 std::map
<T
, marker_entry
> finish_markers
;
354 int updates_since_flush
;
356 RGWOrderCallCR
*order_cr
{nullptr};
359 typename
std::set
<K
> need_retry_set
;
361 virtual RGWCoroutine
*store_marker(const T
& new_marker
, uint64_t index_pos
, const real_time
& timestamp
) = 0;
362 virtual RGWOrderCallCR
*allocate_order_control_cr() = 0;
363 virtual void handle_finish(const T
& marker
) { }
366 RGWSyncShardMarkerTrack(int _window_size
) : window_size(_window_size
), updates_since_flush(0) {}
367 virtual ~RGWSyncShardMarkerTrack() {
373 bool start(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
374 if (pending
.find(pos
) != pending
.end()) {
377 pending
[pos
] = marker_entry(index_pos
, timestamp
);
381 void try_update_high_marker(const T
& pos
, int index_pos
, const real_time
& timestamp
) {
382 finish_markers
[pos
] = marker_entry(index_pos
, timestamp
);
385 RGWCoroutine
*finish(const T
& pos
) {
386 if (pending
.empty()) {
387 /* can happen, due to a bug that ended up with multiple objects with the same name and version
388 * -- which can happen when versioning is enabled an the version is 'null'.
393 typename
std::map
<T
, marker_entry
>::iterator iter
= pending
.begin();
395 bool is_first
= (pos
== iter
->first
);
397 typename
std::map
<T
, marker_entry
>::iterator pos_iter
= pending
.find(pos
);
398 if (pos_iter
== pending
.end()) {
399 /* see pending.empty() comment */
403 finish_markers
[pos
] = pos_iter
->second
;
409 updates_since_flush
++;
411 if (is_first
&& (updates_since_flush
>= window_size
|| pending
.empty())) {
417 RGWCoroutine
*flush() {
418 if (finish_markers
.empty()) {
422 typename
std::map
<T
, marker_entry
>::iterator i
;
424 if (pending
.empty()) {
425 i
= finish_markers
.end();
427 i
= finish_markers
.lower_bound(pending
.begin()->first
);
429 if (i
== finish_markers
.begin()) {
432 updates_since_flush
= 0;
436 const T
& high_marker
= i
->first
;
437 marker_entry
& high_entry
= i
->second
;
438 RGWCoroutine
*cr
= order(store_marker(high_marker
, high_entry
.pos
, high_entry
.timestamp
));
439 finish_markers
.erase(finish_markers
.begin(), last
);
444 * a key needs retry if it was processing when another marker that points
445 * to the same bucket shards arrives. Instead of processing it, we mark
446 * it as need_retry so that when we finish processing the original, we
447 * retry the processing on the same bucket shard, in case there are more
448 * entries to process. This closes a race that can happen.
450 bool need_retry(const K
& key
) {
451 return (need_retry_set
.find(key
) != need_retry_set
.end());
454 void set_need_retry(const K
& key
) {
455 need_retry_set
.insert(key
);
458 void reset_need_retry(const K
& key
) {
459 need_retry_set
.erase(key
);
462 RGWCoroutine
*order(RGWCoroutine
*cr
) {
463 /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
464 * nothing and the existing one will call the cr
466 if (order_cr
&& order_cr
->is_done()) {
471 order_cr
= allocate_order_control_cr();
473 order_cr
->call_cr(cr
);
476 order_cr
->call_cr(cr
);
477 return nullptr; /* don't call it a second time */
481 class RGWMetaSyncShardMarkerTrack
;
483 class RGWMetaSyncSingleEntryCR
: public RGWCoroutine
{
484 RGWMetaSyncEnv
*sync_env
;
487 std::string entry_marker
;
488 RGWMDLogStatus op_status
;
498 RGWMetaSyncShardMarkerTrack
*marker_tracker
;
502 bool error_injection
;
504 RGWSyncTraceNodeRef tn
;
507 RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv
*_sync_env
,
508 const std::string
& _raw_key
, const std::string
& _entry_marker
,
509 const RGWMDLogStatus
& _op_status
,
510 RGWMetaSyncShardMarkerTrack
*_marker_tracker
, const RGWSyncTraceNodeRef
& _tn_parent
);
512 int operate(const DoutPrefixProvider
*dpp
) override
;
515 class RGWShardCollectCR
: public RGWCoroutine
{
522 RGWShardCollectCR(CephContext
*_cct
, int _max_concurrent
) : RGWCoroutine(_cct
),
524 max_concurrent(_max_concurrent
),
527 virtual bool spawn_next() = 0;
528 int operate(const DoutPrefixProvider
*dpp
) override
;
531 // factory functions for meta sync coroutines needed in mdlog trimming
533 RGWCoroutine
* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv
*env
,
534 const std::string
& period
,
536 RGWMetadataLogInfo
* info
);
538 RGWCoroutine
* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv
*env
,
539 const std::string
& period
,
541 const std::string
& marker
,
542 uint32_t max_entries
,
543 rgw_mdlog_shard_data
*result
);