1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
5 #define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
7 #include "tools/rbd_mirror/image_replayer/Replayer.h"
8 #include "include/utime.h"
9 #include "common/AsyncOpTracker.h"
10 #include "common/ceph_mutex.h"
11 #include "common/RefCountedObj.h"
12 #include "cls/journal/cls_journal_types.h"
13 #include "journal/ReplayEntry.h"
14 #include "librbd/ImageCtx.h"
15 #include "librbd/journal/Types.h"
16 #include "librbd/journal/TypeTraits.h"
18 #include <type_traits>
20 namespace journal
{ class Journaler
; }
24 namespace journal
{ template <typename I
> class Replay
; }
31 template <typename
> struct Threads
;
33 namespace image_replayer
{
35 struct ReplayerListener
;
39 template <typename
> class EventPreprocessor
;
40 template <typename
> class ReplayStatusFormatter
;
41 template <typename
> class StateBuilder
;
43 template <typename ImageCtxT
>
44 class Replayer
: public image_replayer::Replayer
{
46 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::Journaler Journaler
;
48 static Replayer
* create(
49 Threads
<ImageCtxT
>* threads
,
50 const std::string
& local_mirror_uuid
,
51 StateBuilder
<ImageCtxT
>* state_builder
,
52 ReplayerListener
* replayer_listener
) {
53 return new Replayer(threads
, local_mirror_uuid
, state_builder
,
58 Threads
<ImageCtxT
>* threads
,
59 const std::string
& local_mirror_uuid
,
60 StateBuilder
<ImageCtxT
>* state_builder
,
61 ReplayerListener
* replayer_listener
);
64 void destroy() override
{
68 void init(Context
* on_finish
) override
;
69 void shut_down(Context
* on_finish
) override
;
71 void flush(Context
* on_finish
) override
;
73 bool get_replay_status(std::string
* description
, Context
* on_finish
) override
;
75 bool is_replaying() const override
{
76 std::unique_lock locker
{m_lock
};
77 return (m_state
== STATE_REPLAYING
);
80 bool is_resync_requested() const override
{
81 std::unique_lock
locker(m_lock
);
82 return m_resync_requested
;
85 int get_error_code() const override
{
86 std::unique_lock
locker(m_lock
);
90 std::string
get_error_description() const override
{
91 std::unique_lock
locker(m_lock
);
92 return m_error_description
;
95 std::string
get_image_spec() const {
96 std::unique_lock
locker(m_lock
);
107 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * *
110 * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * *
112 * | /--------------------------------------------\ *
114 * v v (asok flush) | *
115 * REPLAYING -------------> LOCAL_REPLAY_FLUSH | *
118 * | | FLUSH_COMMIT_POSITION | *
120 * | | \--------------------/| *
122 * | | (entries available) | *
123 * | \-----------> REPLAY_READY | *
125 * | | (skip if not | *
126 * | v needed) (error) *
127 * | REPLAY_FLUSH * * * * * * * * * *
129 * | | (skip if not | * *
130 * | v needed) (error) * *
131 * | GET_REMOTE_TAG * * * * * * * * *
133 * | | (skip if not | * *
134 * | v needed) (error) * *
135 * | ALLOCATE_LOCAL_TAG * * * * * * *
138 * | PREPROCESS_ENTRY * * * * * * * *
141 * | PROCESS_ENTRY * * * * * * * * * *
143 * | \---------------------/ * *
145 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * *
148 * SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
154 * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * *
156 * v (skip if not started)
157 * STOP_REMOTE_JOURNALER_REPLAY
160 * WAIT_FOR_IN_FLIGHT_OPS
168 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::ReplayEntry ReplayEntry
;
176 struct C_ReplayCommitted
;
177 struct RemoteJournalerListener
;
178 struct RemoteReplayHandler
;
179 struct LocalJournalListener
;
181 Threads
<ImageCtxT
>* m_threads
;
182 std::string m_local_mirror_uuid
;
183 StateBuilder
<ImageCtxT
>* m_state_builder
;
184 ReplayerListener
* m_replayer_listener
;
186 mutable ceph::mutex m_lock
;
188 std::string m_image_spec
;
189 Context
* m_on_init_shutdown
= nullptr;
191 State m_state
= STATE_INIT
;
192 int m_error_code
= 0;
193 std::string m_error_description
;
194 bool m_resync_requested
= false;
196 ceph::ref_t
<typename
std::remove_pointer
<decltype(ImageCtxT::journal
)>::type
>
198 RemoteJournalerListener
* m_remote_listener
= nullptr;
200 librbd::journal::Replay
<ImageCtxT
>* m_local_journal_replay
= nullptr;
201 EventPreprocessor
<ImageCtxT
>* m_event_preprocessor
= nullptr;
202 ReplayStatusFormatter
<ImageCtxT
>* m_replay_status_formatter
= nullptr;
203 RemoteReplayHandler
* m_remote_replay_handler
= nullptr;
204 LocalJournalListener
* m_local_journal_listener
= nullptr;
206 PerfCounters
*m_perf_counters
= nullptr;
208 ReplayEntry m_replay_entry
;
209 uint64_t m_replay_bytes
= 0;
210 utime_t m_replay_start_time
;
211 bool m_replay_tag_valid
= false;
212 uint64_t m_replay_tag_tid
= 0;
213 cls::journal::Tag m_replay_tag
;
214 librbd::journal::TagData m_replay_tag_data
;
215 librbd::journal::EventEntry m_event_entry
;
217 AsyncOpTracker m_event_replay_tracker
;
218 Context
*m_delayed_preprocess_task
= nullptr;
220 AsyncOpTracker m_in_flight_op_tracker
;
221 Context
*m_flush_local_replay_task
= nullptr;
223 void handle_remote_journal_metadata_updated();
225 void schedule_flush_local_replay_task();
226 void cancel_flush_local_replay_task();
227 void handle_flush_local_replay_task(int r
);
229 void flush_local_replay(Context
* on_flush
);
230 void handle_flush_local_replay(Context
* on_flush
, int r
);
232 void flush_commit_position(Context
* on_flush
);
233 void handle_flush_commit_position(Context
* on_flush
, int r
);
235 void init_remote_journaler();
236 void handle_init_remote_journaler(int r
);
238 void start_external_replay();
239 void handle_start_external_replay(int r
);
241 bool notify_init_complete(std::unique_lock
<ceph::mutex
>& locker
);
243 void shut_down_local_journal_replay();
244 void handle_shut_down_local_journal_replay(int r
);
246 void wait_for_event_replay();
247 void handle_wait_for_event_replay(int r
);
249 void close_local_image();
250 void handle_close_local_image(int r
);
252 void stop_remote_journaler_replay();
253 void handle_stop_remote_journaler_replay(int r
);
255 void wait_for_in_flight_ops();
256 void handle_wait_for_in_flight_ops(int r
);
259 void handle_replay_flush_shut_down(int r
);
260 void handle_replay_flush(int r
);
262 void get_remote_tag();
263 void handle_get_remote_tag(int r
);
265 void allocate_local_tag();
266 void handle_allocate_local_tag(int r
);
268 void handle_replay_error(int r
, const std::string
&error
);
270 bool is_replay_complete() const;
271 bool is_replay_complete(const std::unique_lock
<ceph::mutex
>& locker
) const;
273 void handle_replay_complete(int r
, const std::string
&error_desc
);
274 void handle_replay_complete(const std::unique_lock
<ceph::mutex
>&,
275 int r
, const std::string
&error_desc
);
276 void handle_replay_ready();
277 void handle_replay_ready(std::unique_lock
<ceph::mutex
>& locker
);
279 void preprocess_entry();
280 void handle_delayed_preprocess_task(int r
);
281 void handle_preprocess_entry_ready(int r
);
282 void handle_preprocess_entry_safe(int r
);
284 void process_entry();
285 void handle_process_entry_ready(int r
);
286 void handle_process_entry_safe(const ReplayEntry
& replay_entry
,
287 uint64_t relay_bytes
,
288 const utime_t
&replay_start_time
, int r
);
290 void handle_resync_image();
292 void notify_status_updated();
294 void cancel_delayed_preprocess_task();
296 int validate_remote_client_state(
297 const cls::journal::Client
& remote_client
,
298 librbd::journal::MirrorPeerClientMeta
* remote_client_meta
,
299 bool* resync_requested
, std::string
* error
);
301 void register_perf_counters();
302 void unregister_perf_counters();
306 } // namespace journal
307 } // namespace image_replayer
308 } // namespace mirror
311 extern template class rbd::mirror::image_replayer::journal::Replayer
<librbd::ImageCtx
>;
313 #endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H