1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5 #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
7 #include "common/AsyncOpTracker.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/rados/librados.hpp"
11 #include "cls/journal/cls_journal_types.h"
12 #include "cls/rbd/cls_rbd_types.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayEntry.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/journal/Types.h"
17 #include "librbd/journal/TypeTraits.h"
18 #include "ImageDeleter.h"
19 #include "ProgressContext.h"
22 #include <boost/noncopyable.hpp>
23 #include <boost/optional.hpp>
31 class AdminSocketHook
;
43 namespace journal
{ template <typename
> class Replay
; }
50 template <typename
> struct Threads
;
52 namespace image_replayer
{ template <typename
> class BootstrapRequest
; }
53 namespace image_replayer
{ template <typename
> class EventPreprocessor
; }
54 namespace image_replayer
{ template <typename
> class ReplayStatusFormatter
; }
57 * Replays changes from a remote cluster for a single image.
59 template <typename ImageCtxT
= librbd::ImageCtx
>
62 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::ReplayEntry ReplayEntry
;
68 STATE_REPLAY_FLUSHING
,
73 static ImageReplayer
*create(
74 Threads
<librbd::ImageCtx
> *threads
,
75 std::shared_ptr
<ImageDeleter
> image_deleter
,
76 ImageSyncThrottlerRef
<ImageCtxT
> image_sync_throttler
,
77 RadosRef local
, const std::string
&local_mirror_uuid
, int64_t local_pool_id
,
78 const std::string
&global_image_id
) {
79 return new ImageReplayer(threads
, image_deleter
, image_sync_throttler
,
80 local
, local_mirror_uuid
, local_pool_id
,
87 ImageReplayer(Threads
<librbd::ImageCtx
> *threads
,
88 std::shared_ptr
<ImageDeleter
> image_deleter
,
89 ImageSyncThrottlerRef
<ImageCtxT
> image_sync_throttler
,
90 RadosRef local
, const std::string
&local_mirror_uuid
,
91 int64_t local_pool_id
, const std::string
&global_image_id
);
92 virtual ~ImageReplayer();
93 ImageReplayer(const ImageReplayer
&) = delete;
94 ImageReplayer
& operator=(const ImageReplayer
&) = delete;
96 State
get_state() { Mutex::Locker
l(m_lock
); return get_state_(); }
97 bool is_stopped() { Mutex::Locker
l(m_lock
); return is_stopped_(); }
98 bool is_running() { Mutex::Locker
l(m_lock
); return is_running_(); }
99 bool is_replaying() { Mutex::Locker
l(m_lock
); return is_replaying_(); }
101 std::string
get_name() { Mutex::Locker
l(m_lock
); return m_name
; };
102 void set_state_description(int r
, const std::string
&desc
);
104 inline bool is_blacklisted() const {
105 Mutex::Locker
locker(m_lock
);
106 return (m_last_r
== -EBLACKLISTED
);
109 void add_remote_image(const std::string
&remote_mirror_uuid
,
110 const std::string
&remote_image_id
,
111 librados::IoCtx
&remote_io_ctx
);
112 void remove_remote_image(const std::string
&remote_mirror_uuid
,
113 const std::string
&remote_image_id
,
114 bool schedule_delete
);
115 bool remote_images_empty() const;
117 inline int64_t get_local_pool_id() const {
118 return m_local_pool_id
;
120 inline const std::string
& get_global_image_id() const {
121 return m_global_image_id
;
124 void start(Context
*on_finish
= nullptr, bool manual
= false);
125 void stop(Context
*on_finish
= nullptr, bool manual
= false,
126 int r
= 0, const std::string
& desc
= "");
127 void restart(Context
*on_finish
= nullptr);
128 void flush(Context
*on_finish
= nullptr);
130 void resync_image(Context
*on_finish
=nullptr);
132 void print_status(Formatter
*f
, stringstream
*ss
);
134 virtual void handle_replay_ready();
135 virtual void handle_replay_complete(int r
, const std::string
&error_desc
);
141 * <uninitialized> <------------------------------------ FAIL
147 * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * *
150 * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
153 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
156 * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
158 * | /--------------------------------------------\
161 * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
164 * | | FLUSH_COMMIT_POSITION |
166 * | | \--------------------/|
168 * | | (entries available) |
169 * | \-----------> REPLAY_READY |
172 * | v needed) (error)
173 * | REPLAY_FLUSH * * * * * * * * *
175 * | | (skip if not | *
176 * | v needed) (error) *
177 * | GET_REMOTE_TAG * * * * * * * *
179 * | | (skip if not | *
180 * | v needed) (error) *
181 * | ALLOCATE_LOCAL_TAG * * * * * *
184 * | PREPROCESS_ENTRY * * * * * * *
187 * | PROCESS_ENTRY * * * * * * * * *
189 * | \---------------------/ *
191 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
194 * JOURNAL_REPLAY_SHUT_DOWN
205 virtual void on_start_fail(int r
, const std::string
&desc
= "");
206 virtual bool on_start_interrupted();
208 virtual void on_stop_journal_replay(int r
= 0, const std::string
&desc
= "");
210 virtual void on_flush_local_replay_flush_start(Context
*on_flush
);
211 virtual void on_flush_local_replay_flush_finish(Context
*on_flush
, int r
);
212 virtual void on_flush_flush_commit_position_start(Context
*on_flush
);
213 virtual void on_flush_flush_commit_position_finish(Context
*on_flush
, int r
);
215 bool on_replay_interrupted();
219 std::string mirror_uuid
;
220 std::string image_id
;
221 librados::IoCtx io_ctx
;
225 RemoteImage(const std::string
&mirror_uuid
,
226 const std::string
&image_id
)
227 : mirror_uuid(mirror_uuid
), image_id(image_id
) {
229 RemoteImage(const std::string
&mirror_uuid
,
230 const std::string
&image_id
,
231 librados::IoCtx
&io_ctx
)
232 : mirror_uuid(mirror_uuid
), image_id(image_id
), io_ctx(io_ctx
) {
235 inline bool operator<(const RemoteImage
&rhs
) const {
236 if (mirror_uuid
!= rhs
.mirror_uuid
) {
237 return mirror_uuid
< rhs
.mirror_uuid
;
239 return image_id
< rhs
.image_id
;
242 inline bool operator==(const RemoteImage
&rhs
) const {
243 return (mirror_uuid
== rhs
.mirror_uuid
&& image_id
== rhs
.image_id
);
247 typedef std::set
<RemoteImage
> RemoteImages
;
249 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::Journaler Journaler
;
250 typedef boost::optional
<State
> OptionalState
;
252 struct JournalListener
: public librbd::journal::Listener
{
253 ImageReplayer
*img_replayer
;
255 JournalListener(ImageReplayer
*img_replayer
)
256 : img_replayer(img_replayer
) {
259 void handle_close() override
{
260 img_replayer
->on_stop_journal_replay();
263 void handle_promoted() override
{
264 img_replayer
->on_stop_journal_replay(0, "force promoted");
267 void handle_resync() override
{
268 img_replayer
->resync_image();
272 class BootstrapProgressContext
: public ProgressContext
{
274 BootstrapProgressContext(ImageReplayer
<ImageCtxT
> *replayer
) :
278 void update_progress(const std::string
&description
,
279 bool flush
= true) override
;
281 ImageReplayer
<ImageCtxT
> *replayer
;
284 Threads
<librbd::ImageCtx
> *m_threads
;
285 std::shared_ptr
<ImageDeleter
> m_image_deleter
;
286 ImageSyncThrottlerRef
<ImageCtxT
> m_image_sync_throttler
;
288 RemoteImages m_remote_images
;
289 RemoteImage m_remote_image
;
292 std::string m_local_mirror_uuid
;
293 int64_t m_local_pool_id
;
294 std::string m_local_image_id
;
295 std::string m_global_image_id
;
297 mutable Mutex m_lock
;
298 State m_state
= STATE_STOPPED
;
300 std::string m_state_desc
;
301 BootstrapProgressContext m_progress_cxt
;
303 image_replayer::EventPreprocessor
<ImageCtxT
> *m_event_preprocessor
= nullptr;
304 image_replayer::ReplayStatusFormatter
<ImageCtxT
> *m_replay_status_formatter
=
306 librados::IoCtx m_local_ioctx
;
307 ImageCtxT
*m_local_image_ctx
= nullptr;
308 std::string m_local_image_tag_owner
;
310 decltype(ImageCtxT::journal
) m_local_journal
= nullptr;
311 librbd::journal::Replay
<ImageCtxT
> *m_local_replay
= nullptr;
312 Journaler
* m_remote_journaler
= nullptr;
313 ::journal::ReplayHandler
*m_replay_handler
= nullptr;
314 librbd::journal::Listener
*m_journal_listener
;
315 bool m_stopping_for_resync
= false;
317 Context
*m_on_start_finish
= nullptr;
318 Context
*m_on_stop_finish
= nullptr;
319 Context
*m_update_status_task
= nullptr;
320 int m_update_status_interval
= 0;
321 librados::AioCompletion
*m_update_status_comp
= nullptr;
322 bool m_stop_requested
= false;
323 bool m_manual_stop
= false;
325 AdminSocketHook
*m_asok_hook
= nullptr;
327 image_replayer::BootstrapRequest
<ImageCtxT
> *m_bootstrap_request
= nullptr;
329 uint32_t m_in_flight_status_updates
= 0;
330 bool m_update_status_requested
= false;
331 Context
*m_on_update_status_finish
= nullptr;
333 librbd::journal::MirrorPeerClientMeta m_client_meta
;
335 ReplayEntry m_replay_entry
;
336 bool m_replay_tag_valid
= false;
337 uint64_t m_replay_tag_tid
= 0;
338 cls::journal::Tag m_replay_tag
;
339 librbd::journal::TagData m_replay_tag_data
;
340 librbd::journal::EventEntry m_event_entry
;
341 AsyncOpTracker m_event_replay_tracker
;
342 Context
*m_delayed_preprocess_task
= nullptr;
344 struct RemoteJournalerListener
: public ::journal::JournalMetadataListener
{
345 ImageReplayer
*replayer
;
347 RemoteJournalerListener(ImageReplayer
*replayer
) : replayer(replayer
) { }
349 void handle_update(::journal::JournalMetadata
*) override
;
352 struct C_ReplayCommitted
: public Context
{
353 ImageReplayer
*replayer
;
354 ReplayEntry replay_entry
;
356 C_ReplayCommitted(ImageReplayer
*replayer
,
357 ReplayEntry
&&replay_entry
)
358 : replayer(replayer
), replay_entry(std::move(replay_entry
)) {
360 void finish(int r
) override
{
361 replayer
->handle_process_entry_safe(replay_entry
, r
);
365 static std::string
to_string(const State state
);
367 State
get_state_() const {
370 bool is_stopped_() const {
371 return m_state
== STATE_STOPPED
;
373 bool is_running_() const {
374 return !is_stopped_() && m_state
!= STATE_STOPPING
&& !m_stop_requested
;
376 bool is_replaying_() const {
377 return (m_state
== STATE_REPLAYING
||
378 m_state
== STATE_REPLAY_FLUSHING
);
381 bool update_mirror_image_status(bool force
, const OptionalState
&state
);
382 bool start_mirror_image_status_update(bool force
, bool restarting
);
383 void finish_mirror_image_status_update();
384 void queue_mirror_image_status_update(const OptionalState
&state
);
385 void send_mirror_status_update(const OptionalState
&state
);
386 void handle_mirror_status_update(int r
);
387 void reschedule_update_status_task(int new_interval
= 0);
389 void shut_down(int r
);
390 void handle_shut_down(int r
);
391 void handle_remote_journal_metadata_updated();
393 void prepare_local_image();
394 void handle_prepare_local_image(int r
);
397 void handle_bootstrap(int r
);
399 void init_remote_journaler();
400 void handle_init_remote_journaler(int r
);
403 void handle_start_replay(int r
);
406 void handle_replay_flush(int r
);
408 void get_remote_tag();
409 void handle_get_remote_tag(int r
);
411 void allocate_local_tag();
412 void handle_allocate_local_tag(int r
);
414 void preprocess_entry();
415 void handle_preprocess_entry_ready(int r
);
416 void handle_preprocess_entry_safe(int r
);
418 void process_entry();
419 void handle_process_entry_ready(int r
);
420 void handle_process_entry_safe(const ReplayEntry
& replay_entry
, int r
);
424 } // namespace mirror
427 extern template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;
429 #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H