1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ImageState.h"
17 #include "librbd/Journal.h"
18 #include "librbd/Operations.h"
19 #include "librbd/Utils.h"
20 #include "librbd/asio/ContextWQ.h"
21 #include "ImageDeleter.h"
22 #include "ImageReplayer.h"
23 #include "MirrorStatusUpdater.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27 #include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28 #include "tools/rbd_mirror/image_replayer/Utils.h"
29 #include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
39 extern PerfCounters
*g_perf_counters
;
44 using librbd::util::create_context_callback
;
47 std::ostream
&operator<<(std::ostream
&os
,
48 const typename ImageReplayer
<I
>::State
&state
);
53 class ImageReplayerAdminSocketCommand
{
55 ImageReplayerAdminSocketCommand(const std::string
&desc
,
56 ImageReplayer
<I
> *replayer
)
57 : desc(desc
), replayer(replayer
) {
59 virtual ~ImageReplayerAdminSocketCommand() {}
60 virtual int call(Formatter
*f
) = 0;
63 ImageReplayer
<I
> *replayer
;
64 bool registered
= false;
68 class StatusCommand
: public ImageReplayerAdminSocketCommand
<I
> {
70 explicit StatusCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
71 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
74 int call(Formatter
*f
) override
{
75 this->replayer
->print_status(f
);
81 class StartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
83 explicit StartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
84 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
87 int call(Formatter
*f
) override
{
88 this->replayer
->start(nullptr, true);
94 class StopCommand
: public ImageReplayerAdminSocketCommand
<I
> {
96 explicit StopCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
97 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
100 int call(Formatter
*f
) override
{
101 this->replayer
->stop(nullptr, true);
106 template <typename I
>
107 class RestartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
109 explicit RestartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
110 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
113 int call(Formatter
*f
) override
{
114 this->replayer
->restart();
119 template <typename I
>
120 class FlushCommand
: public ImageReplayerAdminSocketCommand
<I
> {
122 explicit FlushCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
123 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
126 int call(Formatter
*f
) override
{
127 this->replayer
->flush();
132 template <typename I
>
133 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
135 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
136 ImageReplayer
<I
> *replayer
)
137 : admin_socket(cct
->get_admin_socket()),
138 commands
{{"rbd mirror flush " + name
,
139 new FlushCommand
<I
>("flush rbd mirror " + name
, replayer
)},
140 {"rbd mirror restart " + name
,
141 new RestartCommand
<I
>("restart rbd mirror " + name
, replayer
)},
142 {"rbd mirror start " + name
,
143 new StartCommand
<I
>("start rbd mirror " + name
, replayer
)},
144 {"rbd mirror status " + name
,
145 new StatusCommand
<I
>("get status for rbd mirror " + name
, replayer
)},
146 {"rbd mirror stop " + name
,
147 new StopCommand
<I
>("stop rbd mirror " + name
, replayer
)}} {
150 int register_commands() {
151 for (auto &it
: commands
) {
152 int r
= admin_socket
->register_command(it
.first
, this,
157 it
.second
->registered
= true;
162 ~ImageReplayerAdminSocketHook() override
{
163 admin_socket
->unregister_commands(this);
164 for (auto &it
: commands
) {
170 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
173 bufferlist
& out
) override
{
174 auto i
= commands
.find(command
);
175 ceph_assert(i
!= commands
.end());
176 return i
->second
->call(f
);
180 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
<I
>*,
181 std::less
<>> Commands
;
183 AdminSocket
*admin_socket
;
187 } // anonymous namespace
189 template <typename I
>
190 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
191 const std::string
&description
, bool flush
)
193 const std::string desc
= "bootstrapping, " + description
;
194 replayer
->set_state_description(0, desc
);
196 replayer
->update_mirror_image_status(false, boost::none
);
200 template <typename I
>
201 struct ImageReplayer
<I
>::ReplayerListener
202 : public image_replayer::ReplayerListener
{
203 ImageReplayer
<I
>* image_replayer
;
205 ReplayerListener(ImageReplayer
<I
>* image_replayer
)
206 : image_replayer(image_replayer
) {
209 void handle_notification() override
{
210 image_replayer
->handle_replayer_notification();
214 template <typename I
>
215 ImageReplayer
<I
>::ImageReplayer(
216 librados::IoCtx
&local_io_ctx
, const std::string
&local_mirror_uuid
,
217 const std::string
&global_image_id
, Threads
<I
> *threads
,
218 InstanceWatcher
<I
> *instance_watcher
,
219 MirrorStatusUpdater
<I
>* local_status_updater
,
220 journal::CacheManagerHandler
*cache_manager_handler
,
221 PoolMetaCache
* pool_meta_cache
) :
222 m_local_io_ctx(local_io_ctx
), m_local_mirror_uuid(local_mirror_uuid
),
223 m_global_image_id(global_image_id
), m_threads(threads
),
224 m_instance_watcher(instance_watcher
),
225 m_local_status_updater(local_status_updater
),
226 m_cache_manager_handler(cache_manager_handler
),
227 m_pool_meta_cache(pool_meta_cache
),
228 m_local_image_name(global_image_id
),
229 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
230 stringify(local_io_ctx
.get_id()) + " " + global_image_id
)),
231 m_progress_cxt(this),
232 m_replayer_listener(new ReplayerListener(this))
234 // Register asok commands using a temporary "remote_pool_name/global_image_id"
235 // name. When the image name becomes known on start the asok commands will be
236 // re-registered using "remote_pool_name/remote_image_name" name.
238 m_image_spec
= image_replayer::util::compute_image_spec(
239 local_io_ctx
, global_image_id
);
240 register_admin_socket_hook();
243 template <typename I
>
244 ImageReplayer
<I
>::~ImageReplayer()
246 unregister_admin_socket_hook();
247 ceph_assert(m_state_builder
== nullptr);
248 ceph_assert(m_on_start_finish
== nullptr);
249 ceph_assert(m_on_stop_finish
== nullptr);
250 ceph_assert(m_bootstrap_request
== nullptr);
251 ceph_assert(m_update_status_task
== nullptr);
252 delete m_replayer_listener
;
255 template <typename I
>
256 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
257 std::lock_guard locker
{m_lock
};
259 if (!m_mirror_image_status_state
) {
260 return image_replayer::HEALTH_STATE_OK
;
261 } else if (*m_mirror_image_status_state
==
262 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
263 *m_mirror_image_status_state
==
264 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
265 return image_replayer::HEALTH_STATE_WARNING
;
267 return image_replayer::HEALTH_STATE_ERROR
;
270 template <typename I
>
271 void ImageReplayer
<I
>::add_peer(const Peer
<I
>& peer
) {
272 dout(10) << "peer=" << peer
<< dendl
;
274 std::lock_guard locker
{m_lock
};
275 auto it
= m_peers
.find(peer
);
276 if (it
== m_peers
.end()) {
277 m_peers
.insert(peer
);
281 template <typename I
>
282 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
283 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
285 std::lock_guard l
{m_lock
};
290 template <typename I
>
291 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
, bool restart
)
293 dout(10) << "on_finish=" << on_finish
<< dendl
;
297 std::lock_guard locker
{m_lock
};
298 if (!is_stopped_()) {
299 derr
<< "already running" << dendl
;
301 } else if (m_manual_stop
&& !manual
) {
302 dout(5) << "stopped manually, ignoring start without manual flag"
305 } else if (restart
&& !m_restart_requested
) {
306 dout(10) << "canceled restart" << dendl
;
309 m_state
= STATE_STARTING
;
311 m_state_desc
.clear();
312 m_manual_stop
= false;
313 m_delete_requested
= false;
314 m_restart_requested
= false;
316 if (on_finish
!= nullptr) {
317 ceph_assert(m_on_start_finish
== nullptr);
318 m_on_start_finish
= on_finish
;
320 ceph_assert(m_on_stop_finish
== nullptr);
326 on_finish
->complete(r
);
334 template <typename I
>
335 void ImageReplayer
<I
>::bootstrap() {
338 std::unique_lock locker
{m_lock
};
339 if (m_peers
.empty()) {
342 dout(5) << "no peer clusters" << dendl
;
343 on_start_fail(-ENOENT
, "no peer clusters");
347 // TODO need to support multiple remote images
348 ceph_assert(!m_peers
.empty());
349 m_remote_image_peer
= *m_peers
.begin();
351 if (on_start_interrupted(m_lock
)) {
355 ceph_assert(m_state_builder
== nullptr);
356 auto ctx
= create_context_callback
<
357 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
358 auto request
= image_replayer::BootstrapRequest
<I
>::create(
359 m_threads
, m_local_io_ctx
, m_remote_image_peer
.io_ctx
, m_instance_watcher
,
360 m_global_image_id
, m_local_mirror_uuid
,
361 m_remote_image_peer
.remote_pool_meta
, m_cache_manager_handler
,
362 m_pool_meta_cache
, &m_progress_cxt
, &m_state_builder
, &m_resync_requested
,
366 m_bootstrap_request
= request
;
369 update_mirror_image_status(false, boost::none
);
373 template <typename I
>
374 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
375 dout(10) << "r=" << r
<< dendl
;
377 std::lock_guard locker
{m_lock
};
378 m_bootstrap_request
->put();
379 m_bootstrap_request
= nullptr;
382 if (on_start_interrupted()) {
384 } else if (r
== -ENOMSG
) {
385 dout(5) << "local image is primary" << dendl
;
386 on_start_fail(0, "local image is primary");
388 } else if (r
== -EREMOTEIO
) {
389 dout(5) << "remote image is non-primary" << dendl
;
390 on_start_fail(-EREMOTEIO
, "remote image is non-primary");
392 } else if (r
== -EEXIST
) {
393 on_start_fail(r
, "split-brain detected");
395 } else if (r
== -ENOLINK
) {
396 m_delete_requested
= true;
397 on_start_fail(0, "remote image no longer exists");
400 on_start_fail(r
, "error bootstrapping replay");
402 } else if (m_resync_requested
) {
403 on_start_fail(0, "resync requested");
410 template <typename I
>
411 void ImageReplayer
<I
>::start_replay() {
414 std::unique_lock locker
{m_lock
};
415 ceph_assert(m_replayer
== nullptr);
416 m_replayer
= m_state_builder
->create_replayer(m_threads
, m_instance_watcher
,
419 m_replayer_listener
);
421 auto ctx
= create_context_callback
<
422 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_start_replay
>(this);
423 m_replayer
->init(ctx
);
426 template <typename I
>
427 void ImageReplayer
<I
>::handle_start_replay(int r
) {
428 dout(10) << "r=" << r
<< dendl
;
430 if (on_start_interrupted()) {
433 std::string error_description
= m_replayer
->get_error_description();
434 if (r
== -ENOTCONN
&& m_replayer
->is_resync_requested()) {
435 std::unique_lock locker
{m_lock
};
436 m_resync_requested
= true;
439 // shut down not required if init failed
440 m_replayer
->destroy();
441 m_replayer
= nullptr;
443 derr
<< "error starting replay: " << cpp_strerror(r
) << dendl
;
444 on_start_fail(r
, error_description
);
448 Context
*on_finish
= nullptr;
450 std::unique_lock locker
{m_lock
};
451 ceph_assert(m_state
== STATE_STARTING
);
452 m_state
= STATE_REPLAYING
;
453 std::swap(m_on_start_finish
, on_finish
);
455 std::unique_lock timer_locker
{m_threads
->timer_lock
};
456 schedule_update_mirror_image_replay_status();
459 update_mirror_image_status(true, boost::none
);
460 if (on_replay_interrupted()) {
461 if (on_finish
!= nullptr) {
462 on_finish
->complete(r
);
467 dout(10) << "start succeeded" << dendl
;
468 if (on_finish
!= nullptr) {
469 dout(10) << "on finish complete, r=" << r
<< dendl
;
470 on_finish
->complete(r
);
474 template <typename I
>
475 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
477 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
478 Context
*ctx
= new LambdaContext([this, r
, desc
](int _r
) {
480 std::lock_guard locker
{m_lock
};
481 ceph_assert(m_state
== STATE_STARTING
);
482 m_state
= STATE_STOPPING
;
483 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
484 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
486 dout(10) << "start canceled" << dendl
;
490 set_state_description(r
, desc
);
491 update_mirror_image_status(false, boost::none
);
494 m_threads
->work_queue
->queue(ctx
, 0);
497 template <typename I
>
498 bool ImageReplayer
<I
>::on_start_interrupted() {
499 std::lock_guard locker
{m_lock
};
500 return on_start_interrupted(m_lock
);
503 template <typename I
>
504 bool ImageReplayer
<I
>::on_start_interrupted(ceph::mutex
& lock
) {
505 ceph_assert(ceph_mutex_is_locked(m_lock
));
506 ceph_assert(m_state
== STATE_STARTING
);
507 if (!m_stop_requested
) {
511 on_start_fail(-ECANCELED
, "");
515 template <typename I
>
516 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, bool restart
)
518 dout(10) << "on_finish=" << on_finish
<< ", manual=" << manual
519 << ", restart=" << restart
<< dendl
;
521 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
522 bool shut_down_replay
= false;
525 std::lock_guard locker
{m_lock
};
528 m_restart_requested
= true;
531 if (!is_running_()) {
533 if (!restart
&& m_restart_requested
) {
534 dout(10) << "canceling restart" << dendl
;
535 m_restart_requested
= false;
538 if (!is_stopped_()) {
539 if (m_state
== STATE_STARTING
) {
540 dout(10) << "canceling start" << dendl
;
541 if (m_bootstrap_request
!= nullptr) {
542 bootstrap_request
= m_bootstrap_request
;
543 bootstrap_request
->get();
546 dout(10) << "interrupting replay" << dendl
;
547 shut_down_replay
= true;
550 ceph_assert(m_on_stop_finish
== nullptr);
551 std::swap(m_on_stop_finish
, on_finish
);
552 m_stop_requested
= true;
553 m_manual_stop
= manual
;
558 // avoid holding lock since bootstrap request will update status
559 if (bootstrap_request
!= nullptr) {
560 dout(10) << "canceling bootstrap" << dendl
;
561 bootstrap_request
->cancel();
562 bootstrap_request
->put();
566 dout(20) << "not running" << dendl
;
568 on_finish
->complete(-EINVAL
);
573 if (shut_down_replay
) {
574 on_stop_journal_replay();
575 } else if (on_finish
!= nullptr) {
576 on_finish
->complete(0);
580 template <typename I
>
581 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
586 std::lock_guard locker
{m_lock
};
587 if (m_state
!= STATE_REPLAYING
) {
588 // might be invoked multiple times while stopping
592 m_stop_requested
= true;
593 m_state
= STATE_STOPPING
;
596 cancel_update_mirror_image_replay_status();
597 set_state_description(r
, desc
);
598 update_mirror_image_status(true, boost::none
);
602 template <typename I
>
603 void ImageReplayer
<I
>::restart(Context
*on_finish
)
606 std::lock_guard locker
{m_lock
};
607 m_restart_requested
= true;
610 auto ctx
= new LambdaContext(
611 [this, on_finish
](int r
) {
615 start(on_finish
, true, true);
617 stop(ctx
, false, true);
620 template <typename I
>
621 void ImageReplayer
<I
>::flush()
626 std::unique_lock locker
{m_lock
};
627 if (m_state
!= STATE_REPLAYING
) {
632 ceph_assert(m_replayer
!= nullptr);
633 m_replayer
->flush(&ctx
);
638 update_mirror_image_status(false, boost::none
);
642 template <typename I
>
643 bool ImageReplayer
<I
>::on_replay_interrupted()
647 std::lock_guard locker
{m_lock
};
648 shut_down
= m_stop_requested
;
652 on_stop_journal_replay();
657 template <typename I
>
658 void ImageReplayer
<I
>::print_status(Formatter
*f
)
662 std::lock_guard l
{m_lock
};
664 f
->open_object_section("image_replayer");
665 f
->dump_string("name", m_image_spec
);
666 f
->dump_string("state", to_string(m_state
));
670 template <typename I
>
671 void ImageReplayer
<I
>::schedule_update_mirror_image_replay_status() {
672 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
673 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
674 if (m_state
!= STATE_REPLAYING
) {
680 // periodically update the replaying status even if nothing changes
681 // so that we can adjust our performance stats
682 ceph_assert(m_update_status_task
== nullptr);
683 m_update_status_task
= create_context_callback
<
685 &ImageReplayer
<I
>::handle_update_mirror_image_replay_status
>(this);
686 m_threads
->timer
->add_event_after(10, m_update_status_task
);
689 template <typename I
>
690 void ImageReplayer
<I
>::handle_update_mirror_image_replay_status(int r
) {
693 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
695 ceph_assert(m_update_status_task
!= nullptr);
696 m_update_status_task
= nullptr;
698 auto ctx
= new LambdaContext([this](int) {
699 update_mirror_image_status(false, boost::none
);
701 std::unique_lock locker
{m_lock
};
702 std::unique_lock timer_locker
{m_threads
->timer_lock
};
704 schedule_update_mirror_image_replay_status();
705 m_in_flight_op_tracker
.finish_op();
708 m_in_flight_op_tracker
.start_op();
709 m_threads
->work_queue
->queue(ctx
, 0);
712 template <typename I
>
713 void ImageReplayer
<I
>::cancel_update_mirror_image_replay_status() {
714 std::unique_lock timer_locker
{m_threads
->timer_lock
};
715 if (m_update_status_task
!= nullptr) {
718 if (m_threads
->timer
->cancel_event(m_update_status_task
)) {
719 m_update_status_task
= nullptr;
724 template <typename I
>
725 void ImageReplayer
<I
>::update_mirror_image_status(
726 bool force
, const OptionalState
&opt_state
) {
727 dout(15) << "force=" << force
<< ", "
728 << "state=" << opt_state
<< dendl
;
731 std::lock_guard locker
{m_lock
};
732 if (!force
&& !is_stopped_() && !is_running_()) {
733 dout(15) << "shut down in-progress: ignoring update" << dendl
;
738 m_in_flight_op_tracker
.start_op();
739 auto ctx
= new LambdaContext(
740 [this, force
, opt_state
](int r
) {
741 set_mirror_image_status_update(force
, opt_state
);
743 m_threads
->work_queue
->queue(ctx
, 0);
746 template <typename I
>
747 void ImageReplayer
<I
>::set_mirror_image_status_update(
748 bool force
, const OptionalState
&opt_state
) {
749 dout(15) << "force=" << force
<< ", "
750 << "state=" << opt_state
<< dendl
;
752 reregister_admin_socket_hook();
755 std::string state_desc
;
757 bool stopping_replay
;
759 auto mirror_image_status_state
= boost::make_optional(
760 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
761 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
763 std::lock_guard locker
{m_lock
};
765 state_desc
= m_state_desc
;
766 mirror_image_status_state
= m_mirror_image_status_state
;
768 stopping_replay
= (m_replayer
!= nullptr);
770 if (m_bootstrap_request
!= nullptr) {
771 bootstrap_request
= m_bootstrap_request
;
772 bootstrap_request
->get();
776 bool syncing
= false;
777 if (bootstrap_request
!= nullptr) {
778 syncing
= bootstrap_request
->is_syncing();
779 bootstrap_request
->put();
780 bootstrap_request
= nullptr;
787 cls::rbd::MirrorImageSiteStatus status
;
792 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
793 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
794 mirror_image_status_state
= status
.state
;
796 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
797 status
.description
= "starting replay";
800 case STATE_REPLAYING
:
801 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
804 auto on_req_finish
= new LambdaContext(
805 [this, force
](int r
) {
806 dout(15) << "replay status ready: r=" << r
<< dendl
;
808 set_mirror_image_status_update(force
, boost::none
);
809 } else if (r
== -EAGAIN
) {
810 m_in_flight_op_tracker
.finish_op();
814 ceph_assert(m_replayer
!= nullptr);
815 if (!m_replayer
->get_replay_status(&desc
, on_req_finish
)) {
816 dout(15) << "waiting for replay status" << dendl
;
820 status
.description
= "replaying, " + desc
;
821 mirror_image_status_state
= boost::make_optional(
822 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
826 if (stopping_replay
) {
827 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
828 status
.description
= state_desc
.empty() ? "stopping replay" : state_desc
;
833 if (last_r
== -EREMOTEIO
) {
834 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
835 status
.description
= state_desc
;
836 mirror_image_status_state
= status
.state
;
837 } else if (last_r
< 0 && last_r
!= -ECANCELED
) {
838 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
839 status
.description
= state_desc
;
840 mirror_image_status_state
= status
.state
;
842 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
843 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
844 mirror_image_status_state
= boost::none
;
848 ceph_assert(!"invalid state");
852 std::lock_guard locker
{m_lock
};
853 m_mirror_image_status_state
= mirror_image_status_state
;
856 // prevent the status from ping-ponging when failed replays are restarted
857 if (mirror_image_status_state
&&
858 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
859 status
.state
= *mirror_image_status_state
;
862 dout(15) << "status=" << status
<< dendl
;
863 m_local_status_updater
->set_mirror_image_status(m_global_image_id
, status
,
865 if (m_remote_image_peer
.mirror_status_updater
!= nullptr) {
866 m_remote_image_peer
.mirror_status_updater
->set_mirror_image_status(
867 m_global_image_id
, status
, force
);
870 m_in_flight_op_tracker
.finish_op();
873 template <typename I
>
874 void ImageReplayer
<I
>::shut_down(int r
) {
875 dout(10) << "r=" << r
<< dendl
;
878 std::lock_guard locker
{m_lock
};
879 ceph_assert(m_state
== STATE_STOPPING
);
882 if (!m_in_flight_op_tracker
.empty()) {
883 dout(15) << "waiting for in-flight operations to complete" << dendl
;
884 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
890 // chain the shut down sequence (reverse order)
891 Context
*ctx
= new LambdaContext(
893 update_mirror_image_status(true, STATE_STOPPED
);
897 // destruct the state builder
898 if (m_state_builder
!= nullptr) {
899 ctx
= new LambdaContext([this, ctx
](int r
) {
900 m_state_builder
->close(ctx
);
904 // close the replayer
905 if (m_replayer
!= nullptr) {
906 ctx
= new LambdaContext([this, ctx
](int r
) {
907 m_replayer
->destroy();
908 m_replayer
= nullptr;
911 ctx
= new LambdaContext([this, ctx
](int r
) {
912 m_replayer
->shut_down(ctx
);
916 m_threads
->work_queue
->queue(ctx
, 0);
919 template <typename I
>
920 void ImageReplayer
<I
>::handle_shut_down(int r
) {
921 bool resync_requested
= false;
922 bool delete_requested
= false;
923 bool unregister_asok_hook
= false;
925 std::lock_guard locker
{m_lock
};
927 if (m_delete_requested
&& m_state_builder
!= nullptr &&
928 !m_state_builder
->local_image_id
.empty()) {
929 ceph_assert(m_state_builder
->remote_image_id
.empty());
930 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
931 unregister_asok_hook
= true;
932 std::swap(delete_requested
, m_delete_requested
);
935 std::swap(resync_requested
, m_resync_requested
);
936 if (!delete_requested
&& !resync_requested
&& m_last_r
== -ENOENT
&&
937 ((m_state_builder
== nullptr) ||
938 (m_state_builder
->local_image_id
.empty() &&
939 m_state_builder
->remote_image_id
.empty()))) {
940 dout(0) << "mirror image no longer exists" << dendl
;
941 unregister_asok_hook
= true;
946 if (unregister_asok_hook
) {
947 unregister_admin_socket_hook();
950 if (delete_requested
|| resync_requested
) {
951 dout(5) << "moving image to trash" << dendl
;
952 auto ctx
= new LambdaContext([this, r
](int) {
955 ImageDeleter
<I
>::trash_move(m_local_io_ctx
, m_global_image_id
,
956 resync_requested
, m_threads
->work_queue
, ctx
);
960 if (!m_in_flight_op_tracker
.empty()) {
961 dout(15) << "waiting for in-flight operations to complete" << dendl
;
962 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
968 if (m_local_status_updater
->exists(m_global_image_id
)) {
969 dout(15) << "removing local mirror image status" << dendl
;
970 auto ctx
= new LambdaContext([this, r
](int) {
973 m_local_status_updater
->remove_mirror_image_status(m_global_image_id
, ctx
);
977 if (m_remote_image_peer
.mirror_status_updater
!= nullptr &&
978 m_remote_image_peer
.mirror_status_updater
->exists(m_global_image_id
)) {
979 dout(15) << "removing remote mirror image status" << dendl
;
980 auto ctx
= new LambdaContext([this, r
](int) {
983 m_remote_image_peer
.mirror_status_updater
->remove_mirror_image_status(
984 m_global_image_id
, ctx
);
988 if (m_state_builder
!= nullptr) {
989 m_state_builder
->destroy();
990 m_state_builder
= nullptr;
993 dout(10) << "stop complete" << dendl
;
994 Context
*on_start
= nullptr;
995 Context
*on_stop
= nullptr;
997 std::lock_guard locker
{m_lock
};
998 std::swap(on_start
, m_on_start_finish
);
999 std::swap(on_stop
, m_on_stop_finish
);
1000 m_stop_requested
= false;
1001 ceph_assert(m_state
== STATE_STOPPING
);
1002 m_state
= STATE_STOPPED
;
1005 if (on_start
!= nullptr) {
1006 dout(10) << "on start finish complete, r=" << r
<< dendl
;
1007 on_start
->complete(r
);
1010 if (on_stop
!= nullptr) {
1011 dout(10) << "on stop finish complete, r=" << r
<< dendl
;
1012 on_stop
->complete(r
);
1016 template <typename I
>
1017 void ImageReplayer
<I
>::handle_replayer_notification() {
1020 std::unique_lock locker
{m_lock
};
1021 if (m_state
!= STATE_REPLAYING
) {
1022 // might be attempting to shut down
1027 // detect a rename of the local image
1028 ceph_assert(m_state_builder
!= nullptr &&
1029 m_state_builder
->local_image_ctx
!= nullptr);
1030 std::shared_lock image_locker
{m_state_builder
->local_image_ctx
->image_lock
};
1031 if (m_local_image_name
!= m_state_builder
->local_image_ctx
->name
) {
1032 // will re-register with new name after next status update
1033 dout(10) << "image renamed" << dendl
;
1034 m_local_image_name
= m_state_builder
->local_image_ctx
->name
;
1038 // replayer cannot be shut down while notification is in-flight
1039 ceph_assert(m_replayer
!= nullptr);
1042 if (m_replayer
->is_resync_requested()) {
1043 dout(10) << "resync requested" << dendl
;
1044 m_resync_requested
= true;
1045 on_stop_journal_replay(0, "resync requested");
1049 if (!m_replayer
->is_replaying()) {
1050 auto error_code
= m_replayer
->get_error_code();
1051 auto error_description
= m_replayer
->get_error_description();
1052 dout(10) << "replay interrupted: "
1053 << "r=" << error_code
<< ", "
1054 << "error=" << error_description
<< dendl
;
1055 on_stop_journal_replay(error_code
, error_description
);
1059 update_mirror_image_status(false, {});
1062 template <typename I
>
1063 std::string ImageReplayer
<I
>::to_string(const State state
) {
1065 case ImageReplayer
<I
>::STATE_STARTING
:
1067 case ImageReplayer
<I
>::STATE_REPLAYING
:
1069 case ImageReplayer
<I
>::STATE_STOPPING
:
1071 case ImageReplayer
<I
>::STATE_STOPPED
:
1076 return "Unknown(" + stringify(state
) + ")";
1079 template <typename I
>
1080 void ImageReplayer
<I
>::register_admin_socket_hook() {
1081 ImageReplayerAdminSocketHook
<I
> *asok_hook
;
1083 std::lock_guard locker
{m_lock
};
1084 if (m_asok_hook
!= nullptr) {
1088 dout(15) << "registered asok hook: " << m_image_spec
<< dendl
;
1089 asok_hook
= new ImageReplayerAdminSocketHook
<I
>(
1090 g_ceph_context
, m_image_spec
, this);
1091 int r
= asok_hook
->register_commands();
1093 m_asok_hook
= asok_hook
;
1096 derr
<< "error registering admin socket commands" << dendl
;
1101 template <typename I
>
1102 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1105 AdminSocketHook
*asok_hook
= nullptr;
1107 std::lock_guard locker
{m_lock
};
1108 std::swap(asok_hook
, m_asok_hook
);
1113 template <typename I
>
1114 void ImageReplayer
<I
>::reregister_admin_socket_hook() {
1115 std::unique_lock locker
{m_lock
};
1116 if (m_state
== STATE_STARTING
&& m_bootstrap_request
!= nullptr) {
1117 m_local_image_name
= m_bootstrap_request
->get_local_image_name();
1120 auto image_spec
= image_replayer::util::compute_image_spec(
1121 m_local_io_ctx
, m_local_image_name
);
1122 if (m_asok_hook
!= nullptr && m_image_spec
== image_spec
) {
1126 dout(15) << "old_image_spec=" << m_image_spec
<< ", "
1127 << "new_image_spec=" << image_spec
<< dendl
;
1128 m_image_spec
= image_spec
;
1130 if (m_state
== STATE_STOPPING
|| m_state
== STATE_STOPPED
) {
1131 // no need to re-register if stopping
1136 unregister_admin_socket_hook();
1137 register_admin_socket_hook();
1140 template <typename I
>
1141 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1143 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1144 << "/" << replayer
.get_global_image_id() << "]";
1148 } // namespace mirror
1151 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;