1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5 #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
7 #include "common/AsyncOpTracker.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/rados/librados.hpp"
11 #include "cls/journal/cls_journal_types.h"
12 #include "cls/rbd/cls_rbd_types.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayEntry.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/journal/Types.h"
17 #include "librbd/journal/TypeTraits.h"
18 #include "ProgressContext.h"
20 #include "tools/rbd_mirror/image_replayer/Types.h"
22 #include <boost/noncopyable.hpp>
23 #include <boost/optional.hpp>
31 class AdminSocketHook
;
43 namespace journal
{ template <typename
> class Replay
; }
50 template <typename
> struct ImageDeleter
;
51 template <typename
> struct InstanceWatcher
;
52 template <typename
> struct Threads
;
54 namespace image_replayer
{ template <typename
> class BootstrapRequest
; }
55 namespace image_replayer
{ template <typename
> class EventPreprocessor
; }
56 namespace image_replayer
{ template <typename
> class ReplayStatusFormatter
; }
59 * Replays changes from a remote cluster for a single image.
61 template <typename ImageCtxT
= librbd::ImageCtx
>
64 static ImageReplayer
*create(
65 Threads
<ImageCtxT
> *threads
, ImageDeleter
<ImageCtxT
>* image_deleter
,
66 InstanceWatcher
<ImageCtxT
> *instance_watcher
,
67 RadosRef local
, const std::string
&local_mirror_uuid
, int64_t local_pool_id
,
68 const std::string
&global_image_id
) {
69 return new ImageReplayer(threads
, image_deleter
, instance_watcher
,
70 local
, local_mirror_uuid
, local_pool_id
,
77 ImageReplayer(Threads
<ImageCtxT
> *threads
,
78 ImageDeleter
<ImageCtxT
>* image_deleter
,
79 InstanceWatcher
<ImageCtxT
> *instance_watcher
,
80 RadosRef local
, const std::string
&local_mirror_uuid
,
81 int64_t local_pool_id
, const std::string
&global_image_id
);
82 virtual ~ImageReplayer();
83 ImageReplayer(const ImageReplayer
&) = delete;
84 ImageReplayer
& operator=(const ImageReplayer
&) = delete;
86 bool is_stopped() { Mutex::Locker
l(m_lock
); return is_stopped_(); }
87 bool is_running() { Mutex::Locker
l(m_lock
); return is_running_(); }
88 bool is_replaying() { Mutex::Locker
l(m_lock
); return is_replaying_(); }
90 std::string
get_name() { Mutex::Locker
l(m_lock
); return m_name
; };
91 void set_state_description(int r
, const std::string
&desc
);
93 // TODO temporary until policy handles release of image replayers
94 inline bool is_finished() const {
95 Mutex::Locker
locker(m_lock
);
98 inline void set_finished(bool finished
) {
99 Mutex::Locker
locker(m_lock
);
100 m_finished
= finished
;
103 inline bool is_blacklisted() const {
104 Mutex::Locker
locker(m_lock
);
105 return (m_last_r
== -EBLACKLISTED
);
108 image_replayer::HealthState
get_health_state() const;
110 void add_peer(const std::string
&peer_uuid
, librados::IoCtx
&remote_io_ctx
);
112 inline int64_t get_local_pool_id() const {
113 return m_local_pool_id
;
115 inline const std::string
& get_global_image_id() const {
116 return m_global_image_id
;
119 void start(Context
*on_finish
= nullptr, bool manual
= false);
120 void stop(Context
*on_finish
= nullptr, bool manual
= false,
121 int r
= 0, const std::string
& desc
= "");
122 void restart(Context
*on_finish
= nullptr);
123 void flush(Context
*on_finish
= nullptr);
125 void resync_image(Context
*on_finish
=nullptr);
127 void print_status(Formatter
*f
, stringstream
*ss
);
129 virtual void handle_replay_ready();
130 virtual void handle_replay_complete(int r
, const std::string
&error_desc
);
136 * <uninitialized> <------------------------------------ FAIL
142 * WAIT_FOR_DELETION *
145 * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * *
148 * PREPARE_REMOTE_IMAGE * * * * * * * * * * * * * * * * * *
151 * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
154 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
157 * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
159 * | /--------------------------------------------\
162 * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
165 * | | FLUSH_COMMIT_POSITION |
167 * | | \--------------------/|
169 * | | (entries available) |
170 * | \-----------> REPLAY_READY |
173 * | v needed) (error)
174 * | REPLAY_FLUSH * * * * * * * * *
176 * | | (skip if not | *
177 * | v needed) (error) *
178 * | GET_REMOTE_TAG * * * * * * * *
180 * | | (skip if not | *
181 * | v needed) (error) *
182 * | ALLOCATE_LOCAL_TAG * * * * * *
185 * | PREPROCESS_ENTRY * * * * * * *
188 * | PROCESS_ENTRY * * * * * * * * *
190 * | \---------------------/ *
192 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
195 * JOURNAL_REPLAY_SHUT_DOWN
206 virtual void on_start_fail(int r
, const std::string
&desc
= "");
207 virtual bool on_start_interrupted();
209 virtual void on_stop_journal_replay(int r
= 0, const std::string
&desc
= "");
211 virtual void on_flush_local_replay_flush_start(Context
*on_flush
);
212 virtual void on_flush_local_replay_flush_finish(Context
*on_flush
, int r
);
213 virtual void on_flush_flush_commit_position_start(Context
*on_flush
);
214 virtual void on_flush_flush_commit_position_finish(Context
*on_flush
, int r
);
216 bool on_replay_interrupted();
219 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::ReplayEntry ReplayEntry
;
225 STATE_REPLAY_FLUSHING
,
231 std::string mirror_uuid
;
232 std::string image_id
;
233 librados::IoCtx io_ctx
;
237 RemoteImage(const Peer
& peer
) : io_ctx(peer
.io_ctx
) {
241 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::Journaler Journaler
;
242 typedef boost::optional
<State
> OptionalState
;
243 typedef boost::optional
<cls::rbd::MirrorImageStatusState
>
244 OptionalMirrorImageStatusState
;
246 struct JournalListener
: public librbd::journal::Listener
{
247 ImageReplayer
*img_replayer
;
249 JournalListener(ImageReplayer
*img_replayer
)
250 : img_replayer(img_replayer
) {
253 void handle_close() override
{
254 img_replayer
->on_stop_journal_replay();
257 void handle_promoted() override
{
258 img_replayer
->on_stop_journal_replay(0, "force promoted");
261 void handle_resync() override
{
262 img_replayer
->resync_image();
266 class BootstrapProgressContext
: public ProgressContext
{
268 BootstrapProgressContext(ImageReplayer
<ImageCtxT
> *replayer
) :
272 void update_progress(const std::string
&description
,
273 bool flush
= true) override
;
275 ImageReplayer
<ImageCtxT
> *replayer
;
278 Threads
<ImageCtxT
> *m_threads
;
279 ImageDeleter
<ImageCtxT
>* m_image_deleter
;
280 InstanceWatcher
<ImageCtxT
> *m_instance_watcher
;
283 RemoteImage m_remote_image
;
286 std::string m_local_mirror_uuid
;
287 int64_t m_local_pool_id
;
288 std::string m_local_image_id
;
289 std::string m_global_image_id
;
292 mutable Mutex m_lock
;
293 State m_state
= STATE_STOPPED
;
294 std::string m_state_desc
;
296 OptionalMirrorImageStatusState m_mirror_image_status_state
= boost::none
;
299 BootstrapProgressContext m_progress_cxt
;
301 bool m_finished
= false;
302 bool m_delete_requested
= false;
303 bool m_resync_requested
= false;
305 image_replayer::EventPreprocessor
<ImageCtxT
> *m_event_preprocessor
= nullptr;
306 image_replayer::ReplayStatusFormatter
<ImageCtxT
> *m_replay_status_formatter
=
308 librados::IoCtx m_local_ioctx
;
309 ImageCtxT
*m_local_image_ctx
= nullptr;
310 std::string m_local_image_tag_owner
;
312 decltype(ImageCtxT::journal
) m_local_journal
= nullptr;
313 librbd::journal::Replay
<ImageCtxT
> *m_local_replay
= nullptr;
314 Journaler
* m_remote_journaler
= nullptr;
315 ::journal::ReplayHandler
*m_replay_handler
= nullptr;
316 librbd::journal::Listener
*m_journal_listener
;
318 Context
*m_on_start_finish
= nullptr;
319 Context
*m_on_stop_finish
= nullptr;
320 Context
*m_update_status_task
= nullptr;
321 int m_update_status_interval
= 0;
322 librados::AioCompletion
*m_update_status_comp
= nullptr;
323 bool m_stop_requested
= false;
324 bool m_manual_stop
= false;
326 AdminSocketHook
*m_asok_hook
= nullptr;
328 image_replayer::BootstrapRequest
<ImageCtxT
> *m_bootstrap_request
= nullptr;
330 uint32_t m_in_flight_status_updates
= 0;
331 bool m_update_status_requested
= false;
332 Context
*m_on_update_status_finish
= nullptr;
334 librbd::journal::MirrorPeerClientMeta m_client_meta
;
336 ReplayEntry m_replay_entry
;
337 bool m_replay_tag_valid
= false;
338 uint64_t m_replay_tag_tid
= 0;
339 cls::journal::Tag m_replay_tag
;
340 librbd::journal::TagData m_replay_tag_data
;
341 librbd::journal::EventEntry m_event_entry
;
342 AsyncOpTracker m_event_replay_tracker
;
343 Context
*m_delayed_preprocess_task
= nullptr;
345 struct RemoteJournalerListener
: public ::journal::JournalMetadataListener
{
346 ImageReplayer
*replayer
;
348 RemoteJournalerListener(ImageReplayer
*replayer
) : replayer(replayer
) { }
350 void handle_update(::journal::JournalMetadata
*) override
;
353 struct C_ReplayCommitted
: public Context
{
354 ImageReplayer
*replayer
;
355 ReplayEntry replay_entry
;
357 C_ReplayCommitted(ImageReplayer
*replayer
,
358 ReplayEntry
&&replay_entry
)
359 : replayer(replayer
), replay_entry(std::move(replay_entry
)) {
361 void finish(int r
) override
{
362 replayer
->handle_process_entry_safe(replay_entry
, r
);
366 static std::string
to_string(const State state
);
368 bool is_stopped_() const {
369 return m_state
== STATE_STOPPED
;
371 bool is_running_() const {
372 return !is_stopped_() && m_state
!= STATE_STOPPING
&& !m_stop_requested
;
374 bool is_replaying_() const {
375 return (m_state
== STATE_REPLAYING
||
376 m_state
== STATE_REPLAY_FLUSHING
);
379 bool update_mirror_image_status(bool force
, const OptionalState
&state
);
380 bool start_mirror_image_status_update(bool force
, bool restarting
);
381 void finish_mirror_image_status_update();
382 void queue_mirror_image_status_update(const OptionalState
&state
);
383 void send_mirror_status_update(const OptionalState
&state
);
384 void handle_mirror_status_update(int r
);
385 void reschedule_update_status_task(int new_interval
= 0);
387 void shut_down(int r
);
388 void handle_shut_down(int r
);
389 void handle_remote_journal_metadata_updated();
391 void wait_for_deletion();
392 void handle_wait_for_deletion(int r
);
394 void prepare_local_image();
395 void handle_prepare_local_image(int r
);
397 void prepare_remote_image();
398 void handle_prepare_remote_image(int r
);
401 void handle_bootstrap(int r
);
403 void init_remote_journaler();
404 void handle_init_remote_journaler(int r
);
407 void handle_start_replay(int r
);
410 void handle_replay_flush(int r
);
412 void get_remote_tag();
413 void handle_get_remote_tag(int r
);
415 void allocate_local_tag();
416 void handle_allocate_local_tag(int r
);
418 void preprocess_entry();
419 void handle_preprocess_entry_ready(int r
);
420 void handle_preprocess_entry_safe(int r
);
422 void process_entry();
423 void handle_process_entry_ready(int r
);
424 void handle_process_entry_safe(const ReplayEntry
& replay_entry
, int r
);
426 void register_admin_socket_hook();
427 void unregister_admin_socket_hook();
429 void on_name_changed();
432 } // namespace mirror
435 extern template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;
437 #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H