1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5 #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
7 #include "common/AsyncOpTracker.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/rados/librados.hpp"
11 #include "cls/journal/cls_journal_types.h"
12 #include "cls/rbd/cls_rbd_types.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayEntry.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/journal/Types.h"
17 #include "librbd/journal/TypeTraits.h"
18 #include "ImageDeleter.h"
19 #include "ProgressContext.h"
22 #include <boost/noncopyable.hpp>
23 #include <boost/optional.hpp>
31 class AdminSocketHook
;
43 namespace journal
{ template <typename
> class Replay
; }
50 template <typename
> struct InstanceWatcher
;
51 template <typename
> struct Threads
;
53 namespace image_replayer
{ template <typename
> class BootstrapRequest
; }
54 namespace image_replayer
{ template <typename
> class EventPreprocessor
; }
55 namespace image_replayer
{ template <typename
> class ReplayStatusFormatter
; }
58 * Replays changes from a remote cluster for a single image.
60 template <typename ImageCtxT
= librbd::ImageCtx
>
63 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::ReplayEntry ReplayEntry
;
69 STATE_REPLAY_FLUSHING
,
74 static ImageReplayer
*create(
75 Threads
<librbd::ImageCtx
> *threads
,
76 std::shared_ptr
<ImageDeleter
> image_deleter
,
77 InstanceWatcher
<ImageCtxT
> *instance_watcher
,
78 RadosRef local
, const std::string
&local_mirror_uuid
, int64_t local_pool_id
,
79 const std::string
&global_image_id
) {
80 return new ImageReplayer(threads
, image_deleter
, instance_watcher
,
81 local
, local_mirror_uuid
, local_pool_id
,
88 ImageReplayer(Threads
<librbd::ImageCtx
> *threads
,
89 std::shared_ptr
<ImageDeleter
> image_deleter
,
90 InstanceWatcher
<ImageCtxT
> *instance_watcher
,
91 RadosRef local
, const std::string
&local_mirror_uuid
,
92 int64_t local_pool_id
, const std::string
&global_image_id
);
93 virtual ~ImageReplayer();
94 ImageReplayer(const ImageReplayer
&) = delete;
95 ImageReplayer
& operator=(const ImageReplayer
&) = delete;
97 State
get_state() { Mutex::Locker
l(m_lock
); return get_state_(); }
98 bool is_stopped() { Mutex::Locker
l(m_lock
); return is_stopped_(); }
99 bool is_running() { Mutex::Locker
l(m_lock
); return is_running_(); }
100 bool is_replaying() { Mutex::Locker
l(m_lock
); return is_replaying_(); }
102 std::string
get_name() { Mutex::Locker
l(m_lock
); return m_name
; };
103 void set_state_description(int r
, const std::string
&desc
);
105 inline bool is_blacklisted() const {
106 Mutex::Locker
locker(m_lock
);
107 return (m_last_r
== -EBLACKLISTED
);
110 void add_remote_image(const std::string
&remote_mirror_uuid
,
111 const std::string
&remote_image_id
,
112 librados::IoCtx
&remote_io_ctx
);
113 void remove_remote_image(const std::string
&remote_mirror_uuid
,
114 const std::string
&remote_image_id
,
115 bool schedule_delete
);
116 bool remote_images_empty() const;
118 inline int64_t get_local_pool_id() const {
119 return m_local_pool_id
;
121 inline const std::string
& get_global_image_id() const {
122 return m_global_image_id
;
125 void start(Context
*on_finish
= nullptr, bool manual
= false);
126 void stop(Context
*on_finish
= nullptr, bool manual
= false,
127 int r
= 0, const std::string
& desc
= "");
128 void restart(Context
*on_finish
= nullptr);
129 void flush(Context
*on_finish
= nullptr);
131 void resync_image(Context
*on_finish
=nullptr);
133 void print_status(Formatter
*f
, stringstream
*ss
);
135 virtual void handle_replay_ready();
136 virtual void handle_replay_complete(int r
, const std::string
&error_desc
);
142 * <uninitialized> <------------------------------------ FAIL
148 * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * *
151 * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
154 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
157 * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
159 * | /--------------------------------------------\
162 * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
165 * | | FLUSH_COMMIT_POSITION |
167 * | | \--------------------/|
169 * | | (entries available) |
170 * | \-----------> REPLAY_READY |
173 * | v needed) (error)
174 * | REPLAY_FLUSH * * * * * * * * *
176 * | | (skip if not | *
177 * | v needed) (error) *
178 * | GET_REMOTE_TAG * * * * * * * *
180 * | | (skip if not | *
181 * | v needed) (error) *
182 * | ALLOCATE_LOCAL_TAG * * * * * *
185 * | PREPROCESS_ENTRY * * * * * * *
188 * | PROCESS_ENTRY * * * * * * * * *
190 * | \---------------------/ *
192 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
195 * JOURNAL_REPLAY_SHUT_DOWN
206 virtual void on_start_fail(int r
, const std::string
&desc
= "");
207 virtual bool on_start_interrupted();
209 virtual void on_stop_journal_replay(int r
= 0, const std::string
&desc
= "");
211 virtual void on_flush_local_replay_flush_start(Context
*on_flush
);
212 virtual void on_flush_local_replay_flush_finish(Context
*on_flush
, int r
);
213 virtual void on_flush_flush_commit_position_start(Context
*on_flush
);
214 virtual void on_flush_flush_commit_position_finish(Context
*on_flush
, int r
);
216 bool on_replay_interrupted();
220 std::string mirror_uuid
;
221 std::string image_id
;
222 librados::IoCtx io_ctx
;
226 RemoteImage(const std::string
&mirror_uuid
,
227 const std::string
&image_id
)
228 : mirror_uuid(mirror_uuid
), image_id(image_id
) {
230 RemoteImage(const std::string
&mirror_uuid
,
231 const std::string
&image_id
,
232 librados::IoCtx
&io_ctx
)
233 : mirror_uuid(mirror_uuid
), image_id(image_id
), io_ctx(io_ctx
) {
236 inline bool operator<(const RemoteImage
&rhs
) const {
237 if (mirror_uuid
!= rhs
.mirror_uuid
) {
238 return mirror_uuid
< rhs
.mirror_uuid
;
240 return image_id
< rhs
.image_id
;
243 inline bool operator==(const RemoteImage
&rhs
) const {
244 return (mirror_uuid
== rhs
.mirror_uuid
&& image_id
== rhs
.image_id
);
248 typedef std::set
<RemoteImage
> RemoteImages
;
250 typedef typename
librbd::journal::TypeTraits
<ImageCtxT
>::Journaler Journaler
;
251 typedef boost::optional
<State
> OptionalState
;
253 struct JournalListener
: public librbd::journal::Listener
{
254 ImageReplayer
*img_replayer
;
256 JournalListener(ImageReplayer
*img_replayer
)
257 : img_replayer(img_replayer
) {
260 void handle_close() override
{
261 img_replayer
->on_stop_journal_replay();
264 void handle_promoted() override
{
265 img_replayer
->on_stop_journal_replay(0, "force promoted");
268 void handle_resync() override
{
269 img_replayer
->resync_image();
273 class BootstrapProgressContext
: public ProgressContext
{
275 BootstrapProgressContext(ImageReplayer
<ImageCtxT
> *replayer
) :
279 void update_progress(const std::string
&description
,
280 bool flush
= true) override
;
282 ImageReplayer
<ImageCtxT
> *replayer
;
285 Threads
<librbd::ImageCtx
> *m_threads
;
286 std::shared_ptr
<ImageDeleter
> m_image_deleter
;
287 InstanceWatcher
<ImageCtxT
> *m_instance_watcher
;
289 RemoteImages m_remote_images
;
290 RemoteImage m_remote_image
;
293 std::string m_local_mirror_uuid
;
294 int64_t m_local_pool_id
;
295 std::string m_local_image_id
;
296 std::string m_global_image_id
;
298 mutable Mutex m_lock
;
299 State m_state
= STATE_STOPPED
;
301 std::string m_state_desc
;
302 BootstrapProgressContext m_progress_cxt
;
304 image_replayer::EventPreprocessor
<ImageCtxT
> *m_event_preprocessor
= nullptr;
305 image_replayer::ReplayStatusFormatter
<ImageCtxT
> *m_replay_status_formatter
=
307 librados::IoCtx m_local_ioctx
;
308 ImageCtxT
*m_local_image_ctx
= nullptr;
309 std::string m_local_image_tag_owner
;
311 decltype(ImageCtxT::journal
) m_local_journal
= nullptr;
312 librbd::journal::Replay
<ImageCtxT
> *m_local_replay
= nullptr;
313 Journaler
* m_remote_journaler
= nullptr;
314 ::journal::ReplayHandler
*m_replay_handler
= nullptr;
315 librbd::journal::Listener
*m_journal_listener
;
316 bool m_stopping_for_resync
= false;
318 Context
*m_on_start_finish
= nullptr;
319 Context
*m_on_stop_finish
= nullptr;
320 Context
*m_update_status_task
= nullptr;
321 int m_update_status_interval
= 0;
322 librados::AioCompletion
*m_update_status_comp
= nullptr;
323 bool m_stop_requested
= false;
324 bool m_manual_stop
= false;
326 AdminSocketHook
*m_asok_hook
= nullptr;
328 image_replayer::BootstrapRequest
<ImageCtxT
> *m_bootstrap_request
= nullptr;
330 uint32_t m_in_flight_status_updates
= 0;
331 bool m_update_status_requested
= false;
332 Context
*m_on_update_status_finish
= nullptr;
334 librbd::journal::MirrorPeerClientMeta m_client_meta
;
336 ReplayEntry m_replay_entry
;
337 bool m_replay_tag_valid
= false;
338 uint64_t m_replay_tag_tid
= 0;
339 cls::journal::Tag m_replay_tag
;
340 librbd::journal::TagData m_replay_tag_data
;
341 librbd::journal::EventEntry m_event_entry
;
342 AsyncOpTracker m_event_replay_tracker
;
343 Context
*m_delayed_preprocess_task
= nullptr;
345 struct RemoteJournalerListener
: public ::journal::JournalMetadataListener
{
346 ImageReplayer
*replayer
;
348 RemoteJournalerListener(ImageReplayer
*replayer
) : replayer(replayer
) { }
350 void handle_update(::journal::JournalMetadata
*) override
;
353 struct C_ReplayCommitted
: public Context
{
354 ImageReplayer
*replayer
;
355 ReplayEntry replay_entry
;
357 C_ReplayCommitted(ImageReplayer
*replayer
,
358 ReplayEntry
&&replay_entry
)
359 : replayer(replayer
), replay_entry(std::move(replay_entry
)) {
361 void finish(int r
) override
{
362 replayer
->handle_process_entry_safe(replay_entry
, r
);
366 static std::string
to_string(const State state
);
368 State
get_state_() const {
371 bool is_stopped_() const {
372 return m_state
== STATE_STOPPED
;
374 bool is_running_() const {
375 return !is_stopped_() && m_state
!= STATE_STOPPING
&& !m_stop_requested
;
377 bool is_replaying_() const {
378 return (m_state
== STATE_REPLAYING
||
379 m_state
== STATE_REPLAY_FLUSHING
);
382 bool update_mirror_image_status(bool force
, const OptionalState
&state
);
383 bool start_mirror_image_status_update(bool force
, bool restarting
);
384 void finish_mirror_image_status_update();
385 void queue_mirror_image_status_update(const OptionalState
&state
);
386 void send_mirror_status_update(const OptionalState
&state
);
387 void handle_mirror_status_update(int r
);
388 void reschedule_update_status_task(int new_interval
= 0);
390 void shut_down(int r
);
391 void handle_shut_down(int r
);
392 void handle_remote_journal_metadata_updated();
394 void prepare_local_image();
395 void handle_prepare_local_image(int r
);
398 void handle_bootstrap(int r
);
400 void init_remote_journaler();
401 void handle_init_remote_journaler(int r
);
404 void handle_start_replay(int r
);
407 void handle_replay_flush(int r
);
409 void get_remote_tag();
410 void handle_get_remote_tag(int r
);
412 void allocate_local_tag();
413 void handle_allocate_local_tag(int r
);
415 void preprocess_entry();
416 void handle_preprocess_entry_ready(int r
);
417 void handle_preprocess_entry_safe(int r
);
419 void process_entry();
420 void handle_process_entry_ready(int r
);
421 void handle_process_entry_safe(const ReplayEntry
& replay_entry
, int r
);
425 } // namespace mirror
428 extern template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;
430 #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H