1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageDeleter.h"
24 #include "ImageReplayer.h"
26 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
41 using std::unique_ptr
;
42 using std::shared_ptr
;
48 using librbd::util::create_context_callback
;
49 using librbd::util::create_rados_callback
;
50 using namespace rbd::mirror::image_replayer
;
53 std::ostream
&operator<<(std::ostream
&os
,
54 const typename ImageReplayer
<I
>::State
&state
);
59 struct ReplayHandler
: public ::journal::ReplayHandler
{
60 ImageReplayer
<I
> *replayer
;
61 ReplayHandler(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
62 void get() override
{}
63 void put() override
{}
65 void handle_entries_available() override
{
66 replayer
->handle_replay_ready();
68 void handle_complete(int r
) override
{
71 ss
<< "replay completed with error: " << cpp_strerror(r
);
73 replayer
->handle_replay_complete(r
, ss
.str());
77 class ImageReplayerAdminSocketCommand
{
79 virtual ~ImageReplayerAdminSocketCommand() {}
80 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
84 class StatusCommand
: public ImageReplayerAdminSocketCommand
{
86 explicit StatusCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
88 bool call(Formatter
*f
, stringstream
*ss
) override
{
89 replayer
->print_status(f
, ss
);
94 ImageReplayer
<I
> *replayer
;
98 class StartCommand
: public ImageReplayerAdminSocketCommand
{
100 explicit StartCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
102 bool call(Formatter
*f
, stringstream
*ss
) override
{
103 replayer
->start(nullptr, true);
108 ImageReplayer
<I
> *replayer
;
111 template <typename I
>
112 class StopCommand
: public ImageReplayerAdminSocketCommand
{
114 explicit StopCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
116 bool call(Formatter
*f
, stringstream
*ss
) override
{
117 replayer
->stop(nullptr, true);
122 ImageReplayer
<I
> *replayer
;
125 template <typename I
>
126 class RestartCommand
: public ImageReplayerAdminSocketCommand
{
128 explicit RestartCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
130 bool call(Formatter
*f
, stringstream
*ss
) override
{
136 ImageReplayer
<I
> *replayer
;
139 template <typename I
>
140 class FlushCommand
: public ImageReplayerAdminSocketCommand
{
142 explicit FlushCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
144 bool call(Formatter
*f
, stringstream
*ss
) override
{
146 replayer
->flush(&cond
);
149 *ss
<< "flush: " << cpp_strerror(r
);
156 ImageReplayer
<I
> *replayer
;
159 template <typename I
>
160 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
162 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
163 ImageReplayer
<I
> *replayer
)
164 : admin_socket(cct
->get_admin_socket()), name(name
), replayer(replayer
),
165 lock("ImageReplayerAdminSocketHook::lock " +
166 replayer
->get_global_image_id()) {
169 int register_commands() {
173 command
= "rbd mirror status " + name
;
174 r
= admin_socket
->register_command(command
, command
, this,
175 "get status for rbd mirror " + name
);
179 commands
[command
] = new StatusCommand
<I
>(replayer
);
181 command
= "rbd mirror start " + name
;
182 r
= admin_socket
->register_command(command
, command
, this,
183 "start rbd mirror " + name
);
187 commands
[command
] = new StartCommand
<I
>(replayer
);
189 command
= "rbd mirror stop " + name
;
190 r
= admin_socket
->register_command(command
, command
, this,
191 "stop rbd mirror " + name
);
195 commands
[command
] = new StopCommand
<I
>(replayer
);
197 command
= "rbd mirror restart " + name
;
198 r
= admin_socket
->register_command(command
, command
, this,
199 "restart rbd mirror " + name
);
203 commands
[command
] = new RestartCommand
<I
>(replayer
);
205 command
= "rbd mirror flush " + name
;
206 r
= admin_socket
->register_command(command
, command
, this,
207 "flush rbd mirror " + name
);
211 commands
[command
] = new FlushCommand
<I
>(replayer
);
216 ~ImageReplayerAdminSocketHook() override
{
217 Mutex::Locker
locker(lock
);
218 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
220 (void)admin_socket
->unregister_command(i
->first
);
226 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
227 bufferlist
& out
) override
{
228 Mutex::Locker
locker(lock
);
229 Commands::const_iterator i
= commands
.find(command
);
230 assert(i
!= commands
.end());
231 Formatter
*f
= Formatter::create(format
);
233 bool r
= i
->second
->call(f
, &ss
);
240 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
*> Commands
;
242 AdminSocket
*admin_socket
;
244 ImageReplayer
<I
> *replayer
;
249 uint32_t calculate_replay_delay(const utime_t
&event_time
,
250 int mirroring_replay_delay
) {
251 if (mirroring_replay_delay
<= 0) {
255 utime_t now
= ceph_clock_now();
256 if (event_time
+ mirroring_replay_delay
<= now
) {
260 // ensure it is rounded up when converting to integer
261 return (event_time
+ mirroring_replay_delay
- now
) + 1;
264 } // anonymous namespace
266 template <typename I
>
267 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
268 const std::string
&description
, bool flush
)
270 const std::string desc
= "bootstrapping, " + description
;
271 replayer
->set_state_description(0, desc
);
273 replayer
->update_mirror_image_status(false, boost::none
);
277 template <typename I
>
278 void ImageReplayer
<I
>::RemoteJournalerListener::handle_update(
279 ::journal::JournalMetadata
*) {
280 FunctionContext
*ctx
= new FunctionContext([this](int r
) {
281 replayer
->handle_remote_journal_metadata_updated();
283 replayer
->m_threads
->work_queue
->queue(ctx
, 0);
286 template <typename I
>
287 ImageReplayer
<I
>::ImageReplayer(Threads
<I
> *threads
,
288 ImageDeleter
<I
>* image_deleter
,
289 InstanceWatcher
<I
> *instance_watcher
,
291 const std::string
&local_mirror_uuid
,
292 int64_t local_pool_id
,
293 const std::string
&global_image_id
) :
295 m_image_deleter(image_deleter
),
296 m_instance_watcher(instance_watcher
),
298 m_local_mirror_uuid(local_mirror_uuid
),
299 m_local_pool_id(local_pool_id
),
300 m_global_image_id(global_image_id
),
301 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id
) + " " +
303 m_progress_cxt(this),
304 m_journal_listener(new JournalListener(this)),
305 m_remote_listener(this)
307 // Register asok commands using a temporary "remote_pool_name/global_image_id"
308 // name. When the image name becomes known on start the asok commands will be
309 // re-registered using "remote_pool_name/remote_image_name" name.
311 std::string pool_name
;
312 int r
= m_local
->pool_reverse_lookup(m_local_pool_id
, &pool_name
);
314 derr
<< "error resolving local pool " << m_local_pool_id
315 << ": " << cpp_strerror(r
) << dendl
;
316 pool_name
= stringify(m_local_pool_id
);
319 m_name
= pool_name
+ "/" + m_global_image_id
;
320 register_admin_socket_hook();
323 template <typename I
>
324 ImageReplayer
<I
>::~ImageReplayer()
326 unregister_admin_socket_hook();
327 assert(m_event_preprocessor
== nullptr);
328 assert(m_replay_status_formatter
== nullptr);
329 assert(m_local_image_ctx
== nullptr);
330 assert(m_local_replay
== nullptr);
331 assert(m_remote_journaler
== nullptr);
332 assert(m_replay_handler
== nullptr);
333 assert(m_on_start_finish
== nullptr);
334 assert(m_on_stop_finish
== nullptr);
335 assert(m_bootstrap_request
== nullptr);
336 assert(m_in_flight_status_updates
== 0);
338 delete m_journal_listener
;
341 template <typename I
>
342 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
343 Mutex::Locker
locker(m_lock
);
345 if (!m_mirror_image_status_state
) {
346 return image_replayer::HEALTH_STATE_OK
;
347 } else if (*m_mirror_image_status_state
==
348 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
349 *m_mirror_image_status_state
==
350 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
351 return image_replayer::HEALTH_STATE_WARNING
;
353 return image_replayer::HEALTH_STATE_ERROR
;
356 template <typename I
>
357 void ImageReplayer
<I
>::add_peer(const std::string
&peer_uuid
,
358 librados::IoCtx
&io_ctx
) {
359 Mutex::Locker
locker(m_lock
);
360 auto it
= m_peers
.find({peer_uuid
});
361 if (it
== m_peers
.end()) {
362 m_peers
.insert({peer_uuid
, io_ctx
});
366 template <typename I
>
367 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
368 dout(20) << r
<< " " << desc
<< dendl
;
370 Mutex::Locker
l(m_lock
);
375 template <typename I
>
376 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
)
378 dout(20) << "on_finish=" << on_finish
<< dendl
;
382 Mutex::Locker
locker(m_lock
);
383 if (!is_stopped_()) {
384 derr
<< "already running" << dendl
;
386 } else if (m_manual_stop
&& !manual
) {
387 dout(5) << "stopped manually, ignoring start without manual flag"
391 m_state
= STATE_STARTING
;
393 m_state_desc
.clear();
394 m_manual_stop
= false;
395 m_delete_requested
= false;
397 if (on_finish
!= nullptr) {
398 assert(m_on_start_finish
== nullptr);
399 m_on_start_finish
= on_finish
;
401 assert(m_on_stop_finish
== nullptr);
407 on_finish
->complete(r
);
412 r
= m_local
->ioctx_create2(m_local_pool_id
, m_local_ioctx
);
414 derr
<< "error opening ioctx for local pool " << m_local_pool_id
415 << ": " << cpp_strerror(r
) << dendl
;
416 on_start_fail(r
, "error opening local pool");
423 template <typename I
>
424 void ImageReplayer
<I
>::wait_for_deletion() {
427 Context
*ctx
= create_context_callback
<
428 ImageReplayer
, &ImageReplayer
<I
>::handle_wait_for_deletion
>(this);
429 m_image_deleter
->wait_for_scheduled_deletion(
430 m_local_pool_id
, m_global_image_id
, ctx
, false);
433 template <typename I
>
434 void ImageReplayer
<I
>::handle_wait_for_deletion(int r
) {
435 dout(20) << "r=" << r
<< dendl
;
437 if (r
== -ECANCELED
) {
438 on_start_fail(0, "");
441 on_start_fail(r
, "error waiting for image deletion");
445 prepare_local_image();
448 template <typename I
>
449 void ImageReplayer
<I
>::prepare_local_image() {
452 m_local_image_id
= "";
453 Context
*ctx
= create_context_callback
<
454 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_local_image
>(this);
455 auto req
= PrepareLocalImageRequest
<I
>::create(
456 m_local_ioctx
, m_global_image_id
, &m_local_image_id
,
457 &m_local_image_tag_owner
, m_threads
->work_queue
, ctx
);
461 template <typename I
>
462 void ImageReplayer
<I
>::handle_prepare_local_image(int r
) {
463 dout(20) << "r=" << r
<< dendl
;
466 dout(20) << "local image does not exist" << dendl
;
468 on_start_fail(r
, "error preparing local image for replay");
470 } else if (m_local_image_tag_owner
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
471 dout(5) << "local image is primary" << dendl
;
472 on_start_fail(0, "local image is primary");
476 // local image doesn't exist or is non-primary
477 prepare_remote_image();
480 template <typename I
>
481 void ImageReplayer
<I
>::prepare_remote_image() {
484 // TODO need to support multiple remote images
485 assert(!m_peers
.empty());
486 m_remote_image
= {*m_peers
.begin()};
488 Context
*ctx
= create_context_callback
<
489 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_remote_image
>(this);
490 auto req
= PrepareRemoteImageRequest
<I
>::create(
491 m_remote_image
.io_ctx
, m_global_image_id
, &m_remote_image
.mirror_uuid
,
492 &m_remote_image
.image_id
, ctx
);
496 template <typename I
>
497 void ImageReplayer
<I
>::handle_prepare_remote_image(int r
) {
498 dout(20) << "r=" << r
<< dendl
;
501 dout(20) << "remote image does not exist" << dendl
;
503 // TODO need to support multiple remote images
504 if (!m_local_image_id
.empty() &&
505 m_local_image_tag_owner
== m_remote_image
.mirror_uuid
) {
506 // local image exists and is non-primary and linked to the missing
509 m_delete_requested
= true;
510 on_start_fail(0, "remote image no longer exists");
512 on_start_fail(-ENOENT
, "remote image does not exist");
516 on_start_fail(r
, "error retrieving remote image id");
523 template <typename I
>
524 void ImageReplayer
<I
>::bootstrap() {
527 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
528 journal::Settings settings
;
529 settings
.commit_interval
= cct
->_conf
->rbd_mirror_journal_commit_age
;
530 settings
.max_fetch_bytes
= cct
->_conf
->rbd_mirror_journal_max_fetch_bytes
;
532 m_remote_journaler
= new Journaler(m_threads
->work_queue
,
534 &m_threads
->timer_lock
,
535 m_remote_image
.io_ctx
,
536 m_remote_image
.image_id
,
537 m_local_mirror_uuid
, settings
);
539 Context
*ctx
= create_context_callback
<
540 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
542 BootstrapRequest
<I
> *request
= BootstrapRequest
<I
>::create(
543 m_local_ioctx
, m_remote_image
.io_ctx
, m_instance_watcher
,
544 &m_local_image_ctx
, m_local_image_id
, m_remote_image
.image_id
,
545 m_global_image_id
, m_threads
->work_queue
, m_threads
->timer
,
546 &m_threads
->timer_lock
, m_local_mirror_uuid
, m_remote_image
.mirror_uuid
,
547 m_remote_journaler
, &m_client_meta
, ctx
, &m_resync_requested
,
551 Mutex::Locker
locker(m_lock
);
553 m_bootstrap_request
= request
;
556 update_mirror_image_status(false, boost::none
);
557 reschedule_update_status_task(10);
562 template <typename I
>
563 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
564 dout(20) << "r=" << r
<< dendl
;
566 Mutex::Locker
locker(m_lock
);
567 m_bootstrap_request
->put();
568 m_bootstrap_request
= nullptr;
569 if (m_local_image_ctx
) {
570 m_local_image_id
= m_local_image_ctx
->id
;
574 if (r
== -EREMOTEIO
) {
575 m_local_image_tag_owner
= "";
576 dout(5) << "remote image is non-primary" << dendl
;
577 on_start_fail(-EREMOTEIO
, "remote image is non-primary");
579 } else if (r
== -EEXIST
) {
580 m_local_image_tag_owner
= "";
581 on_start_fail(r
, "split-brain detected");
584 on_start_fail(r
, "error bootstrapping replay");
586 } else if (on_start_interrupted()) {
588 } else if (m_resync_requested
) {
589 on_start_fail(0, "resync requested");
593 assert(m_local_journal
== nullptr);
595 RWLock::RLocker
snap_locker(m_local_image_ctx
->snap_lock
);
596 if (m_local_image_ctx
->journal
!= nullptr) {
597 m_local_journal
= m_local_image_ctx
->journal
;
598 m_local_journal
->add_listener(m_journal_listener
);
602 if (m_local_journal
== nullptr) {
603 on_start_fail(-EINVAL
, "error accessing local journal");
608 Mutex::Locker
locker(m_lock
);
609 std::string name
= m_local_ioctx
.get_pool_name() + "/" +
610 m_local_image_ctx
->name
;
611 if (m_name
!= name
) {
614 // Re-register asok commands using the new name.
616 m_asok_hook
= nullptr;
619 register_admin_socket_hook();
622 update_mirror_image_status(false, boost::none
);
623 init_remote_journaler();
626 template <typename I
>
627 void ImageReplayer
<I
>::init_remote_journaler() {
630 Context
*ctx
= create_context_callback
<
631 ImageReplayer
, &ImageReplayer
<I
>::handle_init_remote_journaler
>(this);
632 m_remote_journaler
->init(ctx
);
635 template <typename I
>
636 void ImageReplayer
<I
>::handle_init_remote_journaler(int r
) {
637 dout(20) << "r=" << r
<< dendl
;
640 derr
<< "failed to initialize remote journal: " << cpp_strerror(r
) << dendl
;
641 on_start_fail(r
, "error initializing remote journal");
643 } else if (on_start_interrupted()) {
647 m_remote_journaler
->add_listener(&m_remote_listener
);
649 cls::journal::Client client
;
650 r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
652 derr
<< "error retrieving remote journal client: " << cpp_strerror(r
)
654 on_start_fail(r
, "error retrieving remote journal client");
658 derr
<< "image_id=" << m_local_image_id
<< ", "
659 << "m_client_meta.image_id=" << m_client_meta
.image_id
<< ", "
660 << "client.state=" << client
.state
<< dendl
;
661 if (m_client_meta
.image_id
== m_local_image_id
&&
662 client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
663 dout(5) << "client flagged disconnected, stopping image replay" << dendl
;
664 if (m_local_image_ctx
->mirroring_resync_after_disconnect
) {
665 m_resync_requested
= true;
666 on_start_fail(-ENOTCONN
, "disconnected: automatic resync");
668 on_start_fail(-ENOTCONN
, "disconnected");
676 template <typename I
>
677 void ImageReplayer
<I
>::start_replay() {
680 Context
*start_ctx
= create_context_callback
<
681 ImageReplayer
, &ImageReplayer
<I
>::handle_start_replay
>(this);
682 m_local_journal
->start_external_replay(&m_local_replay
, start_ctx
);
685 template <typename I
>
686 void ImageReplayer
<I
>::handle_start_replay(int r
) {
687 dout(20) << "r=" << r
<< dendl
;
690 assert(m_local_replay
== nullptr);
691 derr
<< "error starting external replay on local image "
692 << m_local_image_id
<< ": " << cpp_strerror(r
) << dendl
;
693 on_start_fail(r
, "error starting replay on local image");
697 Context
*on_finish(nullptr);
699 Mutex::Locker
locker(m_lock
);
700 assert(m_state
== STATE_STARTING
);
701 m_state
= STATE_REPLAYING
;
702 std::swap(m_on_start_finish
, on_finish
);
705 m_event_preprocessor
= EventPreprocessor
<I
>::create(
706 *m_local_image_ctx
, *m_remote_journaler
, m_local_mirror_uuid
,
707 &m_client_meta
, m_threads
->work_queue
);
708 m_replay_status_formatter
=
709 ReplayStatusFormatter
<I
>::create(m_remote_journaler
, m_local_mirror_uuid
);
711 update_mirror_image_status(true, boost::none
);
712 reschedule_update_status_task(30);
714 if (on_replay_interrupted()) {
719 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
720 double poll_seconds
= cct
->_conf
->rbd_mirror_journal_poll_age
;
722 Mutex::Locker
locker(m_lock
);
723 m_replay_handler
= new ReplayHandler
<I
>(this);
724 m_remote_journaler
->start_live_replay(m_replay_handler
, poll_seconds
);
726 dout(20) << "m_remote_journaler=" << *m_remote_journaler
<< dendl
;
729 dout(20) << "start succeeded" << dendl
;
730 if (on_finish
!= nullptr) {
731 dout(20) << "on finish complete, r=" << r
<< dendl
;
732 on_finish
->complete(r
);
736 template <typename I
>
737 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
739 dout(20) << "r=" << r
<< dendl
;
740 Context
*ctx
= new FunctionContext([this, r
, desc
](int _r
) {
742 Mutex::Locker
locker(m_lock
);
743 assert(m_state
== STATE_STARTING
);
744 m_state
= STATE_STOPPING
;
745 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
746 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
748 dout(20) << "start canceled" << dendl
;
752 set_state_description(r
, desc
);
753 update_mirror_image_status(false, boost::none
);
754 reschedule_update_status_task(-1);
757 m_threads
->work_queue
->queue(ctx
, 0);
760 template <typename I
>
761 bool ImageReplayer
<I
>::on_start_interrupted()
763 Mutex::Locker
locker(m_lock
);
764 assert(m_state
== STATE_STARTING
);
765 if (m_on_stop_finish
== nullptr) {
769 on_start_fail(-ECANCELED
);
773 template <typename I
>
774 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, int r
,
775 const std::string
& desc
)
777 dout(20) << "on_finish=" << on_finish
<< ", manual=" << manual
778 << ", desc=" << desc
<< dendl
;
780 m_image_deleter
->cancel_waiter(m_local_pool_id
, m_global_image_id
);
782 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
783 bool shut_down_replay
= false;
785 bool canceled_task
= false;
787 Mutex::Locker
locker(m_lock
);
789 if (!is_running_()) {
792 if (!is_stopped_()) {
793 if (m_state
== STATE_STARTING
) {
794 dout(20) << "canceling start" << dendl
;
795 if (m_bootstrap_request
) {
796 bootstrap_request
= m_bootstrap_request
;
797 bootstrap_request
->get();
800 dout(20) << "interrupting replay" << dendl
;
801 shut_down_replay
= true;
804 assert(m_on_stop_finish
== nullptr);
805 std::swap(m_on_stop_finish
, on_finish
);
806 m_stop_requested
= true;
807 m_manual_stop
= manual
;
809 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
810 if (m_delayed_preprocess_task
!= nullptr) {
811 canceled_task
= m_threads
->timer
->cancel_event(
812 m_delayed_preprocess_task
);
813 assert(canceled_task
);
814 m_delayed_preprocess_task
= nullptr;
820 // avoid holding lock since bootstrap request will update status
821 if (bootstrap_request
!= nullptr) {
822 bootstrap_request
->cancel();
823 bootstrap_request
->put();
827 m_event_replay_tracker
.finish_op();
828 on_replay_interrupted();
832 dout(20) << "not running" << dendl
;
834 on_finish
->complete(-EINVAL
);
839 if (shut_down_replay
) {
840 on_stop_journal_replay(r
, desc
);
841 } else if (on_finish
!= nullptr) {
842 on_finish
->complete(0);
846 template <typename I
>
847 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
849 dout(20) << "enter" << dendl
;
852 Mutex::Locker
locker(m_lock
);
853 if (m_state
!= STATE_REPLAYING
) {
854 // might be invoked multiple times while stopping
857 m_stop_requested
= true;
858 m_state
= STATE_STOPPING
;
861 set_state_description(r
, desc
);
862 update_mirror_image_status(false, boost::none
);
863 reschedule_update_status_task(-1);
867 template <typename I
>
868 void ImageReplayer
<I
>::handle_replay_ready()
870 dout(20) << "enter" << dendl
;
871 if (on_replay_interrupted()) {
875 if (!m_remote_journaler
->try_pop_front(&m_replay_entry
, &m_replay_tag_tid
)) {
879 m_event_replay_tracker
.start_op();
882 bool stopping
= (m_state
== STATE_STOPPING
);
886 dout(10) << "stopping event replay" << dendl
;
887 m_event_replay_tracker
.finish_op();
891 if (m_replay_tag_valid
&& m_replay_tag
.tid
== m_replay_tag_tid
) {
899 template <typename I
>
900 void ImageReplayer
<I
>::restart(Context
*on_finish
)
902 FunctionContext
*ctx
= new FunctionContext(
903 [this, on_finish
](int r
) {
907 start(on_finish
, true);
912 template <typename I
>
913 void ImageReplayer
<I
>::flush(Context
*on_finish
)
915 dout(20) << "enter" << dendl
;
918 Mutex::Locker
locker(m_lock
);
919 if (m_state
== STATE_REPLAYING
) {
920 Context
*ctx
= new FunctionContext(
922 if (on_finish
!= nullptr) {
923 on_finish
->complete(r
);
926 on_flush_local_replay_flush_start(ctx
);
932 on_finish
->complete(0);
936 template <typename I
>
937 void ImageReplayer
<I
>::on_flush_local_replay_flush_start(Context
*on_flush
)
939 dout(20) << "enter" << dendl
;
940 FunctionContext
*ctx
= new FunctionContext(
941 [this, on_flush
](int r
) {
942 on_flush_local_replay_flush_finish(on_flush
, r
);
945 assert(m_lock
.is_locked());
946 assert(m_state
== STATE_REPLAYING
);
947 m_local_replay
->flush(ctx
);
950 template <typename I
>
951 void ImageReplayer
<I
>::on_flush_local_replay_flush_finish(Context
*on_flush
,
954 dout(20) << "r=" << r
<< dendl
;
956 derr
<< "error flushing local replay: " << cpp_strerror(r
) << dendl
;
957 on_flush
->complete(r
);
961 on_flush_flush_commit_position_start(on_flush
);
964 template <typename I
>
965 void ImageReplayer
<I
>::on_flush_flush_commit_position_start(Context
*on_flush
)
967 FunctionContext
*ctx
= new FunctionContext(
968 [this, on_flush
](int r
) {
969 on_flush_flush_commit_position_finish(on_flush
, r
);
972 m_remote_journaler
->flush_commit_position(ctx
);
975 template <typename I
>
976 void ImageReplayer
<I
>::on_flush_flush_commit_position_finish(Context
*on_flush
,
980 derr
<< "error flushing remote journal commit position: "
981 << cpp_strerror(r
) << dendl
;
984 update_mirror_image_status(false, boost::none
);
986 dout(20) << "flush complete, r=" << r
<< dendl
;
987 on_flush
->complete(r
);
990 template <typename I
>
991 bool ImageReplayer
<I
>::on_replay_interrupted()
995 Mutex::Locker
locker(m_lock
);
996 shut_down
= m_stop_requested
;
1000 on_stop_journal_replay();
1005 template <typename I
>
1006 void ImageReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
)
1008 dout(20) << "enter" << dendl
;
1010 Mutex::Locker
l(m_lock
);
1013 f
->open_object_section("image_replayer");
1014 f
->dump_string("name", m_name
);
1015 f
->dump_string("state", to_string(m_state
));
1019 *ss
<< m_name
<< ": state: " << to_string(m_state
);
1023 template <typename I
>
1024 void ImageReplayer
<I
>::handle_replay_complete(int r
, const std::string
&error_desc
)
1026 dout(20) << "r=" << r
<< dendl
;
1028 derr
<< "replay encountered an error: " << cpp_strerror(r
) << dendl
;
1029 set_state_description(r
, error_desc
);
1033 Mutex::Locker
locker(m_lock
);
1034 m_stop_requested
= true;
1036 on_replay_interrupted();
1039 template <typename I
>
1040 void ImageReplayer
<I
>::replay_flush() {
1043 bool interrupted
= false;
1045 Mutex::Locker
locker(m_lock
);
1046 if (m_state
!= STATE_REPLAYING
) {
1047 dout(20) << "replay interrupted" << dendl
;
1050 m_state
= STATE_REPLAY_FLUSHING
;
1055 m_event_replay_tracker
.finish_op();
1059 // shut down the replay to flush all IO and ops and create a new
1060 // replayer to handle the new tag epoch
1061 Context
*ctx
= create_context_callback
<
1062 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_replay_flush
>(this);
1063 ctx
= new FunctionContext([this, ctx
](int r
) {
1064 m_local_image_ctx
->journal
->stop_external_replay();
1065 m_local_replay
= nullptr;
1072 m_local_journal
->start_external_replay(&m_local_replay
, ctx
);
1074 m_local_replay
->shut_down(false, ctx
);
1077 template <typename I
>
1078 void ImageReplayer
<I
>::handle_replay_flush(int r
) {
1079 dout(20) << "r=" << r
<< dendl
;
1082 Mutex::Locker
locker(m_lock
);
1083 assert(m_state
== STATE_REPLAY_FLUSHING
);
1084 m_state
= STATE_REPLAYING
;
1088 derr
<< "replay flush encountered an error: " << cpp_strerror(r
) << dendl
;
1089 m_event_replay_tracker
.finish_op();
1090 handle_replay_complete(r
, "replay flush encountered an error");
1092 } else if (on_replay_interrupted()) {
1093 m_event_replay_tracker
.finish_op();
1100 template <typename I
>
1101 void ImageReplayer
<I
>::get_remote_tag() {
1102 dout(20) << "tag_tid: " << m_replay_tag_tid
<< dendl
;
1104 Context
*ctx
= create_context_callback
<
1105 ImageReplayer
, &ImageReplayer
<I
>::handle_get_remote_tag
>(this);
1106 m_remote_journaler
->get_tag(m_replay_tag_tid
, &m_replay_tag
, ctx
);
1109 template <typename I
>
1110 void ImageReplayer
<I
>::handle_get_remote_tag(int r
) {
1111 dout(20) << "r=" << r
<< dendl
;
1115 bufferlist::iterator it
= m_replay_tag
.data
.begin();
1116 ::decode(m_replay_tag_data
, it
);
1117 } catch (const buffer::error
&err
) {
1123 derr
<< "failed to retrieve remote tag " << m_replay_tag_tid
<< ": "
1124 << cpp_strerror(r
) << dendl
;
1125 m_event_replay_tracker
.finish_op();
1126 handle_replay_complete(r
, "failed to retrieve remote tag");
1130 m_replay_tag_valid
= true;
1131 dout(20) << "decoded remote tag " << m_replay_tag_tid
<< ": "
1132 << m_replay_tag_data
<< dendl
;
1134 allocate_local_tag();
1137 template <typename I
>
1138 void ImageReplayer
<I
>::allocate_local_tag() {
1141 std::string mirror_uuid
= m_replay_tag_data
.mirror_uuid
;
1142 if (mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
||
1143 mirror_uuid
== m_local_mirror_uuid
) {
1144 mirror_uuid
= m_remote_image
.mirror_uuid
;
1145 } else if (mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
) {
1146 dout(5) << "encountered image demotion: stopping" << dendl
;
1147 Mutex::Locker
locker(m_lock
);
1148 m_stop_requested
= true;
1151 librbd::journal::TagPredecessor
predecessor(m_replay_tag_data
.predecessor
);
1152 if (predecessor
.mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
1153 predecessor
.mirror_uuid
= m_remote_image
.mirror_uuid
;
1154 } else if (predecessor
.mirror_uuid
== m_local_mirror_uuid
) {
1155 predecessor
.mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
1158 dout(20) << "mirror_uuid=" << mirror_uuid
<< ", "
1159 << "predecessor_mirror_uuid=" << predecessor
.mirror_uuid
<< ", "
1160 << "replay_tag_tid=" << m_replay_tag_tid
<< ", "
1161 << "replay_tag_data=" << m_replay_tag_data
<< dendl
;
1162 Context
*ctx
= create_context_callback
<
1163 ImageReplayer
, &ImageReplayer
<I
>::handle_allocate_local_tag
>(this);
1164 m_local_journal
->allocate_tag(mirror_uuid
, predecessor
, ctx
);
1167 template <typename I
>
1168 void ImageReplayer
<I
>::handle_allocate_local_tag(int r
) {
1169 dout(20) << "r=" << r
<< dendl
;
1172 derr
<< "failed to allocate journal tag: " << cpp_strerror(r
) << dendl
;
1173 m_event_replay_tracker
.finish_op();
1174 handle_replay_complete(r
, "failed to allocate journal tag");
1181 template <typename I
>
1182 void ImageReplayer
<I
>::preprocess_entry() {
1183 dout(20) << "preprocessing entry tid=" << m_replay_entry
.get_commit_tid()
1186 bufferlist data
= m_replay_entry
.get_data();
1187 bufferlist::iterator it
= data
.begin();
1188 int r
= m_local_replay
->decode(&it
, &m_event_entry
);
1190 derr
<< "failed to decode journal event" << dendl
;
1191 m_event_replay_tracker
.finish_op();
1192 handle_replay_complete(r
, "failed to decode journal event");
1196 uint32_t delay
= calculate_replay_delay(
1197 m_event_entry
.timestamp
, m_local_image_ctx
->mirroring_replay_delay
);
1199 handle_preprocess_entry_ready(0);
1203 dout(20) << "delaying replay by " << delay
<< " sec" << dendl
;
1205 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1206 assert(m_delayed_preprocess_task
== nullptr);
1207 m_delayed_preprocess_task
= new FunctionContext(
1209 assert(m_threads
->timer_lock
.is_locked());
1210 m_delayed_preprocess_task
= nullptr;
1211 m_threads
->work_queue
->queue(
1212 create_context_callback
<ImageReplayer
,
1213 &ImageReplayer
<I
>::handle_preprocess_entry_ready
>(this), 0);
1215 m_threads
->timer
->add_event_after(delay
, m_delayed_preprocess_task
);
1218 template <typename I
>
1219 void ImageReplayer
<I
>::handle_preprocess_entry_ready(int r
) {
1220 dout(20) << "r=" << r
<< dendl
;
1223 if (!m_event_preprocessor
->is_required(m_event_entry
)) {
1228 Context
*ctx
= create_context_callback
<
1229 ImageReplayer
, &ImageReplayer
<I
>::handle_preprocess_entry_safe
>(this);
1230 m_event_preprocessor
->preprocess(&m_event_entry
, ctx
);
1233 template <typename I
>
1234 void ImageReplayer
<I
>::handle_preprocess_entry_safe(int r
) {
1235 dout(20) << "r=" << r
<< dendl
;
1238 m_event_replay_tracker
.finish_op();
1240 if (r
== -ECANCELED
) {
1241 handle_replay_complete(0, "lost exclusive lock");
1243 derr
<< "failed to preprocess journal event" << dendl
;
1244 handle_replay_complete(r
, "failed to preprocess journal event");
1252 template <typename I
>
1253 void ImageReplayer
<I
>::process_entry() {
1254 dout(20) << "processing entry tid=" << m_replay_entry
.get_commit_tid()
1257 // stop replaying events if stop has been requested
1258 if (on_replay_interrupted()) {
1259 m_event_replay_tracker
.finish_op();
1263 Context
*on_ready
= create_context_callback
<
1264 ImageReplayer
, &ImageReplayer
<I
>::handle_process_entry_ready
>(this);
1265 Context
*on_commit
= new C_ReplayCommitted(this, std::move(m_replay_entry
));
1267 m_local_replay
->process(m_event_entry
, on_ready
, on_commit
);
1270 template <typename I
>
1271 void ImageReplayer
<I
>::handle_process_entry_ready(int r
) {
1275 // attempt to process the next event
1276 handle_replay_ready();
1279 template <typename I
>
1280 void ImageReplayer
<I
>::handle_process_entry_safe(const ReplayEntry
& replay_entry
,
1282 dout(20) << "commit_tid=" << replay_entry
.get_commit_tid() << ", r=" << r
1286 derr
<< "failed to commit journal event: " << cpp_strerror(r
) << dendl
;
1287 handle_replay_complete(r
, "failed to commit journal event");
1289 assert(m_remote_journaler
!= nullptr);
1290 m_remote_journaler
->committed(replay_entry
);
1292 m_event_replay_tracker
.finish_op();
1295 template <typename I
>
1296 bool ImageReplayer
<I
>::update_mirror_image_status(bool force
,
1297 const OptionalState
&state
) {
1300 Mutex::Locker
locker(m_lock
);
1301 if (!start_mirror_image_status_update(force
, false)) {
1306 queue_mirror_image_status_update(state
);
1310 template <typename I
>
1311 bool ImageReplayer
<I
>::start_mirror_image_status_update(bool force
,
1313 assert(m_lock
.is_locked());
1315 if (!force
&& !is_stopped_()) {
1316 if (!is_running_()) {
1317 dout(20) << "shut down in-progress: ignoring update" << dendl
;
1319 } else if (m_in_flight_status_updates
> (restarting
? 1 : 0)) {
1320 dout(20) << "already sending update" << dendl
;
1321 m_update_status_requested
= true;
1327 ++m_in_flight_status_updates
;
1331 template <typename I
>
1332 void ImageReplayer
<I
>::finish_mirror_image_status_update() {
1333 Context
*on_finish
= nullptr;
1335 Mutex::Locker
locker(m_lock
);
1336 assert(m_in_flight_status_updates
> 0);
1337 if (--m_in_flight_status_updates
> 0) {
1338 dout(20) << "waiting on " << m_in_flight_status_updates
<< " in-flight "
1339 << "updates" << dendl
;
1343 std::swap(on_finish
, m_on_update_status_finish
);
1347 if (on_finish
!= nullptr) {
1348 on_finish
->complete(0);
1352 template <typename I
>
1353 void ImageReplayer
<I
>::queue_mirror_image_status_update(const OptionalState
&state
) {
1355 FunctionContext
*ctx
= new FunctionContext(
1356 [this, state
](int r
) {
1357 send_mirror_status_update(state
);
1359 m_threads
->work_queue
->queue(ctx
, 0);
1362 template <typename I
>
1363 void ImageReplayer
<I
>::send_mirror_status_update(const OptionalState
&opt_state
) {
1365 std::string state_desc
;
1367 bool stopping_replay
;
1369 OptionalMirrorImageStatusState mirror_image_status_state
{
1370 boost::make_optional(false, cls::rbd::MirrorImageStatusState
{})};
1371 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
1373 Mutex::Locker
locker(m_lock
);
1375 state_desc
= m_state_desc
;
1376 mirror_image_status_state
= m_mirror_image_status_state
;
1378 stopping_replay
= (m_local_image_ctx
!= nullptr);
1380 if (m_bootstrap_request
!= nullptr) {
1381 bootstrap_request
= m_bootstrap_request
;
1382 bootstrap_request
->get();
1386 bool syncing
= false;
1387 if (bootstrap_request
!= nullptr) {
1388 syncing
= bootstrap_request
->is_syncing();
1389 bootstrap_request
->put();
1390 bootstrap_request
= nullptr;
1397 cls::rbd::MirrorImageStatus status
;
1400 case STATE_STARTING
:
1402 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
1403 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
1404 mirror_image_status_state
= status
.state
;
1406 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
1407 status
.description
= "starting replay";
1410 case STATE_REPLAYING
:
1411 case STATE_REPLAY_FLUSHING
:
1412 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
1414 Context
*on_req_finish
= new FunctionContext(
1416 dout(20) << "replay status ready: r=" << r
<< dendl
;
1418 send_mirror_status_update(boost::none
);
1419 } else if (r
== -EAGAIN
) {
1420 // decrement in-flight status update counter
1421 handle_mirror_status_update(r
);
1426 if (!m_replay_status_formatter
->get_or_send_update(&desc
,
1428 dout(20) << "waiting for replay status" << dendl
;
1431 status
.description
= "replaying, " + desc
;
1432 mirror_image_status_state
= boost::none
;
1435 case STATE_STOPPING
:
1436 if (stopping_replay
) {
1437 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
1438 status
.description
= "stopping replay";
1443 if (last_r
== -EREMOTEIO
) {
1444 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
1445 status
.description
= state_desc
;
1446 mirror_image_status_state
= status
.state
;
1447 } else if (last_r
< 0) {
1448 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
1449 status
.description
= state_desc
;
1450 mirror_image_status_state
= status
.state
;
1452 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
1453 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
1454 mirror_image_status_state
= boost::none
;
1458 assert(!"invalid state");
1462 Mutex::Locker
locker(m_lock
);
1463 m_mirror_image_status_state
= mirror_image_status_state
;
1466 // prevent the status from ping-ponging when failed replays are restarted
1467 if (mirror_image_status_state
&&
1468 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
1469 status
.state
= *mirror_image_status_state
;
1472 dout(20) << "status=" << status
<< dendl
;
1473 librados::ObjectWriteOperation op
;
1474 librbd::cls_client::mirror_image_status_set(&op
, m_global_image_id
, status
);
1476 librados::AioCompletion
*aio_comp
= create_rados_callback
<
1477 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_mirror_status_update
>(this);
1478 int r
= m_local_ioctx
.aio_operate(RBD_MIRRORING
, aio_comp
, &op
);
1480 aio_comp
->release();
1483 template <typename I
>
1484 void ImageReplayer
<I
>::handle_mirror_status_update(int r
) {
1485 dout(20) << "r=" << r
<< dendl
;
1487 bool running
= false;
1488 bool started
= false;
1490 Mutex::Locker
locker(m_lock
);
1491 bool update_status_requested
= false;
1492 std::swap(update_status_requested
, m_update_status_requested
);
1494 running
= is_running_();
1495 if (running
&& update_status_requested
) {
1496 started
= start_mirror_image_status_update(false, true);
1500 // if a deferred update is available, send it -- otherwise reschedule
1503 queue_mirror_image_status_update(boost::none
);
1504 } else if (running
) {
1505 reschedule_update_status_task();
1508 // mark committed status update as no longer in-flight
1509 finish_mirror_image_status_update();
1512 template <typename I
>
1513 void ImageReplayer
<I
>::reschedule_update_status_task(int new_interval
) {
1516 bool canceled_task
= false;
1518 Mutex::Locker
locker(m_lock
);
1519 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1521 if (m_update_status_task
) {
1522 canceled_task
= m_threads
->timer
->cancel_event(m_update_status_task
);
1523 m_update_status_task
= nullptr;
1526 if (new_interval
> 0) {
1527 m_update_status_interval
= new_interval
;
1530 bool restarting
= (new_interval
== 0 || canceled_task
);
1531 if (new_interval
>= 0 && is_running_() &&
1532 start_mirror_image_status_update(false, restarting
)) {
1533 m_update_status_task
= new FunctionContext(
1535 assert(m_threads
->timer_lock
.is_locked());
1536 m_update_status_task
= nullptr;
1538 queue_mirror_image_status_update(boost::none
);
1540 m_threads
->timer
->add_event_after(m_update_status_interval
,
1541 m_update_status_task
);
1545 if (canceled_task
) {
1546 dout(20) << "canceled task" << dendl
;
1547 finish_mirror_image_status_update();
1551 template <typename I
>
1552 void ImageReplayer
<I
>::shut_down(int r
) {
1553 dout(20) << "r=" << r
<< dendl
;
1555 Mutex::Locker
locker(m_lock
);
1556 assert(m_state
== STATE_STOPPING
);
1558 // if status updates are in-flight, wait for them to complete
1559 // before proceeding
1560 if (m_in_flight_status_updates
> 0) {
1561 if (m_on_update_status_finish
== nullptr) {
1562 dout(20) << "waiting for in-flight status update" << dendl
;
1563 m_on_update_status_finish
= new FunctionContext(
1572 // NOTE: it's important to ensure that the local image is fully
1573 // closed before attempting to close the remote journal in
1574 // case the remote cluster is unreachable
1576 // chain the shut down sequence (reverse order)
1577 Context
*ctx
= new FunctionContext(
1579 update_mirror_image_status(true, STATE_STOPPED
);
1580 handle_shut_down(r
);
1583 // close the remote journal
1584 if (m_remote_journaler
!= nullptr) {
1585 ctx
= new FunctionContext([this, ctx
](int r
) {
1586 delete m_remote_journaler
;
1587 m_remote_journaler
= nullptr;
1590 ctx
= new FunctionContext([this, ctx
](int r
) {
1591 m_remote_journaler
->remove_listener(&m_remote_listener
);
1592 m_remote_journaler
->shut_down(ctx
);
1596 // stop the replay of remote journal events
1597 if (m_replay_handler
!= nullptr) {
1598 ctx
= new FunctionContext([this, ctx
](int r
) {
1599 delete m_replay_handler
;
1600 m_replay_handler
= nullptr;
1602 m_event_replay_tracker
.wait_for_ops(ctx
);
1604 ctx
= new FunctionContext([this, ctx
](int r
) {
1605 m_remote_journaler
->stop_replay(ctx
);
1609 // close the local image (release exclusive lock)
1610 if (m_local_image_ctx
) {
1611 ctx
= new FunctionContext([this, ctx
](int r
) {
1612 CloseImageRequest
<I
> *request
= CloseImageRequest
<I
>::create(
1613 &m_local_image_ctx
, ctx
);
1618 // shut down event replay into the local image
1619 if (m_local_journal
!= nullptr) {
1620 ctx
= new FunctionContext([this, ctx
](int r
) {
1621 m_local_journal
= nullptr;
1624 if (m_local_replay
!= nullptr) {
1625 ctx
= new FunctionContext([this, ctx
](int r
) {
1626 m_local_journal
->stop_external_replay();
1627 m_local_replay
= nullptr;
1629 EventPreprocessor
<I
>::destroy(m_event_preprocessor
);
1630 m_event_preprocessor
= nullptr;
1634 ctx
= new FunctionContext([this, ctx
](int r
) {
1635 // blocks if listener notification is in-progress
1636 m_local_journal
->remove_listener(m_journal_listener
);
1641 // wait for all local in-flight replay events to complete
1642 ctx
= new FunctionContext([this, ctx
](int r
) {
1644 derr
<< "error shutting down journal replay: " << cpp_strerror(r
)
1648 m_event_replay_tracker
.wait_for_ops(ctx
);
1651 // flush any local in-flight replay events
1652 if (m_local_replay
!= nullptr) {
1653 ctx
= new FunctionContext([this, ctx
](int r
) {
1654 m_local_replay
->shut_down(true, ctx
);
1658 m_threads
->work_queue
->queue(ctx
, 0);
1661 template <typename I
>
1662 void ImageReplayer
<I
>::handle_shut_down(int r
) {
1663 reschedule_update_status_task(-1);
1666 Mutex::Locker
locker(m_lock
);
1668 // if status updates are in-flight, wait for them to complete
1669 // before proceeding
1670 if (m_in_flight_status_updates
> 0) {
1671 if (m_on_update_status_finish
== nullptr) {
1672 dout(20) << "waiting for in-flight status update" << dendl
;
1673 m_on_update_status_finish
= new FunctionContext(
1675 handle_shut_down(r
);
1681 bool delete_requested
= false;
1682 if (m_delete_requested
&& !m_local_image_id
.empty()) {
1683 assert(m_remote_image
.image_id
.empty());
1684 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
1685 delete_requested
= true;
1687 if (delete_requested
|| m_resync_requested
) {
1688 m_image_deleter
->schedule_image_delete(m_local
,
1691 m_resync_requested
);
1693 m_local_image_id
= "";
1694 m_resync_requested
= false;
1695 if (m_delete_requested
) {
1696 unregister_admin_socket_hook();
1697 m_delete_requested
= false;
1699 } else if (m_last_r
== -ENOENT
&&
1700 m_local_image_id
.empty() && m_remote_image
.image_id
.empty()) {
1701 dout(0) << "mirror image no longer exists" << dendl
;
1702 unregister_admin_socket_hook();
1707 dout(20) << "stop complete" << dendl
;
1708 m_local_ioctx
.close();
1710 ReplayStatusFormatter
<I
>::destroy(m_replay_status_formatter
);
1711 m_replay_status_formatter
= nullptr;
1713 Context
*on_start
= nullptr;
1714 Context
*on_stop
= nullptr;
1716 Mutex::Locker
locker(m_lock
);
1717 std::swap(on_start
, m_on_start_finish
);
1718 std::swap(on_stop
, m_on_stop_finish
);
1719 m_stop_requested
= false;
1720 assert(m_delayed_preprocess_task
== nullptr);
1721 assert(m_state
== STATE_STOPPING
);
1722 m_state
= STATE_STOPPED
;
1725 if (on_start
!= nullptr) {
1726 dout(20) << "on start finish complete, r=" << r
<< dendl
;
1727 on_start
->complete(r
);
1730 if (on_stop
!= nullptr) {
1731 dout(20) << "on stop finish complete, r=" << r
<< dendl
;
1732 on_stop
->complete(r
);
1736 template <typename I
>
1737 void ImageReplayer
<I
>::handle_remote_journal_metadata_updated() {
1740 cls::journal::Client client
;
1742 Mutex::Locker
locker(m_lock
);
1743 if (!is_running_()) {
1747 int r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
1749 derr
<< "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
1754 if (client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
1755 dout(0) << "client flagged disconnected, stopping image replay" << dendl
;
1756 stop(nullptr, false, -ENOTCONN
, "disconnected");
1760 template <typename I
>
1761 std::string ImageReplayer
<I
>::to_string(const State state
) {
1763 case ImageReplayer
<I
>::STATE_STARTING
:
1765 case ImageReplayer
<I
>::STATE_REPLAYING
:
1767 case ImageReplayer
<I
>::STATE_REPLAY_FLUSHING
:
1768 return "ReplayFlushing";
1769 case ImageReplayer
<I
>::STATE_STOPPING
:
1771 case ImageReplayer
<I
>::STATE_STOPPED
:
1776 return "Unknown(" + stringify(state
) + ")";
1779 template <typename I
>
1780 void ImageReplayer
<I
>::resync_image(Context
*on_finish
) {
1783 m_resync_requested
= true;
1787 template <typename I
>
1788 void ImageReplayer
<I
>::register_admin_socket_hook() {
1789 if (m_asok_hook
!= nullptr) {
1793 dout(20) << "registered asok hook: " << m_name
<< dendl
;
1794 auto asok_hook
= new ImageReplayerAdminSocketHook
<I
>(g_ceph_context
, m_name
,
1796 int r
= asok_hook
->register_commands();
1798 derr
<< "error registering admin socket commands" << dendl
;
1800 asok_hook
= nullptr;
1804 m_asok_hook
= asok_hook
;
1807 template <typename I
>
1808 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1812 m_asok_hook
= nullptr;
1815 template <typename I
>
1816 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1818 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1819 << "/" << replayer
.get_global_image_id() << "]";
1823 } // namespace mirror
1826 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;