1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageDeleter.h"
24 #include "ImageReplayer.h"
26 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
41 using std::unique_ptr
;
42 using std::shared_ptr
;
48 using librbd::util::create_context_callback
;
49 using librbd::util::create_rados_callback
;
50 using namespace rbd::mirror::image_replayer
;
53 std::ostream
&operator<<(std::ostream
&os
,
54 const typename ImageReplayer
<I
>::State
&state
);
59 struct ReplayHandler
: public ::journal::ReplayHandler
{
60 ImageReplayer
<I
> *replayer
;
61 ReplayHandler(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
62 void get() override
{}
63 void put() override
{}
65 void handle_entries_available() override
{
66 replayer
->handle_replay_ready();
68 void handle_complete(int r
) override
{
71 ss
<< "replay completed with error: " << cpp_strerror(r
);
73 replayer
->handle_replay_complete(r
, ss
.str());
78 class ImageReplayerAdminSocketCommand
{
80 ImageReplayerAdminSocketCommand(const std::string
&desc
,
81 ImageReplayer
<I
> *replayer
)
82 : desc(desc
), replayer(replayer
) {
84 virtual ~ImageReplayerAdminSocketCommand() {}
85 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
88 ImageReplayer
<I
> *replayer
;
89 bool registered
= false;
93 class StatusCommand
: public ImageReplayerAdminSocketCommand
<I
> {
95 explicit StatusCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
96 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
99 bool call(Formatter
*f
, stringstream
*ss
) override
{
100 this->replayer
->print_status(f
, ss
);
105 template <typename I
>
106 class StartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
108 explicit StartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
109 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
112 bool call(Formatter
*f
, stringstream
*ss
) override
{
113 this->replayer
->start(nullptr, true);
118 template <typename I
>
119 class StopCommand
: public ImageReplayerAdminSocketCommand
<I
> {
121 explicit StopCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
122 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
125 bool call(Formatter
*f
, stringstream
*ss
) override
{
126 this->replayer
->stop(nullptr, true);
131 template <typename I
>
132 class RestartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
134 explicit RestartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
135 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
138 bool call(Formatter
*f
, stringstream
*ss
) override
{
139 this->replayer
->restart();
144 template <typename I
>
145 class FlushCommand
: public ImageReplayerAdminSocketCommand
<I
> {
147 explicit FlushCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
148 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
151 bool call(Formatter
*f
, stringstream
*ss
) override
{
153 this->replayer
->flush(&cond
);
156 *ss
<< "flush: " << cpp_strerror(r
);
163 template <typename I
>
164 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
166 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
167 ImageReplayer
<I
> *replayer
)
168 : admin_socket(cct
->get_admin_socket()),
169 commands
{{"rbd mirror flush " + name
,
170 new FlushCommand
<I
>("flush rbd mirror " + name
, replayer
)},
171 {"rbd mirror restart " + name
,
172 new RestartCommand
<I
>("restart rbd mirror " + name
, replayer
)},
173 {"rbd mirror start " + name
,
174 new StartCommand
<I
>("start rbd mirror " + name
, replayer
)},
175 {"rbd mirror status " + name
,
176 new StatusCommand
<I
>("get status for rbd mirror " + name
, replayer
)},
177 {"rbd mirror stop " + name
,
178 new StopCommand
<I
>("stop rbd mirror " + name
, replayer
)}} {
181 int register_commands() {
182 for (auto &it
: commands
) {
183 int r
= admin_socket
->register_command(it
.first
, it
.first
, this,
188 it
.second
->registered
= true;
193 ~ImageReplayerAdminSocketHook() override
{
194 for (auto &it
: commands
) {
195 if (it
.second
->registered
) {
196 admin_socket
->unregister_command(it
.first
);
203 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
204 bufferlist
& out
) override
{
205 auto i
= commands
.find(command
);
206 assert(i
!= commands
.end());
207 Formatter
*f
= Formatter::create(format
);
209 bool r
= i
->second
->call(f
, &ss
);
216 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
<I
> *> Commands
;
218 AdminSocket
*admin_socket
;
222 uint32_t calculate_replay_delay(const utime_t
&event_time
,
223 int mirroring_replay_delay
) {
224 if (mirroring_replay_delay
<= 0) {
228 utime_t now
= ceph_clock_now();
229 if (event_time
+ mirroring_replay_delay
<= now
) {
233 // ensure it is rounded up when converting to integer
234 return (event_time
+ mirroring_replay_delay
- now
) + 1;
237 } // anonymous namespace
239 template <typename I
>
240 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
241 const std::string
&description
, bool flush
)
243 const std::string desc
= "bootstrapping, " + description
;
244 replayer
->set_state_description(0, desc
);
246 replayer
->update_mirror_image_status(false, boost::none
);
250 template <typename I
>
251 void ImageReplayer
<I
>::RemoteJournalerListener::handle_update(
252 ::journal::JournalMetadata
*) {
253 FunctionContext
*ctx
= new FunctionContext([this](int r
) {
254 replayer
->handle_remote_journal_metadata_updated();
256 replayer
->m_threads
->work_queue
->queue(ctx
, 0);
259 template <typename I
>
260 ImageReplayer
<I
>::ImageReplayer(Threads
<I
> *threads
,
261 ImageDeleter
<I
>* image_deleter
,
262 InstanceWatcher
<I
> *instance_watcher
,
264 const std::string
&local_mirror_uuid
,
265 int64_t local_pool_id
,
266 const std::string
&global_image_id
) :
268 m_image_deleter(image_deleter
),
269 m_instance_watcher(instance_watcher
),
271 m_local_mirror_uuid(local_mirror_uuid
),
272 m_local_pool_id(local_pool_id
),
273 m_global_image_id(global_image_id
),
274 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id
) + " " +
276 m_progress_cxt(this),
277 m_journal_listener(new JournalListener(this)),
278 m_remote_listener(this)
280 // Register asok commands using a temporary "remote_pool_name/global_image_id"
281 // name. When the image name becomes known on start the asok commands will be
282 // re-registered using "remote_pool_name/remote_image_name" name.
284 std::string pool_name
;
285 int r
= m_local
->pool_reverse_lookup(m_local_pool_id
, &pool_name
);
287 derr
<< "error resolving local pool " << m_local_pool_id
288 << ": " << cpp_strerror(r
) << dendl
;
289 pool_name
= stringify(m_local_pool_id
);
292 m_name
= pool_name
+ "/" + m_global_image_id
;
293 register_admin_socket_hook();
296 template <typename I
>
297 ImageReplayer
<I
>::~ImageReplayer()
299 unregister_admin_socket_hook();
300 assert(m_event_preprocessor
== nullptr);
301 assert(m_replay_status_formatter
== nullptr);
302 assert(m_local_image_ctx
== nullptr);
303 assert(m_local_replay
== nullptr);
304 assert(m_remote_journaler
== nullptr);
305 assert(m_replay_handler
== nullptr);
306 assert(m_on_start_finish
== nullptr);
307 assert(m_on_stop_finish
== nullptr);
308 assert(m_bootstrap_request
== nullptr);
309 assert(m_in_flight_status_updates
== 0);
311 delete m_journal_listener
;
314 template <typename I
>
315 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
316 Mutex::Locker
locker(m_lock
);
318 if (!m_mirror_image_status_state
) {
319 return image_replayer::HEALTH_STATE_OK
;
320 } else if (*m_mirror_image_status_state
==
321 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
322 *m_mirror_image_status_state
==
323 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
324 return image_replayer::HEALTH_STATE_WARNING
;
326 return image_replayer::HEALTH_STATE_ERROR
;
329 template <typename I
>
330 void ImageReplayer
<I
>::add_peer(const std::string
&peer_uuid
,
331 librados::IoCtx
&io_ctx
) {
332 Mutex::Locker
locker(m_lock
);
333 auto it
= m_peers
.find({peer_uuid
});
334 if (it
== m_peers
.end()) {
335 m_peers
.insert({peer_uuid
, io_ctx
});
339 template <typename I
>
340 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
341 dout(20) << r
<< " " << desc
<< dendl
;
343 Mutex::Locker
l(m_lock
);
348 template <typename I
>
349 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
)
351 dout(20) << "on_finish=" << on_finish
<< dendl
;
355 Mutex::Locker
locker(m_lock
);
356 if (!is_stopped_()) {
357 derr
<< "already running" << dendl
;
359 } else if (m_manual_stop
&& !manual
) {
360 dout(5) << "stopped manually, ignoring start without manual flag"
364 m_state
= STATE_STARTING
;
366 m_state_desc
.clear();
367 m_manual_stop
= false;
368 m_delete_requested
= false;
370 if (on_finish
!= nullptr) {
371 assert(m_on_start_finish
== nullptr);
372 m_on_start_finish
= on_finish
;
374 assert(m_on_stop_finish
== nullptr);
380 on_finish
->complete(r
);
385 r
= m_local
->ioctx_create2(m_local_pool_id
, m_local_ioctx
);
387 derr
<< "error opening ioctx for local pool " << m_local_pool_id
388 << ": " << cpp_strerror(r
) << dendl
;
389 on_start_fail(r
, "error opening local pool");
396 template <typename I
>
397 void ImageReplayer
<I
>::wait_for_deletion() {
400 Context
*ctx
= create_context_callback
<
401 ImageReplayer
, &ImageReplayer
<I
>::handle_wait_for_deletion
>(this);
402 m_image_deleter
->wait_for_scheduled_deletion(
403 m_local_pool_id
, m_global_image_id
, ctx
, false);
406 template <typename I
>
407 void ImageReplayer
<I
>::handle_wait_for_deletion(int r
) {
408 dout(20) << "r=" << r
<< dendl
;
410 if (r
== -ECANCELED
) {
411 on_start_fail(0, "");
414 on_start_fail(r
, "error waiting for image deletion");
418 prepare_local_image();
421 template <typename I
>
422 void ImageReplayer
<I
>::prepare_local_image() {
425 m_local_image_id
= "";
426 Context
*ctx
= create_context_callback
<
427 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_local_image
>(this);
428 auto req
= PrepareLocalImageRequest
<I
>::create(
429 m_local_ioctx
, m_global_image_id
, &m_local_image_id
,
430 &m_local_image_tag_owner
, m_threads
->work_queue
, ctx
);
434 template <typename I
>
435 void ImageReplayer
<I
>::handle_prepare_local_image(int r
) {
436 dout(20) << "r=" << r
<< dendl
;
439 dout(20) << "local image does not exist" << dendl
;
441 on_start_fail(r
, "error preparing local image for replay");
443 } else if (m_local_image_tag_owner
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
444 dout(5) << "local image is primary" << dendl
;
445 on_start_fail(0, "local image is primary");
449 // local image doesn't exist or is non-primary
450 prepare_remote_image();
453 template <typename I
>
454 void ImageReplayer
<I
>::prepare_remote_image() {
457 // TODO need to support multiple remote images
458 assert(!m_peers
.empty());
459 m_remote_image
= {*m_peers
.begin()};
461 Context
*ctx
= create_context_callback
<
462 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_remote_image
>(this);
463 auto req
= PrepareRemoteImageRequest
<I
>::create(
464 m_remote_image
.io_ctx
, m_global_image_id
, &m_remote_image
.mirror_uuid
,
465 &m_remote_image
.image_id
, ctx
);
469 template <typename I
>
470 void ImageReplayer
<I
>::handle_prepare_remote_image(int r
) {
471 dout(20) << "r=" << r
<< dendl
;
474 dout(20) << "remote image does not exist" << dendl
;
476 // TODO need to support multiple remote images
477 if (!m_local_image_id
.empty() &&
478 m_local_image_tag_owner
== m_remote_image
.mirror_uuid
) {
479 // local image exists and is non-primary and linked to the missing
482 m_delete_requested
= true;
483 on_start_fail(0, "remote image no longer exists");
485 on_start_fail(-ENOENT
, "remote image does not exist");
489 on_start_fail(r
, "error retrieving remote image id");
496 template <typename I
>
497 void ImageReplayer
<I
>::bootstrap() {
500 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
501 journal::Settings settings
;
502 settings
.commit_interval
= cct
->_conf
->get_val
<double>(
503 "rbd_mirror_journal_commit_age");
504 settings
.max_fetch_bytes
= cct
->_conf
->get_val
<uint64_t>(
505 "rbd_mirror_journal_max_fetch_bytes");
507 m_remote_journaler
= new Journaler(m_threads
->work_queue
,
509 &m_threads
->timer_lock
,
510 m_remote_image
.io_ctx
,
511 m_remote_image
.image_id
,
512 m_local_mirror_uuid
, settings
);
514 Context
*ctx
= create_context_callback
<
515 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
517 BootstrapRequest
<I
> *request
= BootstrapRequest
<I
>::create(
518 m_local_ioctx
, m_remote_image
.io_ctx
, m_instance_watcher
,
519 &m_local_image_ctx
, m_local_image_id
, m_remote_image
.image_id
,
520 m_global_image_id
, m_threads
->work_queue
, m_threads
->timer
,
521 &m_threads
->timer_lock
, m_local_mirror_uuid
, m_remote_image
.mirror_uuid
,
522 m_remote_journaler
, &m_client_meta
, ctx
, &m_resync_requested
,
526 Mutex::Locker
locker(m_lock
);
528 m_bootstrap_request
= request
;
531 update_mirror_image_status(false, boost::none
);
532 reschedule_update_status_task(10);
537 template <typename I
>
538 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
539 dout(20) << "r=" << r
<< dendl
;
541 Mutex::Locker
locker(m_lock
);
542 m_bootstrap_request
->put();
543 m_bootstrap_request
= nullptr;
544 if (m_local_image_ctx
) {
545 m_local_image_id
= m_local_image_ctx
->id
;
549 if (r
== -EREMOTEIO
) {
550 m_local_image_tag_owner
= "";
551 dout(5) << "remote image is non-primary" << dendl
;
552 on_start_fail(-EREMOTEIO
, "remote image is non-primary");
554 } else if (r
== -EEXIST
) {
555 m_local_image_tag_owner
= "";
556 on_start_fail(r
, "split-brain detected");
559 on_start_fail(r
, "error bootstrapping replay");
561 } else if (on_start_interrupted()) {
563 } else if (m_resync_requested
) {
564 on_start_fail(0, "resync requested");
568 assert(m_local_journal
== nullptr);
570 RWLock::RLocker
snap_locker(m_local_image_ctx
->snap_lock
);
571 if (m_local_image_ctx
->journal
!= nullptr) {
572 m_local_journal
= m_local_image_ctx
->journal
;
573 m_local_journal
->add_listener(m_journal_listener
);
577 if (m_local_journal
== nullptr) {
578 on_start_fail(-EINVAL
, "error accessing local journal");
584 update_mirror_image_status(false, boost::none
);
585 init_remote_journaler();
588 template <typename I
>
589 void ImageReplayer
<I
>::init_remote_journaler() {
592 Context
*ctx
= create_context_callback
<
593 ImageReplayer
, &ImageReplayer
<I
>::handle_init_remote_journaler
>(this);
594 m_remote_journaler
->init(ctx
);
597 template <typename I
>
598 void ImageReplayer
<I
>::handle_init_remote_journaler(int r
) {
599 dout(20) << "r=" << r
<< dendl
;
602 derr
<< "failed to initialize remote journal: " << cpp_strerror(r
) << dendl
;
603 on_start_fail(r
, "error initializing remote journal");
605 } else if (on_start_interrupted()) {
609 m_remote_journaler
->add_listener(&m_remote_listener
);
611 cls::journal::Client client
;
612 r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
614 derr
<< "error retrieving remote journal client: " << cpp_strerror(r
)
616 on_start_fail(r
, "error retrieving remote journal client");
620 derr
<< "image_id=" << m_local_image_id
<< ", "
621 << "m_client_meta.image_id=" << m_client_meta
.image_id
<< ", "
622 << "client.state=" << client
.state
<< dendl
;
623 if (m_client_meta
.image_id
== m_local_image_id
&&
624 client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
625 dout(5) << "client flagged disconnected, stopping image replay" << dendl
;
626 if (m_local_image_ctx
->mirroring_resync_after_disconnect
) {
627 m_resync_requested
= true;
628 on_start_fail(-ENOTCONN
, "disconnected: automatic resync");
630 on_start_fail(-ENOTCONN
, "disconnected");
638 template <typename I
>
639 void ImageReplayer
<I
>::start_replay() {
642 Context
*start_ctx
= create_context_callback
<
643 ImageReplayer
, &ImageReplayer
<I
>::handle_start_replay
>(this);
644 m_local_journal
->start_external_replay(&m_local_replay
, start_ctx
);
647 template <typename I
>
648 void ImageReplayer
<I
>::handle_start_replay(int r
) {
649 dout(20) << "r=" << r
<< dendl
;
652 assert(m_local_replay
== nullptr);
653 derr
<< "error starting external replay on local image "
654 << m_local_image_id
<< ": " << cpp_strerror(r
) << dendl
;
655 on_start_fail(r
, "error starting replay on local image");
659 Context
*on_finish(nullptr);
661 Mutex::Locker
locker(m_lock
);
662 assert(m_state
== STATE_STARTING
);
663 m_state
= STATE_REPLAYING
;
664 std::swap(m_on_start_finish
, on_finish
);
667 m_event_preprocessor
= EventPreprocessor
<I
>::create(
668 *m_local_image_ctx
, *m_remote_journaler
, m_local_mirror_uuid
,
669 &m_client_meta
, m_threads
->work_queue
);
670 m_replay_status_formatter
=
671 ReplayStatusFormatter
<I
>::create(m_remote_journaler
, m_local_mirror_uuid
);
673 update_mirror_image_status(true, boost::none
);
674 reschedule_update_status_task(30);
676 if (on_replay_interrupted()) {
681 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
682 double poll_seconds
= cct
->_conf
->get_val
<double>(
683 "rbd_mirror_journal_poll_age");
685 Mutex::Locker
locker(m_lock
);
686 m_replay_handler
= new ReplayHandler
<I
>(this);
687 m_remote_journaler
->start_live_replay(m_replay_handler
, poll_seconds
);
689 dout(20) << "m_remote_journaler=" << *m_remote_journaler
<< dendl
;
692 dout(20) << "start succeeded" << dendl
;
693 if (on_finish
!= nullptr) {
694 dout(20) << "on finish complete, r=" << r
<< dendl
;
695 on_finish
->complete(r
);
699 template <typename I
>
700 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
702 dout(20) << "r=" << r
<< dendl
;
703 Context
*ctx
= new FunctionContext([this, r
, desc
](int _r
) {
705 Mutex::Locker
locker(m_lock
);
706 assert(m_state
== STATE_STARTING
);
707 m_state
= STATE_STOPPING
;
708 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
709 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
711 dout(20) << "start canceled" << dendl
;
715 set_state_description(r
, desc
);
716 update_mirror_image_status(false, boost::none
);
717 reschedule_update_status_task(-1);
720 m_threads
->work_queue
->queue(ctx
, 0);
723 template <typename I
>
724 bool ImageReplayer
<I
>::on_start_interrupted()
726 Mutex::Locker
locker(m_lock
);
727 assert(m_state
== STATE_STARTING
);
728 if (m_on_stop_finish
== nullptr) {
732 on_start_fail(-ECANCELED
);
736 template <typename I
>
737 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, int r
,
738 const std::string
& desc
)
740 dout(20) << "on_finish=" << on_finish
<< ", manual=" << manual
741 << ", desc=" << desc
<< dendl
;
743 m_image_deleter
->cancel_waiter(m_local_pool_id
, m_global_image_id
);
745 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
746 bool shut_down_replay
= false;
749 Mutex::Locker
locker(m_lock
);
751 if (!is_running_()) {
754 if (!is_stopped_()) {
755 if (m_state
== STATE_STARTING
) {
756 dout(20) << "canceling start" << dendl
;
757 if (m_bootstrap_request
) {
758 bootstrap_request
= m_bootstrap_request
;
759 bootstrap_request
->get();
762 dout(20) << "interrupting replay" << dendl
;
763 shut_down_replay
= true;
766 assert(m_on_stop_finish
== nullptr);
767 std::swap(m_on_stop_finish
, on_finish
);
768 m_stop_requested
= true;
769 m_manual_stop
= manual
;
774 // avoid holding lock since bootstrap request will update status
775 if (bootstrap_request
!= nullptr) {
776 bootstrap_request
->cancel();
777 bootstrap_request
->put();
781 dout(20) << "not running" << dendl
;
783 on_finish
->complete(-EINVAL
);
788 if (shut_down_replay
) {
789 on_stop_journal_replay(r
, desc
);
790 } else if (on_finish
!= nullptr) {
791 on_finish
->complete(0);
795 template <typename I
>
796 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
798 dout(20) << "enter" << dendl
;
801 Mutex::Locker
locker(m_lock
);
802 if (m_state
!= STATE_REPLAYING
) {
803 // might be invoked multiple times while stopping
806 m_stop_requested
= true;
807 m_state
= STATE_STOPPING
;
810 set_state_description(r
, desc
);
811 update_mirror_image_status(false, boost::none
);
812 reschedule_update_status_task(-1);
816 template <typename I
>
817 void ImageReplayer
<I
>::handle_replay_ready()
819 dout(20) << "enter" << dendl
;
820 if (on_replay_interrupted()) {
824 if (!m_remote_journaler
->try_pop_front(&m_replay_entry
, &m_replay_tag_tid
)) {
828 m_event_replay_tracker
.start_op();
831 bool stopping
= (m_state
== STATE_STOPPING
);
835 dout(10) << "stopping event replay" << dendl
;
836 m_event_replay_tracker
.finish_op();
840 if (m_replay_tag_valid
&& m_replay_tag
.tid
== m_replay_tag_tid
) {
848 template <typename I
>
849 void ImageReplayer
<I
>::restart(Context
*on_finish
)
851 FunctionContext
*ctx
= new FunctionContext(
852 [this, on_finish
](int r
) {
856 start(on_finish
, true);
861 template <typename I
>
862 void ImageReplayer
<I
>::flush(Context
*on_finish
)
864 dout(20) << "enter" << dendl
;
867 Mutex::Locker
locker(m_lock
);
868 if (m_state
== STATE_REPLAYING
) {
869 Context
*ctx
= new FunctionContext(
871 if (on_finish
!= nullptr) {
872 on_finish
->complete(r
);
875 on_flush_local_replay_flush_start(ctx
);
881 on_finish
->complete(0);
885 template <typename I
>
886 void ImageReplayer
<I
>::on_flush_local_replay_flush_start(Context
*on_flush
)
888 dout(20) << "enter" << dendl
;
889 FunctionContext
*ctx
= new FunctionContext(
890 [this, on_flush
](int r
) {
891 on_flush_local_replay_flush_finish(on_flush
, r
);
894 assert(m_lock
.is_locked());
895 assert(m_state
== STATE_REPLAYING
);
896 m_local_replay
->flush(ctx
);
899 template <typename I
>
900 void ImageReplayer
<I
>::on_flush_local_replay_flush_finish(Context
*on_flush
,
903 dout(20) << "r=" << r
<< dendl
;
905 derr
<< "error flushing local replay: " << cpp_strerror(r
) << dendl
;
906 on_flush
->complete(r
);
910 on_flush_flush_commit_position_start(on_flush
);
913 template <typename I
>
914 void ImageReplayer
<I
>::on_flush_flush_commit_position_start(Context
*on_flush
)
916 FunctionContext
*ctx
= new FunctionContext(
917 [this, on_flush
](int r
) {
918 on_flush_flush_commit_position_finish(on_flush
, r
);
921 m_remote_journaler
->flush_commit_position(ctx
);
924 template <typename I
>
925 void ImageReplayer
<I
>::on_flush_flush_commit_position_finish(Context
*on_flush
,
929 derr
<< "error flushing remote journal commit position: "
930 << cpp_strerror(r
) << dendl
;
933 update_mirror_image_status(false, boost::none
);
935 dout(20) << "flush complete, r=" << r
<< dendl
;
936 on_flush
->complete(r
);
939 template <typename I
>
940 bool ImageReplayer
<I
>::on_replay_interrupted()
944 Mutex::Locker
locker(m_lock
);
945 shut_down
= m_stop_requested
;
949 on_stop_journal_replay();
954 template <typename I
>
955 void ImageReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
)
957 dout(20) << "enter" << dendl
;
959 Mutex::Locker
l(m_lock
);
962 f
->open_object_section("image_replayer");
963 f
->dump_string("name", m_name
);
964 f
->dump_string("state", to_string(m_state
));
968 *ss
<< m_name
<< ": state: " << to_string(m_state
);
972 template <typename I
>
973 void ImageReplayer
<I
>::handle_replay_complete(int r
, const std::string
&error_desc
)
975 dout(20) << "r=" << r
<< dendl
;
977 derr
<< "replay encountered an error: " << cpp_strerror(r
) << dendl
;
978 set_state_description(r
, error_desc
);
982 Mutex::Locker
locker(m_lock
);
983 m_stop_requested
= true;
985 on_replay_interrupted();
988 template <typename I
>
989 void ImageReplayer
<I
>::replay_flush() {
992 bool interrupted
= false;
994 Mutex::Locker
locker(m_lock
);
995 if (m_state
!= STATE_REPLAYING
) {
996 dout(20) << "replay interrupted" << dendl
;
999 m_state
= STATE_REPLAY_FLUSHING
;
1004 m_event_replay_tracker
.finish_op();
1008 // shut down the replay to flush all IO and ops and create a new
1009 // replayer to handle the new tag epoch
1010 Context
*ctx
= create_context_callback
<
1011 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_replay_flush
>(this);
1012 ctx
= new FunctionContext([this, ctx
](int r
) {
1013 m_local_image_ctx
->journal
->stop_external_replay();
1014 m_local_replay
= nullptr;
1021 m_local_journal
->start_external_replay(&m_local_replay
, ctx
);
1023 m_local_replay
->shut_down(false, ctx
);
1026 template <typename I
>
1027 void ImageReplayer
<I
>::handle_replay_flush(int r
) {
1028 dout(20) << "r=" << r
<< dendl
;
1031 Mutex::Locker
locker(m_lock
);
1032 assert(m_state
== STATE_REPLAY_FLUSHING
);
1033 m_state
= STATE_REPLAYING
;
1037 derr
<< "replay flush encountered an error: " << cpp_strerror(r
) << dendl
;
1038 m_event_replay_tracker
.finish_op();
1039 handle_replay_complete(r
, "replay flush encountered an error");
1041 } else if (on_replay_interrupted()) {
1042 m_event_replay_tracker
.finish_op();
1049 template <typename I
>
1050 void ImageReplayer
<I
>::get_remote_tag() {
1051 dout(20) << "tag_tid: " << m_replay_tag_tid
<< dendl
;
1053 Context
*ctx
= create_context_callback
<
1054 ImageReplayer
, &ImageReplayer
<I
>::handle_get_remote_tag
>(this);
1055 m_remote_journaler
->get_tag(m_replay_tag_tid
, &m_replay_tag
, ctx
);
1058 template <typename I
>
1059 void ImageReplayer
<I
>::handle_get_remote_tag(int r
) {
1060 dout(20) << "r=" << r
<< dendl
;
1064 bufferlist::iterator it
= m_replay_tag
.data
.begin();
1065 ::decode(m_replay_tag_data
, it
);
1066 } catch (const buffer::error
&err
) {
1072 derr
<< "failed to retrieve remote tag " << m_replay_tag_tid
<< ": "
1073 << cpp_strerror(r
) << dendl
;
1074 m_event_replay_tracker
.finish_op();
1075 handle_replay_complete(r
, "failed to retrieve remote tag");
1079 m_replay_tag_valid
= true;
1080 dout(20) << "decoded remote tag " << m_replay_tag_tid
<< ": "
1081 << m_replay_tag_data
<< dendl
;
1083 allocate_local_tag();
1086 template <typename I
>
1087 void ImageReplayer
<I
>::allocate_local_tag() {
1090 std::string mirror_uuid
= m_replay_tag_data
.mirror_uuid
;
1091 if (mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
||
1092 mirror_uuid
== m_local_mirror_uuid
) {
1093 mirror_uuid
= m_remote_image
.mirror_uuid
;
1094 } else if (mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
) {
1095 dout(5) << "encountered image demotion: stopping" << dendl
;
1096 Mutex::Locker
locker(m_lock
);
1097 m_stop_requested
= true;
1100 librbd::journal::TagPredecessor
predecessor(m_replay_tag_data
.predecessor
);
1101 if (predecessor
.mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
1102 predecessor
.mirror_uuid
= m_remote_image
.mirror_uuid
;
1103 } else if (predecessor
.mirror_uuid
== m_local_mirror_uuid
) {
1104 predecessor
.mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
1107 dout(20) << "mirror_uuid=" << mirror_uuid
<< ", "
1108 << "predecessor_mirror_uuid=" << predecessor
.mirror_uuid
<< ", "
1109 << "replay_tag_tid=" << m_replay_tag_tid
<< ", "
1110 << "replay_tag_data=" << m_replay_tag_data
<< dendl
;
1111 Context
*ctx
= create_context_callback
<
1112 ImageReplayer
, &ImageReplayer
<I
>::handle_allocate_local_tag
>(this);
1113 m_local_journal
->allocate_tag(mirror_uuid
, predecessor
, ctx
);
1116 template <typename I
>
1117 void ImageReplayer
<I
>::handle_allocate_local_tag(int r
) {
1118 dout(20) << "r=" << r
<< dendl
;
1121 derr
<< "failed to allocate journal tag: " << cpp_strerror(r
) << dendl
;
1122 m_event_replay_tracker
.finish_op();
1123 handle_replay_complete(r
, "failed to allocate journal tag");
1130 template <typename I
>
1131 void ImageReplayer
<I
>::preprocess_entry() {
1132 dout(20) << "preprocessing entry tid=" << m_replay_entry
.get_commit_tid()
1135 bufferlist data
= m_replay_entry
.get_data();
1136 bufferlist::iterator it
= data
.begin();
1137 int r
= m_local_replay
->decode(&it
, &m_event_entry
);
1139 derr
<< "failed to decode journal event" << dendl
;
1140 m_event_replay_tracker
.finish_op();
1141 handle_replay_complete(r
, "failed to decode journal event");
1145 uint32_t delay
= calculate_replay_delay(
1146 m_event_entry
.timestamp
, m_local_image_ctx
->mirroring_replay_delay
);
1148 handle_preprocess_entry_ready(0);
1152 dout(20) << "delaying replay by " << delay
<< " sec" << dendl
;
1154 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1155 assert(m_delayed_preprocess_task
== nullptr);
1156 m_delayed_preprocess_task
= new FunctionContext(
1158 assert(m_threads
->timer_lock
.is_locked());
1159 m_delayed_preprocess_task
= nullptr;
1160 m_threads
->work_queue
->queue(
1161 create_context_callback
<ImageReplayer
,
1162 &ImageReplayer
<I
>::handle_preprocess_entry_ready
>(this), 0);
1164 m_threads
->timer
->add_event_after(delay
, m_delayed_preprocess_task
);
1167 template <typename I
>
1168 void ImageReplayer
<I
>::handle_preprocess_entry_ready(int r
) {
1169 dout(20) << "r=" << r
<< dendl
;
1172 if (!m_event_preprocessor
->is_required(m_event_entry
)) {
1177 Context
*ctx
= create_context_callback
<
1178 ImageReplayer
, &ImageReplayer
<I
>::handle_preprocess_entry_safe
>(this);
1179 m_event_preprocessor
->preprocess(&m_event_entry
, ctx
);
1182 template <typename I
>
1183 void ImageReplayer
<I
>::handle_preprocess_entry_safe(int r
) {
1184 dout(20) << "r=" << r
<< dendl
;
1187 m_event_replay_tracker
.finish_op();
1189 if (r
== -ECANCELED
) {
1190 handle_replay_complete(0, "lost exclusive lock");
1192 derr
<< "failed to preprocess journal event" << dendl
;
1193 handle_replay_complete(r
, "failed to preprocess journal event");
1201 template <typename I
>
1202 void ImageReplayer
<I
>::process_entry() {
1203 dout(20) << "processing entry tid=" << m_replay_entry
.get_commit_tid()
1206 // stop replaying events if stop has been requested
1207 if (on_replay_interrupted()) {
1208 m_event_replay_tracker
.finish_op();
1212 Context
*on_ready
= create_context_callback
<
1213 ImageReplayer
, &ImageReplayer
<I
>::handle_process_entry_ready
>(this);
1214 Context
*on_commit
= new C_ReplayCommitted(this, std::move(m_replay_entry
));
1216 m_local_replay
->process(m_event_entry
, on_ready
, on_commit
);
1219 template <typename I
>
1220 void ImageReplayer
<I
>::handle_process_entry_ready(int r
) {
1226 // attempt to process the next event
1227 handle_replay_ready();
1230 template <typename I
>
1231 void ImageReplayer
<I
>::handle_process_entry_safe(const ReplayEntry
& replay_entry
,
1233 dout(20) << "commit_tid=" << replay_entry
.get_commit_tid() << ", r=" << r
1237 derr
<< "failed to commit journal event: " << cpp_strerror(r
) << dendl
;
1238 handle_replay_complete(r
, "failed to commit journal event");
1240 assert(m_remote_journaler
!= nullptr);
1241 m_remote_journaler
->committed(replay_entry
);
1243 m_event_replay_tracker
.finish_op();
1246 template <typename I
>
1247 bool ImageReplayer
<I
>::update_mirror_image_status(bool force
,
1248 const OptionalState
&state
) {
1251 Mutex::Locker
locker(m_lock
);
1252 if (!start_mirror_image_status_update(force
, false)) {
1257 queue_mirror_image_status_update(state
);
1261 template <typename I
>
1262 bool ImageReplayer
<I
>::start_mirror_image_status_update(bool force
,
1264 assert(m_lock
.is_locked());
1266 if (!force
&& !is_stopped_()) {
1267 if (!is_running_()) {
1268 dout(20) << "shut down in-progress: ignoring update" << dendl
;
1270 } else if (m_in_flight_status_updates
> (restarting
? 1 : 0)) {
1271 dout(20) << "already sending update" << dendl
;
1272 m_update_status_requested
= true;
1278 ++m_in_flight_status_updates
;
1282 template <typename I
>
1283 void ImageReplayer
<I
>::finish_mirror_image_status_update() {
1284 Context
*on_finish
= nullptr;
1286 Mutex::Locker
locker(m_lock
);
1287 assert(m_in_flight_status_updates
> 0);
1288 if (--m_in_flight_status_updates
> 0) {
1289 dout(20) << "waiting on " << m_in_flight_status_updates
<< " in-flight "
1290 << "updates" << dendl
;
1294 std::swap(on_finish
, m_on_update_status_finish
);
1298 if (on_finish
!= nullptr) {
1299 on_finish
->complete(0);
1303 template <typename I
>
1304 void ImageReplayer
<I
>::queue_mirror_image_status_update(const OptionalState
&state
) {
1306 FunctionContext
*ctx
= new FunctionContext(
1307 [this, state
](int r
) {
1308 send_mirror_status_update(state
);
1310 m_threads
->work_queue
->queue(ctx
, 0);
1313 template <typename I
>
1314 void ImageReplayer
<I
>::send_mirror_status_update(const OptionalState
&opt_state
) {
1316 std::string state_desc
;
1318 bool stopping_replay
;
1320 OptionalMirrorImageStatusState mirror_image_status_state
{
1321 boost::make_optional(false, cls::rbd::MirrorImageStatusState
{})};
1322 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
1324 Mutex::Locker
locker(m_lock
);
1326 state_desc
= m_state_desc
;
1327 mirror_image_status_state
= m_mirror_image_status_state
;
1329 stopping_replay
= (m_local_image_ctx
!= nullptr);
1331 if (m_bootstrap_request
!= nullptr) {
1332 bootstrap_request
= m_bootstrap_request
;
1333 bootstrap_request
->get();
1337 bool syncing
= false;
1338 if (bootstrap_request
!= nullptr) {
1339 syncing
= bootstrap_request
->is_syncing();
1340 bootstrap_request
->put();
1341 bootstrap_request
= nullptr;
1348 cls::rbd::MirrorImageStatus status
;
1351 case STATE_STARTING
:
1353 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
1354 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
1355 mirror_image_status_state
= status
.state
;
1357 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
1358 status
.description
= "starting replay";
1361 case STATE_REPLAYING
:
1362 case STATE_REPLAY_FLUSHING
:
1363 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
1365 Context
*on_req_finish
= new FunctionContext(
1367 dout(20) << "replay status ready: r=" << r
<< dendl
;
1369 send_mirror_status_update(boost::none
);
1370 } else if (r
== -EAGAIN
) {
1371 // decrement in-flight status update counter
1372 handle_mirror_status_update(r
);
1377 if (!m_replay_status_formatter
->get_or_send_update(&desc
,
1379 dout(20) << "waiting for replay status" << dendl
;
1382 status
.description
= "replaying, " + desc
;
1383 mirror_image_status_state
= boost::none
;
1386 case STATE_STOPPING
:
1387 if (stopping_replay
) {
1388 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
1389 status
.description
= "stopping replay";
1394 if (last_r
== -EREMOTEIO
) {
1395 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
1396 status
.description
= state_desc
;
1397 mirror_image_status_state
= status
.state
;
1398 } else if (last_r
< 0) {
1399 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
1400 status
.description
= state_desc
;
1401 mirror_image_status_state
= status
.state
;
1403 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
1404 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
1405 mirror_image_status_state
= boost::none
;
1409 assert(!"invalid state");
1413 Mutex::Locker
locker(m_lock
);
1414 m_mirror_image_status_state
= mirror_image_status_state
;
1417 // prevent the status from ping-ponging when failed replays are restarted
1418 if (mirror_image_status_state
&&
1419 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
1420 status
.state
= *mirror_image_status_state
;
1423 dout(20) << "status=" << status
<< dendl
;
1424 librados::ObjectWriteOperation op
;
1425 librbd::cls_client::mirror_image_status_set(&op
, m_global_image_id
, status
);
1427 librados::AioCompletion
*aio_comp
= create_rados_callback
<
1428 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_mirror_status_update
>(this);
1429 int r
= m_local_ioctx
.aio_operate(RBD_MIRRORING
, aio_comp
, &op
);
1431 aio_comp
->release();
1434 template <typename I
>
1435 void ImageReplayer
<I
>::handle_mirror_status_update(int r
) {
1436 dout(20) << "r=" << r
<< dendl
;
1438 bool running
= false;
1439 bool started
= false;
1441 Mutex::Locker
locker(m_lock
);
1442 bool update_status_requested
= false;
1443 std::swap(update_status_requested
, m_update_status_requested
);
1445 running
= is_running_();
1446 if (running
&& update_status_requested
) {
1447 started
= start_mirror_image_status_update(false, true);
1451 // if a deferred update is available, send it -- otherwise reschedule
1454 queue_mirror_image_status_update(boost::none
);
1455 } else if (running
) {
1456 reschedule_update_status_task();
1459 // mark committed status update as no longer in-flight
1460 finish_mirror_image_status_update();
1463 template <typename I
>
1464 void ImageReplayer
<I
>::reschedule_update_status_task(int new_interval
) {
1467 bool canceled_task
= false;
1469 Mutex::Locker
locker(m_lock
);
1470 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1472 if (m_update_status_task
) {
1473 canceled_task
= m_threads
->timer
->cancel_event(m_update_status_task
);
1474 m_update_status_task
= nullptr;
1477 if (new_interval
> 0) {
1478 m_update_status_interval
= new_interval
;
1481 bool restarting
= (new_interval
== 0 || canceled_task
);
1482 if (new_interval
>= 0 && is_running_() &&
1483 start_mirror_image_status_update(false, restarting
)) {
1484 m_update_status_task
= new FunctionContext(
1486 assert(m_threads
->timer_lock
.is_locked());
1487 m_update_status_task
= nullptr;
1489 queue_mirror_image_status_update(boost::none
);
1491 m_threads
->timer
->add_event_after(m_update_status_interval
,
1492 m_update_status_task
);
1496 if (canceled_task
) {
1497 dout(20) << "canceled task" << dendl
;
1498 finish_mirror_image_status_update();
1502 template <typename I
>
1503 void ImageReplayer
<I
>::shut_down(int r
) {
1504 dout(20) << "r=" << r
<< dendl
;
1506 bool canceled_delayed_preprocess_task
= false;
1508 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1509 if (m_delayed_preprocess_task
!= nullptr) {
1510 canceled_delayed_preprocess_task
= m_threads
->timer
->cancel_event(
1511 m_delayed_preprocess_task
);
1512 assert(canceled_delayed_preprocess_task
);
1513 m_delayed_preprocess_task
= nullptr;
1516 if (canceled_delayed_preprocess_task
) {
1517 // wake up sleeping replay
1518 m_event_replay_tracker
.finish_op();
1522 Mutex::Locker
locker(m_lock
);
1523 assert(m_state
== STATE_STOPPING
);
1525 // if status updates are in-flight, wait for them to complete
1526 // before proceeding
1527 if (m_in_flight_status_updates
> 0) {
1528 if (m_on_update_status_finish
== nullptr) {
1529 dout(20) << "waiting for in-flight status update" << dendl
;
1530 m_on_update_status_finish
= new FunctionContext(
1539 // NOTE: it's important to ensure that the local image is fully
1540 // closed before attempting to close the remote journal in
1541 // case the remote cluster is unreachable
1543 // chain the shut down sequence (reverse order)
1544 Context
*ctx
= new FunctionContext(
1546 update_mirror_image_status(true, STATE_STOPPED
);
1547 handle_shut_down(r
);
1550 // close the remote journal
1551 if (m_remote_journaler
!= nullptr) {
1552 ctx
= new FunctionContext([this, ctx
](int r
) {
1553 delete m_remote_journaler
;
1554 m_remote_journaler
= nullptr;
1557 ctx
= new FunctionContext([this, ctx
](int r
) {
1558 m_remote_journaler
->remove_listener(&m_remote_listener
);
1559 m_remote_journaler
->shut_down(ctx
);
1563 // stop the replay of remote journal events
1564 if (m_replay_handler
!= nullptr) {
1565 ctx
= new FunctionContext([this, ctx
](int r
) {
1566 delete m_replay_handler
;
1567 m_replay_handler
= nullptr;
1569 m_event_replay_tracker
.wait_for_ops(ctx
);
1571 ctx
= new FunctionContext([this, ctx
](int r
) {
1572 m_remote_journaler
->stop_replay(ctx
);
1576 // close the local image (release exclusive lock)
1577 if (m_local_image_ctx
) {
1578 ctx
= new FunctionContext([this, ctx
](int r
) {
1579 CloseImageRequest
<I
> *request
= CloseImageRequest
<I
>::create(
1580 &m_local_image_ctx
, ctx
);
1585 // shut down event replay into the local image
1586 if (m_local_journal
!= nullptr) {
1587 ctx
= new FunctionContext([this, ctx
](int r
) {
1588 m_local_journal
= nullptr;
1591 if (m_local_replay
!= nullptr) {
1592 ctx
= new FunctionContext([this, ctx
](int r
) {
1593 m_local_journal
->stop_external_replay();
1594 m_local_replay
= nullptr;
1596 EventPreprocessor
<I
>::destroy(m_event_preprocessor
);
1597 m_event_preprocessor
= nullptr;
1601 ctx
= new FunctionContext([this, ctx
](int r
) {
1602 // blocks if listener notification is in-progress
1603 m_local_journal
->remove_listener(m_journal_listener
);
1608 // wait for all local in-flight replay events to complete
1609 ctx
= new FunctionContext([this, ctx
](int r
) {
1611 derr
<< "error shutting down journal replay: " << cpp_strerror(r
)
1615 m_event_replay_tracker
.wait_for_ops(ctx
);
1618 // flush any local in-flight replay events
1619 if (m_local_replay
!= nullptr) {
1620 ctx
= new FunctionContext([this, ctx
](int r
) {
1621 m_local_replay
->shut_down(true, ctx
);
1625 m_threads
->work_queue
->queue(ctx
, 0);
1628 template <typename I
>
1629 void ImageReplayer
<I
>::handle_shut_down(int r
) {
1630 reschedule_update_status_task(-1);
1632 bool unregister_asok_hook
= false;
1634 Mutex::Locker
locker(m_lock
);
1636 // if status updates are in-flight, wait for them to complete
1637 // before proceeding
1638 if (m_in_flight_status_updates
> 0) {
1639 if (m_on_update_status_finish
== nullptr) {
1640 dout(20) << "waiting for in-flight status update" << dendl
;
1641 m_on_update_status_finish
= new FunctionContext(
1643 handle_shut_down(r
);
1649 bool delete_requested
= false;
1650 if (m_delete_requested
&& !m_local_image_id
.empty()) {
1651 assert(m_remote_image
.image_id
.empty());
1652 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
1653 delete_requested
= true;
1655 if (delete_requested
|| m_resync_requested
) {
1656 m_image_deleter
->schedule_image_delete(m_local
,
1659 m_resync_requested
);
1661 m_local_image_id
= "";
1662 m_resync_requested
= false;
1663 if (m_delete_requested
) {
1664 unregister_asok_hook
= true;
1665 m_delete_requested
= false;
1667 } else if (m_last_r
== -ENOENT
&&
1668 m_local_image_id
.empty() && m_remote_image
.image_id
.empty()) {
1669 dout(0) << "mirror image no longer exists" << dendl
;
1670 unregister_asok_hook
= true;
1675 if (unregister_asok_hook
) {
1676 unregister_admin_socket_hook();
1679 dout(20) << "stop complete" << dendl
;
1680 m_local_ioctx
.close();
1682 ReplayStatusFormatter
<I
>::destroy(m_replay_status_formatter
);
1683 m_replay_status_formatter
= nullptr;
1685 Context
*on_start
= nullptr;
1686 Context
*on_stop
= nullptr;
1688 Mutex::Locker
locker(m_lock
);
1689 std::swap(on_start
, m_on_start_finish
);
1690 std::swap(on_stop
, m_on_stop_finish
);
1691 m_stop_requested
= false;
1692 assert(m_delayed_preprocess_task
== nullptr);
1693 assert(m_state
== STATE_STOPPING
);
1694 m_state
= STATE_STOPPED
;
1697 if (on_start
!= nullptr) {
1698 dout(20) << "on start finish complete, r=" << r
<< dendl
;
1699 on_start
->complete(r
);
1702 if (on_stop
!= nullptr) {
1703 dout(20) << "on stop finish complete, r=" << r
<< dendl
;
1704 on_stop
->complete(r
);
1708 template <typename I
>
1709 void ImageReplayer
<I
>::handle_remote_journal_metadata_updated() {
1712 cls::journal::Client client
;
1714 Mutex::Locker
locker(m_lock
);
1715 if (!is_running_()) {
1719 int r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
1721 derr
<< "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
1726 if (client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
1727 dout(0) << "client flagged disconnected, stopping image replay" << dendl
;
1728 stop(nullptr, false, -ENOTCONN
, "disconnected");
1732 template <typename I
>
1733 std::string ImageReplayer
<I
>::to_string(const State state
) {
1735 case ImageReplayer
<I
>::STATE_STARTING
:
1737 case ImageReplayer
<I
>::STATE_REPLAYING
:
1739 case ImageReplayer
<I
>::STATE_REPLAY_FLUSHING
:
1740 return "ReplayFlushing";
1741 case ImageReplayer
<I
>::STATE_STOPPING
:
1743 case ImageReplayer
<I
>::STATE_STOPPED
:
1748 return "Unknown(" + stringify(state
) + ")";
1751 template <typename I
>
1752 void ImageReplayer
<I
>::resync_image(Context
*on_finish
) {
1755 m_resync_requested
= true;
1759 template <typename I
>
1760 void ImageReplayer
<I
>::register_admin_socket_hook() {
1761 ImageReplayerAdminSocketHook
<I
> *asok_hook
;
1763 Mutex::Locker
locker(m_lock
);
1764 if (m_asok_hook
!= nullptr) {
1768 dout(20) << "registered asok hook: " << m_name
<< dendl
;
1769 asok_hook
= new ImageReplayerAdminSocketHook
<I
>(g_ceph_context
, m_name
,
1771 int r
= asok_hook
->register_commands();
1773 m_asok_hook
= asok_hook
;
1776 derr
<< "error registering admin socket commands" << dendl
;
1781 template <typename I
>
1782 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1785 AdminSocketHook
*asok_hook
= nullptr;
1787 Mutex::Locker
locker(m_lock
);
1788 std::swap(asok_hook
, m_asok_hook
);
1793 template <typename I
>
1794 void ImageReplayer
<I
>::on_name_changed() {
1796 Mutex::Locker
locker(m_lock
);
1797 std::string name
= m_local_ioctx
.get_pool_name() + "/" +
1798 m_local_image_ctx
->name
;
1799 if (m_name
== name
) {
1804 unregister_admin_socket_hook();
1805 register_admin_socket_hook();
1808 template <typename I
>
1809 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1811 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1812 << "/" << replayer
.get_global_image_id() << "]";
1816 } // namespace mirror
1819 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;