1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "common/WorkQueue.h"
13 #include "global/global_context.h"
14 #include "journal/Journaler.h"
15 #include "librbd/ExclusiveLock.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/ImageState.h"
18 #include "librbd/Journal.h"
19 #include "librbd/Operations.h"
20 #include "librbd/Utils.h"
21 #include "ImageDeleter.h"
22 #include "ImageReplayer.h"
23 #include "MirrorStatusUpdater.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27 #include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28 #include "tools/rbd_mirror/image_replayer/Utils.h"
29 #include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
39 extern PerfCounters
*g_perf_counters
;
44 using librbd::util::create_context_callback
;
47 std::ostream
&operator<<(std::ostream
&os
,
48 const typename ImageReplayer
<I
>::State
&state
);
53 class ImageReplayerAdminSocketCommand
{
55 ImageReplayerAdminSocketCommand(const std::string
&desc
,
56 ImageReplayer
<I
> *replayer
)
57 : desc(desc
), replayer(replayer
) {
59 virtual ~ImageReplayerAdminSocketCommand() {}
60 virtual int call(Formatter
*f
) = 0;
63 ImageReplayer
<I
> *replayer
;
64 bool registered
= false;
68 class StatusCommand
: public ImageReplayerAdminSocketCommand
<I
> {
70 explicit StatusCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
71 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
74 int call(Formatter
*f
) override
{
75 this->replayer
->print_status(f
);
81 class StartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
83 explicit StartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
84 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
87 int call(Formatter
*f
) override
{
88 this->replayer
->start(nullptr, true);
94 class StopCommand
: public ImageReplayerAdminSocketCommand
<I
> {
96 explicit StopCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
97 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
100 int call(Formatter
*f
) override
{
101 this->replayer
->stop(nullptr, true);
106 template <typename I
>
107 class RestartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
109 explicit RestartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
110 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
113 int call(Formatter
*f
) override
{
114 this->replayer
->restart();
119 template <typename I
>
120 class FlushCommand
: public ImageReplayerAdminSocketCommand
<I
> {
122 explicit FlushCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
123 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
126 int call(Formatter
*f
) override
{
127 this->replayer
->flush();
132 template <typename I
>
133 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
135 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
136 ImageReplayer
<I
> *replayer
)
137 : admin_socket(cct
->get_admin_socket()),
138 commands
{{"rbd mirror flush " + name
,
139 new FlushCommand
<I
>("flush rbd mirror " + name
, replayer
)},
140 {"rbd mirror restart " + name
,
141 new RestartCommand
<I
>("restart rbd mirror " + name
, replayer
)},
142 {"rbd mirror start " + name
,
143 new StartCommand
<I
>("start rbd mirror " + name
, replayer
)},
144 {"rbd mirror status " + name
,
145 new StatusCommand
<I
>("get status for rbd mirror " + name
, replayer
)},
146 {"rbd mirror stop " + name
,
147 new StopCommand
<I
>("stop rbd mirror " + name
, replayer
)}} {
150 int register_commands() {
151 for (auto &it
: commands
) {
152 int r
= admin_socket
->register_command(it
.first
, this,
157 it
.second
->registered
= true;
162 ~ImageReplayerAdminSocketHook() override
{
163 admin_socket
->unregister_commands(this);
164 for (auto &it
: commands
) {
170 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
173 bufferlist
& out
) override
{
174 auto i
= commands
.find(command
);
175 ceph_assert(i
!= commands
.end());
176 return i
->second
->call(f
);
180 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
<I
>*,
181 std::less
<>> Commands
;
183 AdminSocket
*admin_socket
;
187 } // anonymous namespace
189 template <typename I
>
190 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
191 const std::string
&description
, bool flush
)
193 const std::string desc
= "bootstrapping, " + description
;
194 replayer
->set_state_description(0, desc
);
196 replayer
->update_mirror_image_status(false, boost::none
);
200 template <typename I
>
201 struct ImageReplayer
<I
>::ReplayerListener
202 : public image_replayer::ReplayerListener
{
203 ImageReplayer
<I
>* image_replayer
;
205 ReplayerListener(ImageReplayer
<I
>* image_replayer
)
206 : image_replayer(image_replayer
) {
209 void handle_notification() override
{
210 image_replayer
->handle_replayer_notification();
214 template <typename I
>
215 ImageReplayer
<I
>::ImageReplayer(
216 librados::IoCtx
&local_io_ctx
, const std::string
&local_mirror_uuid
,
217 const std::string
&global_image_id
, Threads
<I
> *threads
,
218 InstanceWatcher
<I
> *instance_watcher
,
219 MirrorStatusUpdater
<I
>* local_status_updater
,
220 journal::CacheManagerHandler
*cache_manager_handler
,
221 PoolMetaCache
* pool_meta_cache
) :
222 m_local_io_ctx(local_io_ctx
), m_local_mirror_uuid(local_mirror_uuid
),
223 m_global_image_id(global_image_id
), m_threads(threads
),
224 m_instance_watcher(instance_watcher
),
225 m_local_status_updater(local_status_updater
),
226 m_cache_manager_handler(cache_manager_handler
),
227 m_pool_meta_cache(pool_meta_cache
),
228 m_local_image_name(global_image_id
),
229 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
230 stringify(local_io_ctx
.get_id()) + " " + global_image_id
)),
231 m_progress_cxt(this),
232 m_replayer_listener(new ReplayerListener(this))
234 // Register asok commands using a temporary "remote_pool_name/global_image_id"
235 // name. When the image name becomes known on start the asok commands will be
236 // re-registered using "remote_pool_name/remote_image_name" name.
238 m_image_spec
= image_replayer::util::compute_image_spec(
239 local_io_ctx
, global_image_id
);
240 register_admin_socket_hook();
243 template <typename I
>
244 ImageReplayer
<I
>::~ImageReplayer()
246 unregister_admin_socket_hook();
247 ceph_assert(m_state_builder
== nullptr);
248 ceph_assert(m_on_start_finish
== nullptr);
249 ceph_assert(m_on_stop_finish
== nullptr);
250 ceph_assert(m_bootstrap_request
== nullptr);
251 delete m_replayer_listener
;
254 template <typename I
>
255 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
256 std::lock_guard locker
{m_lock
};
258 if (!m_mirror_image_status_state
) {
259 return image_replayer::HEALTH_STATE_OK
;
260 } else if (*m_mirror_image_status_state
==
261 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
262 *m_mirror_image_status_state
==
263 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
264 return image_replayer::HEALTH_STATE_WARNING
;
266 return image_replayer::HEALTH_STATE_ERROR
;
269 template <typename I
>
270 void ImageReplayer
<I
>::add_peer(const Peer
<I
>& peer
) {
271 dout(10) << "peer=" << peer
<< dendl
;
273 std::lock_guard locker
{m_lock
};
274 auto it
= m_peers
.find(peer
);
275 if (it
== m_peers
.end()) {
276 m_peers
.insert(peer
);
280 template <typename I
>
281 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
282 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
284 std::lock_guard l
{m_lock
};
289 template <typename I
>
290 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
)
292 dout(10) << "on_finish=" << on_finish
<< dendl
;
296 std::lock_guard locker
{m_lock
};
297 if (!is_stopped_()) {
298 derr
<< "already running" << dendl
;
300 } else if (m_manual_stop
&& !manual
) {
301 dout(5) << "stopped manually, ignoring start without manual flag"
305 m_state
= STATE_STARTING
;
307 m_state_desc
.clear();
308 m_manual_stop
= false;
309 m_delete_requested
= false;
311 if (on_finish
!= nullptr) {
312 ceph_assert(m_on_start_finish
== nullptr);
313 m_on_start_finish
= on_finish
;
315 ceph_assert(m_on_stop_finish
== nullptr);
321 on_finish
->complete(r
);
329 template <typename I
>
330 void ImageReplayer
<I
>::bootstrap() {
333 std::unique_lock locker
{m_lock
};
334 if (m_peers
.empty()) {
337 dout(5) << "no peer clusters" << dendl
;
338 on_start_fail(-ENOENT
, "no peer clusters");
342 // TODO need to support multiple remote images
343 ceph_assert(!m_peers
.empty());
344 m_remote_image_peer
= *m_peers
.begin();
346 if (on_start_interrupted(m_lock
)) {
350 ceph_assert(m_state_builder
== nullptr);
351 auto ctx
= create_context_callback
<
352 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
353 auto request
= image_replayer::BootstrapRequest
<I
>::create(
354 m_threads
, m_local_io_ctx
, m_remote_image_peer
.io_ctx
, m_instance_watcher
,
355 m_global_image_id
, m_local_mirror_uuid
,
356 m_remote_image_peer
.remote_pool_meta
, m_cache_manager_handler
,
357 m_pool_meta_cache
, &m_progress_cxt
, &m_state_builder
, &m_resync_requested
,
361 m_bootstrap_request
= request
;
364 update_mirror_image_status(false, boost::none
);
368 template <typename I
>
369 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
370 dout(10) << "r=" << r
<< dendl
;
372 std::lock_guard locker
{m_lock
};
373 m_bootstrap_request
->put();
374 m_bootstrap_request
= nullptr;
377 if (on_start_interrupted()) {
379 } else if (r
== -ENOMSG
) {
380 dout(5) << "local image is primary" << dendl
;
381 on_start_fail(0, "local image is primary");
383 } else if (r
== -EREMOTEIO
) {
384 dout(5) << "remote image is non-primary" << dendl
;
385 on_start_fail(-EREMOTEIO
, "remote image is non-primary");
387 } else if (r
== -EEXIST
) {
388 on_start_fail(r
, "split-brain detected");
390 } else if (r
== -ENOLINK
) {
391 m_delete_requested
= true;
392 on_start_fail(0, "remote image no longer exists");
395 on_start_fail(r
, "error bootstrapping replay");
397 } else if (m_resync_requested
) {
398 on_start_fail(0, "resync requested");
405 template <typename I
>
406 void ImageReplayer
<I
>::start_replay() {
409 std::unique_lock locker
{m_lock
};
410 ceph_assert(m_replayer
== nullptr);
411 m_replayer
= m_state_builder
->create_replayer(m_threads
, m_instance_watcher
,
414 m_replayer_listener
);
416 auto ctx
= create_context_callback
<
417 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_start_replay
>(this);
418 m_replayer
->init(ctx
);
421 template <typename I
>
422 void ImageReplayer
<I
>::handle_start_replay(int r
) {
423 dout(10) << "r=" << r
<< dendl
;
425 if (on_start_interrupted()) {
428 std::string error_description
= m_replayer
->get_error_description();
429 if (r
== -ENOTCONN
&& m_replayer
->is_resync_requested()) {
430 std::unique_lock locker
{m_lock
};
431 m_resync_requested
= true;
434 // shut down not required if init failed
435 m_replayer
->destroy();
436 m_replayer
= nullptr;
438 derr
<< "error starting replay: " << cpp_strerror(r
) << dendl
;
439 on_start_fail(r
, error_description
);
443 Context
*on_finish
= nullptr;
445 std::unique_lock locker
{m_lock
};
446 ceph_assert(m_state
== STATE_STARTING
);
447 m_state
= STATE_REPLAYING
;
448 std::swap(m_on_start_finish
, on_finish
);
451 update_mirror_image_status(true, boost::none
);
452 if (on_replay_interrupted()) {
453 if (on_finish
!= nullptr) {
454 on_finish
->complete(r
);
459 dout(10) << "start succeeded" << dendl
;
460 if (on_finish
!= nullptr) {
461 dout(10) << "on finish complete, r=" << r
<< dendl
;
462 on_finish
->complete(r
);
466 template <typename I
>
467 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
469 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
470 Context
*ctx
= new LambdaContext([this, r
, desc
](int _r
) {
472 std::lock_guard locker
{m_lock
};
473 ceph_assert(m_state
== STATE_STARTING
);
474 m_state
= STATE_STOPPING
;
475 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
476 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
478 dout(10) << "start canceled" << dendl
;
482 set_state_description(r
, desc
);
483 update_mirror_image_status(false, boost::none
);
486 m_threads
->work_queue
->queue(ctx
, 0);
489 template <typename I
>
490 bool ImageReplayer
<I
>::on_start_interrupted() {
491 std::lock_guard locker
{m_lock
};
492 return on_start_interrupted(m_lock
);
495 template <typename I
>
496 bool ImageReplayer
<I
>::on_start_interrupted(ceph::mutex
& lock
) {
497 ceph_assert(ceph_mutex_is_locked(m_lock
));
498 ceph_assert(m_state
== STATE_STARTING
);
499 if (!m_stop_requested
) {
503 on_start_fail(-ECANCELED
, "");
507 template <typename I
>
508 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, int r
,
509 const std::string
& desc
)
511 dout(10) << "on_finish=" << on_finish
<< ", manual=" << manual
512 << ", desc=" << desc
<< dendl
;
514 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
515 bool shut_down_replay
= false;
518 std::lock_guard locker
{m_lock
};
520 if (!is_running_()) {
523 if (!is_stopped_()) {
524 if (m_state
== STATE_STARTING
) {
525 dout(10) << "canceling start" << dendl
;
526 if (m_bootstrap_request
!= nullptr) {
527 bootstrap_request
= m_bootstrap_request
;
528 bootstrap_request
->get();
531 dout(10) << "interrupting replay" << dendl
;
532 shut_down_replay
= true;
535 ceph_assert(m_on_stop_finish
== nullptr);
536 std::swap(m_on_stop_finish
, on_finish
);
537 m_stop_requested
= true;
538 m_manual_stop
= manual
;
543 // avoid holding lock since bootstrap request will update status
544 if (bootstrap_request
!= nullptr) {
545 dout(10) << "canceling bootstrap" << dendl
;
546 bootstrap_request
->cancel();
547 bootstrap_request
->put();
551 dout(20) << "not running" << dendl
;
553 on_finish
->complete(-EINVAL
);
558 if (shut_down_replay
) {
559 on_stop_journal_replay(r
, desc
);
560 } else if (on_finish
!= nullptr) {
561 on_finish
->complete(0);
565 template <typename I
>
566 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
571 std::lock_guard locker
{m_lock
};
572 if (m_state
!= STATE_REPLAYING
) {
573 // might be invoked multiple times while stopping
577 m_stop_requested
= true;
578 m_state
= STATE_STOPPING
;
581 set_state_description(r
, desc
);
582 update_mirror_image_status(true, boost::none
);
586 template <typename I
>
587 void ImageReplayer
<I
>::restart(Context
*on_finish
)
589 auto ctx
= new LambdaContext(
590 [this, on_finish
](int r
) {
594 start(on_finish
, true);
599 template <typename I
>
600 void ImageReplayer
<I
>::flush()
605 std::unique_lock locker
{m_lock
};
606 if (m_state
!= STATE_REPLAYING
) {
611 ceph_assert(m_replayer
!= nullptr);
612 m_replayer
->flush(&ctx
);
617 update_mirror_image_status(false, boost::none
);
621 template <typename I
>
622 bool ImageReplayer
<I
>::on_replay_interrupted()
626 std::lock_guard locker
{m_lock
};
627 shut_down
= m_stop_requested
;
631 on_stop_journal_replay();
636 template <typename I
>
637 void ImageReplayer
<I
>::print_status(Formatter
*f
)
641 std::lock_guard l
{m_lock
};
643 f
->open_object_section("image_replayer");
644 f
->dump_string("name", m_image_spec
);
645 f
->dump_string("state", to_string(m_state
));
649 template <typename I
>
650 void ImageReplayer
<I
>::update_mirror_image_status(
651 bool force
, const OptionalState
&opt_state
) {
652 dout(15) << "force=" << force
<< ", "
653 << "state=" << opt_state
<< dendl
;
656 std::lock_guard locker
{m_lock
};
657 if (!force
&& !is_stopped_() && !is_running_()) {
658 dout(15) << "shut down in-progress: ignoring update" << dendl
;
663 m_in_flight_op_tracker
.start_op();
664 auto ctx
= new LambdaContext(
665 [this, force
, opt_state
](int r
) {
666 set_mirror_image_status_update(force
, opt_state
);
668 m_threads
->work_queue
->queue(ctx
, 0);
671 template <typename I
>
672 void ImageReplayer
<I
>::set_mirror_image_status_update(
673 bool force
, const OptionalState
&opt_state
) {
674 dout(15) << "force=" << force
<< ", "
675 << "state=" << opt_state
<< dendl
;
677 reregister_admin_socket_hook();
680 std::string state_desc
;
682 bool stopping_replay
;
684 auto mirror_image_status_state
= boost::make_optional(
685 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
686 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
688 std::lock_guard locker
{m_lock
};
690 state_desc
= m_state_desc
;
691 mirror_image_status_state
= m_mirror_image_status_state
;
693 stopping_replay
= (m_replayer
!= nullptr);
695 if (m_bootstrap_request
!= nullptr) {
696 bootstrap_request
= m_bootstrap_request
;
697 bootstrap_request
->get();
701 bool syncing
= false;
702 if (bootstrap_request
!= nullptr) {
703 syncing
= bootstrap_request
->is_syncing();
704 bootstrap_request
->put();
705 bootstrap_request
= nullptr;
712 cls::rbd::MirrorImageSiteStatus status
;
717 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
718 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
719 mirror_image_status_state
= status
.state
;
721 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
722 status
.description
= "starting replay";
725 case STATE_REPLAYING
:
726 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
729 auto on_req_finish
= new LambdaContext(
730 [this, force
](int r
) {
731 dout(15) << "replay status ready: r=" << r
<< dendl
;
733 set_mirror_image_status_update(force
, boost::none
);
734 } else if (r
== -EAGAIN
) {
735 m_in_flight_op_tracker
.finish_op();
739 ceph_assert(m_replayer
!= nullptr);
740 if (!m_replayer
->get_replay_status(&desc
, on_req_finish
)) {
741 dout(15) << "waiting for replay status" << dendl
;
745 status
.description
= "replaying, " + desc
;
746 mirror_image_status_state
= boost::make_optional(
747 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
751 if (stopping_replay
) {
752 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
753 status
.description
= state_desc
.empty() ? "stopping replay" : state_desc
;
758 if (last_r
== -EREMOTEIO
) {
759 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
760 status
.description
= state_desc
;
761 mirror_image_status_state
= status
.state
;
762 } else if (last_r
< 0 && last_r
!= -ECANCELED
) {
763 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
764 status
.description
= state_desc
;
765 mirror_image_status_state
= status
.state
;
767 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
768 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
769 mirror_image_status_state
= boost::none
;
773 ceph_assert(!"invalid state");
777 std::lock_guard locker
{m_lock
};
778 m_mirror_image_status_state
= mirror_image_status_state
;
781 // prevent the status from ping-ponging when failed replays are restarted
782 if (mirror_image_status_state
&&
783 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
784 status
.state
= *mirror_image_status_state
;
787 dout(15) << "status=" << status
<< dendl
;
788 m_local_status_updater
->set_mirror_image_status(m_global_image_id
, status
,
790 if (m_remote_image_peer
.mirror_status_updater
!= nullptr) {
791 m_remote_image_peer
.mirror_status_updater
->set_mirror_image_status(
792 m_global_image_id
, status
, force
);
795 m_in_flight_op_tracker
.finish_op();
798 template <typename I
>
799 void ImageReplayer
<I
>::shut_down(int r
) {
800 dout(10) << "r=" << r
<< dendl
;
803 std::lock_guard locker
{m_lock
};
804 ceph_assert(m_state
== STATE_STOPPING
);
807 if (!m_in_flight_op_tracker
.empty()) {
808 dout(15) << "waiting for in-flight operations to complete" << dendl
;
809 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
815 // chain the shut down sequence (reverse order)
816 Context
*ctx
= new LambdaContext(
818 update_mirror_image_status(true, STATE_STOPPED
);
822 // destruct the state builder
823 if (m_state_builder
!= nullptr) {
824 ctx
= new LambdaContext([this, ctx
](int r
) {
825 m_state_builder
->close(ctx
);
829 // close the replayer
830 if (m_replayer
!= nullptr) {
831 ctx
= new LambdaContext([this, ctx
](int r
) {
832 m_replayer
->destroy();
833 m_replayer
= nullptr;
836 ctx
= new LambdaContext([this, ctx
](int r
) {
837 m_replayer
->shut_down(ctx
);
841 m_threads
->work_queue
->queue(ctx
, 0);
844 template <typename I
>
845 void ImageReplayer
<I
>::handle_shut_down(int r
) {
846 bool resync_requested
= false;
847 bool delete_requested
= false;
848 bool unregister_asok_hook
= false;
850 std::lock_guard locker
{m_lock
};
852 if (m_delete_requested
&& m_state_builder
!= nullptr &&
853 !m_state_builder
->local_image_id
.empty()) {
854 ceph_assert(m_state_builder
->remote_image_id
.empty());
855 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
856 unregister_asok_hook
= true;
857 std::swap(delete_requested
, m_delete_requested
);
860 std::swap(resync_requested
, m_resync_requested
);
861 if (!delete_requested
&& !resync_requested
&& m_last_r
== -ENOENT
&&
862 ((m_state_builder
== nullptr) ||
863 (m_state_builder
->local_image_id
.empty() &&
864 m_state_builder
->remote_image_id
.empty()))) {
865 dout(0) << "mirror image no longer exists" << dendl
;
866 unregister_asok_hook
= true;
871 if (unregister_asok_hook
) {
872 unregister_admin_socket_hook();
875 if (delete_requested
|| resync_requested
) {
876 dout(5) << "moving image to trash" << dendl
;
877 auto ctx
= new LambdaContext([this, r
](int) {
880 ImageDeleter
<I
>::trash_move(m_local_io_ctx
, m_global_image_id
,
881 resync_requested
, m_threads
->work_queue
, ctx
);
885 if (!m_in_flight_op_tracker
.empty()) {
886 dout(15) << "waiting for in-flight operations to complete" << dendl
;
887 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
893 if (m_local_status_updater
->exists(m_global_image_id
)) {
894 dout(15) << "removing local mirror image status" << dendl
;
895 auto ctx
= new LambdaContext([this, r
](int) {
898 m_local_status_updater
->remove_mirror_image_status(m_global_image_id
, ctx
);
902 if (m_remote_image_peer
.mirror_status_updater
!= nullptr &&
903 m_remote_image_peer
.mirror_status_updater
->exists(m_global_image_id
)) {
904 dout(15) << "removing remote mirror image status" << dendl
;
905 auto ctx
= new LambdaContext([this, r
](int) {
908 m_remote_image_peer
.mirror_status_updater
->remove_mirror_image_status(
909 m_global_image_id
, ctx
);
913 if (m_state_builder
!= nullptr) {
914 m_state_builder
->destroy();
915 m_state_builder
= nullptr;
918 dout(10) << "stop complete" << dendl
;
919 Context
*on_start
= nullptr;
920 Context
*on_stop
= nullptr;
922 std::lock_guard locker
{m_lock
};
923 std::swap(on_start
, m_on_start_finish
);
924 std::swap(on_stop
, m_on_stop_finish
);
925 m_stop_requested
= false;
926 ceph_assert(m_state
== STATE_STOPPING
);
927 m_state
= STATE_STOPPED
;
930 if (on_start
!= nullptr) {
931 dout(10) << "on start finish complete, r=" << r
<< dendl
;
932 on_start
->complete(r
);
935 if (on_stop
!= nullptr) {
936 dout(10) << "on stop finish complete, r=" << r
<< dendl
;
937 on_stop
->complete(r
);
941 template <typename I
>
942 void ImageReplayer
<I
>::handle_replayer_notification() {
945 std::unique_lock locker
{m_lock
};
946 if (m_state
!= STATE_REPLAYING
) {
947 // might be attempting to shut down
952 // detect a rename of the local image
953 ceph_assert(m_state_builder
!= nullptr &&
954 m_state_builder
->local_image_ctx
!= nullptr);
955 std::shared_lock image_locker
{m_state_builder
->local_image_ctx
->image_lock
};
956 if (m_local_image_name
!= m_state_builder
->local_image_ctx
->name
) {
957 // will re-register with new name after next status update
958 dout(10) << "image renamed" << dendl
;
959 m_local_image_name
= m_state_builder
->local_image_ctx
->name
;
963 // replayer cannot be shut down while notification is in-flight
964 ceph_assert(m_replayer
!= nullptr);
967 if (m_replayer
->is_resync_requested()) {
968 dout(10) << "resync requested" << dendl
;
969 m_resync_requested
= true;
970 on_stop_journal_replay(0, "resync requested");
974 if (!m_replayer
->is_replaying()) {
975 auto error_code
= m_replayer
->get_error_code();
976 auto error_description
= m_replayer
->get_error_description();
977 dout(10) << "replay interrupted: "
978 << "r=" << error_code
<< ", "
979 << "error=" << error_description
<< dendl
;
980 on_stop_journal_replay(error_code
, error_description
);
984 update_mirror_image_status(false, {});
987 template <typename I
>
988 std::string ImageReplayer
<I
>::to_string(const State state
) {
990 case ImageReplayer
<I
>::STATE_STARTING
:
992 case ImageReplayer
<I
>::STATE_REPLAYING
:
994 case ImageReplayer
<I
>::STATE_STOPPING
:
996 case ImageReplayer
<I
>::STATE_STOPPED
:
1001 return "Unknown(" + stringify(state
) + ")";
1004 template <typename I
>
1005 void ImageReplayer
<I
>::register_admin_socket_hook() {
1006 ImageReplayerAdminSocketHook
<I
> *asok_hook
;
1008 std::lock_guard locker
{m_lock
};
1009 if (m_asok_hook
!= nullptr) {
1013 dout(15) << "registered asok hook: " << m_image_spec
<< dendl
;
1014 asok_hook
= new ImageReplayerAdminSocketHook
<I
>(
1015 g_ceph_context
, m_image_spec
, this);
1016 int r
= asok_hook
->register_commands();
1018 m_asok_hook
= asok_hook
;
1021 derr
<< "error registering admin socket commands" << dendl
;
1026 template <typename I
>
1027 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1030 AdminSocketHook
*asok_hook
= nullptr;
1032 std::lock_guard locker
{m_lock
};
1033 std::swap(asok_hook
, m_asok_hook
);
1038 template <typename I
>
1039 void ImageReplayer
<I
>::reregister_admin_socket_hook() {
1040 std::unique_lock locker
{m_lock
};
1041 if (m_state
== STATE_STARTING
&& m_bootstrap_request
!= nullptr) {
1042 m_local_image_name
= m_bootstrap_request
->get_local_image_name();
1045 auto image_spec
= image_replayer::util::compute_image_spec(
1046 m_local_io_ctx
, m_local_image_name
);
1047 if (m_asok_hook
!= nullptr && m_image_spec
== image_spec
) {
1051 dout(15) << "old_image_spec=" << m_image_spec
<< ", "
1052 << "new_image_spec=" << image_spec
<< dendl
;
1053 m_image_spec
= image_spec
;
1055 if (m_state
== STATE_STOPPING
|| m_state
== STATE_STOPPED
) {
1056 // no need to re-register if stopping
1061 unregister_admin_socket_hook();
1062 register_admin_socket_hook();
1065 template <typename I
>
1066 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1068 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1069 << "/" << replayer
.get_global_image_id() << "]";
1073 } // namespace mirror
1076 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;