1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
5 #define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
7 #include "tools/rbd_mirror/image_replayer/Replayer.h"
8 #include "include/utime.h"
9 #include "common/AsyncOpTracker.h"
10 #include "common/ceph_mutex.h"
11 #include "common/RefCountedObj.h"
12 #include "cls/journal/cls_journal_types.h"
13 #include "journal/ReplayEntry.h"
14 #include "librbd/ImageCtx.h"
15 #include "librbd/journal/Types.h"
16 #include "librbd/journal/TypeTraits.h"
18 #include <type_traits>
20 namespace journal
{ class Journaler
; }
24 namespace journal
{ template <typename I
> class Replay
; }
31 template <typename
> struct Threads
;
33 namespace image_replayer
{
35 struct ReplayerListener
;
39 template <typename
> class EventPreprocessor
;
40 template <typename
> class ReplayStatusFormatter
;
41 template <typename
> class StateBuilder
;
43 template <typename ImageCtxT
>
44 class Replayer
: public image_replayer::Replayer
{
46 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::Journaler Journaler
;
48 static Replayer
* create(
49 Threads
<ImageCtxT
>* threads
,
50 const std::string
& local_mirror_uuid
,
51 StateBuilder
<ImageCtxT
>* state_builder
,
52 ReplayerListener
* replayer_listener
) {
53 return new Replayer(threads
, local_mirror_uuid
, state_builder
,
58 Threads
<ImageCtxT
>* threads
,
59 const std::string
& local_mirror_uuid
,
60 StateBuilder
<ImageCtxT
>* state_builder
,
61 ReplayerListener
* replayer_listener
);
64 void destroy() override
{
68 void init(Context
* on_finish
) override
;
69 void shut_down(Context
* on_finish
) override
;
71 void flush(Context
* on_finish
) override
;
73 bool get_replay_status(std::string
* description
, Context
* on_finish
) override
;
75 bool is_replaying() const override
{
76 std::unique_lock locker
{m_lock
};
77 return (m_state
== STATE_REPLAYING
);
80 bool is_resync_requested() const override
{
81 std::unique_lock
locker(m_lock
);
82 return m_resync_requested
;
85 int get_error_code() const override
{
86 std::unique_lock
locker(m_lock
);
90 std::string
get_error_description() const override
{
91 std::unique_lock
locker(m_lock
);
92 return m_error_description
;
95 std::string
get_image_spec() const {
96 std::unique_lock
locker(m_lock
);
107 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * *
110 * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * *
112 * | /--------------------------------------------\ *
114 * v v (asok flush) | *
115 * REPLAYING -------------> LOCAL_REPLAY_FLUSH | *
118 * | | FLUSH_COMMIT_POSITION | *
120 * | | \--------------------/| *
122 * | | (entries available) | *
123 * | \-----------> REPLAY_READY | *
125 * | | (skip if not | *
126 * | v needed) (error) *
127 * | REPLAY_FLUSH * * * * * * * * * *
129 * | | (skip if not | * *
130 * | v needed) (error) * *
131 * | GET_REMOTE_TAG * * * * * * * * *
133 * | | (skip if not | * *
134 * | v needed) (error) * *
135 * | ALLOCATE_LOCAL_TAG * * * * * * *
138 * | PREPROCESS_ENTRY * * * * * * * *
141 * | PROCESS_ENTRY * * * * * * * * * *
143 * | \---------------------/ * *
145 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * *
151 * SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
157 * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * *
159 * v (skip if not started)
160 * STOP_REMOTE_JOURNALER_REPLAY
163 * WAIT_FOR_IN_FLIGHT_OPS
171 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::ReplayEntry ReplayEntry
;
179 struct C_ReplayCommitted
;
180 struct RemoteJournalerListener
;
181 struct RemoteReplayHandler
;
182 struct LocalJournalListener
;
184 Threads
<ImageCtxT
>* m_threads
;
185 std::string m_local_mirror_uuid
;
186 StateBuilder
<ImageCtxT
>* m_state_builder
;
187 ReplayerListener
* m_replayer_listener
;
189 mutable ceph::mutex m_lock
;
191 std::string m_image_spec
;
192 Context
* m_on_init_shutdown
= nullptr;
194 State m_state
= STATE_INIT
;
195 int m_error_code
= 0;
196 std::string m_error_description
;
197 bool m_resync_requested
= false;
199 ceph::ref_t
<typename
std::remove_pointer
<decltype(ImageCtxT::journal
)>::type
>
201 RemoteJournalerListener
* m_remote_listener
= nullptr;
203 librbd::journal::Replay
<ImageCtxT
>* m_local_journal_replay
= nullptr;
204 EventPreprocessor
<ImageCtxT
>* m_event_preprocessor
= nullptr;
205 ReplayStatusFormatter
<ImageCtxT
>* m_replay_status_formatter
= nullptr;
206 RemoteReplayHandler
* m_remote_replay_handler
= nullptr;
207 LocalJournalListener
* m_local_journal_listener
= nullptr;
209 PerfCounters
*m_perf_counters
= nullptr;
211 ReplayEntry m_replay_entry
;
212 uint64_t m_replay_bytes
= 0;
213 utime_t m_replay_start_time
;
214 bool m_replay_tag_valid
= false;
215 uint64_t m_replay_tag_tid
= 0;
216 cls::journal::Tag m_replay_tag
;
217 librbd::journal::TagData m_replay_tag_data
;
218 librbd::journal::EventEntry m_event_entry
;
220 AsyncOpTracker m_flush_tracker
;
222 AsyncOpTracker m_event_replay_tracker
;
223 Context
*m_delayed_preprocess_task
= nullptr;
225 AsyncOpTracker m_in_flight_op_tracker
;
226 Context
*m_flush_local_replay_task
= nullptr;
228 void handle_remote_journal_metadata_updated();
230 void schedule_flush_local_replay_task();
231 void cancel_flush_local_replay_task();
232 void handle_flush_local_replay_task(int r
);
234 void flush_local_replay(Context
* on_flush
);
235 void handle_flush_local_replay(Context
* on_flush
, int r
);
237 void flush_commit_position(Context
* on_flush
);
238 void handle_flush_commit_position(Context
* on_flush
, int r
);
240 void init_remote_journaler();
241 void handle_init_remote_journaler(int r
);
243 void start_external_replay(std::unique_lock
<ceph::mutex
>& locker
);
244 void handle_start_external_replay(int r
);
246 bool add_local_journal_listener(std::unique_lock
<ceph::mutex
>& locker
);
248 bool notify_init_complete(std::unique_lock
<ceph::mutex
>& locker
);
250 void wait_for_flush();
251 void handle_wait_for_flush(int r
);
253 void shut_down_local_journal_replay();
254 void handle_shut_down_local_journal_replay(int r
);
256 void wait_for_event_replay();
257 void handle_wait_for_event_replay(int r
);
259 void close_local_image();
260 void handle_close_local_image(int r
);
262 void stop_remote_journaler_replay();
263 void handle_stop_remote_journaler_replay(int r
);
265 void wait_for_in_flight_ops();
266 void handle_wait_for_in_flight_ops(int r
);
269 void handle_replay_flush_shut_down(int r
);
270 void handle_replay_flush(int r
);
272 void get_remote_tag();
273 void handle_get_remote_tag(int r
);
275 void allocate_local_tag();
276 void handle_allocate_local_tag(int r
);
278 void handle_replay_error(int r
, const std::string
&error
);
280 bool is_replay_complete() const;
281 bool is_replay_complete(const std::unique_lock
<ceph::mutex
>& locker
) const;
283 void handle_replay_complete(int r
, const std::string
&error_desc
);
284 void handle_replay_complete(const std::unique_lock
<ceph::mutex
>&,
285 int r
, const std::string
&error_desc
);
286 void handle_replay_ready();
287 void handle_replay_ready(std::unique_lock
<ceph::mutex
>& locker
);
289 void preprocess_entry();
290 void handle_delayed_preprocess_task(int r
);
291 void handle_preprocess_entry_ready(int r
);
292 void handle_preprocess_entry_safe(int r
);
294 void process_entry();
295 void handle_process_entry_ready(int r
);
296 void handle_process_entry_safe(const ReplayEntry
& replay_entry
,
297 uint64_t relay_bytes
,
298 const utime_t
&replay_start_time
, int r
);
300 void handle_resync_image();
302 void notify_status_updated();
304 void cancel_delayed_preprocess_task();
306 int validate_remote_client_state(
307 const cls::journal::Client
& remote_client
,
308 librbd::journal::MirrorPeerClientMeta
* remote_client_meta
,
309 bool* resync_requested
, std::string
* error
);
311 void register_perf_counters();
312 void unregister_perf_counters();
316 } // namespace journal
317 } // namespace image_replayer
318 } // namespace mirror
321 extern template class rbd::mirror::image_replayer::journal::Replayer
<librbd::ImageCtx
>;
323 #endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H