1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "common/debug.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "librbd/Journal.h"
10 #include "librbd/Utils.h"
11 #include "librbd/journal/Replay.h"
12 #include "journal/Journaler.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayHandler.h"
15 #include "tools/rbd_mirror/Threads.h"
16 #include "tools/rbd_mirror/Types.h"
17 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
18 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
19 #include "tools/rbd_mirror/image_replayer/Utils.h"
20 #include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
21 #include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
22 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rbd_mirror
27 #define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
28 << "Replayer: " << this << " " << __func__ << ": "
30 extern PerfCounters
*g_perf_counters
;
34 namespace image_replayer
{
39 uint32_t calculate_replay_delay(const utime_t
&event_time
,
40 int mirroring_replay_delay
) {
41 if (mirroring_replay_delay
<= 0) {
45 utime_t now
= ceph_clock_now();
46 if (event_time
+ mirroring_replay_delay
<= now
) {
50 // ensure it is rounded up when converting to integer
51 return (event_time
+ mirroring_replay_delay
- now
) + 1;
54 } // anonymous namespace
56 using librbd::util::create_async_context_callback
;
57 using librbd::util::create_context_callback
;
60 struct Replayer
<I
>::C_ReplayCommitted
: public Context
{
62 ReplayEntry replay_entry
;
63 uint64_t replay_bytes
;
64 utime_t replay_start_time
;
66 C_ReplayCommitted(Replayer
* replayer
, ReplayEntry
&&replay_entry
,
67 uint64_t replay_bytes
, const utime_t
&replay_start_time
)
68 : replayer(replayer
), replay_entry(std::move(replay_entry
)),
69 replay_bytes(replay_bytes
), replay_start_time(replay_start_time
) {
72 void finish(int r
) override
{
73 replayer
->handle_process_entry_safe(replay_entry
, replay_bytes
,
74 replay_start_time
, r
);
79 struct Replayer
<I
>::C_TrackedOp
: public Context
{
83 C_TrackedOp(Replayer
* replayer
, Context
* ctx
)
84 : replayer(replayer
), ctx(ctx
) {
85 replayer
->m_in_flight_op_tracker
.start_op();
88 void finish(int r
) override
{
90 replayer
->m_in_flight_op_tracker
.finish_op();
95 struct Replayer
<I
>::RemoteJournalerListener
96 : public ::journal::JournalMetadataListener
{
99 RemoteJournalerListener(Replayer
* replayer
) : replayer(replayer
) {}
101 void handle_update(::journal::JournalMetadata
*) override
{
102 auto ctx
= new C_TrackedOp(replayer
, new LambdaContext([this](int r
) {
103 replayer
->handle_remote_journal_metadata_updated();
105 replayer
->m_threads
->work_queue
->queue(ctx
, 0);
109 template <typename I
>
110 struct Replayer
<I
>::RemoteReplayHandler
: public ::journal::ReplayHandler
{
113 RemoteReplayHandler(Replayer
* replayer
) : replayer(replayer
) {}
114 ~RemoteReplayHandler() override
{};
116 void handle_entries_available() override
{
117 replayer
->handle_replay_ready();
120 void handle_complete(int r
) override
{
123 error
= "not enough memory in autotune cache";
125 error
= "replay completed with error: " + cpp_strerror(r
);
127 replayer
->handle_replay_complete(r
, error
);
131 template <typename I
>
132 struct Replayer
<I
>::LocalJournalListener
133 : public librbd::journal::Listener
{
136 LocalJournalListener(Replayer
* replayer
) : replayer(replayer
) {
139 void handle_close() override
{
140 replayer
->handle_replay_complete(0, "");
143 void handle_promoted() override
{
144 replayer
->handle_replay_complete(0, "force promoted");
147 void handle_resync() override
{
148 replayer
->handle_resync_image();
152 template <typename I
>
153 Replayer
<I
>::Replayer(
155 const std::string
& local_mirror_uuid
,
156 StateBuilder
<I
>* state_builder
,
157 ReplayerListener
* replayer_listener
)
158 : m_threads(threads
),
159 m_local_mirror_uuid(local_mirror_uuid
),
160 m_state_builder(state_builder
),
161 m_replayer_listener(replayer_listener
),
162 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
163 "rbd::mirror::image_replayer::journal::Replayer", this))) {
167 std::unique_lock locker
{m_lock
};
168 register_perf_counters();
172 template <typename I
>
173 Replayer
<I
>::~Replayer() {
177 std::unique_lock locker
{m_lock
};
178 unregister_perf_counters();
181 ceph_assert(m_remote_listener
== nullptr);
182 ceph_assert(m_local_journal_listener
== nullptr);
183 ceph_assert(m_local_journal_replay
== nullptr);
184 ceph_assert(m_remote_replay_handler
== nullptr);
185 ceph_assert(m_event_preprocessor
== nullptr);
186 ceph_assert(m_replay_status_formatter
== nullptr);
187 ceph_assert(m_delayed_preprocess_task
== nullptr);
188 ceph_assert(m_flush_local_replay_task
== nullptr);
189 ceph_assert(m_state_builder
->local_image_ctx
== nullptr);
192 template <typename I
>
193 void Replayer
<I
>::init(Context
* on_finish
) {
196 ceph_assert(m_local_journal
== nullptr);
198 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
199 std::shared_lock image_locker
{local_image_ctx
->image_lock
};
200 m_image_spec
= util::compute_image_spec(local_image_ctx
->md_ctx
,
201 local_image_ctx
->name
);
202 m_local_journal
= local_image_ctx
->journal
;
205 ceph_assert(m_on_init_shutdown
== nullptr);
206 m_on_init_shutdown
= on_finish
;
208 if (m_local_journal
== nullptr) {
209 std::unique_lock locker
{m_lock
};
210 m_state
= STATE_COMPLETE
;
211 m_state_builder
->remote_journaler
= nullptr;
213 handle_replay_complete(locker
, -EINVAL
, "error accessing local journal");
218 init_remote_journaler();
221 template <typename I
>
222 void Replayer
<I
>::shut_down(Context
* on_finish
) {
225 std::unique_lock locker
{m_lock
};
226 ceph_assert(m_on_init_shutdown
== nullptr);
227 m_on_init_shutdown
= on_finish
;
229 if (m_state
== STATE_INIT
) {
230 // raced with the last piece of the init state machine
232 } else if (m_state
== STATE_REPLAYING
) {
233 m_state
= STATE_COMPLETE
;
236 // if shutting down due to an error notification, we don't
237 // need to propagate the same error again
239 m_error_description
= "";
241 cancel_delayed_preprocess_task();
242 cancel_flush_local_replay_task();
243 shut_down_local_journal_replay();
246 template <typename I
>
247 void Replayer
<I
>::flush(Context
* on_finish
) {
250 flush_local_replay(new C_TrackedOp(this, on_finish
));
253 template <typename I
>
254 bool Replayer
<I
>::get_replay_status(std::string
* description
,
255 Context
* on_finish
) {
258 std::unique_lock locker
{m_lock
};
259 if (m_replay_status_formatter
== nullptr) {
260 derr
<< "replay not running" << dendl
;
263 on_finish
->complete(-EAGAIN
);
267 on_finish
= new C_TrackedOp(this, on_finish
);
268 return m_replay_status_formatter
->get_or_send_update(description
,
272 template <typename I
>
273 void Replayer
<I
>::init_remote_journaler() {
276 Context
*ctx
= create_context_callback
<
277 Replayer
, &Replayer
<I
>::handle_init_remote_journaler
>(this);
278 m_state_builder
->remote_journaler
->init(ctx
);
281 template <typename I
>
282 void Replayer
<I
>::handle_init_remote_journaler(int r
) {
283 dout(10) << "r=" << r
<< dendl
;
285 std::unique_lock locker
{m_lock
};
287 derr
<< "failed to initialize remote journal: " << cpp_strerror(r
) << dendl
;
288 handle_replay_complete(locker
, r
, "error initializing remote journal");
293 // listen for metadata updates to check for disconnect events
294 ceph_assert(m_remote_listener
== nullptr);
295 m_remote_listener
= new RemoteJournalerListener(this);
296 m_state_builder
->remote_journaler
->add_listener(m_remote_listener
);
298 cls::journal::Client remote_client
;
299 r
= m_state_builder
->remote_journaler
->get_cached_client(m_local_mirror_uuid
,
302 derr
<< "error retrieving remote journal client: " << cpp_strerror(r
)
304 handle_replay_complete(locker
, r
, "error retrieving remote journal client");
310 r
= validate_remote_client_state(remote_client
,
311 &m_state_builder
->remote_client_meta
,
312 &m_resync_requested
, &error
);
314 handle_replay_complete(locker
, r
, error
);
319 start_external_replay();
322 template <typename I
>
323 void Replayer
<I
>::start_external_replay() {
326 Context
*start_ctx
= create_context_callback
<
327 Replayer
, &Replayer
<I
>::handle_start_external_replay
>(this);
328 m_local_journal
->start_external_replay(&m_local_journal_replay
, start_ctx
);
331 template <typename I
>
332 void Replayer
<I
>::handle_start_external_replay(int r
) {
333 dout(10) << "r=" << r
<< dendl
;
335 std::unique_lock locker
{m_lock
};
337 ceph_assert(m_local_journal_replay
== nullptr);
338 derr
<< "error starting external replay on local image "
339 << m_state_builder
->local_image_ctx
->id
<< ": "
340 << cpp_strerror(r
) << dendl
;
342 handle_replay_complete(locker
, r
, "error starting replay on local image");
347 if (!notify_init_complete(locker
)) {
351 m_state
= STATE_REPLAYING
;
353 // listen for promotion and resync requests against local journal
354 m_local_journal_listener
= new LocalJournalListener(this);
355 m_local_journal
->add_listener(m_local_journal_listener
);
357 // verify that the local image wasn't force-promoted and that a resync hasn't
358 // been requested now that we are listening for events
359 if (m_local_journal
->is_tag_owner()) {
360 dout(10) << "local image force-promoted" << dendl
;
361 handle_replay_complete(locker
, 0, "force promoted");
365 bool resync_requested
= false;
366 r
= m_local_journal
->is_resync_requested(&resync_requested
);
368 dout(10) << "failed to determine resync state: " << cpp_strerror(r
)
370 handle_replay_complete(locker
, r
, "error parsing resync state");
372 } else if (resync_requested
) {
373 dout(10) << "local image resync requested" << dendl
;
374 handle_replay_complete(locker
, 0, "resync requested");
378 // start remote journal replay
379 m_event_preprocessor
= EventPreprocessor
<I
>::create(
380 *m_state_builder
->local_image_ctx
, *m_state_builder
->remote_journaler
,
381 m_local_mirror_uuid
, &m_state_builder
->remote_client_meta
,
382 m_threads
->work_queue
);
383 m_replay_status_formatter
= ReplayStatusFormatter
<I
>::create(
384 m_state_builder
->remote_journaler
, m_local_mirror_uuid
);
386 auto cct
= static_cast<CephContext
*>(m_state_builder
->local_image_ctx
->cct
);
387 double poll_seconds
= cct
->_conf
.get_val
<double>(
388 "rbd_mirror_journal_poll_age");
389 m_remote_replay_handler
= new RemoteReplayHandler(this);
390 m_state_builder
->remote_journaler
->start_live_replay(m_remote_replay_handler
,
393 notify_status_updated();
396 template <typename I
>
397 bool Replayer
<I
>::notify_init_complete(std::unique_lock
<ceph::mutex
>& locker
) {
400 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
401 ceph_assert(m_state
== STATE_INIT
);
403 // notify that init has completed
404 Context
*on_finish
= nullptr;
405 std::swap(m_on_init_shutdown
, on_finish
);
408 on_finish
->complete(0);
411 if (m_on_init_shutdown
!= nullptr) {
412 // shut down requested after we notified init complete but before we
421 template <typename I
>
422 void Replayer
<I
>::shut_down_local_journal_replay() {
423 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
425 if (m_local_journal_replay
== nullptr) {
426 wait_for_event_replay();
431 auto ctx
= create_context_callback
<
432 Replayer
<I
>, &Replayer
<I
>::handle_shut_down_local_journal_replay
>(this);
433 m_local_journal_replay
->shut_down(true, ctx
);
436 template <typename I
>
437 void Replayer
<I
>::handle_shut_down_local_journal_replay(int r
) {
438 dout(10) << "r=" << r
<< dendl
;
440 std::unique_lock locker
{m_lock
};
442 derr
<< "error shutting down journal replay: " << cpp_strerror(r
) << dendl
;
443 handle_replay_error(r
, "failed to shut down local journal replay");
446 wait_for_event_replay();
449 template <typename I
>
450 void Replayer
<I
>::wait_for_event_replay() {
451 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
454 auto ctx
= create_async_context_callback(
455 m_threads
->work_queue
, create_context_callback
<
456 Replayer
<I
>, &Replayer
<I
>::handle_wait_for_event_replay
>(this));
457 m_event_replay_tracker
.wait_for_ops(ctx
);
460 template <typename I
>
461 void Replayer
<I
>::handle_wait_for_event_replay(int r
) {
462 dout(10) << "r=" << r
<< dendl
;
464 std::unique_lock locker
{m_lock
};
468 template <typename I
>
469 void Replayer
<I
>::close_local_image() {
470 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
471 if (m_state_builder
->local_image_ctx
== nullptr) {
472 stop_remote_journaler_replay();
477 if (m_local_journal_listener
!= nullptr) {
478 // blocks if listener notification is in-progress
479 m_local_journal
->remove_listener(m_local_journal_listener
);
480 delete m_local_journal_listener
;
481 m_local_journal_listener
= nullptr;
484 if (m_local_journal_replay
!= nullptr) {
485 m_local_journal
->stop_external_replay();
486 m_local_journal_replay
= nullptr;
489 if (m_event_preprocessor
!= nullptr) {
490 image_replayer::journal::EventPreprocessor
<I
>::destroy(
491 m_event_preprocessor
);
492 m_event_preprocessor
= nullptr;
495 m_local_journal
.reset();
497 // NOTE: it's important to ensure that the local image is fully
498 // closed before attempting to close the remote journal in
499 // case the remote cluster is unreachable
500 ceph_assert(m_state_builder
->local_image_ctx
!= nullptr);
501 auto ctx
= create_context_callback
<
502 Replayer
<I
>, &Replayer
<I
>::handle_close_local_image
>(this);
503 auto request
= image_replayer::CloseImageRequest
<I
>::create(
504 &m_state_builder
->local_image_ctx
, ctx
);
509 template <typename I
>
510 void Replayer
<I
>::handle_close_local_image(int r
) {
511 dout(10) << "r=" << r
<< dendl
;
513 std::unique_lock locker
{m_lock
};
515 derr
<< "error closing local iamge: " << cpp_strerror(r
) << dendl
;
516 handle_replay_error(r
, "failed to close local image");
519 ceph_assert(m_state_builder
->local_image_ctx
== nullptr);
520 stop_remote_journaler_replay();
523 template <typename I
>
524 void Replayer
<I
>::stop_remote_journaler_replay() {
525 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
527 if (m_state_builder
->remote_journaler
== nullptr) {
528 wait_for_in_flight_ops();
530 } else if (m_remote_replay_handler
== nullptr) {
531 wait_for_in_flight_ops();
536 auto ctx
= create_async_context_callback(
537 m_threads
->work_queue
, create_context_callback
<
538 Replayer
<I
>, &Replayer
<I
>::handle_stop_remote_journaler_replay
>(this));
539 m_state_builder
->remote_journaler
->stop_replay(ctx
);
542 template <typename I
>
543 void Replayer
<I
>::handle_stop_remote_journaler_replay(int r
) {
544 dout(10) << "r=" << r
<< dendl
;
546 std::unique_lock locker
{m_lock
};
548 derr
<< "failed to stop remote journaler replay : " << cpp_strerror(r
)
550 handle_replay_error(r
, "failed to stop remote journaler replay");
553 delete m_remote_replay_handler
;
554 m_remote_replay_handler
= nullptr;
556 wait_for_in_flight_ops();
559 template <typename I
>
560 void Replayer
<I
>::wait_for_in_flight_ops() {
562 if (m_remote_listener
!= nullptr) {
563 m_state_builder
->remote_journaler
->remove_listener(m_remote_listener
);
564 delete m_remote_listener
;
565 m_remote_listener
= nullptr;
568 auto ctx
= create_async_context_callback(
569 m_threads
->work_queue
, create_context_callback
<
570 Replayer
<I
>, &Replayer
<I
>::handle_wait_for_in_flight_ops
>(this));
571 m_in_flight_op_tracker
.wait_for_ops(ctx
);
574 template <typename I
>
575 void Replayer
<I
>::handle_wait_for_in_flight_ops(int r
) {
576 dout(10) << "r=" << r
<< dendl
;
578 ReplayStatusFormatter
<I
>::destroy(m_replay_status_formatter
);
579 m_replay_status_formatter
= nullptr;
581 Context
* on_init_shutdown
= nullptr;
583 std::unique_lock locker
{m_lock
};
584 ceph_assert(m_on_init_shutdown
!= nullptr);
585 std::swap(m_on_init_shutdown
, on_init_shutdown
);
586 m_state
= STATE_COMPLETE
;
588 on_init_shutdown
->complete(m_error_code
);
591 template <typename I
>
592 void Replayer
<I
>::handle_remote_journal_metadata_updated() {
595 std::unique_lock locker
{m_lock
};
596 if (m_state
!= STATE_REPLAYING
) {
600 cls::journal::Client remote_client
;
601 int r
= m_state_builder
->remote_journaler
->get_cached_client(
602 m_local_mirror_uuid
, &remote_client
);
604 derr
<< "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
608 librbd::journal::MirrorPeerClientMeta remote_client_meta
;
610 r
= validate_remote_client_state(remote_client
, &remote_client_meta
,
611 &m_resync_requested
, &error
);
613 dout(0) << "client flagged disconnected, stopping image replay" << dendl
;
614 handle_replay_complete(locker
, r
, error
);
618 template <typename I
>
619 void Replayer
<I
>::schedule_flush_local_replay_task() {
620 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
622 std::unique_lock timer_locker
{m_threads
->timer_lock
};
623 if (m_state
!= STATE_REPLAYING
|| m_flush_local_replay_task
!= nullptr) {
628 m_flush_local_replay_task
= create_async_context_callback(
629 m_threads
->work_queue
, create_context_callback
<
630 Replayer
<I
>, &Replayer
<I
>::handle_flush_local_replay_task
>(this));
631 m_threads
->timer
->add_event_after(30, m_flush_local_replay_task
);
634 template <typename I
>
635 void Replayer
<I
>::cancel_flush_local_replay_task() {
636 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
638 std::unique_lock timer_locker
{m_threads
->timer_lock
};
639 if (m_flush_local_replay_task
!= nullptr) {
641 m_threads
->timer
->cancel_event(m_flush_local_replay_task
);
642 m_flush_local_replay_task
= nullptr;
646 template <typename I
>
647 void Replayer
<I
>::handle_flush_local_replay_task(int) {
650 m_in_flight_op_tracker
.start_op();
651 auto on_finish
= new LambdaContext([this](int) {
652 std::unique_lock locker
{m_lock
};
655 std::unique_lock timer_locker
{m_threads
->timer_lock
};
656 m_flush_local_replay_task
= nullptr;
659 notify_status_updated();
660 m_in_flight_op_tracker
.finish_op();
662 flush_local_replay(on_finish
);
665 template <typename I
>
666 void Replayer
<I
>::flush_local_replay(Context
* on_flush
) {
667 std::unique_lock locker
{m_lock
};
668 if (m_state
!= STATE_REPLAYING
) {
670 on_flush
->complete(0);
672 } else if (m_local_journal_replay
== nullptr) {
673 // raced w/ a tag creation stop/start, which implies that
674 // the replay is flushed
676 flush_commit_position(on_flush
);
681 auto ctx
= new LambdaContext(
682 [this, on_flush
](int r
) {
683 handle_flush_local_replay(on_flush
, r
);
685 m_local_journal_replay
->flush(ctx
);
688 template <typename I
>
689 void Replayer
<I
>::handle_flush_local_replay(Context
* on_flush
, int r
) {
690 dout(15) << "r=" << r
<< dendl
;
692 derr
<< "error flushing local replay: " << cpp_strerror(r
) << dendl
;
693 on_flush
->complete(r
);
697 flush_commit_position(on_flush
);
700 template <typename I
>
701 void Replayer
<I
>::flush_commit_position(Context
* on_flush
) {
702 std::unique_lock locker
{m_lock
};
703 if (m_state
!= STATE_REPLAYING
) {
705 on_flush
->complete(0);
710 auto ctx
= new LambdaContext(
711 [this, on_flush
](int r
) {
712 handle_flush_commit_position(on_flush
, r
);
714 m_state_builder
->remote_journaler
->flush_commit_position(ctx
);
717 template <typename I
>
718 void Replayer
<I
>::handle_flush_commit_position(Context
* on_flush
, int r
) {
719 dout(15) << "r=" << r
<< dendl
;
721 derr
<< "error flushing remote journal commit position: "
722 << cpp_strerror(r
) << dendl
;
725 on_flush
->complete(r
);
728 template <typename I
>
729 void Replayer
<I
>::handle_replay_error(int r
, const std::string
&error
) {
730 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
732 if (m_error_code
== 0) {
734 m_error_description
= error
;
738 template <typename I
>
739 bool Replayer
<I
>::is_replay_complete() const {
740 std::unique_lock locker
{m_lock
};
741 return is_replay_complete(locker
);
744 template <typename I
>
745 bool Replayer
<I
>::is_replay_complete(
746 const std::unique_lock
<ceph::mutex
>&) const {
747 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
748 return (m_state
== STATE_COMPLETE
);
751 template <typename I
>
752 void Replayer
<I
>::handle_replay_complete(int r
, const std::string
&error
) {
753 std::unique_lock locker
{m_lock
};
754 handle_replay_complete(locker
, r
, error
);
757 template <typename I
>
758 void Replayer
<I
>::handle_replay_complete(
759 const std::unique_lock
<ceph::mutex
>&, int r
, const std::string
&error
) {
760 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
762 dout(10) << "r=" << r
<< ", error=" << error
<< dendl
;
764 derr
<< "replay encountered an error: " << cpp_strerror(r
) << dendl
;
765 handle_replay_error(r
, error
);
768 if (m_state
!= STATE_REPLAYING
) {
772 m_state
= STATE_COMPLETE
;
773 notify_status_updated();
776 template <typename I
>
777 void Replayer
<I
>::handle_replay_ready() {
778 std::unique_lock locker
{m_lock
};
779 handle_replay_ready(locker
);
782 template <typename I
>
783 void Replayer
<I
>::handle_replay_ready(
784 std::unique_lock
<ceph::mutex
>& locker
) {
785 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
788 if (is_replay_complete(locker
)) {
792 if (!m_state_builder
->remote_journaler
->try_pop_front(&m_replay_entry
,
793 &m_replay_tag_tid
)) {
794 dout(20) << "no entries ready for replay" << dendl
;
798 // can safely drop lock once the entry is tracked
799 m_event_replay_tracker
.start_op();
802 dout(20) << "entry tid=" << m_replay_entry
.get_commit_tid()
803 << "tag_tid=" << m_replay_tag_tid
<< dendl
;
804 if (!m_replay_tag_valid
|| m_replay_tag
.tid
!= m_replay_tag_tid
) {
805 // must allocate a new local journal tag prior to processing
813 template <typename I
>
814 void Replayer
<I
>::replay_flush() {
817 // shut down the replay to flush all IO and ops and create a new
818 // replayer to handle the new tag epoch
819 auto ctx
= create_context_callback
<
820 Replayer
<I
>, &Replayer
<I
>::handle_replay_flush_shut_down
>(this);
821 ceph_assert(m_local_journal_replay
!= nullptr);
822 m_local_journal_replay
->shut_down(false, ctx
);
825 template <typename I
>
826 void Replayer
<I
>::handle_replay_flush_shut_down(int r
) {
828 std::unique_lock locker
{m_lock
};
829 ceph_assert(m_local_journal
!= nullptr);
830 m_local_journal
->stop_external_replay();
831 m_local_journal_replay
= nullptr;
834 dout(10) << "r=" << r
<< dendl
;
836 handle_replay_flush(r
);
840 auto ctx
= create_context_callback
<
841 Replayer
<I
>, &Replayer
<I
>::handle_replay_flush
>(this);
842 m_local_journal
->start_external_replay(&m_local_journal_replay
, ctx
);
845 template <typename I
>
846 void Replayer
<I
>::handle_replay_flush(int r
) {
847 dout(10) << "r=" << r
<< dendl
;
849 derr
<< "replay flush encountered an error: " << cpp_strerror(r
) << dendl
;
850 handle_replay_complete(r
, "replay flush encountered an error");
851 m_event_replay_tracker
.finish_op();
853 } else if (is_replay_complete()) {
854 m_event_replay_tracker
.finish_op();
861 template <typename I
>
862 void Replayer
<I
>::get_remote_tag() {
863 dout(15) << "tag_tid: " << m_replay_tag_tid
<< dendl
;
865 Context
*ctx
= create_context_callback
<
866 Replayer
, &Replayer
<I
>::handle_get_remote_tag
>(this);
867 m_state_builder
->remote_journaler
->get_tag(m_replay_tag_tid
, &m_replay_tag
,
871 template <typename I
>
872 void Replayer
<I
>::handle_get_remote_tag(int r
) {
873 dout(15) << "r=" << r
<< dendl
;
877 auto it
= m_replay_tag
.data
.cbegin();
878 decode(m_replay_tag_data
, it
);
879 } catch (const buffer::error
&err
) {
885 derr
<< "failed to retrieve remote tag " << m_replay_tag_tid
<< ": "
886 << cpp_strerror(r
) << dendl
;
887 handle_replay_complete(r
, "failed to retrieve remote tag");
888 m_event_replay_tracker
.finish_op();
892 m_replay_tag_valid
= true;
893 dout(15) << "decoded remote tag " << m_replay_tag_tid
<< ": "
894 << m_replay_tag_data
<< dendl
;
896 allocate_local_tag();
899 template <typename I
>
900 void Replayer
<I
>::allocate_local_tag() {
903 std::string mirror_uuid
= m_replay_tag_data
.mirror_uuid
;
904 if (mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
905 mirror_uuid
= m_state_builder
->remote_mirror_uuid
;
906 } else if (mirror_uuid
== m_local_mirror_uuid
) {
907 mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
908 } else if (mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
) {
909 // handle possible edge condition where daemon can failover and
910 // the local image has already been promoted/demoted
911 auto local_tag_data
= m_local_journal
->get_tag_data();
912 if (local_tag_data
.mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
&&
913 (local_tag_data
.predecessor
.commit_valid
&&
914 local_tag_data
.predecessor
.mirror_uuid
==
915 librbd::Journal
<>::LOCAL_MIRROR_UUID
)) {
916 dout(15) << "skipping stale demotion event" << dendl
;
917 handle_process_entry_safe(m_replay_entry
, m_replay_bytes
,
918 m_replay_start_time
, 0);
919 handle_replay_ready();
922 dout(5) << "encountered image demotion: stopping" << dendl
;
923 handle_replay_complete(0, "");
927 librbd::journal::TagPredecessor
predecessor(m_replay_tag_data
.predecessor
);
928 if (predecessor
.mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
929 predecessor
.mirror_uuid
= m_state_builder
->remote_mirror_uuid
;
930 } else if (predecessor
.mirror_uuid
== m_local_mirror_uuid
) {
931 predecessor
.mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
934 dout(15) << "mirror_uuid=" << mirror_uuid
<< ", "
935 << "predecessor=" << predecessor
<< ", "
936 << "replay_tag_tid=" << m_replay_tag_tid
<< dendl
;
937 Context
*ctx
= create_context_callback
<
938 Replayer
, &Replayer
<I
>::handle_allocate_local_tag
>(this);
939 m_local_journal
->allocate_tag(mirror_uuid
, predecessor
, ctx
);
942 template <typename I
>
943 void Replayer
<I
>::handle_allocate_local_tag(int r
) {
944 dout(15) << "r=" << r
<< ", "
945 << "tag_tid=" << m_local_journal
->get_tag_tid() << dendl
;
947 derr
<< "failed to allocate journal tag: " << cpp_strerror(r
) << dendl
;
948 handle_replay_complete(r
, "failed to allocate journal tag");
949 m_event_replay_tracker
.finish_op();
956 template <typename I
>
957 void Replayer
<I
>::preprocess_entry() {
958 dout(20) << "preprocessing entry tid=" << m_replay_entry
.get_commit_tid()
961 bufferlist data
= m_replay_entry
.get_data();
962 auto it
= data
.cbegin();
963 int r
= m_local_journal_replay
->decode(&it
, &m_event_entry
);
965 derr
<< "failed to decode journal event" << dendl
;
966 handle_replay_complete(r
, "failed to decode journal event");
967 m_event_replay_tracker
.finish_op();
971 m_replay_bytes
= data
.length();
972 uint32_t delay
= calculate_replay_delay(
973 m_event_entry
.timestamp
,
974 m_state_builder
->local_image_ctx
->mirroring_replay_delay
);
976 handle_preprocess_entry_ready(0);
980 std::unique_lock locker
{m_lock
};
981 if (is_replay_complete(locker
)) {
982 // don't schedule a delayed replay task if a shut-down is in-progress
983 m_event_replay_tracker
.finish_op();
987 dout(20) << "delaying replay by " << delay
<< " sec" << dendl
;
988 std::unique_lock timer_locker
{m_threads
->timer_lock
};
989 ceph_assert(m_delayed_preprocess_task
== nullptr);
990 m_delayed_preprocess_task
= create_context_callback
<
991 Replayer
<I
>, &Replayer
<I
>::handle_delayed_preprocess_task
>(this);
992 m_threads
->timer
->add_event_after(delay
, m_delayed_preprocess_task
);
995 template <typename I
>
996 void Replayer
<I
>::handle_delayed_preprocess_task(int r
) {
997 dout(20) << "r=" << r
<< dendl
;
999 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
1000 m_delayed_preprocess_task
= nullptr;
1002 m_threads
->work_queue
->queue(create_context_callback
<
1003 Replayer
, &Replayer
<I
>::handle_preprocess_entry_ready
>(this), 0);
1006 template <typename I
>
1007 void Replayer
<I
>::handle_preprocess_entry_ready(int r
) {
1008 dout(20) << "r=" << r
<< dendl
;
1009 ceph_assert(r
== 0);
1011 m_replay_start_time
= ceph_clock_now();
1012 if (!m_event_preprocessor
->is_required(m_event_entry
)) {
1017 Context
*ctx
= create_context_callback
<
1018 Replayer
, &Replayer
<I
>::handle_preprocess_entry_safe
>(this);
1019 m_event_preprocessor
->preprocess(&m_event_entry
, ctx
);
1022 template <typename I
>
1023 void Replayer
<I
>::handle_preprocess_entry_safe(int r
) {
1024 dout(20) << "r=" << r
<< dendl
;
1027 if (r
== -ECANCELED
) {
1028 handle_replay_complete(0, "lost exclusive lock");
1030 derr
<< "failed to preprocess journal event" << dendl
;
1031 handle_replay_complete(r
, "failed to preprocess journal event");
1034 m_event_replay_tracker
.finish_op();
1041 template <typename I
>
1042 void Replayer
<I
>::process_entry() {
1043 dout(20) << "processing entry tid=" << m_replay_entry
.get_commit_tid()
1046 Context
*on_ready
= create_context_callback
<
1047 Replayer
, &Replayer
<I
>::handle_process_entry_ready
>(this);
1048 Context
*on_commit
= new C_ReplayCommitted(this, std::move(m_replay_entry
),
1050 m_replay_start_time
);
1052 m_local_journal_replay
->process(m_event_entry
, on_ready
, on_commit
);
1055 template <typename I
>
1056 void Replayer
<I
>::handle_process_entry_ready(int r
) {
1057 std::unique_lock locker
{m_lock
};
1060 ceph_assert(r
== 0);
1062 bool update_status
= false;
1064 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
1065 std::shared_lock image_locker
{local_image_ctx
->image_lock
};
1066 auto image_spec
= util::compute_image_spec(local_image_ctx
->md_ctx
,
1067 local_image_ctx
->name
);
1068 if (m_image_spec
!= image_spec
) {
1069 m_image_spec
= image_spec
;
1070 update_status
= true;
1074 if (update_status
) {
1075 unregister_perf_counters();
1076 register_perf_counters();
1077 notify_status_updated();
1080 // attempt to process the next event
1081 handle_replay_ready(locker
);
1084 template <typename I
>
1085 void Replayer
<I
>::handle_process_entry_safe(
1086 const ReplayEntry
&replay_entry
, uint64_t replay_bytes
,
1087 const utime_t
&replay_start_time
, int r
) {
1088 dout(20) << "commit_tid=" << replay_entry
.get_commit_tid() << ", r=" << r
1092 derr
<< "failed to commit journal event: " << cpp_strerror(r
) << dendl
;
1093 handle_replay_complete(r
, "failed to commit journal event");
1095 ceph_assert(m_state_builder
->remote_journaler
!= nullptr);
1096 m_state_builder
->remote_journaler
->committed(replay_entry
);
1099 auto latency
= ceph_clock_now() - replay_start_time
;
1100 if (g_perf_counters
) {
1101 g_perf_counters
->inc(l_rbd_mirror_replay
);
1102 g_perf_counters
->inc(l_rbd_mirror_replay_bytes
, replay_bytes
);
1103 g_perf_counters
->tinc(l_rbd_mirror_replay_latency
, latency
);
1106 auto ctx
= new LambdaContext(
1107 [this, replay_bytes
, latency
](int r
) {
1108 std::unique_lock locker
{m_lock
};
1109 schedule_flush_local_replay_task();
1111 if (m_perf_counters
) {
1112 m_perf_counters
->inc(l_rbd_mirror_replay
);
1113 m_perf_counters
->inc(l_rbd_mirror_replay_bytes
, replay_bytes
);
1114 m_perf_counters
->tinc(l_rbd_mirror_replay_latency
, latency
);
1117 m_event_replay_tracker
.finish_op();
1119 m_threads
->work_queue
->queue(ctx
, 0);
1122 template <typename I
>
1123 void Replayer
<I
>::handle_resync_image() {
1126 std::unique_lock locker
{m_lock
};
1127 m_resync_requested
= true;
1128 handle_replay_complete(locker
, 0, "resync requested");
1131 template <typename I
>
1132 void Replayer
<I
>::notify_status_updated() {
1133 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1137 auto ctx
= new C_TrackedOp(this, new LambdaContext(
1139 m_replayer_listener
->handle_notification();
1141 m_threads
->work_queue
->queue(ctx
, 0);
1144 template <typename I
>
1145 void Replayer
<I
>::cancel_delayed_preprocess_task() {
1146 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1148 bool canceled_delayed_preprocess_task
= false;
1150 std::unique_lock timer_locker
{m_threads
->timer_lock
};
1151 if (m_delayed_preprocess_task
!= nullptr) {
1153 canceled_delayed_preprocess_task
= m_threads
->timer
->cancel_event(
1154 m_delayed_preprocess_task
);
1155 ceph_assert(canceled_delayed_preprocess_task
);
1156 m_delayed_preprocess_task
= nullptr;
1160 if (canceled_delayed_preprocess_task
) {
1161 // wake up sleeping replay
1162 m_event_replay_tracker
.finish_op();
1166 template <typename I
>
1167 int Replayer
<I
>::validate_remote_client_state(
1168 const cls::journal::Client
& remote_client
,
1169 librbd::journal::MirrorPeerClientMeta
* remote_client_meta
,
1170 bool* resync_requested
, std::string
* error
) {
1171 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1173 if (!util::decode_client_meta(remote_client
, remote_client_meta
)) {
1174 // require operator intervention since the data is corrupt
1175 *error
= "error retrieving remote journal client";
1179 auto local_image_ctx
= m_state_builder
->local_image_ctx
;
1180 dout(5) << "image_id=" << local_image_ctx
->id
<< ", "
1181 << "remote_client_meta.image_id="
1182 << remote_client_meta
->image_id
<< ", "
1183 << "remote_client.state=" << remote_client
.state
<< dendl
;
1184 if (remote_client_meta
->image_id
== local_image_ctx
->id
&&
1185 remote_client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
1186 dout(5) << "client flagged disconnected, stopping image replay" << dendl
;
1187 if (local_image_ctx
->config
.template get_val
<bool>(
1188 "rbd_mirroring_resync_after_disconnect")) {
1189 dout(10) << "disconnected: automatic resync" << dendl
;
1190 *resync_requested
= true;
1191 *error
= "disconnected: automatic resync";
1194 dout(10) << "disconnected" << dendl
;
1195 *error
= "disconnected";
1203 template <typename I
>
1204 void Replayer
<I
>::register_perf_counters() {
1207 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1208 ceph_assert(m_perf_counters
== nullptr);
1210 auto cct
= static_cast<CephContext
*>(m_state_builder
->local_image_ctx
->cct
);
1211 auto prio
= cct
->_conf
.get_val
<int64_t>("rbd_mirror_image_perf_stats_prio");
1212 PerfCountersBuilder
plb(g_ceph_context
, "rbd_mirror_image_" + m_image_spec
,
1213 l_rbd_mirror_first
, l_rbd_mirror_last
);
1214 plb
.add_u64_counter(l_rbd_mirror_replay
, "replay", "Replays", "r", prio
);
1215 plb
.add_u64_counter(l_rbd_mirror_replay_bytes
, "replay_bytes",
1216 "Replayed data", "rb", prio
, unit_t(UNIT_BYTES
));
1217 plb
.add_time_avg(l_rbd_mirror_replay_latency
, "replay_latency",
1218 "Replay latency", "rl", prio
);
1219 m_perf_counters
= plb
.create_perf_counters();
1220 g_ceph_context
->get_perfcounters_collection()->add(m_perf_counters
);
1223 template <typename I
>
1224 void Replayer
<I
>::unregister_perf_counters() {
1226 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1228 PerfCounters
*perf_counters
= nullptr;
1229 std::swap(perf_counters
, m_perf_counters
);
1231 if (perf_counters
!= nullptr) {
1232 g_ceph_context
->get_perfcounters_collection()->remove(perf_counters
);
1233 delete perf_counters
;
1237 } // namespace journal
1238 } // namespace image_replayer
1239 } // namespace mirror
1242 template class rbd::mirror::image_replayer::journal::Replayer
<librbd::ImageCtx
>;