1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageDeleter.h"
24 #include "ImageReplayer.h"
26 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
41 using std::unique_ptr
;
42 using std::shared_ptr
;
48 using librbd::util::create_context_callback
;
49 using librbd::util::create_rados_callback
;
50 using namespace rbd::mirror::image_replayer
;
53 std::ostream
&operator<<(std::ostream
&os
,
54 const typename ImageReplayer
<I
>::State
&state
);
59 struct ReplayHandler
: public ::journal::ReplayHandler
{
60 ImageReplayer
<I
> *replayer
;
61 ReplayHandler(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
62 void get() override
{}
63 void put() override
{}
65 void handle_entries_available() override
{
66 replayer
->handle_replay_ready();
68 void handle_complete(int r
) override
{
71 ss
<< "replay completed with error: " << cpp_strerror(r
);
73 replayer
->handle_replay_complete(r
, ss
.str());
78 class ImageReplayerAdminSocketCommand
{
80 ImageReplayerAdminSocketCommand(const std::string
&desc
,
81 ImageReplayer
<I
> *replayer
)
82 : desc(desc
), replayer(replayer
) {
84 virtual ~ImageReplayerAdminSocketCommand() {}
85 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
88 ImageReplayer
<I
> *replayer
;
89 bool registered
= false;
93 class StatusCommand
: public ImageReplayerAdminSocketCommand
<I
> {
95 explicit StatusCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
96 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
99 bool call(Formatter
*f
, stringstream
*ss
) override
{
100 this->replayer
->print_status(f
, ss
);
105 template <typename I
>
106 class StartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
108 explicit StartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
109 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
112 bool call(Formatter
*f
, stringstream
*ss
) override
{
113 this->replayer
->start(nullptr, true);
118 template <typename I
>
119 class StopCommand
: public ImageReplayerAdminSocketCommand
<I
> {
121 explicit StopCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
122 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
125 bool call(Formatter
*f
, stringstream
*ss
) override
{
126 this->replayer
->stop(nullptr, true);
131 template <typename I
>
132 class RestartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
134 explicit RestartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
135 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
138 bool call(Formatter
*f
, stringstream
*ss
) override
{
139 this->replayer
->restart();
144 template <typename I
>
145 class FlushCommand
: public ImageReplayerAdminSocketCommand
<I
> {
147 explicit FlushCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
148 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
151 bool call(Formatter
*f
, stringstream
*ss
) override
{
152 this->replayer
->flush();
157 template <typename I
>
158 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
160 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
161 ImageReplayer
<I
> *replayer
)
162 : admin_socket(cct
->get_admin_socket()),
163 commands
{{"rbd mirror flush " + name
,
164 new FlushCommand
<I
>("flush rbd mirror " + name
, replayer
)},
165 {"rbd mirror restart " + name
,
166 new RestartCommand
<I
>("restart rbd mirror " + name
, replayer
)},
167 {"rbd mirror start " + name
,
168 new StartCommand
<I
>("start rbd mirror " + name
, replayer
)},
169 {"rbd mirror status " + name
,
170 new StatusCommand
<I
>("get status for rbd mirror " + name
, replayer
)},
171 {"rbd mirror stop " + name
,
172 new StopCommand
<I
>("stop rbd mirror " + name
, replayer
)}} {
175 int register_commands() {
176 for (auto &it
: commands
) {
177 int r
= admin_socket
->register_command(it
.first
, it
.first
, this,
182 it
.second
->registered
= true;
187 ~ImageReplayerAdminSocketHook() override
{
188 for (auto &it
: commands
) {
189 if (it
.second
->registered
) {
190 admin_socket
->unregister_command(it
.first
);
197 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
198 bufferlist
& out
) override
{
199 auto i
= commands
.find(command
);
200 assert(i
!= commands
.end());
201 Formatter
*f
= Formatter::create(format
);
203 bool r
= i
->second
->call(f
, &ss
);
210 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
<I
> *> Commands
;
212 AdminSocket
*admin_socket
;
216 uint32_t calculate_replay_delay(const utime_t
&event_time
,
217 int mirroring_replay_delay
) {
218 if (mirroring_replay_delay
<= 0) {
222 utime_t now
= ceph_clock_now();
223 if (event_time
+ mirroring_replay_delay
<= now
) {
227 // ensure it is rounded up when converting to integer
228 return (event_time
+ mirroring_replay_delay
- now
) + 1;
231 } // anonymous namespace
233 template <typename I
>
234 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
235 const std::string
&description
, bool flush
)
237 const std::string desc
= "bootstrapping, " + description
;
238 replayer
->set_state_description(0, desc
);
240 replayer
->update_mirror_image_status(false, boost::none
);
244 template <typename I
>
245 void ImageReplayer
<I
>::RemoteJournalerListener::handle_update(
246 ::journal::JournalMetadata
*) {
247 FunctionContext
*ctx
= new FunctionContext([this](int r
) {
248 replayer
->handle_remote_journal_metadata_updated();
250 replayer
->m_threads
->work_queue
->queue(ctx
, 0);
253 template <typename I
>
254 ImageReplayer
<I
>::ImageReplayer(Threads
<I
> *threads
,
255 ImageDeleter
<I
>* image_deleter
,
256 InstanceWatcher
<I
> *instance_watcher
,
258 const std::string
&local_mirror_uuid
,
259 int64_t local_pool_id
,
260 const std::string
&global_image_id
) :
262 m_image_deleter(image_deleter
),
263 m_instance_watcher(instance_watcher
),
265 m_local_mirror_uuid(local_mirror_uuid
),
266 m_local_pool_id(local_pool_id
),
267 m_global_image_id(global_image_id
), m_local_image_name(global_image_id
),
268 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id
) + " " +
270 m_progress_cxt(this),
271 m_journal_listener(new JournalListener(this)),
272 m_remote_listener(this)
274 // Register asok commands using a temporary "remote_pool_name/global_image_id"
275 // name. When the image name becomes known on start the asok commands will be
276 // re-registered using "remote_pool_name/remote_image_name" name.
278 std::string pool_name
;
279 int r
= m_local
->pool_reverse_lookup(m_local_pool_id
, &pool_name
);
281 derr
<< "error resolving local pool " << m_local_pool_id
282 << ": " << cpp_strerror(r
) << dendl
;
283 pool_name
= stringify(m_local_pool_id
);
286 m_name
= pool_name
+ "/" + m_global_image_id
;
287 register_admin_socket_hook();
290 template <typename I
>
291 ImageReplayer
<I
>::~ImageReplayer()
293 unregister_admin_socket_hook();
294 assert(m_event_preprocessor
== nullptr);
295 assert(m_replay_status_formatter
== nullptr);
296 assert(m_local_image_ctx
== nullptr);
297 assert(m_local_replay
== nullptr);
298 assert(m_remote_journaler
== nullptr);
299 assert(m_replay_handler
== nullptr);
300 assert(m_on_start_finish
== nullptr);
301 assert(m_on_stop_finish
== nullptr);
302 assert(m_bootstrap_request
== nullptr);
303 assert(m_in_flight_status_updates
== 0);
305 delete m_journal_listener
;
308 template <typename I
>
309 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
310 Mutex::Locker
locker(m_lock
);
312 if (!m_mirror_image_status_state
) {
313 return image_replayer::HEALTH_STATE_OK
;
314 } else if (*m_mirror_image_status_state
==
315 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
316 *m_mirror_image_status_state
==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
318 return image_replayer::HEALTH_STATE_WARNING
;
320 return image_replayer::HEALTH_STATE_ERROR
;
323 template <typename I
>
324 void ImageReplayer
<I
>::add_peer(const std::string
&peer_uuid
,
325 librados::IoCtx
&io_ctx
) {
326 Mutex::Locker
locker(m_lock
);
327 auto it
= m_peers
.find({peer_uuid
});
328 if (it
== m_peers
.end()) {
329 m_peers
.insert({peer_uuid
, io_ctx
});
333 template <typename I
>
334 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
335 dout(20) << r
<< " " << desc
<< dendl
;
337 Mutex::Locker
l(m_lock
);
342 template <typename I
>
343 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
)
345 dout(20) << "on_finish=" << on_finish
<< dendl
;
349 Mutex::Locker
locker(m_lock
);
350 if (!is_stopped_()) {
351 derr
<< "already running" << dendl
;
353 } else if (m_manual_stop
&& !manual
) {
354 dout(5) << "stopped manually, ignoring start without manual flag"
358 m_state
= STATE_STARTING
;
360 m_state_desc
.clear();
361 m_manual_stop
= false;
362 m_delete_requested
= false;
364 if (on_finish
!= nullptr) {
365 assert(m_on_start_finish
== nullptr);
366 m_on_start_finish
= on_finish
;
368 assert(m_on_stop_finish
== nullptr);
374 on_finish
->complete(r
);
379 r
= m_local
->ioctx_create2(m_local_pool_id
, m_local_ioctx
);
381 derr
<< "error opening ioctx for local pool " << m_local_pool_id
382 << ": " << cpp_strerror(r
) << dendl
;
383 on_start_fail(r
, "error opening local pool");
390 template <typename I
>
391 void ImageReplayer
<I
>::wait_for_deletion() {
394 Context
*ctx
= create_context_callback
<
395 ImageReplayer
, &ImageReplayer
<I
>::handle_wait_for_deletion
>(this);
396 m_image_deleter
->wait_for_scheduled_deletion(
397 m_local_pool_id
, m_global_image_id
, ctx
, false);
400 template <typename I
>
401 void ImageReplayer
<I
>::handle_wait_for_deletion(int r
) {
402 dout(20) << "r=" << r
<< dendl
;
404 if (r
== -ECANCELED
) {
405 on_start_fail(0, "");
408 on_start_fail(r
, "error waiting for image deletion");
412 prepare_local_image();
415 template <typename I
>
416 void ImageReplayer
<I
>::prepare_local_image() {
419 m_local_image_id
= "";
420 Context
*ctx
= create_context_callback
<
421 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_local_image
>(this);
422 auto req
= PrepareLocalImageRequest
<I
>::create(
423 m_local_ioctx
, m_global_image_id
, &m_local_image_id
, &m_local_image_name
,
424 &m_local_image_tag_owner
, m_threads
->work_queue
, ctx
);
428 template <typename I
>
429 void ImageReplayer
<I
>::handle_prepare_local_image(int r
) {
430 dout(20) << "r=" << r
<< dendl
;
433 dout(20) << "local image does not exist" << dendl
;
435 on_start_fail(r
, "error preparing local image for replay");
438 reregister_admin_socket_hook();
441 // local image doesn't exist or is non-primary
442 prepare_remote_image();
445 template <typename I
>
446 void ImageReplayer
<I
>::prepare_remote_image() {
448 if (m_peers
.empty()) {
449 // technically nothing to bootstrap, but it handles the status update
454 // TODO need to support multiple remote images
455 assert(!m_peers
.empty());
456 m_remote_image
= {*m_peers
.begin()};
458 Context
*ctx
= create_context_callback
<
459 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_remote_image
>(this);
460 auto req
= PrepareRemoteImageRequest
<I
>::create(
461 m_threads
, m_remote_image
.io_ctx
, m_global_image_id
, m_local_mirror_uuid
,
462 m_local_image_id
, &m_remote_image
.mirror_uuid
, &m_remote_image
.image_id
,
463 &m_remote_journaler
, &m_client_state
, &m_client_meta
, ctx
);
467 template <typename I
>
468 void ImageReplayer
<I
>::handle_prepare_remote_image(int r
) {
469 dout(20) << "r=" << r
<< dendl
;
471 assert(r
< 0 ? m_remote_journaler
== nullptr : m_remote_journaler
!= nullptr);
472 if (r
< 0 && !m_local_image_id
.empty() &&
473 m_local_image_tag_owner
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
474 // local image is primary -- fall-through
475 } else if (r
== -ENOENT
) {
476 dout(20) << "remote image does not exist" << dendl
;
478 // TODO need to support multiple remote images
479 if (!m_local_image_id
.empty() &&
480 m_local_image_tag_owner
== m_remote_image
.mirror_uuid
) {
481 // local image exists and is non-primary and linked to the missing
484 m_delete_requested
= true;
485 on_start_fail(0, "remote image no longer exists");
487 on_start_fail(-ENOENT
, "remote image does not exist");
491 on_start_fail(r
, "error retrieving remote image id");
498 template <typename I
>
499 void ImageReplayer
<I
>::bootstrap() {
502 if (!m_local_image_id
.empty() &&
503 m_local_image_tag_owner
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
504 dout(5) << "local image is primary" << dendl
;
505 on_start_fail(0, "local image is primary");
507 } else if (m_peers
.empty()) {
508 dout(5) << "no peer clusters" << dendl
;
509 on_start_fail(-ENOENT
, "no peer clusters");
513 Context
*ctx
= create_context_callback
<
514 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
516 BootstrapRequest
<I
> *request
= BootstrapRequest
<I
>::create(
517 m_local_ioctx
, m_remote_image
.io_ctx
, m_instance_watcher
,
518 &m_local_image_ctx
, m_local_image_id
, m_remote_image
.image_id
,
519 m_global_image_id
, m_threads
->work_queue
, m_threads
->timer
,
520 &m_threads
->timer_lock
, m_local_mirror_uuid
, m_remote_image
.mirror_uuid
,
521 m_remote_journaler
, &m_client_state
, &m_client_meta
, ctx
,
522 &m_resync_requested
, &m_progress_cxt
);
525 Mutex::Locker
locker(m_lock
);
527 m_bootstrap_request
= request
;
530 update_mirror_image_status(false, boost::none
);
531 reschedule_update_status_task(10);
536 template <typename I
>
537 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
538 dout(20) << "r=" << r
<< dendl
;
540 Mutex::Locker
locker(m_lock
);
541 m_bootstrap_request
->put();
542 m_bootstrap_request
= nullptr;
543 if (m_local_image_ctx
) {
544 m_local_image_id
= m_local_image_ctx
->id
;
548 if (r
== -EREMOTEIO
) {
549 m_local_image_tag_owner
= "";
550 dout(5) << "remote image is non-primary" << dendl
;
551 on_start_fail(-EREMOTEIO
, "remote image is non-primary");
553 } else if (r
== -EEXIST
) {
554 m_local_image_tag_owner
= "";
555 on_start_fail(r
, "split-brain detected");
558 on_start_fail(r
, "error bootstrapping replay");
560 } else if (on_start_interrupted()) {
562 } else if (m_resync_requested
) {
563 on_start_fail(0, "resync requested");
567 assert(m_local_journal
== nullptr);
569 RWLock::RLocker
snap_locker(m_local_image_ctx
->snap_lock
);
570 if (m_local_image_ctx
->journal
!= nullptr) {
571 m_local_journal
= m_local_image_ctx
->journal
;
572 m_local_journal
->add_listener(m_journal_listener
);
576 if (m_local_journal
== nullptr) {
577 on_start_fail(-EINVAL
, "error accessing local journal");
581 update_mirror_image_status(false, boost::none
);
582 init_remote_journaler();
585 template <typename I
>
586 void ImageReplayer
<I
>::init_remote_journaler() {
589 Context
*ctx
= create_context_callback
<
590 ImageReplayer
, &ImageReplayer
<I
>::handle_init_remote_journaler
>(this);
591 m_remote_journaler
->init(ctx
);
594 template <typename I
>
595 void ImageReplayer
<I
>::handle_init_remote_journaler(int r
) {
596 dout(20) << "r=" << r
<< dendl
;
599 derr
<< "failed to initialize remote journal: " << cpp_strerror(r
) << dendl
;
600 on_start_fail(r
, "error initializing remote journal");
602 } else if (on_start_interrupted()) {
606 m_remote_journaler
->add_listener(&m_remote_listener
);
608 cls::journal::Client client
;
609 r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
611 derr
<< "error retrieving remote journal client: " << cpp_strerror(r
)
613 on_start_fail(r
, "error retrieving remote journal client");
617 dout(5) << "image_id=" << m_local_image_id
<< ", "
618 << "client_meta.image_id=" << m_client_meta
.image_id
<< ", "
619 << "client.state=" << client
.state
<< dendl
;
620 if (m_client_meta
.image_id
== m_local_image_id
&&
621 client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
622 dout(5) << "client flagged disconnected, stopping image replay" << dendl
;
623 if (m_local_image_ctx
->mirroring_resync_after_disconnect
) {
624 m_resync_requested
= true;
625 on_start_fail(-ENOTCONN
, "disconnected: automatic resync");
627 on_start_fail(-ENOTCONN
, "disconnected");
635 template <typename I
>
636 void ImageReplayer
<I
>::start_replay() {
639 Context
*start_ctx
= create_context_callback
<
640 ImageReplayer
, &ImageReplayer
<I
>::handle_start_replay
>(this);
641 m_local_journal
->start_external_replay(&m_local_replay
, start_ctx
);
644 template <typename I
>
645 void ImageReplayer
<I
>::handle_start_replay(int r
) {
646 dout(20) << "r=" << r
<< dendl
;
649 assert(m_local_replay
== nullptr);
650 derr
<< "error starting external replay on local image "
651 << m_local_image_id
<< ": " << cpp_strerror(r
) << dendl
;
652 on_start_fail(r
, "error starting replay on local image");
656 Context
*on_finish(nullptr);
658 Mutex::Locker
locker(m_lock
);
659 assert(m_state
== STATE_STARTING
);
660 m_state
= STATE_REPLAYING
;
661 std::swap(m_on_start_finish
, on_finish
);
664 m_event_preprocessor
= EventPreprocessor
<I
>::create(
665 *m_local_image_ctx
, *m_remote_journaler
, m_local_mirror_uuid
,
666 &m_client_meta
, m_threads
->work_queue
);
667 m_replay_status_formatter
=
668 ReplayStatusFormatter
<I
>::create(m_remote_journaler
, m_local_mirror_uuid
);
670 update_mirror_image_status(true, boost::none
);
671 reschedule_update_status_task(30);
673 if (on_replay_interrupted()) {
678 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
679 double poll_seconds
= cct
->_conf
->get_val
<double>(
680 "rbd_mirror_journal_poll_age");
682 Mutex::Locker
locker(m_lock
);
683 m_replay_handler
= new ReplayHandler
<I
>(this);
684 m_remote_journaler
->start_live_replay(m_replay_handler
, poll_seconds
);
686 dout(20) << "m_remote_journaler=" << *m_remote_journaler
<< dendl
;
689 dout(20) << "start succeeded" << dendl
;
690 if (on_finish
!= nullptr) {
691 dout(20) << "on finish complete, r=" << r
<< dendl
;
692 on_finish
->complete(r
);
696 template <typename I
>
697 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
699 dout(20) << "r=" << r
<< dendl
;
700 Context
*ctx
= new FunctionContext([this, r
, desc
](int _r
) {
702 Mutex::Locker
locker(m_lock
);
703 assert(m_state
== STATE_STARTING
);
704 m_state
= STATE_STOPPING
;
705 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
706 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
708 dout(20) << "start canceled" << dendl
;
712 set_state_description(r
, desc
);
713 update_mirror_image_status(false, boost::none
);
714 reschedule_update_status_task(-1);
717 m_threads
->work_queue
->queue(ctx
, 0);
720 template <typename I
>
721 bool ImageReplayer
<I
>::on_start_interrupted()
723 Mutex::Locker
locker(m_lock
);
724 assert(m_state
== STATE_STARTING
);
725 if (m_on_stop_finish
== nullptr) {
729 on_start_fail(-ECANCELED
);
733 template <typename I
>
734 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, int r
,
735 const std::string
& desc
)
737 dout(20) << "on_finish=" << on_finish
<< ", manual=" << manual
738 << ", desc=" << desc
<< dendl
;
740 m_image_deleter
->cancel_waiter(m_local_pool_id
, m_global_image_id
);
742 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
743 bool shut_down_replay
= false;
746 Mutex::Locker
locker(m_lock
);
748 if (!is_running_()) {
751 if (!is_stopped_()) {
752 if (m_state
== STATE_STARTING
) {
753 dout(20) << "canceling start" << dendl
;
754 if (m_bootstrap_request
) {
755 bootstrap_request
= m_bootstrap_request
;
756 bootstrap_request
->get();
759 dout(20) << "interrupting replay" << dendl
;
760 shut_down_replay
= true;
763 assert(m_on_stop_finish
== nullptr);
764 std::swap(m_on_stop_finish
, on_finish
);
765 m_stop_requested
= true;
766 m_manual_stop
= manual
;
771 // avoid holding lock since bootstrap request will update status
772 if (bootstrap_request
!= nullptr) {
773 bootstrap_request
->cancel();
774 bootstrap_request
->put();
778 dout(20) << "not running" << dendl
;
780 on_finish
->complete(-EINVAL
);
785 if (shut_down_replay
) {
786 on_stop_journal_replay(r
, desc
);
787 } else if (on_finish
!= nullptr) {
788 on_finish
->complete(0);
792 template <typename I
>
793 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
795 dout(20) << "enter" << dendl
;
798 Mutex::Locker
locker(m_lock
);
799 if (m_state
!= STATE_REPLAYING
) {
800 // might be invoked multiple times while stopping
803 m_stop_requested
= true;
804 m_state
= STATE_STOPPING
;
807 set_state_description(r
, desc
);
808 update_mirror_image_status(false, boost::none
);
809 reschedule_update_status_task(-1);
813 template <typename I
>
814 void ImageReplayer
<I
>::handle_replay_ready()
816 dout(20) << "enter" << dendl
;
817 if (on_replay_interrupted()) {
821 if (!m_remote_journaler
->try_pop_front(&m_replay_entry
, &m_replay_tag_tid
)) {
825 m_event_replay_tracker
.start_op();
828 bool stopping
= (m_state
== STATE_STOPPING
);
832 dout(10) << "stopping event replay" << dendl
;
833 m_event_replay_tracker
.finish_op();
837 if (m_replay_tag_valid
&& m_replay_tag
.tid
== m_replay_tag_tid
) {
845 template <typename I
>
846 void ImageReplayer
<I
>::restart(Context
*on_finish
)
848 FunctionContext
*ctx
= new FunctionContext(
849 [this, on_finish
](int r
) {
853 start(on_finish
, true);
858 template <typename I
>
859 void ImageReplayer
<I
>::flush(Context
*on_finish
)
861 dout(20) << "enter" << dendl
;
864 Mutex::Locker
locker(m_lock
);
865 if (m_state
== STATE_REPLAYING
) {
866 Context
*ctx
= new FunctionContext(
868 if (on_finish
!= nullptr) {
869 on_finish
->complete(r
);
872 on_flush_local_replay_flush_start(ctx
);
878 on_finish
->complete(0);
882 template <typename I
>
883 void ImageReplayer
<I
>::on_flush_local_replay_flush_start(Context
*on_flush
)
885 dout(20) << "enter" << dendl
;
886 FunctionContext
*ctx
= new FunctionContext(
887 [this, on_flush
](int r
) {
888 on_flush_local_replay_flush_finish(on_flush
, r
);
891 assert(m_lock
.is_locked());
892 assert(m_state
== STATE_REPLAYING
);
893 m_local_replay
->flush(ctx
);
896 template <typename I
>
897 void ImageReplayer
<I
>::on_flush_local_replay_flush_finish(Context
*on_flush
,
900 dout(20) << "r=" << r
<< dendl
;
902 derr
<< "error flushing local replay: " << cpp_strerror(r
) << dendl
;
903 on_flush
->complete(r
);
907 on_flush_flush_commit_position_start(on_flush
);
910 template <typename I
>
911 void ImageReplayer
<I
>::on_flush_flush_commit_position_start(Context
*on_flush
)
913 FunctionContext
*ctx
= new FunctionContext(
914 [this, on_flush
](int r
) {
915 on_flush_flush_commit_position_finish(on_flush
, r
);
918 m_remote_journaler
->flush_commit_position(ctx
);
921 template <typename I
>
922 void ImageReplayer
<I
>::on_flush_flush_commit_position_finish(Context
*on_flush
,
926 derr
<< "error flushing remote journal commit position: "
927 << cpp_strerror(r
) << dendl
;
930 update_mirror_image_status(false, boost::none
);
932 dout(20) << "flush complete, r=" << r
<< dendl
;
933 on_flush
->complete(r
);
936 template <typename I
>
937 bool ImageReplayer
<I
>::on_replay_interrupted()
941 Mutex::Locker
locker(m_lock
);
942 shut_down
= m_stop_requested
;
946 on_stop_journal_replay();
951 template <typename I
>
952 void ImageReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
)
954 dout(20) << "enter" << dendl
;
956 Mutex::Locker
l(m_lock
);
959 f
->open_object_section("image_replayer");
960 f
->dump_string("name", m_name
);
961 f
->dump_string("state", to_string(m_state
));
965 *ss
<< m_name
<< ": state: " << to_string(m_state
);
969 template <typename I
>
970 void ImageReplayer
<I
>::handle_replay_complete(int r
, const std::string
&error_desc
)
972 dout(20) << "r=" << r
<< dendl
;
974 derr
<< "replay encountered an error: " << cpp_strerror(r
) << dendl
;
975 set_state_description(r
, error_desc
);
979 Mutex::Locker
locker(m_lock
);
980 m_stop_requested
= true;
982 on_replay_interrupted();
985 template <typename I
>
986 void ImageReplayer
<I
>::replay_flush() {
989 bool interrupted
= false;
991 Mutex::Locker
locker(m_lock
);
992 if (m_state
!= STATE_REPLAYING
) {
993 dout(20) << "replay interrupted" << dendl
;
996 m_state
= STATE_REPLAY_FLUSHING
;
1001 m_event_replay_tracker
.finish_op();
1005 // shut down the replay to flush all IO and ops and create a new
1006 // replayer to handle the new tag epoch
1007 Context
*ctx
= create_context_callback
<
1008 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_replay_flush
>(this);
1009 ctx
= new FunctionContext([this, ctx
](int r
) {
1010 m_local_image_ctx
->journal
->stop_external_replay();
1011 m_local_replay
= nullptr;
1018 m_local_journal
->start_external_replay(&m_local_replay
, ctx
);
1020 m_local_replay
->shut_down(false, ctx
);
1023 template <typename I
>
1024 void ImageReplayer
<I
>::handle_replay_flush(int r
) {
1025 dout(20) << "r=" << r
<< dendl
;
1028 Mutex::Locker
locker(m_lock
);
1029 assert(m_state
== STATE_REPLAY_FLUSHING
);
1030 m_state
= STATE_REPLAYING
;
1034 derr
<< "replay flush encountered an error: " << cpp_strerror(r
) << dendl
;
1035 m_event_replay_tracker
.finish_op();
1036 handle_replay_complete(r
, "replay flush encountered an error");
1038 } else if (on_replay_interrupted()) {
1039 m_event_replay_tracker
.finish_op();
1046 template <typename I
>
1047 void ImageReplayer
<I
>::get_remote_tag() {
1048 dout(20) << "tag_tid: " << m_replay_tag_tid
<< dendl
;
1050 Context
*ctx
= create_context_callback
<
1051 ImageReplayer
, &ImageReplayer
<I
>::handle_get_remote_tag
>(this);
1052 m_remote_journaler
->get_tag(m_replay_tag_tid
, &m_replay_tag
, ctx
);
1055 template <typename I
>
1056 void ImageReplayer
<I
>::handle_get_remote_tag(int r
) {
1057 dout(20) << "r=" << r
<< dendl
;
1061 bufferlist::iterator it
= m_replay_tag
.data
.begin();
1062 ::decode(m_replay_tag_data
, it
);
1063 } catch (const buffer::error
&err
) {
1069 derr
<< "failed to retrieve remote tag " << m_replay_tag_tid
<< ": "
1070 << cpp_strerror(r
) << dendl
;
1071 m_event_replay_tracker
.finish_op();
1072 handle_replay_complete(r
, "failed to retrieve remote tag");
1076 m_replay_tag_valid
= true;
1077 dout(20) << "decoded remote tag " << m_replay_tag_tid
<< ": "
1078 << m_replay_tag_data
<< dendl
;
1080 allocate_local_tag();
1083 template <typename I
>
1084 void ImageReplayer
<I
>::allocate_local_tag() {
1087 std::string mirror_uuid
= m_replay_tag_data
.mirror_uuid
;
1088 if (mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
1089 mirror_uuid
= m_remote_image
.mirror_uuid
;
1090 } else if (mirror_uuid
== m_local_mirror_uuid
) {
1091 mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
1092 } else if (mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
) {
1093 // handle possible edge condition where daemon can failover and
1094 // the local image has already been promoted/demoted
1095 auto local_tag_data
= m_local_journal
->get_tag_data();
1096 if (local_tag_data
.mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
&&
1097 (local_tag_data
.predecessor
.commit_valid
&&
1098 local_tag_data
.predecessor
.mirror_uuid
==
1099 librbd::Journal
<>::LOCAL_MIRROR_UUID
)) {
1100 dout(15) << "skipping stale demotion event" << dendl
;
1101 handle_process_entry_safe(m_replay_entry
, 0);
1102 handle_replay_ready();
1105 dout(5) << "encountered image demotion: stopping" << dendl
;
1106 Mutex::Locker
locker(m_lock
);
1107 m_stop_requested
= true;
1111 librbd::journal::TagPredecessor
predecessor(m_replay_tag_data
.predecessor
);
1112 if (predecessor
.mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
1113 predecessor
.mirror_uuid
= m_remote_image
.mirror_uuid
;
1114 } else if (predecessor
.mirror_uuid
== m_local_mirror_uuid
) {
1115 predecessor
.mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
1118 dout(15) << "mirror_uuid=" << mirror_uuid
<< ", "
1119 << "predecessor=" << predecessor
<< ", "
1120 << "replay_tag_tid=" << m_replay_tag_tid
<< dendl
;
1121 Context
*ctx
= create_context_callback
<
1122 ImageReplayer
, &ImageReplayer
<I
>::handle_allocate_local_tag
>(this);
1123 m_local_journal
->allocate_tag(mirror_uuid
, predecessor
, ctx
);
1126 template <typename I
>
1127 void ImageReplayer
<I
>::handle_allocate_local_tag(int r
) {
1128 dout(15) << "r=" << r
<< ", "
1129 << "tag_tid=" << m_local_journal
->get_tag_tid() << dendl
;
1132 derr
<< "failed to allocate journal tag: " << cpp_strerror(r
) << dendl
;
1133 m_event_replay_tracker
.finish_op();
1134 handle_replay_complete(r
, "failed to allocate journal tag");
1141 template <typename I
>
1142 void ImageReplayer
<I
>::preprocess_entry() {
1143 dout(20) << "preprocessing entry tid=" << m_replay_entry
.get_commit_tid()
1146 bufferlist data
= m_replay_entry
.get_data();
1147 bufferlist::iterator it
= data
.begin();
1148 int r
= m_local_replay
->decode(&it
, &m_event_entry
);
1150 derr
<< "failed to decode journal event" << dendl
;
1151 m_event_replay_tracker
.finish_op();
1152 handle_replay_complete(r
, "failed to decode journal event");
1156 uint32_t delay
= calculate_replay_delay(
1157 m_event_entry
.timestamp
, m_local_image_ctx
->mirroring_replay_delay
);
1159 handle_preprocess_entry_ready(0);
1163 dout(20) << "delaying replay by " << delay
<< " sec" << dendl
;
1165 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1166 assert(m_delayed_preprocess_task
== nullptr);
1167 m_delayed_preprocess_task
= new FunctionContext(
1169 assert(m_threads
->timer_lock
.is_locked());
1170 m_delayed_preprocess_task
= nullptr;
1171 m_threads
->work_queue
->queue(
1172 create_context_callback
<ImageReplayer
,
1173 &ImageReplayer
<I
>::handle_preprocess_entry_ready
>(this), 0);
1175 m_threads
->timer
->add_event_after(delay
, m_delayed_preprocess_task
);
1178 template <typename I
>
1179 void ImageReplayer
<I
>::handle_preprocess_entry_ready(int r
) {
1180 dout(20) << "r=" << r
<< dendl
;
1183 if (!m_event_preprocessor
->is_required(m_event_entry
)) {
1188 Context
*ctx
= create_context_callback
<
1189 ImageReplayer
, &ImageReplayer
<I
>::handle_preprocess_entry_safe
>(this);
1190 m_event_preprocessor
->preprocess(&m_event_entry
, ctx
);
1193 template <typename I
>
1194 void ImageReplayer
<I
>::handle_preprocess_entry_safe(int r
) {
1195 dout(20) << "r=" << r
<< dendl
;
1198 m_event_replay_tracker
.finish_op();
1200 if (r
== -ECANCELED
) {
1201 handle_replay_complete(0, "lost exclusive lock");
1203 derr
<< "failed to preprocess journal event" << dendl
;
1204 handle_replay_complete(r
, "failed to preprocess journal event");
1212 template <typename I
>
1213 void ImageReplayer
<I
>::process_entry() {
1214 dout(20) << "processing entry tid=" << m_replay_entry
.get_commit_tid()
1217 // stop replaying events if stop has been requested
1218 if (on_replay_interrupted()) {
1219 m_event_replay_tracker
.finish_op();
1223 Context
*on_ready
= create_context_callback
<
1224 ImageReplayer
, &ImageReplayer
<I
>::handle_process_entry_ready
>(this);
1225 Context
*on_commit
= new C_ReplayCommitted(this, std::move(m_replay_entry
));
1227 m_local_replay
->process(m_event_entry
, on_ready
, on_commit
);
1230 template <typename I
>
1231 void ImageReplayer
<I
>::handle_process_entry_ready(int r
) {
1235 bool update_status
= false;
1237 RWLock::RLocker
snap_locker(m_local_image_ctx
->snap_lock
);
1238 if (m_local_image_name
!= m_local_image_ctx
->name
) {
1239 m_local_image_name
= m_local_image_ctx
->name
;
1240 update_status
= true;
1244 if (update_status
) {
1245 reschedule_update_status_task(0);
1248 // attempt to process the next event
1249 handle_replay_ready();
1252 template <typename I
>
1253 void ImageReplayer
<I
>::handle_process_entry_safe(const ReplayEntry
& replay_entry
,
1255 dout(20) << "commit_tid=" << replay_entry
.get_commit_tid() << ", r=" << r
1259 derr
<< "failed to commit journal event: " << cpp_strerror(r
) << dendl
;
1260 handle_replay_complete(r
, "failed to commit journal event");
1262 assert(m_remote_journaler
!= nullptr);
1263 m_remote_journaler
->committed(replay_entry
);
1265 m_event_replay_tracker
.finish_op();
1268 template <typename I
>
1269 bool ImageReplayer
<I
>::update_mirror_image_status(bool force
,
1270 const OptionalState
&state
) {
1273 Mutex::Locker
locker(m_lock
);
1274 if (!start_mirror_image_status_update(force
, false)) {
1279 queue_mirror_image_status_update(state
);
1283 template <typename I
>
1284 bool ImageReplayer
<I
>::start_mirror_image_status_update(bool force
,
1286 assert(m_lock
.is_locked());
1288 if (!force
&& !is_stopped_()) {
1289 if (!is_running_()) {
1290 dout(20) << "shut down in-progress: ignoring update" << dendl
;
1292 } else if (m_in_flight_status_updates
> (restarting
? 1 : 0)) {
1293 dout(20) << "already sending update" << dendl
;
1294 m_update_status_requested
= true;
1300 ++m_in_flight_status_updates
;
1304 template <typename I
>
1305 void ImageReplayer
<I
>::finish_mirror_image_status_update() {
1306 reregister_admin_socket_hook();
1308 Context
*on_finish
= nullptr;
1310 Mutex::Locker
locker(m_lock
);
1311 assert(m_in_flight_status_updates
> 0);
1312 if (--m_in_flight_status_updates
> 0) {
1313 dout(20) << "waiting on " << m_in_flight_status_updates
<< " in-flight "
1314 << "updates" << dendl
;
1318 std::swap(on_finish
, m_on_update_status_finish
);
1322 if (on_finish
!= nullptr) {
1323 on_finish
->complete(0);
1327 template <typename I
>
1328 void ImageReplayer
<I
>::queue_mirror_image_status_update(const OptionalState
&state
) {
1330 FunctionContext
*ctx
= new FunctionContext(
1331 [this, state
](int r
) {
1332 send_mirror_status_update(state
);
1334 m_threads
->work_queue
->queue(ctx
, 0);
1337 template <typename I
>
1338 void ImageReplayer
<I
>::send_mirror_status_update(const OptionalState
&opt_state
) {
1340 std::string state_desc
;
1342 bool stopping_replay
;
1344 OptionalMirrorImageStatusState mirror_image_status_state
{
1345 boost::make_optional(false, cls::rbd::MirrorImageStatusState
{})};
1346 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
1348 Mutex::Locker
locker(m_lock
);
1350 state_desc
= m_state_desc
;
1351 mirror_image_status_state
= m_mirror_image_status_state
;
1353 stopping_replay
= (m_local_image_ctx
!= nullptr);
1355 if (m_bootstrap_request
!= nullptr) {
1356 bootstrap_request
= m_bootstrap_request
;
1357 bootstrap_request
->get();
1361 bool syncing
= false;
1362 if (bootstrap_request
!= nullptr) {
1363 syncing
= bootstrap_request
->is_syncing();
1364 bootstrap_request
->put();
1365 bootstrap_request
= nullptr;
1372 cls::rbd::MirrorImageStatus status
;
1375 case STATE_STARTING
:
1377 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
1378 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
1379 mirror_image_status_state
= status
.state
;
1381 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
1382 status
.description
= "starting replay";
1385 case STATE_REPLAYING
:
1386 case STATE_REPLAY_FLUSHING
:
1387 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
1389 Context
*on_req_finish
= new FunctionContext(
1391 dout(20) << "replay status ready: r=" << r
<< dendl
;
1393 send_mirror_status_update(boost::none
);
1394 } else if (r
== -EAGAIN
) {
1395 // decrement in-flight status update counter
1396 handle_mirror_status_update(r
);
1401 if (!m_replay_status_formatter
->get_or_send_update(&desc
,
1403 dout(20) << "waiting for replay status" << dendl
;
1406 status
.description
= "replaying, " + desc
;
1407 mirror_image_status_state
= boost::none
;
1410 case STATE_STOPPING
:
1411 if (stopping_replay
) {
1412 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
1413 status
.description
= "stopping replay";
1418 if (last_r
== -EREMOTEIO
) {
1419 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
1420 status
.description
= state_desc
;
1421 mirror_image_status_state
= status
.state
;
1422 } else if (last_r
< 0) {
1423 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
1424 status
.description
= state_desc
;
1425 mirror_image_status_state
= status
.state
;
1427 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
1428 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
1429 mirror_image_status_state
= boost::none
;
1433 assert(!"invalid state");
1437 Mutex::Locker
locker(m_lock
);
1438 m_mirror_image_status_state
= mirror_image_status_state
;
1441 // prevent the status from ping-ponging when failed replays are restarted
1442 if (mirror_image_status_state
&&
1443 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
1444 status
.state
= *mirror_image_status_state
;
1447 dout(20) << "status=" << status
<< dendl
;
1448 librados::ObjectWriteOperation op
;
1449 librbd::cls_client::mirror_image_status_set(&op
, m_global_image_id
, status
);
1451 librados::AioCompletion
*aio_comp
= create_rados_callback
<
1452 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_mirror_status_update
>(this);
1453 int r
= m_local_ioctx
.aio_operate(RBD_MIRRORING
, aio_comp
, &op
);
1455 aio_comp
->release();
1458 template <typename I
>
1459 void ImageReplayer
<I
>::handle_mirror_status_update(int r
) {
1460 dout(20) << "r=" << r
<< dendl
;
1462 bool running
= false;
1463 bool started
= false;
1465 Mutex::Locker
locker(m_lock
);
1466 bool update_status_requested
= false;
1467 std::swap(update_status_requested
, m_update_status_requested
);
1469 running
= is_running_();
1470 if (running
&& update_status_requested
) {
1471 started
= start_mirror_image_status_update(false, true);
1475 // if a deferred update is available, send it -- otherwise reschedule
1478 queue_mirror_image_status_update(boost::none
);
1479 } else if (running
) {
1480 reschedule_update_status_task();
1483 // mark committed status update as no longer in-flight
1484 finish_mirror_image_status_update();
1487 template <typename I
>
1488 void ImageReplayer
<I
>::reschedule_update_status_task(int new_interval
) {
1491 bool canceled_task
= false;
1493 Mutex::Locker
locker(m_lock
);
1494 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1496 if (m_update_status_task
) {
1497 canceled_task
= m_threads
->timer
->cancel_event(m_update_status_task
);
1498 m_update_status_task
= nullptr;
1501 if (new_interval
> 0) {
1502 m_update_status_interval
= new_interval
;
1505 bool restarting
= (new_interval
== 0 || canceled_task
);
1506 if (new_interval
>= 0 && is_running_() &&
1507 start_mirror_image_status_update(false, restarting
)) {
1508 m_update_status_task
= new FunctionContext(
1510 assert(m_threads
->timer_lock
.is_locked());
1511 m_update_status_task
= nullptr;
1513 queue_mirror_image_status_update(boost::none
);
1515 m_threads
->timer
->add_event_after(m_update_status_interval
,
1516 m_update_status_task
);
1520 if (canceled_task
) {
1521 dout(20) << "canceled task" << dendl
;
1522 finish_mirror_image_status_update();
1526 template <typename I
>
1527 void ImageReplayer
<I
>::shut_down(int r
) {
1528 dout(20) << "r=" << r
<< dendl
;
1530 bool canceled_delayed_preprocess_task
= false;
1532 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1533 if (m_delayed_preprocess_task
!= nullptr) {
1534 canceled_delayed_preprocess_task
= m_threads
->timer
->cancel_event(
1535 m_delayed_preprocess_task
);
1536 assert(canceled_delayed_preprocess_task
);
1537 m_delayed_preprocess_task
= nullptr;
1540 if (canceled_delayed_preprocess_task
) {
1541 // wake up sleeping replay
1542 m_event_replay_tracker
.finish_op();
1546 Mutex::Locker
locker(m_lock
);
1547 assert(m_state
== STATE_STOPPING
);
1549 // if status updates are in-flight, wait for them to complete
1550 // before proceeding
1551 if (m_in_flight_status_updates
> 0) {
1552 if (m_on_update_status_finish
== nullptr) {
1553 dout(20) << "waiting for in-flight status update" << dendl
;
1554 m_on_update_status_finish
= new FunctionContext(
1563 // NOTE: it's important to ensure that the local image is fully
1564 // closed before attempting to close the remote journal in
1565 // case the remote cluster is unreachable
1567 // chain the shut down sequence (reverse order)
1568 Context
*ctx
= new FunctionContext(
1570 update_mirror_image_status(true, STATE_STOPPED
);
1571 handle_shut_down(r
);
1574 // close the remote journal
1575 if (m_remote_journaler
!= nullptr) {
1576 ctx
= new FunctionContext([this, ctx
](int r
) {
1577 delete m_remote_journaler
;
1578 m_remote_journaler
= nullptr;
1581 ctx
= new FunctionContext([this, ctx
](int r
) {
1582 m_remote_journaler
->remove_listener(&m_remote_listener
);
1583 m_remote_journaler
->shut_down(ctx
);
1587 // stop the replay of remote journal events
1588 if (m_replay_handler
!= nullptr) {
1589 ctx
= new FunctionContext([this, ctx
](int r
) {
1590 delete m_replay_handler
;
1591 m_replay_handler
= nullptr;
1593 m_event_replay_tracker
.wait_for_ops(ctx
);
1595 ctx
= new FunctionContext([this, ctx
](int r
) {
1596 m_remote_journaler
->stop_replay(ctx
);
1600 // close the local image (release exclusive lock)
1601 if (m_local_image_ctx
) {
1602 ctx
= new FunctionContext([this, ctx
](int r
) {
1603 CloseImageRequest
<I
> *request
= CloseImageRequest
<I
>::create(
1604 &m_local_image_ctx
, ctx
);
1609 // shut down event replay into the local image
1610 if (m_local_journal
!= nullptr) {
1611 ctx
= new FunctionContext([this, ctx
](int r
) {
1612 m_local_journal
= nullptr;
1615 if (m_local_replay
!= nullptr) {
1616 ctx
= new FunctionContext([this, ctx
](int r
) {
1617 m_local_journal
->stop_external_replay();
1618 m_local_replay
= nullptr;
1620 EventPreprocessor
<I
>::destroy(m_event_preprocessor
);
1621 m_event_preprocessor
= nullptr;
1625 ctx
= new FunctionContext([this, ctx
](int r
) {
1626 // blocks if listener notification is in-progress
1627 m_local_journal
->remove_listener(m_journal_listener
);
1632 // wait for all local in-flight replay events to complete
1633 ctx
= new FunctionContext([this, ctx
](int r
) {
1635 derr
<< "error shutting down journal replay: " << cpp_strerror(r
)
1639 m_event_replay_tracker
.wait_for_ops(ctx
);
1642 // flush any local in-flight replay events
1643 if (m_local_replay
!= nullptr) {
1644 ctx
= new FunctionContext([this, ctx
](int r
) {
1645 m_local_replay
->shut_down(true, ctx
);
1649 m_threads
->work_queue
->queue(ctx
, 0);
1652 template <typename I
>
1653 void ImageReplayer
<I
>::handle_shut_down(int r
) {
1654 reschedule_update_status_task(-1);
1656 bool unregister_asok_hook
= false;
1658 Mutex::Locker
locker(m_lock
);
1660 // if status updates are in-flight, wait for them to complete
1661 // before proceeding
1662 if (m_in_flight_status_updates
> 0) {
1663 if (m_on_update_status_finish
== nullptr) {
1664 dout(20) << "waiting for in-flight status update" << dendl
;
1665 m_on_update_status_finish
= new FunctionContext(
1667 handle_shut_down(r
);
1673 bool delete_requested
= false;
1674 if (m_delete_requested
&& !m_local_image_id
.empty()) {
1675 assert(m_remote_image
.image_id
.empty());
1676 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
1677 delete_requested
= true;
1679 if (delete_requested
|| m_resync_requested
) {
1680 m_image_deleter
->schedule_image_delete(m_local
,
1683 m_resync_requested
);
1685 m_local_image_id
= "";
1686 m_resync_requested
= false;
1687 if (m_delete_requested
) {
1688 unregister_asok_hook
= true;
1689 m_delete_requested
= false;
1691 } else if (m_last_r
== -ENOENT
&&
1692 m_local_image_id
.empty() && m_remote_image
.image_id
.empty()) {
1693 dout(0) << "mirror image no longer exists" << dendl
;
1694 unregister_asok_hook
= true;
1699 if (unregister_asok_hook
) {
1700 unregister_admin_socket_hook();
1703 dout(20) << "stop complete" << dendl
;
1704 m_local_ioctx
.close();
1706 ReplayStatusFormatter
<I
>::destroy(m_replay_status_formatter
);
1707 m_replay_status_formatter
= nullptr;
1709 Context
*on_start
= nullptr;
1710 Context
*on_stop
= nullptr;
1712 Mutex::Locker
locker(m_lock
);
1713 std::swap(on_start
, m_on_start_finish
);
1714 std::swap(on_stop
, m_on_stop_finish
);
1715 m_stop_requested
= false;
1716 assert(m_delayed_preprocess_task
== nullptr);
1717 assert(m_state
== STATE_STOPPING
);
1718 m_state
= STATE_STOPPED
;
1721 if (on_start
!= nullptr) {
1722 dout(20) << "on start finish complete, r=" << r
<< dendl
;
1723 on_start
->complete(r
);
1726 if (on_stop
!= nullptr) {
1727 dout(20) << "on stop finish complete, r=" << r
<< dendl
;
1728 on_stop
->complete(r
);
1732 template <typename I
>
1733 void ImageReplayer
<I
>::handle_remote_journal_metadata_updated() {
1736 cls::journal::Client client
;
1738 Mutex::Locker
locker(m_lock
);
1739 if (!is_running_()) {
1743 int r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
1745 derr
<< "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
1750 if (client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
1751 dout(0) << "client flagged disconnected, stopping image replay" << dendl
;
1752 stop(nullptr, false, -ENOTCONN
, "disconnected");
1756 template <typename I
>
1757 std::string ImageReplayer
<I
>::to_string(const State state
) {
1759 case ImageReplayer
<I
>::STATE_STARTING
:
1761 case ImageReplayer
<I
>::STATE_REPLAYING
:
1763 case ImageReplayer
<I
>::STATE_REPLAY_FLUSHING
:
1764 return "ReplayFlushing";
1765 case ImageReplayer
<I
>::STATE_STOPPING
:
1767 case ImageReplayer
<I
>::STATE_STOPPED
:
1772 return "Unknown(" + stringify(state
) + ")";
1775 template <typename I
>
1776 void ImageReplayer
<I
>::resync_image(Context
*on_finish
) {
1779 m_resync_requested
= true;
1783 template <typename I
>
1784 void ImageReplayer
<I
>::register_admin_socket_hook() {
1785 ImageReplayerAdminSocketHook
<I
> *asok_hook
;
1787 Mutex::Locker
locker(m_lock
);
1788 if (m_asok_hook
!= nullptr) {
1792 dout(15) << "registered asok hook: " << m_name
<< dendl
;
1793 asok_hook
= new ImageReplayerAdminSocketHook
<I
>(g_ceph_context
, m_name
,
1795 int r
= asok_hook
->register_commands();
1797 m_asok_hook
= asok_hook
;
1800 derr
<< "error registering admin socket commands" << dendl
;
1805 template <typename I
>
1806 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1809 AdminSocketHook
*asok_hook
= nullptr;
1811 Mutex::Locker
locker(m_lock
);
1812 std::swap(asok_hook
, m_asok_hook
);
1817 template <typename I
>
1818 void ImageReplayer
<I
>::reregister_admin_socket_hook() {
1820 Mutex::Locker
locker(m_lock
);
1821 auto name
= m_local_ioctx
.get_pool_name() + "/" + m_local_image_name
;
1822 if (m_name
== name
) {
1827 unregister_admin_socket_hook();
1828 register_admin_socket_hook();
1831 template <typename I
>
1832 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1834 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1835 << "/" << replayer
.get_global_image_id() << "]";
1839 } // namespace mirror
1842 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;