1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageReplayer.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
27 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
28 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_rbd_mirror
34 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
39 using std::unique_ptr
;
40 using std::shared_ptr
;
46 using librbd::util::create_context_callback
;
47 using librbd::util::create_rados_callback
;
48 using namespace rbd::mirror::image_replayer
;
51 std::ostream
&operator<<(std::ostream
&os
,
52 const typename ImageReplayer
<I
>::State
&state
);
57 struct ReplayHandler
: public ::journal::ReplayHandler
{
58 ImageReplayer
<I
> *replayer
;
59 ReplayHandler(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
60 void get() override
{}
61 void put() override
{}
63 void handle_entries_available() override
{
64 replayer
->handle_replay_ready();
66 void handle_complete(int r
) override
{
69 ss
<< "replay completed with error: " << cpp_strerror(r
);
71 replayer
->handle_replay_complete(r
, ss
.str());
75 class ImageReplayerAdminSocketCommand
{
77 virtual ~ImageReplayerAdminSocketCommand() {}
78 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
82 class StatusCommand
: public ImageReplayerAdminSocketCommand
{
84 explicit StatusCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
86 bool call(Formatter
*f
, stringstream
*ss
) override
{
87 replayer
->print_status(f
, ss
);
92 ImageReplayer
<I
> *replayer
;
96 class StartCommand
: public ImageReplayerAdminSocketCommand
{
98 explicit StartCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
100 bool call(Formatter
*f
, stringstream
*ss
) override
{
101 replayer
->start(nullptr, true);
106 ImageReplayer
<I
> *replayer
;
109 template <typename I
>
110 class StopCommand
: public ImageReplayerAdminSocketCommand
{
112 explicit StopCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
114 bool call(Formatter
*f
, stringstream
*ss
) override
{
115 replayer
->stop(nullptr, true);
120 ImageReplayer
<I
> *replayer
;
123 template <typename I
>
124 class RestartCommand
: public ImageReplayerAdminSocketCommand
{
126 explicit RestartCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
128 bool call(Formatter
*f
, stringstream
*ss
) override
{
134 ImageReplayer
<I
> *replayer
;
137 template <typename I
>
138 class FlushCommand
: public ImageReplayerAdminSocketCommand
{
140 explicit FlushCommand(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
142 bool call(Formatter
*f
, stringstream
*ss
) override
{
144 replayer
->flush(&cond
);
147 *ss
<< "flush: " << cpp_strerror(r
);
154 ImageReplayer
<I
> *replayer
;
157 template <typename I
>
158 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
160 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
161 ImageReplayer
<I
> *replayer
)
162 : admin_socket(cct
->get_admin_socket()),
163 lock("ImageReplayerAdminSocketHook::lock " +
164 replayer
->get_global_image_id()) {
168 command
= "rbd mirror status " + name
;
169 r
= admin_socket
->register_command(command
, command
, this,
170 "get status for rbd mirror " + name
);
172 commands
[command
] = new StatusCommand
<I
>(replayer
);
175 command
= "rbd mirror start " + name
;
176 r
= admin_socket
->register_command(command
, command
, this,
177 "start rbd mirror " + name
);
179 commands
[command
] = new StartCommand
<I
>(replayer
);
182 command
= "rbd mirror stop " + name
;
183 r
= admin_socket
->register_command(command
, command
, this,
184 "stop rbd mirror " + name
);
186 commands
[command
] = new StopCommand
<I
>(replayer
);
189 command
= "rbd mirror restart " + name
;
190 r
= admin_socket
->register_command(command
, command
, this,
191 "restart rbd mirror " + name
);
193 commands
[command
] = new RestartCommand
<I
>(replayer
);
196 command
= "rbd mirror flush " + name
;
197 r
= admin_socket
->register_command(command
, command
, this,
198 "flush rbd mirror " + name
);
200 commands
[command
] = new FlushCommand
<I
>(replayer
);
204 ~ImageReplayerAdminSocketHook() override
{
205 Mutex::Locker
locker(lock
);
206 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
208 (void)admin_socket
->unregister_command(i
->first
);
214 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
215 bufferlist
& out
) override
{
216 Mutex::Locker
locker(lock
);
217 Commands::const_iterator i
= commands
.find(command
);
218 assert(i
!= commands
.end());
219 Formatter
*f
= Formatter::create(format
);
221 bool r
= i
->second
->call(f
, &ss
);
228 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
*> Commands
;
230 AdminSocket
*admin_socket
;
235 uint32_t calculate_replay_delay(const utime_t
&event_time
,
236 int mirroring_replay_delay
) {
237 if (mirroring_replay_delay
<= 0) {
241 utime_t now
= ceph_clock_now();
242 if (event_time
+ mirroring_replay_delay
<= now
) {
246 // ensure it is rounded up when converting to integer
247 return (event_time
+ mirroring_replay_delay
- now
) + 1;
250 } // anonymous namespace
252 template <typename I
>
253 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
254 const std::string
&description
, bool flush
)
256 const std::string desc
= "bootstrapping, " + description
;
257 replayer
->set_state_description(0, desc
);
259 replayer
->update_mirror_image_status(false, boost::none
);
263 template <typename I
>
264 void ImageReplayer
<I
>::RemoteJournalerListener::handle_update(
265 ::journal::JournalMetadata
*) {
266 FunctionContext
*ctx
= new FunctionContext([this](int r
) {
267 replayer
->handle_remote_journal_metadata_updated();
269 replayer
->m_threads
->work_queue
->queue(ctx
, 0);
272 template <typename I
>
273 ImageReplayer
<I
>::ImageReplayer(Threads
<librbd::ImageCtx
> *threads
,
274 ImageDeleter
<I
>* image_deleter
,
275 InstanceWatcher
<I
> *instance_watcher
,
277 const std::string
&local_mirror_uuid
,
278 int64_t local_pool_id
,
279 const std::string
&global_image_id
) :
281 m_image_deleter(image_deleter
),
282 m_instance_watcher(instance_watcher
),
284 m_local_mirror_uuid(local_mirror_uuid
),
285 m_local_pool_id(local_pool_id
),
286 m_global_image_id(global_image_id
),
287 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id
) + " " +
289 m_progress_cxt(this),
290 m_journal_listener(new JournalListener(this)),
291 m_remote_listener(this)
293 // Register asok commands using a temporary "remote_pool_name/global_image_id"
294 // name. When the image name becomes known on start the asok commands will be
295 // re-registered using "remote_pool_name/remote_image_name" name.
297 std::string pool_name
;
298 int r
= m_local
->pool_reverse_lookup(m_local_pool_id
, &pool_name
);
300 derr
<< "error resolving local pool " << m_local_pool_id
301 << ": " << cpp_strerror(r
) << dendl
;
302 pool_name
= stringify(m_local_pool_id
);
305 m_name
= pool_name
+ "/" + m_global_image_id
;
306 dout(20) << "registered asok hook: " << m_name
<< dendl
;
307 m_asok_hook
= new ImageReplayerAdminSocketHook
<I
>(g_ceph_context
, m_name
,
311 template <typename I
>
312 ImageReplayer
<I
>::~ImageReplayer()
314 assert(m_event_preprocessor
== nullptr);
315 assert(m_replay_status_formatter
== nullptr);
316 assert(m_local_image_ctx
== nullptr);
317 assert(m_local_replay
== nullptr);
318 assert(m_remote_journaler
== nullptr);
319 assert(m_replay_handler
== nullptr);
320 assert(m_on_start_finish
== nullptr);
321 assert(m_on_stop_finish
== nullptr);
322 assert(m_bootstrap_request
== nullptr);
323 assert(m_in_flight_status_updates
== 0);
325 delete m_journal_listener
;
329 template <typename I
>
330 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
331 Mutex::Locker
locker(m_lock
);
333 if (!m_mirror_image_status_state
) {
334 return image_replayer::HEALTH_STATE_OK
;
335 } else if (*m_mirror_image_status_state
==
336 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
337 *m_mirror_image_status_state
==
338 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
339 return image_replayer::HEALTH_STATE_WARNING
;
341 return image_replayer::HEALTH_STATE_ERROR
;
344 template <typename I
>
345 void ImageReplayer
<I
>::add_remote_image(const std::string
&mirror_uuid
,
346 const std::string
&image_id
,
347 librados::IoCtx
&io_ctx
) {
348 Mutex::Locker
locker(m_lock
);
350 RemoteImage
remote_image(mirror_uuid
, image_id
, io_ctx
);
351 auto it
= m_remote_images
.find(remote_image
);
352 if (it
== m_remote_images
.end()) {
353 m_remote_images
.insert(remote_image
);
357 template <typename I
>
358 void ImageReplayer
<I
>::remove_remote_image(const std::string
&mirror_uuid
,
359 const std::string
&image_id
,
360 bool schedule_delete
) {
361 Mutex::Locker
locker(m_lock
);
362 m_remote_images
.erase({mirror_uuid
, image_id
});
365 template <typename I
>
366 bool ImageReplayer
<I
>::remote_images_empty() const {
367 Mutex::Locker
locker(m_lock
);
368 return m_remote_images
.empty();
371 template <typename I
>
372 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
373 dout(20) << r
<< " " << desc
<< dendl
;
375 Mutex::Locker
l(m_lock
);
380 template <typename I
>
381 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
)
383 dout(20) << "on_finish=" << on_finish
<< dendl
;
387 Mutex::Locker
locker(m_lock
);
388 if (!is_stopped_()) {
389 derr
<< "already running" << dendl
;
391 } else if (m_manual_stop
&& !manual
) {
392 dout(5) << "stopped manually, ignoring start without manual flag"
396 m_state
= STATE_STARTING
;
398 m_state_desc
.clear();
399 m_manual_stop
= false;
401 if (on_finish
!= nullptr) {
402 assert(m_on_start_finish
== nullptr);
403 m_on_start_finish
= on_finish
;
405 assert(m_on_stop_finish
== nullptr);
411 on_finish
->complete(r
);
416 r
= m_local
->ioctx_create2(m_local_pool_id
, m_local_ioctx
);
418 derr
<< "error opening ioctx for local pool " << m_local_pool_id
419 << ": " << cpp_strerror(r
) << dendl
;
420 on_start_fail(r
, "error opening local pool");
424 prepare_local_image();
427 template <typename I
>
428 void ImageReplayer
<I
>::prepare_local_image() {
431 Context
*ctx
= create_context_callback
<
432 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_local_image
>(this);
433 auto req
= PrepareLocalImageRequest
<I
>::create(
434 m_local_ioctx
, m_global_image_id
, &m_local_image_id
,
435 &m_local_image_tag_owner
, m_threads
->work_queue
, ctx
);
439 template <typename I
>
440 void ImageReplayer
<I
>::handle_prepare_local_image(int r
) {
441 dout(20) << "r=" << r
<< dendl
;
444 dout(20) << "local image does not exist" << dendl
;
446 on_start_fail(r
, "error preparing local image for replay");
448 } else if (m_local_image_tag_owner
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
449 dout(5) << "local image is primary" << dendl
;
450 on_start_fail(0, "local image is primary");
454 // local image doesn't exist or is non-primary
458 template <typename I
>
459 void ImageReplayer
<I
>::bootstrap() {
462 if (m_remote_images
.empty()) {
463 on_start_fail(-EREMOTEIO
, "waiting for primary remote image");
467 // TODO bootstrap will need to support multiple remote images
468 m_remote_image
= *m_remote_images
.begin();
470 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
471 journal::Settings settings
;
472 settings
.commit_interval
= cct
->_conf
->rbd_mirror_journal_commit_age
;
473 settings
.max_fetch_bytes
= cct
->_conf
->rbd_mirror_journal_max_fetch_bytes
;
475 m_remote_journaler
= new Journaler(m_threads
->work_queue
,
477 &m_threads
->timer_lock
,
478 m_remote_image
.io_ctx
,
479 m_remote_image
.image_id
,
480 m_local_mirror_uuid
, settings
);
482 Context
*ctx
= create_context_callback
<
483 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
485 BootstrapRequest
<I
> *request
= BootstrapRequest
<I
>::create(
486 m_local_ioctx
, m_remote_image
.io_ctx
, m_instance_watcher
,
487 &m_local_image_ctx
, m_local_image_id
, m_remote_image
.image_id
,
488 m_global_image_id
, m_threads
->work_queue
, m_threads
->timer
,
489 &m_threads
->timer_lock
, m_local_mirror_uuid
, m_remote_image
.mirror_uuid
,
490 m_remote_journaler
, &m_client_meta
, ctx
, &m_do_resync
, &m_progress_cxt
);
493 Mutex::Locker
locker(m_lock
);
495 m_bootstrap_request
= request
;
498 update_mirror_image_status(false, boost::none
);
499 reschedule_update_status_task(10);
504 template <typename I
>
505 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
506 dout(20) << "r=" << r
<< dendl
;
508 Mutex::Locker
locker(m_lock
);
509 m_bootstrap_request
->put();
510 m_bootstrap_request
= nullptr;
511 if (m_local_image_ctx
) {
512 m_local_image_id
= m_local_image_ctx
->id
;
516 if (r
== -EREMOTEIO
) {
517 m_local_image_tag_owner
= "";
518 dout(5) << "remote image is non-primary" << dendl
;
519 on_start_fail(-EREMOTEIO
, "remote image is non-primary");
521 } else if (r
== -EEXIST
) {
522 m_local_image_tag_owner
= "";
523 on_start_fail(r
, "split-brain detected");
526 on_start_fail(r
, "error bootstrapping replay");
528 } else if (on_start_interrupted()) {
532 assert(m_local_journal
== nullptr);
534 RWLock::RLocker
snap_locker(m_local_image_ctx
->snap_lock
);
535 if (m_local_image_ctx
->journal
!= nullptr) {
536 m_local_journal
= m_local_image_ctx
->journal
;
537 m_local_journal
->add_listener(m_journal_listener
);
541 if (m_local_journal
== nullptr) {
542 on_start_fail(-EINVAL
, "error accessing local journal");
547 Mutex::Locker
locker(m_lock
);
550 Context
*on_finish
= m_on_start_finish
;
551 m_stopping_for_resync
= true;
552 FunctionContext
*ctx
= new FunctionContext([this, on_finish
](int r
) {
555 on_finish
->complete(r
);
559 resync_image(on_finish
);
561 m_on_start_finish
= ctx
;
564 std::string name
= m_local_ioctx
.get_pool_name() + "/" +
565 m_local_image_ctx
->name
;
566 if (m_name
!= name
) {
569 // Re-register asok commands using the new name.
571 m_asok_hook
= nullptr;
575 dout(20) << "registered asok hook: " << m_name
<< dendl
;
576 m_asok_hook
= new ImageReplayerAdminSocketHook
<I
>(g_ceph_context
, m_name
,
581 update_mirror_image_status(false, boost::none
);
582 init_remote_journaler();
585 template <typename I
>
586 void ImageReplayer
<I
>::init_remote_journaler() {
589 Context
*ctx
= create_context_callback
<
590 ImageReplayer
, &ImageReplayer
<I
>::handle_init_remote_journaler
>(this);
591 m_remote_journaler
->init(ctx
);
594 template <typename I
>
595 void ImageReplayer
<I
>::handle_init_remote_journaler(int r
) {
596 dout(20) << "r=" << r
<< dendl
;
599 derr
<< "failed to initialize remote journal: " << cpp_strerror(r
) << dendl
;
600 on_start_fail(r
, "error initializing remote journal");
602 } else if (on_start_interrupted()) {
606 m_remote_journaler
->add_listener(&m_remote_listener
);
608 cls::journal::Client client
;
609 r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
611 derr
<< "error retrieving remote journal client: " << cpp_strerror(r
)
613 on_start_fail(r
, "error retrieving remote journal client");
617 if (client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
618 dout(5) << "client flagged disconnected, stopping image replay" << dendl
;
619 if (m_local_image_ctx
->mirroring_resync_after_disconnect
) {
620 Mutex::Locker
locker(m_lock
);
621 m_stopping_for_resync
= true;
623 on_start_fail(-ENOTCONN
, "disconnected");
630 template <typename I
>
631 void ImageReplayer
<I
>::start_replay() {
634 Context
*start_ctx
= create_context_callback
<
635 ImageReplayer
, &ImageReplayer
<I
>::handle_start_replay
>(this);
636 m_local_journal
->start_external_replay(&m_local_replay
, start_ctx
);
639 template <typename I
>
640 void ImageReplayer
<I
>::handle_start_replay(int r
) {
641 dout(20) << "r=" << r
<< dendl
;
644 assert(m_local_replay
== nullptr);
645 derr
<< "error starting external replay on local image "
646 << m_local_image_id
<< ": " << cpp_strerror(r
) << dendl
;
647 on_start_fail(r
, "error starting replay on local image");
651 Context
*on_finish(nullptr);
653 Mutex::Locker
locker(m_lock
);
654 assert(m_state
== STATE_STARTING
);
655 m_state
= STATE_REPLAYING
;
656 std::swap(m_on_start_finish
, on_finish
);
659 m_event_preprocessor
= EventPreprocessor
<I
>::create(
660 *m_local_image_ctx
, *m_remote_journaler
, m_local_mirror_uuid
,
661 &m_client_meta
, m_threads
->work_queue
);
662 m_replay_status_formatter
=
663 ReplayStatusFormatter
<I
>::create(m_remote_journaler
, m_local_mirror_uuid
);
665 update_mirror_image_status(true, boost::none
);
666 reschedule_update_status_task(30);
668 dout(20) << "start succeeded" << dendl
;
669 if (on_finish
!= nullptr) {
670 dout(20) << "on finish complete, r=" << r
<< dendl
;
671 on_finish
->complete(r
);
674 if (on_replay_interrupted()) {
679 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
680 double poll_seconds
= cct
->_conf
->rbd_mirror_journal_poll_age
;
682 Mutex::Locker
locker(m_lock
);
683 m_replay_handler
= new ReplayHandler
<I
>(this);
684 m_remote_journaler
->start_live_replay(m_replay_handler
, poll_seconds
);
686 dout(20) << "m_remote_journaler=" << *m_remote_journaler
<< dendl
;
691 template <typename I
>
692 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
694 dout(20) << "r=" << r
<< dendl
;
695 Context
*ctx
= new FunctionContext([this, r
, desc
](int _r
) {
697 Mutex::Locker
locker(m_lock
);
698 assert(m_state
== STATE_STARTING
);
699 m_state
= STATE_STOPPING
;
700 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
) {
701 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
703 dout(20) << "start canceled" << dendl
;
707 set_state_description(r
, desc
);
708 update_mirror_image_status(false, boost::none
);
709 reschedule_update_status_task(-1);
712 m_threads
->work_queue
->queue(ctx
, 0);
715 template <typename I
>
716 bool ImageReplayer
<I
>::on_start_interrupted()
718 Mutex::Locker
locker(m_lock
);
719 assert(m_state
== STATE_STARTING
);
720 if (m_on_stop_finish
== nullptr) {
724 on_start_fail(-ECANCELED
);
728 template <typename I
>
729 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, int r
,
730 const std::string
& desc
)
732 dout(20) << "on_finish=" << on_finish
<< ", manual=" << manual
733 << ", desc=" << desc
<< dendl
;
735 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
736 bool shut_down_replay
= false;
738 bool canceled_task
= false;
740 Mutex::Locker
locker(m_lock
);
742 if (!is_running_()) {
745 if (!is_stopped_()) {
746 if (m_state
== STATE_STARTING
) {
747 dout(20) << "canceling start" << dendl
;
748 if (m_bootstrap_request
) {
749 bootstrap_request
= m_bootstrap_request
;
750 bootstrap_request
->get();
753 dout(20) << "interrupting replay" << dendl
;
754 shut_down_replay
= true;
757 assert(m_on_stop_finish
== nullptr);
758 std::swap(m_on_stop_finish
, on_finish
);
759 m_stop_requested
= true;
760 m_manual_stop
= manual
;
762 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
763 if (m_delayed_preprocess_task
!= nullptr) {
764 canceled_task
= m_threads
->timer
->cancel_event(
765 m_delayed_preprocess_task
);
766 assert(canceled_task
);
767 m_delayed_preprocess_task
= nullptr;
773 // avoid holding lock since bootstrap request will update status
774 if (bootstrap_request
!= nullptr) {
775 bootstrap_request
->cancel();
776 bootstrap_request
->put();
780 m_event_replay_tracker
.finish_op();
781 on_replay_interrupted();
785 dout(20) << "not running" << dendl
;
787 on_finish
->complete(-EINVAL
);
792 if (shut_down_replay
) {
793 on_stop_journal_replay(r
, desc
);
794 } else if (on_finish
!= nullptr) {
795 on_finish
->complete(0);
799 template <typename I
>
800 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
802 dout(20) << "enter" << dendl
;
805 Mutex::Locker
locker(m_lock
);
806 if (m_state
!= STATE_REPLAYING
) {
807 // might be invoked multiple times while stopping
810 m_stop_requested
= true;
811 m_state
= STATE_STOPPING
;
814 set_state_description(r
, desc
);
815 update_mirror_image_status(false, boost::none
);
816 reschedule_update_status_task(-1);
820 template <typename I
>
821 void ImageReplayer
<I
>::handle_replay_ready()
823 dout(20) << "enter" << dendl
;
824 if (on_replay_interrupted()) {
828 if (!m_remote_journaler
->try_pop_front(&m_replay_entry
, &m_replay_tag_tid
)) {
832 m_event_replay_tracker
.start_op();
835 bool stopping
= (m_state
== STATE_STOPPING
);
839 dout(10) << "stopping event replay" << dendl
;
840 m_event_replay_tracker
.finish_op();
844 if (m_replay_tag_valid
&& m_replay_tag
.tid
== m_replay_tag_tid
) {
852 template <typename I
>
853 void ImageReplayer
<I
>::restart(Context
*on_finish
)
855 FunctionContext
*ctx
= new FunctionContext(
856 [this, on_finish
](int r
) {
860 start(on_finish
, true);
865 template <typename I
>
866 void ImageReplayer
<I
>::flush(Context
*on_finish
)
868 dout(20) << "enter" << dendl
;
871 Mutex::Locker
locker(m_lock
);
872 if (m_state
== STATE_REPLAYING
) {
873 Context
*ctx
= new FunctionContext(
875 if (on_finish
!= nullptr) {
876 on_finish
->complete(r
);
879 on_flush_local_replay_flush_start(ctx
);
885 on_finish
->complete(0);
889 template <typename I
>
890 void ImageReplayer
<I
>::on_flush_local_replay_flush_start(Context
*on_flush
)
892 dout(20) << "enter" << dendl
;
893 FunctionContext
*ctx
= new FunctionContext(
894 [this, on_flush
](int r
) {
895 on_flush_local_replay_flush_finish(on_flush
, r
);
898 assert(m_lock
.is_locked());
899 assert(m_state
== STATE_REPLAYING
);
900 m_local_replay
->flush(ctx
);
903 template <typename I
>
904 void ImageReplayer
<I
>::on_flush_local_replay_flush_finish(Context
*on_flush
,
907 dout(20) << "r=" << r
<< dendl
;
909 derr
<< "error flushing local replay: " << cpp_strerror(r
) << dendl
;
910 on_flush
->complete(r
);
914 on_flush_flush_commit_position_start(on_flush
);
917 template <typename I
>
918 void ImageReplayer
<I
>::on_flush_flush_commit_position_start(Context
*on_flush
)
920 FunctionContext
*ctx
= new FunctionContext(
921 [this, on_flush
](int r
) {
922 on_flush_flush_commit_position_finish(on_flush
, r
);
925 m_remote_journaler
->flush_commit_position(ctx
);
928 template <typename I
>
929 void ImageReplayer
<I
>::on_flush_flush_commit_position_finish(Context
*on_flush
,
933 derr
<< "error flushing remote journal commit position: "
934 << cpp_strerror(r
) << dendl
;
937 update_mirror_image_status(false, boost::none
);
939 dout(20) << "flush complete, r=" << r
<< dendl
;
940 on_flush
->complete(r
);
943 template <typename I
>
944 bool ImageReplayer
<I
>::on_replay_interrupted()
948 Mutex::Locker
locker(m_lock
);
949 shut_down
= m_stop_requested
;
953 on_stop_journal_replay();
958 template <typename I
>
959 void ImageReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
)
961 dout(20) << "enter" << dendl
;
963 Mutex::Locker
l(m_lock
);
966 f
->open_object_section("image_replayer");
967 f
->dump_string("name", m_name
);
968 f
->dump_string("state", to_string(m_state
));
972 *ss
<< m_name
<< ": state: " << to_string(m_state
);
976 template <typename I
>
977 void ImageReplayer
<I
>::handle_replay_complete(int r
, const std::string
&error_desc
)
979 dout(20) << "r=" << r
<< dendl
;
981 derr
<< "replay encountered an error: " << cpp_strerror(r
) << dendl
;
982 set_state_description(r
, error_desc
);
986 Mutex::Locker
locker(m_lock
);
987 m_stop_requested
= true;
989 on_replay_interrupted();
992 template <typename I
>
993 void ImageReplayer
<I
>::replay_flush() {
996 bool interrupted
= false;
998 Mutex::Locker
locker(m_lock
);
999 if (m_state
!= STATE_REPLAYING
) {
1000 dout(20) << "replay interrupted" << dendl
;
1003 m_state
= STATE_REPLAY_FLUSHING
;
1008 m_event_replay_tracker
.finish_op();
1012 // shut down the replay to flush all IO and ops and create a new
1013 // replayer to handle the new tag epoch
1014 Context
*ctx
= create_context_callback
<
1015 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_replay_flush
>(this);
1016 ctx
= new FunctionContext([this, ctx
](int r
) {
1017 m_local_image_ctx
->journal
->stop_external_replay();
1018 m_local_replay
= nullptr;
1025 m_local_journal
->start_external_replay(&m_local_replay
, ctx
);
1027 m_local_replay
->shut_down(false, ctx
);
1030 template <typename I
>
1031 void ImageReplayer
<I
>::handle_replay_flush(int r
) {
1032 dout(20) << "r=" << r
<< dendl
;
1035 Mutex::Locker
locker(m_lock
);
1036 assert(m_state
== STATE_REPLAY_FLUSHING
);
1037 m_state
= STATE_REPLAYING
;
1041 derr
<< "replay flush encountered an error: " << cpp_strerror(r
) << dendl
;
1042 m_event_replay_tracker
.finish_op();
1043 handle_replay_complete(r
, "replay flush encountered an error");
1045 } else if (on_replay_interrupted()) {
1046 m_event_replay_tracker
.finish_op();
1053 template <typename I
>
1054 void ImageReplayer
<I
>::get_remote_tag() {
1055 dout(20) << "tag_tid: " << m_replay_tag_tid
<< dendl
;
1057 Context
*ctx
= create_context_callback
<
1058 ImageReplayer
, &ImageReplayer
<I
>::handle_get_remote_tag
>(this);
1059 m_remote_journaler
->get_tag(m_replay_tag_tid
, &m_replay_tag
, ctx
);
1062 template <typename I
>
1063 void ImageReplayer
<I
>::handle_get_remote_tag(int r
) {
1064 dout(20) << "r=" << r
<< dendl
;
1068 bufferlist::iterator it
= m_replay_tag
.data
.begin();
1069 ::decode(m_replay_tag_data
, it
);
1070 } catch (const buffer::error
&err
) {
1076 derr
<< "failed to retrieve remote tag " << m_replay_tag_tid
<< ": "
1077 << cpp_strerror(r
) << dendl
;
1078 m_event_replay_tracker
.finish_op();
1079 handle_replay_complete(r
, "failed to retrieve remote tag");
1083 m_replay_tag_valid
= true;
1084 dout(20) << "decoded remote tag " << m_replay_tag_tid
<< ": "
1085 << m_replay_tag_data
<< dendl
;
1087 allocate_local_tag();
1090 template <typename I
>
1091 void ImageReplayer
<I
>::allocate_local_tag() {
1094 std::string mirror_uuid
= m_replay_tag_data
.mirror_uuid
;
1095 if (mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
||
1096 mirror_uuid
== m_local_mirror_uuid
) {
1097 mirror_uuid
= m_remote_image
.mirror_uuid
;
1098 } else if (mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
) {
1099 dout(5) << "encountered image demotion: stopping" << dendl
;
1100 Mutex::Locker
locker(m_lock
);
1101 m_stop_requested
= true;
1104 librbd::journal::TagPredecessor
predecessor(m_replay_tag_data
.predecessor
);
1105 if (predecessor
.mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
1106 predecessor
.mirror_uuid
= m_remote_image
.mirror_uuid
;
1107 } else if (predecessor
.mirror_uuid
== m_local_mirror_uuid
) {
1108 predecessor
.mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
1111 dout(20) << "mirror_uuid=" << mirror_uuid
<< ", "
1112 << "predecessor_mirror_uuid=" << predecessor
.mirror_uuid
<< ", "
1113 << "replay_tag_tid=" << m_replay_tag_tid
<< ", "
1114 << "replay_tag_data=" << m_replay_tag_data
<< dendl
;
1115 Context
*ctx
= create_context_callback
<
1116 ImageReplayer
, &ImageReplayer
<I
>::handle_allocate_local_tag
>(this);
1117 m_local_journal
->allocate_tag(mirror_uuid
, predecessor
, ctx
);
1120 template <typename I
>
1121 void ImageReplayer
<I
>::handle_allocate_local_tag(int r
) {
1122 dout(20) << "r=" << r
<< dendl
;
1125 derr
<< "failed to allocate journal tag: " << cpp_strerror(r
) << dendl
;
1126 m_event_replay_tracker
.finish_op();
1127 handle_replay_complete(r
, "failed to allocate journal tag");
1134 template <typename I
>
1135 void ImageReplayer
<I
>::preprocess_entry() {
1136 dout(20) << "preprocessing entry tid=" << m_replay_entry
.get_commit_tid()
1139 bufferlist data
= m_replay_entry
.get_data();
1140 bufferlist::iterator it
= data
.begin();
1141 int r
= m_local_replay
->decode(&it
, &m_event_entry
);
1143 derr
<< "failed to decode journal event" << dendl
;
1144 m_event_replay_tracker
.finish_op();
1145 handle_replay_complete(r
, "failed to decode journal event");
1149 uint32_t delay
= calculate_replay_delay(
1150 m_event_entry
.timestamp
, m_local_image_ctx
->mirroring_replay_delay
);
1152 handle_preprocess_entry_ready(0);
1156 dout(20) << "delaying replay by " << delay
<< " sec" << dendl
;
1158 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1159 assert(m_delayed_preprocess_task
== nullptr);
1160 m_delayed_preprocess_task
= new FunctionContext(
1162 assert(m_threads
->timer_lock
.is_locked());
1163 m_delayed_preprocess_task
= nullptr;
1164 m_threads
->work_queue
->queue(
1165 create_context_callback
<ImageReplayer
,
1166 &ImageReplayer
<I
>::handle_preprocess_entry_ready
>(this), 0);
1168 m_threads
->timer
->add_event_after(delay
, m_delayed_preprocess_task
);
1171 template <typename I
>
1172 void ImageReplayer
<I
>::handle_preprocess_entry_ready(int r
) {
1173 dout(20) << "r=" << r
<< dendl
;
1176 if (!m_event_preprocessor
->is_required(m_event_entry
)) {
1181 Context
*ctx
= create_context_callback
<
1182 ImageReplayer
, &ImageReplayer
<I
>::handle_preprocess_entry_safe
>(this);
1183 m_event_preprocessor
->preprocess(&m_event_entry
, ctx
);
1186 template <typename I
>
1187 void ImageReplayer
<I
>::handle_preprocess_entry_safe(int r
) {
1188 dout(20) << "r=" << r
<< dendl
;
1191 m_event_replay_tracker
.finish_op();
1193 if (r
== -ECANCELED
) {
1194 handle_replay_complete(0, "lost exclusive lock");
1196 derr
<< "failed to preprocess journal event" << dendl
;
1197 handle_replay_complete(r
, "failed to preprocess journal event");
1205 template <typename I
>
1206 void ImageReplayer
<I
>::process_entry() {
1207 dout(20) << "processing entry tid=" << m_replay_entry
.get_commit_tid()
1210 // stop replaying events if stop has been requested
1211 if (on_replay_interrupted()) {
1212 m_event_replay_tracker
.finish_op();
1216 Context
*on_ready
= create_context_callback
<
1217 ImageReplayer
, &ImageReplayer
<I
>::handle_process_entry_ready
>(this);
1218 Context
*on_commit
= new C_ReplayCommitted(this, std::move(m_replay_entry
));
1220 m_local_replay
->process(m_event_entry
, on_ready
, on_commit
);
1223 template <typename I
>
1224 void ImageReplayer
<I
>::handle_process_entry_ready(int r
) {
1228 // attempt to process the next event
1229 handle_replay_ready();
1232 template <typename I
>
1233 void ImageReplayer
<I
>::handle_process_entry_safe(const ReplayEntry
& replay_entry
,
1235 dout(20) << "commit_tid=" << replay_entry
.get_commit_tid() << ", r=" << r
1239 derr
<< "failed to commit journal event: " << cpp_strerror(r
) << dendl
;
1240 handle_replay_complete(r
, "failed to commit journal event");
1242 assert(m_remote_journaler
!= nullptr);
1243 m_remote_journaler
->committed(replay_entry
);
1245 m_event_replay_tracker
.finish_op();
1248 template <typename I
>
1249 bool ImageReplayer
<I
>::update_mirror_image_status(bool force
,
1250 const OptionalState
&state
) {
1253 Mutex::Locker
locker(m_lock
);
1254 if (!start_mirror_image_status_update(force
, false)) {
1259 queue_mirror_image_status_update(state
);
1263 template <typename I
>
1264 bool ImageReplayer
<I
>::start_mirror_image_status_update(bool force
,
1266 assert(m_lock
.is_locked());
1268 if (!force
&& !is_stopped_()) {
1269 if (!is_running_()) {
1270 dout(20) << "shut down in-progress: ignoring update" << dendl
;
1272 } else if (m_in_flight_status_updates
> (restarting
? 1 : 0)) {
1273 dout(20) << "already sending update" << dendl
;
1274 m_update_status_requested
= true;
1280 ++m_in_flight_status_updates
;
1284 template <typename I
>
1285 void ImageReplayer
<I
>::finish_mirror_image_status_update() {
1286 Context
*on_finish
= nullptr;
1288 Mutex::Locker
locker(m_lock
);
1289 assert(m_in_flight_status_updates
> 0);
1290 if (--m_in_flight_status_updates
> 0) {
1291 dout(20) << "waiting on " << m_in_flight_status_updates
<< " in-flight "
1292 << "updates" << dendl
;
1296 std::swap(on_finish
, m_on_update_status_finish
);
1300 if (on_finish
!= nullptr) {
1301 on_finish
->complete(0);
1305 template <typename I
>
1306 void ImageReplayer
<I
>::queue_mirror_image_status_update(const OptionalState
&state
) {
1308 FunctionContext
*ctx
= new FunctionContext(
1309 [this, state
](int r
) {
1310 send_mirror_status_update(state
);
1312 m_threads
->work_queue
->queue(ctx
, 0);
1315 template <typename I
>
1316 void ImageReplayer
<I
>::send_mirror_status_update(const OptionalState
&opt_state
) {
1318 std::string state_desc
;
1320 bool stopping_replay
;
1322 OptionalMirrorImageStatusState mirror_image_status_state
{
1323 boost::make_optional(false, cls::rbd::MirrorImageStatusState
{})};
1324 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
1326 Mutex::Locker
locker(m_lock
);
1328 state_desc
= m_state_desc
;
1329 mirror_image_status_state
= m_mirror_image_status_state
;
1331 stopping_replay
= (m_local_image_ctx
!= nullptr);
1333 if (m_bootstrap_request
!= nullptr) {
1334 bootstrap_request
= m_bootstrap_request
;
1335 bootstrap_request
->get();
1339 bool syncing
= false;
1340 if (bootstrap_request
!= nullptr) {
1341 syncing
= bootstrap_request
->is_syncing();
1342 bootstrap_request
->put();
1343 bootstrap_request
= nullptr;
1350 cls::rbd::MirrorImageStatus status
;
1353 case STATE_STARTING
:
1355 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
1356 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
1357 mirror_image_status_state
= status
.state
;
1359 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
1360 status
.description
= "starting replay";
1363 case STATE_REPLAYING
:
1364 case STATE_REPLAY_FLUSHING
:
1365 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
1367 Context
*on_req_finish
= new FunctionContext(
1369 dout(20) << "replay status ready: r=" << r
<< dendl
;
1371 send_mirror_status_update(boost::none
);
1372 } else if (r
== -EAGAIN
) {
1373 // decrement in-flight status update counter
1374 handle_mirror_status_update(r
);
1379 if (!m_replay_status_formatter
->get_or_send_update(&desc
,
1381 dout(20) << "waiting for replay status" << dendl
;
1384 status
.description
= "replaying, " + desc
;
1385 mirror_image_status_state
= boost::none
;
1388 case STATE_STOPPING
:
1389 if (stopping_replay
) {
1390 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
1391 status
.description
= "stopping replay";
1396 if (last_r
== -EREMOTEIO
) {
1397 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
1398 status
.description
= state_desc
;
1399 mirror_image_status_state
= status
.state
;
1400 } else if (last_r
< 0) {
1401 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
1402 status
.description
= state_desc
;
1403 mirror_image_status_state
= status
.state
;
1405 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
1406 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
1407 mirror_image_status_state
= boost::none
;
1411 assert(!"invalid state");
1415 Mutex::Locker
locker(m_lock
);
1416 m_mirror_image_status_state
= mirror_image_status_state
;
1419 // prevent the status from ping-ponging when failed replays are restarted
1420 if (mirror_image_status_state
&&
1421 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
1422 status
.state
= *mirror_image_status_state
;
1425 dout(20) << "status=" << status
<< dendl
;
1426 librados::ObjectWriteOperation op
;
1427 librbd::cls_client::mirror_image_status_set(&op
, m_global_image_id
, status
);
1429 librados::AioCompletion
*aio_comp
= create_rados_callback
<
1430 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_mirror_status_update
>(this);
1431 int r
= m_local_ioctx
.aio_operate(RBD_MIRRORING
, aio_comp
, &op
);
1433 aio_comp
->release();
1436 template <typename I
>
1437 void ImageReplayer
<I
>::handle_mirror_status_update(int r
) {
1438 dout(20) << "r=" << r
<< dendl
;
1440 bool running
= false;
1441 bool started
= false;
1443 Mutex::Locker
locker(m_lock
);
1444 bool update_status_requested
= false;
1445 std::swap(update_status_requested
, m_update_status_requested
);
1447 running
= is_running_();
1448 if (running
&& update_status_requested
) {
1449 started
= start_mirror_image_status_update(false, true);
1453 // if a deferred update is available, send it -- otherwise reschedule
1456 queue_mirror_image_status_update(boost::none
);
1457 } else if (running
) {
1458 reschedule_update_status_task();
1461 // mark committed status update as no longer in-flight
1462 finish_mirror_image_status_update();
1465 template <typename I
>
1466 void ImageReplayer
<I
>::reschedule_update_status_task(int new_interval
) {
1469 bool canceled_task
= false;
1471 Mutex::Locker
locker(m_lock
);
1472 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1474 if (m_update_status_task
) {
1475 canceled_task
= m_threads
->timer
->cancel_event(m_update_status_task
);
1476 m_update_status_task
= nullptr;
1479 if (new_interval
> 0) {
1480 m_update_status_interval
= new_interval
;
1483 bool restarting
= (new_interval
== 0 || canceled_task
);
1484 if (new_interval
>= 0 && is_running_() &&
1485 start_mirror_image_status_update(false, restarting
)) {
1486 m_update_status_task
= new FunctionContext(
1488 assert(m_threads
->timer_lock
.is_locked());
1489 m_update_status_task
= nullptr;
1491 queue_mirror_image_status_update(boost::none
);
1493 m_threads
->timer
->add_event_after(m_update_status_interval
,
1494 m_update_status_task
);
1498 if (canceled_task
) {
1499 dout(20) << "canceled task" << dendl
;
1500 finish_mirror_image_status_update();
1504 template <typename I
>
1505 void ImageReplayer
<I
>::shut_down(int r
) {
1506 dout(20) << "r=" << r
<< dendl
;
1508 Mutex::Locker
locker(m_lock
);
1509 assert(m_state
== STATE_STOPPING
);
1511 // if status updates are in-flight, wait for them to complete
1512 // before proceeding
1513 if (m_in_flight_status_updates
> 0) {
1514 if (m_on_update_status_finish
== nullptr) {
1515 dout(20) << "waiting for in-flight status update" << dendl
;
1516 m_on_update_status_finish
= new FunctionContext(
1525 // NOTE: it's important to ensure that the local image is fully
1526 // closed before attempting to close the remote journal in
1527 // case the remote cluster is unreachable
1529 // chain the shut down sequence (reverse order)
1530 Context
*ctx
= new FunctionContext(
1532 update_mirror_image_status(true, STATE_STOPPED
);
1533 handle_shut_down(r
);
1536 // close the remote journal
1537 if (m_remote_journaler
!= nullptr) {
1538 ctx
= new FunctionContext([this, ctx
](int r
) {
1539 delete m_remote_journaler
;
1540 m_remote_journaler
= nullptr;
1543 ctx
= new FunctionContext([this, ctx
](int r
) {
1544 m_remote_journaler
->remove_listener(&m_remote_listener
);
1545 m_remote_journaler
->shut_down(ctx
);
1547 if (m_stopping_for_resync
) {
1548 ctx
= new FunctionContext([this, ctx
](int r
) {
1549 m_remote_journaler
->unregister_client(ctx
);
1554 // stop the replay of remote journal events
1555 if (m_replay_handler
!= nullptr) {
1556 ctx
= new FunctionContext([this, ctx
](int r
) {
1557 delete m_replay_handler
;
1558 m_replay_handler
= nullptr;
1560 m_event_replay_tracker
.wait_for_ops(ctx
);
1562 ctx
= new FunctionContext([this, ctx
](int r
) {
1563 m_remote_journaler
->stop_replay(ctx
);
1567 // close the local image (release exclusive lock)
1568 if (m_local_image_ctx
) {
1569 ctx
= new FunctionContext([this, ctx
](int r
) {
1570 CloseImageRequest
<I
> *request
= CloseImageRequest
<I
>::create(
1571 &m_local_image_ctx
, ctx
);
1576 // shut down event replay into the local image
1577 if (m_local_journal
!= nullptr) {
1578 ctx
= new FunctionContext([this, ctx
](int r
) {
1579 m_local_journal
= nullptr;
1582 if (m_local_replay
!= nullptr) {
1583 ctx
= new FunctionContext([this, ctx
](int r
) {
1584 m_local_journal
->stop_external_replay();
1585 m_local_replay
= nullptr;
1587 EventPreprocessor
<I
>::destroy(m_event_preprocessor
);
1588 m_event_preprocessor
= nullptr;
1592 ctx
= new FunctionContext([this, ctx
](int r
) {
1593 // blocks if listener notification is in-progress
1594 m_local_journal
->remove_listener(m_journal_listener
);
1599 // wait for all local in-flight replay events to complete
1600 ctx
= new FunctionContext([this, ctx
](int r
) {
1602 derr
<< "error shutting down journal replay: " << cpp_strerror(r
)
1606 m_event_replay_tracker
.wait_for_ops(ctx
);
1609 // flush any local in-flight replay events
1610 if (m_local_replay
!= nullptr) {
1611 ctx
= new FunctionContext([this, ctx
](int r
) {
1612 m_local_replay
->shut_down(true, ctx
);
1616 m_threads
->work_queue
->queue(ctx
, 0);
1619 template <typename I
>
1620 void ImageReplayer
<I
>::handle_shut_down(int r
) {
1621 reschedule_update_status_task(-1);
1624 Mutex::Locker
locker(m_lock
);
1626 // if status updates are in-flight, wait for them to complete
1627 // before proceeding
1628 if (m_in_flight_status_updates
> 0) {
1629 if (m_on_update_status_finish
== nullptr) {
1630 dout(20) << "waiting for in-flight status update" << dendl
;
1631 m_on_update_status_finish
= new FunctionContext(
1633 handle_shut_down(r
);
1639 if (m_stopping_for_resync
) {
1640 m_image_deleter
->schedule_image_delete(m_local
,
1644 m_stopping_for_resync
= false;
1648 dout(20) << "stop complete" << dendl
;
1649 m_local_ioctx
.close();
1651 ReplayStatusFormatter
<I
>::destroy(m_replay_status_formatter
);
1652 m_replay_status_formatter
= nullptr;
1654 Context
*on_start
= nullptr;
1655 Context
*on_stop
= nullptr;
1657 Mutex::Locker
locker(m_lock
);
1658 std::swap(on_start
, m_on_start_finish
);
1659 std::swap(on_stop
, m_on_stop_finish
);
1660 m_stop_requested
= false;
1661 assert(m_delayed_preprocess_task
== nullptr);
1662 assert(m_state
== STATE_STOPPING
);
1663 m_state
= STATE_STOPPED
;
1666 if (on_start
!= nullptr) {
1667 dout(20) << "on start finish complete, r=" << r
<< dendl
;
1668 on_start
->complete(r
);
1671 if (on_stop
!= nullptr) {
1672 dout(20) << "on stop finish complete, r=" << r
<< dendl
;
1673 on_stop
->complete(r
);
1677 template <typename I
>
1678 void ImageReplayer
<I
>::handle_remote_journal_metadata_updated() {
1681 cls::journal::Client client
;
1683 Mutex::Locker
locker(m_lock
);
1684 if (!is_running_()) {
1688 int r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
1690 derr
<< "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
1695 if (client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
1696 dout(0) << "client flagged disconnected, stopping image replay" << dendl
;
1697 stop(nullptr, false, -ENOTCONN
, "disconnected");
1701 template <typename I
>
1702 std::string ImageReplayer
<I
>::to_string(const State state
) {
1704 case ImageReplayer
<I
>::STATE_STARTING
:
1706 case ImageReplayer
<I
>::STATE_REPLAYING
:
1708 case ImageReplayer
<I
>::STATE_REPLAY_FLUSHING
:
1709 return "ReplayFlushing";
1710 case ImageReplayer
<I
>::STATE_STOPPING
:
1712 case ImageReplayer
<I
>::STATE_STOPPED
:
1717 return "Unknown(" + stringify(state
) + ")";
1720 template <typename I
>
1721 void ImageReplayer
<I
>::resync_image(Context
*on_finish
) {
1725 Mutex::Locker
l(m_lock
);
1726 m_stopping_for_resync
= true;
1732 template <typename I
>
1733 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1735 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1736 << "/" << replayer
.get_global_image_id() << "]";
1740 } // namespace mirror
1743 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;