1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "librbd/Journal.h"
10 #include "librbd/Utils.h"
11 #include "librbd/journal/Replay.h"
12 #include "journal/Journaler.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayHandler.h"
15 #include "tools/rbd_mirror/Threads.h"
16 #include "tools/rbd_mirror/Types.h"
17 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
18 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
19 #include "tools/rbd_mirror/image_replayer/Utils.h"
20 #include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
21 #include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
22 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rbd_mirror
27 #define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
28 << "Replayer: " << this << " " << __func__ << ": "
30 extern PerfCounters
*g_perf_counters
;
34 namespace image_replayer
{
39 uint32_t calculate_replay_delay(const utime_t
&event_time
,
40 int mirroring_replay_delay
) {
41 if (mirroring_replay_delay
<= 0) {
45 utime_t now
= ceph_clock_now();
46 if (event_time
+ mirroring_replay_delay
<= now
) {
50 // ensure it is rounded up when converting to integer
51 return (event_time
+ mirroring_replay_delay
- now
) + 1;
54 } // anonymous namespace
56 using librbd::util::create_async_context_callback
;
57 using librbd::util::create_context_callback
;
60 struct Replayer
<I
>::C_ReplayCommitted
: public Context
{
62 ReplayEntry replay_entry
;
63 uint64_t replay_bytes
;
64 utime_t replay_start_time
;
66 C_ReplayCommitted(Replayer
* replayer
, ReplayEntry
&&replay_entry
,
67 uint64_t replay_bytes
, const utime_t
&replay_start_time
)
68 : replayer(replayer
), replay_entry(std::move(replay_entry
)),
69 replay_bytes(replay_bytes
), replay_start_time(replay_start_time
) {
72 void finish(int r
) override
{
73 replayer
->handle_process_entry_safe(replay_entry
, replay_bytes
,
74 replay_start_time
, r
);
79 struct Replayer
<I
>::RemoteJournalerListener
80 : public ::journal::JournalMetadataListener
{
83 RemoteJournalerListener(Replayer
* replayer
) : replayer(replayer
) {}
85 void handle_update(::journal::JournalMetadata
*) override
{
86 auto ctx
= new C_TrackedOp(
87 replayer
->m_in_flight_op_tracker
,
88 new LambdaContext([this](int r
) {
89 replayer
->handle_remote_journal_metadata_updated();
91 replayer
->m_threads
->work_queue
->queue(ctx
, 0);
96 struct Replayer
<I
>::RemoteReplayHandler
: public ::journal::ReplayHandler
{
99 RemoteReplayHandler(Replayer
* replayer
) : replayer(replayer
) {}
100 ~RemoteReplayHandler() override
{};
102 void handle_entries_available() override
{
103 replayer
->handle_replay_ready();
106 void handle_complete(int r
) override
{
109 error
= "not enough memory in autotune cache";
111 error
= "replay completed with error: " + cpp_strerror(r
);
113 replayer
->handle_replay_complete(r
, error
);
117 template <typename I
>
118 struct Replayer
<I
>::LocalJournalListener
119 : public librbd::journal::Listener
{
122 LocalJournalListener(Replayer
* replayer
) : replayer(replayer
) {
125 void handle_close() override
{
126 replayer
->handle_replay_complete(0, "");
129 void handle_promoted() override
{
130 replayer
->handle_replay_complete(0, "force promoted");
133 void handle_resync() override
{
134 replayer
->handle_resync_image();
138 template <typename I
>
139 Replayer
<I
>::Replayer(
141 const std::string
& local_mirror_uuid
,
142 StateBuilder
<I
>* state_builder
,
143 ReplayerListener
* replayer_listener
)
144 : m_threads(threads
),
145 m_local_mirror_uuid(local_mirror_uuid
),
146 m_state_builder(state_builder
),
147 m_replayer_listener(replayer_listener
),
148 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
149 "rbd::mirror::image_replayer::journal::Replayer", this))) {
153 std::unique_lock locker
{m_lock
};
154 register_perf_counters();
158 template <typename I
>
159 Replayer
<I
>::~Replayer() {
163 std::unique_lock locker
{m_lock
};
164 unregister_perf_counters();
167 ceph_assert(m_remote_listener
== nullptr);
168 ceph_assert(m_local_journal_listener
== nullptr);
169 ceph_assert(m_local_journal_replay
== nullptr);
170 ceph_assert(m_remote_replay_handler
== nullptr);
171 ceph_assert(m_event_preprocessor
== nullptr);
172 ceph_assert(m_replay_status_formatter
== nullptr);
173 ceph_assert(m_delayed_preprocess_task
== nullptr);
174 ceph_assert(m_flush_local_replay_task
== nullptr);
175 ceph_assert(m_state_builder
->local_image_ctx
== nullptr);
178 template <typename I
>
179 void Replayer
<I
>::init(Context
* on_finish
) {
183 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
184 std::shared_lock image_locker
{local_image_ctx
->image_lock
};
185 m_image_spec
= util::compute_image_spec(local_image_ctx
->md_ctx
,
186 local_image_ctx
->name
);
189 ceph_assert(m_on_init_shutdown
== nullptr);
190 m_on_init_shutdown
= on_finish
;
192 init_remote_journaler();
195 template <typename I
>
196 void Replayer
<I
>::shut_down(Context
* on_finish
) {
199 std::unique_lock locker
{m_lock
};
200 ceph_assert(m_on_init_shutdown
== nullptr);
201 m_on_init_shutdown
= on_finish
;
203 if (m_state
== STATE_INIT
) {
204 // raced with the last piece of the init state machine
206 } else if (m_state
== STATE_REPLAYING
) {
207 m_state
= STATE_COMPLETE
;
210 // if shutting down due to an error notification, we don't
211 // need to propagate the same error again
213 m_error_description
= "";
215 cancel_delayed_preprocess_task();
216 cancel_flush_local_replay_task();
220 template <typename I
>
221 void Replayer
<I
>::flush(Context
* on_finish
) {
224 flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker
, on_finish
));
227 template <typename I
>
228 bool Replayer
<I
>::get_replay_status(std::string
* description
,
229 Context
* on_finish
) {
232 std::unique_lock locker
{m_lock
};
233 if (m_replay_status_formatter
== nullptr) {
234 derr
<< "replay not running" << dendl
;
237 on_finish
->complete(-EAGAIN
);
241 on_finish
= new C_TrackedOp(m_in_flight_op_tracker
, on_finish
);
242 return m_replay_status_formatter
->get_or_send_update(description
,
246 template <typename I
>
247 void Replayer
<I
>::init_remote_journaler() {
250 Context
*ctx
= create_context_callback
<
251 Replayer
, &Replayer
<I
>::handle_init_remote_journaler
>(this);
252 m_state_builder
->remote_journaler
->init(ctx
);
255 template <typename I
>
256 void Replayer
<I
>::handle_init_remote_journaler(int r
) {
257 dout(10) << "r=" << r
<< dendl
;
259 std::unique_lock locker
{m_lock
};
261 derr
<< "failed to initialize remote journal: " << cpp_strerror(r
) << dendl
;
262 handle_replay_complete(locker
, r
, "error initializing remote journal");
267 // listen for metadata updates to check for disconnect events
268 ceph_assert(m_remote_listener
== nullptr);
269 m_remote_listener
= new RemoteJournalerListener(this);
270 m_state_builder
->remote_journaler
->add_listener(m_remote_listener
);
272 cls::journal::Client remote_client
;
273 r
= m_state_builder
->remote_journaler
->get_cached_client(m_local_mirror_uuid
,
276 derr
<< "error retrieving remote journal client: " << cpp_strerror(r
)
278 handle_replay_complete(locker
, r
, "error retrieving remote journal client");
284 r
= validate_remote_client_state(remote_client
,
285 &m_state_builder
->remote_client_meta
,
286 &m_resync_requested
, &error
);
288 handle_replay_complete(locker
, r
, error
);
293 start_external_replay(locker
);
296 template <typename I
>
297 void Replayer
<I
>::start_external_replay(std::unique_lock
<ceph::mutex
>& locker
) {
300 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
301 std::shared_lock local_image_locker
{local_image_ctx
->image_lock
};
303 ceph_assert(m_local_journal
== nullptr);
304 m_local_journal
= local_image_ctx
->journal
;
305 if (m_local_journal
== nullptr) {
306 local_image_locker
.unlock();
308 derr
<< "local image journal closed" << dendl
;
309 handle_replay_complete(locker
, -EINVAL
, "error accessing local journal");
314 // safe to hold pointer to journal after external playback starts
315 Context
*start_ctx
= create_context_callback
<
316 Replayer
, &Replayer
<I
>::handle_start_external_replay
>(this);
317 m_local_journal
->start_external_replay(&m_local_journal_replay
, start_ctx
);
320 template <typename I
>
321 void Replayer
<I
>::handle_start_external_replay(int r
) {
322 dout(10) << "r=" << r
<< dendl
;
324 std::unique_lock locker
{m_lock
};
326 ceph_assert(m_local_journal_replay
== nullptr);
327 derr
<< "error starting external replay on local image "
328 << m_state_builder
->local_image_ctx
->id
<< ": "
329 << cpp_strerror(r
) << dendl
;
331 handle_replay_complete(locker
, r
, "error starting replay on local image");
336 if (!notify_init_complete(locker
)) {
340 m_state
= STATE_REPLAYING
;
342 // check for resync/promotion state after adding listener
343 if (!add_local_journal_listener(locker
)) {
347 // start remote journal replay
348 m_event_preprocessor
= EventPreprocessor
<I
>::create(
349 *m_state_builder
->local_image_ctx
, *m_state_builder
->remote_journaler
,
350 m_local_mirror_uuid
, &m_state_builder
->remote_client_meta
,
351 m_threads
->work_queue
);
352 m_replay_status_formatter
= ReplayStatusFormatter
<I
>::create(
353 m_state_builder
->remote_journaler
, m_local_mirror_uuid
);
355 auto cct
= static_cast<CephContext
*>(m_state_builder
->local_image_ctx
->cct
);
356 double poll_seconds
= cct
->_conf
.get_val
<double>(
357 "rbd_mirror_journal_poll_age");
358 m_remote_replay_handler
= new RemoteReplayHandler(this);
359 m_state_builder
->remote_journaler
->start_live_replay(m_remote_replay_handler
,
362 notify_status_updated();
365 template <typename I
>
366 bool Replayer
<I
>::add_local_journal_listener(
367 std::unique_lock
<ceph::mutex
>& locker
) {
370 // listen for promotion and resync requests against local journal
371 ceph_assert(m_local_journal_listener
== nullptr);
372 m_local_journal_listener
= new LocalJournalListener(this);
373 m_local_journal
->add_listener(m_local_journal_listener
);
375 // verify that the local image wasn't force-promoted and that a resync hasn't
376 // been requested now that we are listening for events
377 if (m_local_journal
->is_tag_owner()) {
378 dout(10) << "local image force-promoted" << dendl
;
379 handle_replay_complete(locker
, 0, "force promoted");
383 bool resync_requested
= false;
384 int r
= m_local_journal
->is_resync_requested(&resync_requested
);
386 dout(10) << "failed to determine resync state: " << cpp_strerror(r
)
388 handle_replay_complete(locker
, r
, "error parsing resync state");
390 } else if (resync_requested
) {
391 dout(10) << "local image resync requested" << dendl
;
392 handle_replay_complete(locker
, 0, "resync requested");
399 template <typename I
>
400 bool Replayer
<I
>::notify_init_complete(std::unique_lock
<ceph::mutex
>& locker
) {
403 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
404 ceph_assert(m_state
== STATE_INIT
);
406 // notify that init has completed
407 Context
*on_finish
= nullptr;
408 std::swap(m_on_init_shutdown
, on_finish
);
411 on_finish
->complete(0);
414 if (m_on_init_shutdown
!= nullptr) {
415 // shut down requested after we notified init complete but before we
424 template <typename I
>
425 void Replayer
<I
>::wait_for_flush() {
426 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
428 // ensure that we don't have two concurrent local journal replay shut downs
430 auto ctx
= create_async_context_callback(
431 m_threads
->work_queue
, create_context_callback
<
432 Replayer
<I
>, &Replayer
<I
>::handle_wait_for_flush
>(this));
433 m_flush_tracker
.wait_for_ops(ctx
);
436 template <typename I
>
437 void Replayer
<I
>::handle_wait_for_flush(int r
) {
438 dout(10) << "r=" << r
<< dendl
;
440 shut_down_local_journal_replay();
443 template <typename I
>
444 void Replayer
<I
>::shut_down_local_journal_replay() {
445 std::unique_lock locker
{m_lock
};
447 if (m_local_journal_replay
== nullptr) {
448 wait_for_event_replay();
452 // It's required to stop the local journal replay state machine prior to
453 // waiting for the events to complete. This is to ensure that IO is properly
454 // flushed (it might be batched), wait for any running ops to complete, and
455 // to cancel any ops waiting for their associated OnFinish events.
457 auto ctx
= create_context_callback
<
458 Replayer
<I
>, &Replayer
<I
>::handle_shut_down_local_journal_replay
>(this);
459 m_local_journal_replay
->shut_down(true, ctx
);
462 template <typename I
>
463 void Replayer
<I
>::handle_shut_down_local_journal_replay(int r
) {
464 dout(10) << "r=" << r
<< dendl
;
466 std::unique_lock locker
{m_lock
};
468 derr
<< "error shutting down journal replay: " << cpp_strerror(r
) << dendl
;
469 handle_replay_error(r
, "failed to shut down local journal replay");
472 wait_for_event_replay();
475 template <typename I
>
476 void Replayer
<I
>::wait_for_event_replay() {
477 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
480 auto ctx
= create_async_context_callback(
481 m_threads
->work_queue
, create_context_callback
<
482 Replayer
<I
>, &Replayer
<I
>::handle_wait_for_event_replay
>(this));
483 m_event_replay_tracker
.wait_for_ops(ctx
);
486 template <typename I
>
487 void Replayer
<I
>::handle_wait_for_event_replay(int r
) {
488 dout(10) << "r=" << r
<< dendl
;
490 std::unique_lock locker
{m_lock
};
494 template <typename I
>
495 void Replayer
<I
>::close_local_image() {
496 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
497 if (m_state_builder
->local_image_ctx
== nullptr) {
498 stop_remote_journaler_replay();
503 if (m_local_journal_listener
!= nullptr) {
504 // blocks if listener notification is in-progress
505 m_local_journal
->remove_listener(m_local_journal_listener
);
506 delete m_local_journal_listener
;
507 m_local_journal_listener
= nullptr;
510 if (m_local_journal_replay
!= nullptr) {
511 m_local_journal
->stop_external_replay();
512 m_local_journal_replay
= nullptr;
515 if (m_event_preprocessor
!= nullptr) {
516 image_replayer::journal::EventPreprocessor
<I
>::destroy(
517 m_event_preprocessor
);
518 m_event_preprocessor
= nullptr;
521 m_local_journal
.reset();
523 // NOTE: it's important to ensure that the local image is fully
524 // closed before attempting to close the remote journal in
525 // case the remote cluster is unreachable
526 ceph_assert(m_state_builder
->local_image_ctx
!= nullptr);
527 auto ctx
= create_context_callback
<
528 Replayer
<I
>, &Replayer
<I
>::handle_close_local_image
>(this);
529 auto request
= image_replayer::CloseImageRequest
<I
>::create(
530 &m_state_builder
->local_image_ctx
, ctx
);
535 template <typename I
>
536 void Replayer
<I
>::handle_close_local_image(int r
) {
537 dout(10) << "r=" << r
<< dendl
;
539 std::unique_lock locker
{m_lock
};
541 derr
<< "error closing local iamge: " << cpp_strerror(r
) << dendl
;
542 handle_replay_error(r
, "failed to close local image");
545 ceph_assert(m_state_builder
->local_image_ctx
== nullptr);
546 stop_remote_journaler_replay();
549 template <typename I
>
550 void Replayer
<I
>::stop_remote_journaler_replay() {
551 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
553 if (m_state_builder
->remote_journaler
== nullptr) {
554 wait_for_in_flight_ops();
556 } else if (m_remote_replay_handler
== nullptr) {
557 wait_for_in_flight_ops();
562 auto ctx
= create_async_context_callback(
563 m_threads
->work_queue
, create_context_callback
<
564 Replayer
<I
>, &Replayer
<I
>::handle_stop_remote_journaler_replay
>(this));
565 m_state_builder
->remote_journaler
->stop_replay(ctx
);
568 template <typename I
>
569 void Replayer
<I
>::handle_stop_remote_journaler_replay(int r
) {
570 dout(10) << "r=" << r
<< dendl
;
572 std::unique_lock locker
{m_lock
};
574 derr
<< "failed to stop remote journaler replay : " << cpp_strerror(r
)
576 handle_replay_error(r
, "failed to stop remote journaler replay");
579 delete m_remote_replay_handler
;
580 m_remote_replay_handler
= nullptr;
582 wait_for_in_flight_ops();
585 template <typename I
>
586 void Replayer
<I
>::wait_for_in_flight_ops() {
588 if (m_remote_listener
!= nullptr) {
589 m_state_builder
->remote_journaler
->remove_listener(m_remote_listener
);
590 delete m_remote_listener
;
591 m_remote_listener
= nullptr;
594 auto ctx
= create_async_context_callback(
595 m_threads
->work_queue
, create_context_callback
<
596 Replayer
<I
>, &Replayer
<I
>::handle_wait_for_in_flight_ops
>(this));
597 m_in_flight_op_tracker
.wait_for_ops(ctx
);
600 template <typename I
>
601 void Replayer
<I
>::handle_wait_for_in_flight_ops(int r
) {
602 dout(10) << "r=" << r
<< dendl
;
604 ReplayStatusFormatter
<I
>::destroy(m_replay_status_formatter
);
605 m_replay_status_formatter
= nullptr;
607 Context
* on_init_shutdown
= nullptr;
609 std::unique_lock locker
{m_lock
};
610 ceph_assert(m_on_init_shutdown
!= nullptr);
611 std::swap(m_on_init_shutdown
, on_init_shutdown
);
612 m_state
= STATE_COMPLETE
;
614 on_init_shutdown
->complete(m_error_code
);
617 template <typename I
>
618 void Replayer
<I
>::handle_remote_journal_metadata_updated() {
621 std::unique_lock locker
{m_lock
};
622 if (m_state
!= STATE_REPLAYING
) {
626 cls::journal::Client remote_client
;
627 int r
= m_state_builder
->remote_journaler
->get_cached_client(
628 m_local_mirror_uuid
, &remote_client
);
630 derr
<< "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
634 librbd::journal::MirrorPeerClientMeta remote_client_meta
;
636 r
= validate_remote_client_state(remote_client
, &remote_client_meta
,
637 &m_resync_requested
, &error
);
639 dout(0) << "client flagged disconnected, stopping image replay" << dendl
;
640 handle_replay_complete(locker
, r
, error
);
644 template <typename I
>
645 void Replayer
<I
>::schedule_flush_local_replay_task() {
646 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
648 std::unique_lock timer_locker
{m_threads
->timer_lock
};
649 if (m_state
!= STATE_REPLAYING
|| m_flush_local_replay_task
!= nullptr) {
654 m_flush_local_replay_task
= create_async_context_callback(
655 m_threads
->work_queue
, create_context_callback
<
656 Replayer
<I
>, &Replayer
<I
>::handle_flush_local_replay_task
>(this));
657 m_threads
->timer
->add_event_after(30, m_flush_local_replay_task
);
660 template <typename I
>
661 void Replayer
<I
>::cancel_flush_local_replay_task() {
662 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
664 std::unique_lock timer_locker
{m_threads
->timer_lock
};
665 if (m_flush_local_replay_task
!= nullptr) {
667 m_threads
->timer
->cancel_event(m_flush_local_replay_task
);
668 m_flush_local_replay_task
= nullptr;
672 template <typename I
>
673 void Replayer
<I
>::handle_flush_local_replay_task(int) {
676 m_in_flight_op_tracker
.start_op();
677 auto on_finish
= new LambdaContext([this](int) {
678 std::unique_lock locker
{m_lock
};
681 std::unique_lock timer_locker
{m_threads
->timer_lock
};
682 m_flush_local_replay_task
= nullptr;
685 notify_status_updated();
686 m_in_flight_op_tracker
.finish_op();
688 flush_local_replay(on_finish
);
691 template <typename I
>
692 void Replayer
<I
>::flush_local_replay(Context
* on_flush
) {
693 std::unique_lock locker
{m_lock
};
694 if (m_state
!= STATE_REPLAYING
) {
696 on_flush
->complete(0);
698 } else if (m_local_journal_replay
== nullptr) {
699 // raced w/ a tag creation stop/start, which implies that
700 // the replay is flushed
702 flush_commit_position(on_flush
);
707 auto ctx
= new LambdaContext(
708 [this, on_flush
](int r
) {
709 handle_flush_local_replay(on_flush
, r
);
711 m_local_journal_replay
->flush(ctx
);
714 template <typename I
>
715 void Replayer
<I
>::handle_flush_local_replay(Context
* on_flush
, int r
) {
716 dout(15) << "r=" << r
<< dendl
;
718 derr
<< "error flushing local replay: " << cpp_strerror(r
) << dendl
;
719 on_flush
->complete(r
);
723 flush_commit_position(on_flush
);
726 template <typename I
>
727 void Replayer
<I
>::flush_commit_position(Context
* on_flush
) {
728 std::unique_lock locker
{m_lock
};
729 if (m_state
!= STATE_REPLAYING
) {
731 on_flush
->complete(0);
736 auto ctx
= new LambdaContext(
737 [this, on_flush
](int r
) {
738 handle_flush_commit_position(on_flush
, r
);
740 m_state_builder
->remote_journaler
->flush_commit_position(ctx
);
743 template <typename I
>
744 void Replayer
<I
>::handle_flush_commit_position(Context
* on_flush
, int r
) {
745 dout(15) << "r=" << r
<< dendl
;
747 derr
<< "error flushing remote journal commit position: "
748 << cpp_strerror(r
) << dendl
;
751 on_flush
->complete(r
);
754 template <typename I
>
755 void Replayer
<I
>::handle_replay_error(int r
, const std::string
&error
) {
756 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
758 if (m_error_code
== 0) {
760 m_error_description
= error
;
764 template <typename I
>
765 bool Replayer
<I
>::is_replay_complete() const {
766 std::unique_lock locker
{m_lock
};
767 return is_replay_complete(locker
);
770 template <typename I
>
771 bool Replayer
<I
>::is_replay_complete(
772 const std::unique_lock
<ceph::mutex
>&) const {
773 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
774 return (m_state
== STATE_COMPLETE
);
777 template <typename I
>
778 void Replayer
<I
>::handle_replay_complete(int r
, const std::string
&error
) {
779 std::unique_lock locker
{m_lock
};
780 handle_replay_complete(locker
, r
, error
);
783 template <typename I
>
784 void Replayer
<I
>::handle_replay_complete(
785 const std::unique_lock
<ceph::mutex
>&, int r
, const std::string
&error
) {
786 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
788 dout(10) << "r=" << r
<< ", error=" << error
<< dendl
;
790 derr
<< "replay encountered an error: " << cpp_strerror(r
) << dendl
;
791 handle_replay_error(r
, error
);
794 if (m_state
!= STATE_REPLAYING
) {
798 m_state
= STATE_COMPLETE
;
799 notify_status_updated();
802 template <typename I
>
803 void Replayer
<I
>::handle_replay_ready() {
804 std::unique_lock locker
{m_lock
};
805 handle_replay_ready(locker
);
808 template <typename I
>
809 void Replayer
<I
>::handle_replay_ready(
810 std::unique_lock
<ceph::mutex
>& locker
) {
811 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
814 if (is_replay_complete(locker
)) {
818 if (!m_state_builder
->remote_journaler
->try_pop_front(&m_replay_entry
,
819 &m_replay_tag_tid
)) {
820 dout(20) << "no entries ready for replay" << dendl
;
824 // can safely drop lock once the entry is tracked
825 m_event_replay_tracker
.start_op();
828 dout(20) << "entry tid=" << m_replay_entry
.get_commit_tid()
829 << "tag_tid=" << m_replay_tag_tid
<< dendl
;
830 if (!m_replay_tag_valid
|| m_replay_tag
.tid
!= m_replay_tag_tid
) {
831 // must allocate a new local journal tag prior to processing
839 template <typename I
>
840 void Replayer
<I
>::replay_flush() {
842 m_flush_tracker
.start_op();
844 // shut down the replay to flush all IO and ops and create a new
845 // replayer to handle the new tag epoch
846 auto ctx
= create_context_callback
<
847 Replayer
<I
>, &Replayer
<I
>::handle_replay_flush_shut_down
>(this);
848 ceph_assert(m_local_journal_replay
!= nullptr);
849 m_local_journal_replay
->shut_down(false, ctx
);
852 template <typename I
>
853 void Replayer
<I
>::handle_replay_flush_shut_down(int r
) {
854 std::unique_lock locker
{m_lock
};
855 dout(10) << "r=" << r
<< dendl
;
857 ceph_assert(m_local_journal
!= nullptr);
858 ceph_assert(m_local_journal_listener
!= nullptr);
860 // blocks if listener notification is in-progress
861 m_local_journal
->remove_listener(m_local_journal_listener
);
862 delete m_local_journal_listener
;
863 m_local_journal_listener
= nullptr;
865 m_local_journal
->stop_external_replay();
866 m_local_journal_replay
= nullptr;
867 m_local_journal
.reset();
872 handle_replay_flush(r
);
876 // journal might have been closed now that we stopped external replay
877 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
878 std::shared_lock local_image_locker
{local_image_ctx
->image_lock
};
879 m_local_journal
= local_image_ctx
->journal
;
880 if (m_local_journal
== nullptr) {
881 local_image_locker
.unlock();
884 derr
<< "local image journal closed" << dendl
;
885 handle_replay_flush(-EINVAL
);
889 auto ctx
= create_context_callback
<
890 Replayer
<I
>, &Replayer
<I
>::handle_replay_flush
>(this);
891 m_local_journal
->start_external_replay(&m_local_journal_replay
, ctx
);
894 template <typename I
>
895 void Replayer
<I
>::handle_replay_flush(int r
) {
896 std::unique_lock locker
{m_lock
};
897 dout(10) << "r=" << r
<< dendl
;
898 m_flush_tracker
.finish_op();
901 derr
<< "replay flush encountered an error: " << cpp_strerror(r
) << dendl
;
902 handle_replay_complete(locker
, r
, "replay flush encountered an error");
903 m_event_replay_tracker
.finish_op();
905 } else if (is_replay_complete(locker
)) {
906 m_event_replay_tracker
.finish_op();
910 // check for resync/promotion state after adding listener
911 if (!add_local_journal_listener(locker
)) {
912 m_event_replay_tracker
.finish_op();
920 template <typename I
>
921 void Replayer
<I
>::get_remote_tag() {
922 dout(15) << "tag_tid: " << m_replay_tag_tid
<< dendl
;
924 Context
*ctx
= create_context_callback
<
925 Replayer
, &Replayer
<I
>::handle_get_remote_tag
>(this);
926 m_state_builder
->remote_journaler
->get_tag(m_replay_tag_tid
, &m_replay_tag
,
930 template <typename I
>
931 void Replayer
<I
>::handle_get_remote_tag(int r
) {
932 dout(15) << "r=" << r
<< dendl
;
936 auto it
= m_replay_tag
.data
.cbegin();
937 decode(m_replay_tag_data
, it
);
938 } catch (const buffer::error
&err
) {
944 derr
<< "failed to retrieve remote tag " << m_replay_tag_tid
<< ": "
945 << cpp_strerror(r
) << dendl
;
946 handle_replay_complete(r
, "failed to retrieve remote tag");
947 m_event_replay_tracker
.finish_op();
951 m_replay_tag_valid
= true;
952 dout(15) << "decoded remote tag " << m_replay_tag_tid
<< ": "
953 << m_replay_tag_data
<< dendl
;
955 allocate_local_tag();
958 template <typename I
>
959 void Replayer
<I
>::allocate_local_tag() {
962 std::string mirror_uuid
= m_replay_tag_data
.mirror_uuid
;
963 if (mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
964 mirror_uuid
= m_state_builder
->remote_mirror_uuid
;
965 } else if (mirror_uuid
== m_local_mirror_uuid
) {
966 mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
967 } else if (mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
) {
968 // handle possible edge condition where daemon can failover and
969 // the local image has already been promoted/demoted
970 auto local_tag_data
= m_local_journal
->get_tag_data();
971 if (local_tag_data
.mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
&&
972 (local_tag_data
.predecessor
.commit_valid
&&
973 local_tag_data
.predecessor
.mirror_uuid
==
974 librbd::Journal
<>::LOCAL_MIRROR_UUID
)) {
975 dout(15) << "skipping stale demotion event" << dendl
;
976 handle_process_entry_safe(m_replay_entry
, m_replay_bytes
,
977 m_replay_start_time
, 0);
978 handle_replay_ready();
981 dout(5) << "encountered image demotion: stopping" << dendl
;
982 handle_replay_complete(0, "");
986 librbd::journal::TagPredecessor
predecessor(m_replay_tag_data
.predecessor
);
987 if (predecessor
.mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
988 predecessor
.mirror_uuid
= m_state_builder
->remote_mirror_uuid
;
989 } else if (predecessor
.mirror_uuid
== m_local_mirror_uuid
) {
990 predecessor
.mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
993 dout(15) << "mirror_uuid=" << mirror_uuid
<< ", "
994 << "predecessor=" << predecessor
<< ", "
995 << "replay_tag_tid=" << m_replay_tag_tid
<< dendl
;
996 Context
*ctx
= create_context_callback
<
997 Replayer
, &Replayer
<I
>::handle_allocate_local_tag
>(this);
998 m_local_journal
->allocate_tag(mirror_uuid
, predecessor
, ctx
);
1001 template <typename I
>
1002 void Replayer
<I
>::handle_allocate_local_tag(int r
) {
1003 dout(15) << "r=" << r
<< ", "
1004 << "tag_tid=" << m_local_journal
->get_tag_tid() << dendl
;
1006 derr
<< "failed to allocate journal tag: " << cpp_strerror(r
) << dendl
;
1007 handle_replay_complete(r
, "failed to allocate journal tag");
1008 m_event_replay_tracker
.finish_op();
1015 template <typename I
>
1016 void Replayer
<I
>::preprocess_entry() {
1017 dout(20) << "preprocessing entry tid=" << m_replay_entry
.get_commit_tid()
1020 bufferlist data
= m_replay_entry
.get_data();
1021 auto it
= data
.cbegin();
1022 int r
= m_local_journal_replay
->decode(&it
, &m_event_entry
);
1024 derr
<< "failed to decode journal event" << dendl
;
1025 handle_replay_complete(r
, "failed to decode journal event");
1026 m_event_replay_tracker
.finish_op();
1030 m_replay_bytes
= data
.length();
1031 uint32_t delay
= calculate_replay_delay(
1032 m_event_entry
.timestamp
,
1033 m_state_builder
->local_image_ctx
->mirroring_replay_delay
);
1035 handle_preprocess_entry_ready(0);
1039 std::unique_lock locker
{m_lock
};
1040 if (is_replay_complete(locker
)) {
1041 // don't schedule a delayed replay task if a shut-down is in-progress
1042 m_event_replay_tracker
.finish_op();
1046 dout(20) << "delaying replay by " << delay
<< " sec" << dendl
;
1047 std::unique_lock timer_locker
{m_threads
->timer_lock
};
1048 ceph_assert(m_delayed_preprocess_task
== nullptr);
1049 m_delayed_preprocess_task
= create_context_callback
<
1050 Replayer
<I
>, &Replayer
<I
>::handle_delayed_preprocess_task
>(this);
1051 m_threads
->timer
->add_event_after(delay
, m_delayed_preprocess_task
);
1054 template <typename I
>
1055 void Replayer
<I
>::handle_delayed_preprocess_task(int r
) {
1056 dout(20) << "r=" << r
<< dendl
;
1058 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
1059 m_delayed_preprocess_task
= nullptr;
1061 m_threads
->work_queue
->queue(create_context_callback
<
1062 Replayer
, &Replayer
<I
>::handle_preprocess_entry_ready
>(this), 0);
1065 template <typename I
>
1066 void Replayer
<I
>::handle_preprocess_entry_ready(int r
) {
1067 dout(20) << "r=" << r
<< dendl
;
1068 ceph_assert(r
== 0);
1070 m_replay_start_time
= ceph_clock_now();
1071 if (!m_event_preprocessor
->is_required(m_event_entry
)) {
1076 Context
*ctx
= create_context_callback
<
1077 Replayer
, &Replayer
<I
>::handle_preprocess_entry_safe
>(this);
1078 m_event_preprocessor
->preprocess(&m_event_entry
, ctx
);
1081 template <typename I
>
1082 void Replayer
<I
>::handle_preprocess_entry_safe(int r
) {
1083 dout(20) << "r=" << r
<< dendl
;
1086 if (r
== -ECANCELED
) {
1087 handle_replay_complete(0, "lost exclusive lock");
1089 derr
<< "failed to preprocess journal event" << dendl
;
1090 handle_replay_complete(r
, "failed to preprocess journal event");
1093 m_event_replay_tracker
.finish_op();
1100 template <typename I
>
1101 void Replayer
<I
>::process_entry() {
1102 dout(20) << "processing entry tid=" << m_replay_entry
.get_commit_tid()
1105 Context
*on_ready
= create_context_callback
<
1106 Replayer
, &Replayer
<I
>::handle_process_entry_ready
>(this);
1107 Context
*on_commit
= new C_ReplayCommitted(this, std::move(m_replay_entry
),
1109 m_replay_start_time
);
1111 m_local_journal_replay
->process(m_event_entry
, on_ready
, on_commit
);
1114 template <typename I
>
1115 void Replayer
<I
>::handle_process_entry_ready(int r
) {
1116 std::unique_lock locker
{m_lock
};
1119 ceph_assert(r
== 0);
1121 bool update_status
= false;
1123 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
1124 std::shared_lock image_locker
{local_image_ctx
->image_lock
};
1125 auto image_spec
= util::compute_image_spec(local_image_ctx
->md_ctx
,
1126 local_image_ctx
->name
);
1127 if (m_image_spec
!= image_spec
) {
1128 m_image_spec
= image_spec
;
1129 update_status
= true;
1133 m_replay_status_formatter
->handle_entry_processed(m_replay_bytes
);
1135 if (update_status
) {
1136 unregister_perf_counters();
1137 register_perf_counters();
1138 notify_status_updated();
1141 // attempt to process the next event
1142 handle_replay_ready(locker
);
1145 template <typename I
>
1146 void Replayer
<I
>::handle_process_entry_safe(
1147 const ReplayEntry
&replay_entry
, uint64_t replay_bytes
,
1148 const utime_t
&replay_start_time
, int r
) {
1149 dout(20) << "commit_tid=" << replay_entry
.get_commit_tid() << ", r=" << r
1153 derr
<< "failed to commit journal event: " << cpp_strerror(r
) << dendl
;
1154 handle_replay_complete(r
, "failed to commit journal event");
1156 ceph_assert(m_state_builder
->remote_journaler
!= nullptr);
1157 m_state_builder
->remote_journaler
->committed(replay_entry
);
1160 auto latency
= ceph_clock_now() - replay_start_time
;
1161 if (g_perf_counters
) {
1162 g_perf_counters
->inc(l_rbd_mirror_replay
);
1163 g_perf_counters
->inc(l_rbd_mirror_replay_bytes
, replay_bytes
);
1164 g_perf_counters
->tinc(l_rbd_mirror_replay_latency
, latency
);
1167 auto ctx
= new LambdaContext(
1168 [this, replay_bytes
, latency
](int r
) {
1169 std::unique_lock locker
{m_lock
};
1170 schedule_flush_local_replay_task();
1172 if (m_perf_counters
) {
1173 m_perf_counters
->inc(l_rbd_mirror_replay
);
1174 m_perf_counters
->inc(l_rbd_mirror_replay_bytes
, replay_bytes
);
1175 m_perf_counters
->tinc(l_rbd_mirror_replay_latency
, latency
);
1178 m_event_replay_tracker
.finish_op();
1180 m_threads
->work_queue
->queue(ctx
, 0);
1183 template <typename I
>
1184 void Replayer
<I
>::handle_resync_image() {
1187 std::unique_lock locker
{m_lock
};
1188 m_resync_requested
= true;
1189 handle_replay_complete(locker
, 0, "resync requested");
1192 template <typename I
>
1193 void Replayer
<I
>::notify_status_updated() {
1194 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1198 auto ctx
= new C_TrackedOp(m_in_flight_op_tracker
, new LambdaContext(
1200 m_replayer_listener
->handle_notification();
1202 m_threads
->work_queue
->queue(ctx
, 0);
1205 template <typename I
>
1206 void Replayer
<I
>::cancel_delayed_preprocess_task() {
1207 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1209 bool canceled_delayed_preprocess_task
= false;
1211 std::unique_lock timer_locker
{m_threads
->timer_lock
};
1212 if (m_delayed_preprocess_task
!= nullptr) {
1214 canceled_delayed_preprocess_task
= m_threads
->timer
->cancel_event(
1215 m_delayed_preprocess_task
);
1216 ceph_assert(canceled_delayed_preprocess_task
);
1217 m_delayed_preprocess_task
= nullptr;
1221 if (canceled_delayed_preprocess_task
) {
1222 // wake up sleeping replay
1223 m_event_replay_tracker
.finish_op();
1227 template <typename I
>
1228 int Replayer
<I
>::validate_remote_client_state(
1229 const cls::journal::Client
& remote_client
,
1230 librbd::journal::MirrorPeerClientMeta
* remote_client_meta
,
1231 bool* resync_requested
, std::string
* error
) {
1232 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1234 if (!util::decode_client_meta(remote_client
, remote_client_meta
)) {
1235 // require operator intervention since the data is corrupt
1236 *error
= "error retrieving remote journal client";
1240 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
1241 dout(5) << "image_id=" << local_image_ctx
->id
<< ", "
1242 << "remote_client_meta.image_id="
1243 << remote_client_meta
->image_id
<< ", "
1244 << "remote_client.state=" << remote_client
.state
<< dendl
;
1245 if (remote_client_meta
->image_id
== local_image_ctx
->id
&&
1246 remote_client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
1247 dout(5) << "client flagged disconnected, stopping image replay" << dendl
;
1248 if (local_image_ctx
->config
.template get_val
<bool>(
1249 "rbd_mirroring_resync_after_disconnect")) {
1250 dout(10) << "disconnected: automatic resync" << dendl
;
1251 *resync_requested
= true;
1252 *error
= "disconnected: automatic resync";
1255 dout(10) << "disconnected" << dendl
;
1256 *error
= "disconnected";
1264 template <typename I
>
1265 void Replayer
<I
>::register_perf_counters() {
1268 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1269 ceph_assert(m_perf_counters
== nullptr);
1271 auto cct
= static_cast<CephContext
*>(m_state_builder
->local_image_ctx
->cct
);
1272 auto prio
= cct
->_conf
.get_val
<int64_t>("rbd_mirror_image_perf_stats_prio");
1273 PerfCountersBuilder
plb(g_ceph_context
, "rbd_mirror_image_" + m_image_spec
,
1274 l_rbd_mirror_first
, l_rbd_mirror_last
);
1275 plb
.add_u64_counter(l_rbd_mirror_replay
, "replay", "Replays", "r", prio
);
1276 plb
.add_u64_counter(l_rbd_mirror_replay_bytes
, "replay_bytes",
1277 "Replayed data", "rb", prio
, unit_t(UNIT_BYTES
));
1278 plb
.add_time_avg(l_rbd_mirror_replay_latency
, "replay_latency",
1279 "Replay latency", "rl", prio
);
1280 m_perf_counters
= plb
.create_perf_counters();
1281 g_ceph_context
->get_perfcounters_collection()->add(m_perf_counters
);
1284 template <typename I
>
1285 void Replayer
<I
>::unregister_perf_counters() {
1287 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1289 PerfCounters
*perf_counters
= nullptr;
1290 std::swap(perf_counters
, m_perf_counters
);
1292 if (perf_counters
!= nullptr) {
1293 g_ceph_context
->get_perfcounters_collection()->remove(perf_counters
);
1294 delete perf_counters
;
1298 } // namespace journal
1299 } // namespace image_replayer
1300 } // namespace mirror
1303 template class rbd::mirror::image_replayer::journal::Replayer
<librbd::ImageCtx
>;