1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5 #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
7 #include "common/AsyncOpTracker.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/rados/librados.hpp"
11 #include "cls/journal/cls_journal_types.h"
12 #include "cls/rbd/cls_rbd_types.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayEntry.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/journal/Types.h"
17 #include "librbd/journal/TypeTraits.h"
18 #include "ImageDeleter.h"
19 #include "ProgressContext.h"
21 #include "tools/rbd_mirror/image_replayer/Types.h"
23 #include <boost/noncopyable.hpp>
24 #include <boost/optional.hpp>
32 class AdminSocketHook
;
44 namespace journal
{ template <typename
> class Replay
; }
51 template <typename
> struct InstanceWatcher
;
52 template <typename
> struct Threads
;
54 namespace image_replayer
{ template <typename
> class BootstrapRequest
; }
55 namespace image_replayer
{ template <typename
> class EventPreprocessor
; }
56 namespace image_replayer
{ template <typename
> class ReplayStatusFormatter
; }
59 * Replays changes from a remote cluster for a single image.
61 template <typename ImageCtxT
= librbd::ImageCtx
>
64 static ImageReplayer
*create(
65 Threads
<librbd::ImageCtx
> *threads
, ImageDeleter
<ImageCtxT
>* image_deleter
,
66 InstanceWatcher
<ImageCtxT
> *instance_watcher
,
67 RadosRef local
, const std::string
&local_mirror_uuid
, int64_t local_pool_id
,
68 const std::string
&global_image_id
) {
69 return new ImageReplayer(threads
, image_deleter
, instance_watcher
,
70 local
, local_mirror_uuid
, local_pool_id
,
77 ImageReplayer(Threads
<librbd::ImageCtx
> *threads
,
78 ImageDeleter
<ImageCtxT
>* image_deleter
,
79 InstanceWatcher
<ImageCtxT
> *instance_watcher
,
80 RadosRef local
, const std::string
&local_mirror_uuid
,
81 int64_t local_pool_id
, const std::string
&global_image_id
);
82 virtual ~ImageReplayer();
83 ImageReplayer(const ImageReplayer
&) = delete;
84 ImageReplayer
& operator=(const ImageReplayer
&) = delete;
86 bool is_stopped() { Mutex::Locker
l(m_lock
); return is_stopped_(); }
87 bool is_running() { Mutex::Locker
l(m_lock
); return is_running_(); }
88 bool is_replaying() { Mutex::Locker
l(m_lock
); return is_replaying_(); }
90 std::string
get_name() { Mutex::Locker
l(m_lock
); return m_name
; };
91 void set_state_description(int r
, const std::string
&desc
);
93 inline bool is_blacklisted() const {
94 Mutex::Locker
locker(m_lock
);
95 return (m_last_r
== -EBLACKLISTED
);
98 image_replayer::HealthState
get_health_state() const;
100 void add_remote_image(const std::string
&remote_mirror_uuid
,
101 const std::string
&remote_image_id
,
102 librados::IoCtx
&remote_io_ctx
);
103 void remove_remote_image(const std::string
&remote_mirror_uuid
,
104 const std::string
&remote_image_id
,
105 bool schedule_delete
);
106 bool remote_images_empty() const;
108 inline int64_t get_local_pool_id() const {
109 return m_local_pool_id
;
111 inline const std::string
& get_global_image_id() const {
112 return m_global_image_id
;
115 void start(Context
*on_finish
= nullptr, bool manual
= false);
116 void stop(Context
*on_finish
= nullptr, bool manual
= false,
117 int r
= 0, const std::string
& desc
= "");
118 void restart(Context
*on_finish
= nullptr);
119 void flush(Context
*on_finish
= nullptr);
121 void resync_image(Context
*on_finish
=nullptr);
123 void print_status(Formatter
*f
, stringstream
*ss
);
125 virtual void handle_replay_ready();
126 virtual void handle_replay_complete(int r
, const std::string
&error_desc
);
132 * <uninitialized> <------------------------------------ FAIL
138 * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * *
141 * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
144 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
147 * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
149 * | /--------------------------------------------\
152 * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
155 * | | FLUSH_COMMIT_POSITION |
157 * | | \--------------------/|
159 * | | (entries available) |
160 * | \-----------> REPLAY_READY |
163 * | v needed) (error)
164 * | REPLAY_FLUSH * * * * * * * * *
166 * | | (skip if not | *
167 * | v needed) (error) *
168 * | GET_REMOTE_TAG * * * * * * * *
170 * | | (skip if not | *
171 * | v needed) (error) *
172 * | ALLOCATE_LOCAL_TAG * * * * * *
175 * | PREPROCESS_ENTRY * * * * * * *
178 * | PROCESS_ENTRY * * * * * * * * *
180 * | \---------------------/ *
182 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
185 * JOURNAL_REPLAY_SHUT_DOWN
196 virtual void on_start_fail(int r
, const std::string
&desc
= "");
197 virtual bool on_start_interrupted();
199 virtual void on_stop_journal_replay(int r
= 0, const std::string
&desc
= "");
201 virtual void on_flush_local_replay_flush_start(Context
*on_flush
);
202 virtual void on_flush_local_replay_flush_finish(Context
*on_flush
, int r
);
203 virtual void on_flush_flush_commit_position_start(Context
*on_flush
);
204 virtual void on_flush_flush_commit_position_finish(Context
*on_flush
, int r
);
206 bool on_replay_interrupted();
209 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::ReplayEntry ReplayEntry
;
215 STATE_REPLAY_FLUSHING
,
221 std::string mirror_uuid
;
222 std::string image_id
;
223 librados::IoCtx io_ctx
;
227 RemoteImage(const std::string
&mirror_uuid
,
228 const std::string
&image_id
)
229 : mirror_uuid(mirror_uuid
), image_id(image_id
) {
231 RemoteImage(const std::string
&mirror_uuid
,
232 const std::string
&image_id
,
233 librados::IoCtx
&io_ctx
)
234 : mirror_uuid(mirror_uuid
), image_id(image_id
), io_ctx(io_ctx
) {
237 inline bool operator<(const RemoteImage
&rhs
) const {
238 if (mirror_uuid
!= rhs
.mirror_uuid
) {
239 return mirror_uuid
< rhs
.mirror_uuid
;
241 return image_id
< rhs
.image_id
;
244 inline bool operator==(const RemoteImage
&rhs
) const {
245 return (mirror_uuid
== rhs
.mirror_uuid
&& image_id
== rhs
.image_id
);
249 typedef std::set
<RemoteImage
> RemoteImages
;
251 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::Journaler Journaler
;
252 typedef boost::optional
<State
> OptionalState
;
253 typedef boost::optional
<cls::rbd::MirrorImageStatusState
>
254 OptionalMirrorImageStatusState
;
256 struct JournalListener
: public librbd::journal::Listener
{
257 ImageReplayer
*img_replayer
;
259 JournalListener(ImageReplayer
*img_replayer
)
260 : img_replayer(img_replayer
) {
263 void handle_close() override
{
264 img_replayer
->on_stop_journal_replay();
267 void handle_promoted() override
{
268 img_replayer
->on_stop_journal_replay(0, "force promoted");
271 void handle_resync() override
{
272 img_replayer
->resync_image();
276 class BootstrapProgressContext
: public ProgressContext
{
278 BootstrapProgressContext(ImageReplayer
<ImageCtxT
> *replayer
) :
282 void update_progress(const std::string
&description
,
283 bool flush
= true) override
;
285 ImageReplayer
<ImageCtxT
> *replayer
;
288 Threads
<librbd::ImageCtx
> *m_threads
;
289 ImageDeleter
<ImageCtxT
>* m_image_deleter
;
290 InstanceWatcher
<ImageCtxT
> *m_instance_watcher
;
292 RemoteImages m_remote_images
;
293 RemoteImage m_remote_image
;
296 std::string m_local_mirror_uuid
;
297 int64_t m_local_pool_id
;
298 std::string m_local_image_id
;
299 std::string m_global_image_id
;
301 mutable Mutex m_lock
;
302 State m_state
= STATE_STOPPED
;
303 std::string m_state_desc
;
305 OptionalMirrorImageStatusState m_mirror_image_status_state
= boost::none
;
308 BootstrapProgressContext m_progress_cxt
;
309 bool m_do_resync
{false};
310 image_replayer::EventPreprocessor
<ImageCtxT
> *m_event_preprocessor
= nullptr;
311 image_replayer::ReplayStatusFormatter
<ImageCtxT
> *m_replay_status_formatter
=
313 librados::IoCtx m_local_ioctx
;
314 ImageCtxT
*m_local_image_ctx
= nullptr;
315 std::string m_local_image_tag_owner
;
317 decltype(ImageCtxT::journal
) m_local_journal
= nullptr;
318 librbd::journal::Replay
<ImageCtxT
> *m_local_replay
= nullptr;
319 Journaler
* m_remote_journaler
= nullptr;
320 ::journal::ReplayHandler
*m_replay_handler
= nullptr;
321 librbd::journal::Listener
*m_journal_listener
;
322 bool m_stopping_for_resync
= false;
324 Context
*m_on_start_finish
= nullptr;
325 Context
*m_on_stop_finish
= nullptr;
326 Context
*m_update_status_task
= nullptr;
327 int m_update_status_interval
= 0;
328 librados::AioCompletion
*m_update_status_comp
= nullptr;
329 bool m_stop_requested
= false;
330 bool m_manual_stop
= false;
332 AdminSocketHook
*m_asok_hook
= nullptr;
334 image_replayer::BootstrapRequest
<ImageCtxT
> *m_bootstrap_request
= nullptr;
336 uint32_t m_in_flight_status_updates
= 0;
337 bool m_update_status_requested
= false;
338 Context
*m_on_update_status_finish
= nullptr;
340 librbd::journal::MirrorPeerClientMeta m_client_meta
;
342 ReplayEntry m_replay_entry
;
343 bool m_replay_tag_valid
= false;
344 uint64_t m_replay_tag_tid
= 0;
345 cls::journal::Tag m_replay_tag
;
346 librbd::journal::TagData m_replay_tag_data
;
347 librbd::journal::EventEntry m_event_entry
;
348 AsyncOpTracker m_event_replay_tracker
;
349 Context
*m_delayed_preprocess_task
= nullptr;
351 struct RemoteJournalerListener
: public ::journal::JournalMetadataListener
{
352 ImageReplayer
*replayer
;
354 RemoteJournalerListener(ImageReplayer
*replayer
) : replayer(replayer
) { }
356 void handle_update(::journal::JournalMetadata
*) override
;
359 struct C_ReplayCommitted
: public Context
{
360 ImageReplayer
*replayer
;
361 ReplayEntry replay_entry
;
363 C_ReplayCommitted(ImageReplayer
*replayer
,
364 ReplayEntry
&&replay_entry
)
365 : replayer(replayer
), replay_entry(std::move(replay_entry
)) {
367 void finish(int r
) override
{
368 replayer
->handle_process_entry_safe(replay_entry
, r
);
372 static std::string
to_string(const State state
);
374 bool is_stopped_() const {
375 return m_state
== STATE_STOPPED
;
377 bool is_running_() const {
378 return !is_stopped_() && m_state
!= STATE_STOPPING
&& !m_stop_requested
;
380 bool is_replaying_() const {
381 return (m_state
== STATE_REPLAYING
||
382 m_state
== STATE_REPLAY_FLUSHING
);
385 bool update_mirror_image_status(bool force
, const OptionalState
&state
);
386 bool start_mirror_image_status_update(bool force
, bool restarting
);
387 void finish_mirror_image_status_update();
388 void queue_mirror_image_status_update(const OptionalState
&state
);
389 void send_mirror_status_update(const OptionalState
&state
);
390 void handle_mirror_status_update(int r
);
391 void reschedule_update_status_task(int new_interval
= 0);
393 void shut_down(int r
);
394 void handle_shut_down(int r
);
395 void handle_remote_journal_metadata_updated();
397 void prepare_local_image();
398 void handle_prepare_local_image(int r
);
401 void handle_bootstrap(int r
);
403 void init_remote_journaler();
404 void handle_init_remote_journaler(int r
);
407 void handle_start_replay(int r
);
410 void handle_replay_flush(int r
);
412 void get_remote_tag();
413 void handle_get_remote_tag(int r
);
415 void allocate_local_tag();
416 void handle_allocate_local_tag(int r
);
418 void preprocess_entry();
419 void handle_preprocess_entry_ready(int r
);
420 void handle_preprocess_entry_safe(int r
);
422 void process_entry();
423 void handle_process_entry_ready(int r
);
424 void handle_process_entry_safe(const ReplayEntry
& replay_entry
, int r
);
428 } // namespace mirror
431 extern template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;
433 #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H