1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ImageState.h"
17 #include "librbd/Journal.h"
18 #include "librbd/Operations.h"
19 #include "librbd/Utils.h"
20 #include "librbd/asio/ContextWQ.h"
21 #include "ImageDeleter.h"
22 #include "ImageReplayer.h"
23 #include "MirrorStatusUpdater.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27 #include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28 #include "tools/rbd_mirror/image_replayer/Utils.h"
29 #include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
42 using librbd::util::create_context_callback
;
45 std::ostream
&operator<<(std::ostream
&os
,
46 const typename ImageReplayer
<I
>::State
&state
);
51 class ImageReplayerAdminSocketCommand
{
53 ImageReplayerAdminSocketCommand(const std::string
&desc
,
54 ImageReplayer
<I
> *replayer
)
55 : desc(desc
), replayer(replayer
) {
57 virtual ~ImageReplayerAdminSocketCommand() {}
58 virtual int call(Formatter
*f
) = 0;
61 ImageReplayer
<I
> *replayer
;
62 bool registered
= false;
66 class StatusCommand
: public ImageReplayerAdminSocketCommand
<I
> {
68 explicit StatusCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
69 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
72 int call(Formatter
*f
) override
{
73 this->replayer
->print_status(f
);
79 class StartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
81 explicit StartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
82 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
85 int call(Formatter
*f
) override
{
86 this->replayer
->start(nullptr, true);
92 class StopCommand
: public ImageReplayerAdminSocketCommand
<I
> {
94 explicit StopCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
95 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
98 int call(Formatter
*f
) override
{
99 this->replayer
->stop(nullptr, true);
104 template <typename I
>
105 class RestartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
107 explicit RestartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
108 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
111 int call(Formatter
*f
) override
{
112 this->replayer
->restart();
117 template <typename I
>
118 class FlushCommand
: public ImageReplayerAdminSocketCommand
<I
> {
120 explicit FlushCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
121 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
124 int call(Formatter
*f
) override
{
125 this->replayer
->flush();
130 template <typename I
>
131 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
133 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
134 ImageReplayer
<I
> *replayer
)
135 : admin_socket(cct
->get_admin_socket()),
136 commands
{{"rbd mirror flush " + name
,
137 new FlushCommand
<I
>("flush rbd mirror " + name
, replayer
)},
138 {"rbd mirror restart " + name
,
139 new RestartCommand
<I
>("restart rbd mirror " + name
, replayer
)},
140 {"rbd mirror start " + name
,
141 new StartCommand
<I
>("start rbd mirror " + name
, replayer
)},
142 {"rbd mirror status " + name
,
143 new StatusCommand
<I
>("get status for rbd mirror " + name
, replayer
)},
144 {"rbd mirror stop " + name
,
145 new StopCommand
<I
>("stop rbd mirror " + name
, replayer
)}} {
148 int register_commands() {
149 for (auto &it
: commands
) {
150 int r
= admin_socket
->register_command(it
.first
, this,
155 it
.second
->registered
= true;
160 ~ImageReplayerAdminSocketHook() override
{
161 admin_socket
->unregister_commands(this);
162 for (auto &it
: commands
) {
168 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
171 bufferlist
& out
) override
{
172 auto i
= commands
.find(command
);
173 ceph_assert(i
!= commands
.end());
174 return i
->second
->call(f
);
178 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
<I
>*,
179 std::less
<>> Commands
;
181 AdminSocket
*admin_socket
;
185 } // anonymous namespace
187 template <typename I
>
188 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
189 const std::string
&description
, bool flush
)
191 const std::string desc
= "bootstrapping, " + description
;
192 replayer
->set_state_description(0, desc
);
194 replayer
->update_mirror_image_status(false, boost::none
);
198 template <typename I
>
199 struct ImageReplayer
<I
>::ReplayerListener
200 : public image_replayer::ReplayerListener
{
201 ImageReplayer
<I
>* image_replayer
;
203 ReplayerListener(ImageReplayer
<I
>* image_replayer
)
204 : image_replayer(image_replayer
) {
207 void handle_notification() override
{
208 image_replayer
->handle_replayer_notification();
212 template <typename I
>
213 ImageReplayer
<I
>::ImageReplayer(
214 librados::IoCtx
&local_io_ctx
, const std::string
&local_mirror_uuid
,
215 const std::string
&global_image_id
, Threads
<I
> *threads
,
216 InstanceWatcher
<I
> *instance_watcher
,
217 MirrorStatusUpdater
<I
>* local_status_updater
,
218 journal::CacheManagerHandler
*cache_manager_handler
,
219 PoolMetaCache
* pool_meta_cache
) :
220 m_local_io_ctx(local_io_ctx
), m_local_mirror_uuid(local_mirror_uuid
),
221 m_global_image_id(global_image_id
), m_threads(threads
),
222 m_instance_watcher(instance_watcher
),
223 m_local_status_updater(local_status_updater
),
224 m_cache_manager_handler(cache_manager_handler
),
225 m_pool_meta_cache(pool_meta_cache
),
226 m_local_image_name(global_image_id
),
227 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
228 stringify(local_io_ctx
.get_id()) + " " + global_image_id
)),
229 m_progress_cxt(this),
230 m_replayer_listener(new ReplayerListener(this))
232 // Register asok commands using a temporary "remote_pool_name/global_image_id"
233 // name. When the image name becomes known on start the asok commands will be
234 // re-registered using "remote_pool_name/remote_image_name" name.
236 m_image_spec
= image_replayer::util::compute_image_spec(
237 local_io_ctx
, global_image_id
);
238 register_admin_socket_hook();
241 template <typename I
>
242 ImageReplayer
<I
>::~ImageReplayer()
244 unregister_admin_socket_hook();
245 ceph_assert(m_state_builder
== nullptr);
246 ceph_assert(m_on_start_finish
== nullptr);
247 ceph_assert(m_on_stop_contexts
.empty());
248 ceph_assert(m_bootstrap_request
== nullptr);
249 ceph_assert(m_update_status_task
== nullptr);
250 delete m_replayer_listener
;
253 template <typename I
>
254 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
255 std::lock_guard locker
{m_lock
};
257 if (!m_mirror_image_status_state
) {
258 return image_replayer::HEALTH_STATE_OK
;
259 } else if (*m_mirror_image_status_state
==
260 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
261 *m_mirror_image_status_state
==
262 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
263 return image_replayer::HEALTH_STATE_WARNING
;
265 return image_replayer::HEALTH_STATE_ERROR
;
268 template <typename I
>
269 void ImageReplayer
<I
>::add_peer(const Peer
<I
>& peer
) {
270 dout(10) << "peer=" << peer
<< dendl
;
272 std::lock_guard locker
{m_lock
};
273 auto it
= m_peers
.find(peer
);
274 if (it
== m_peers
.end()) {
275 m_peers
.insert(peer
);
279 template <typename I
>
280 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
281 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
283 std::lock_guard l
{m_lock
};
288 template <typename I
>
289 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
, bool restart
)
291 dout(10) << "on_finish=" << on_finish
<< dendl
;
295 std::lock_guard locker
{m_lock
};
296 if (!is_stopped_()) {
297 derr
<< "already running" << dendl
;
299 } else if (m_manual_stop
&& !manual
) {
300 dout(5) << "stopped manually, ignoring start without manual flag"
303 } else if (restart
&& !m_restart_requested
) {
304 dout(10) << "canceled restart" << dendl
;
307 m_state
= STATE_STARTING
;
309 m_state_desc
.clear();
310 m_manual_stop
= false;
311 m_delete_requested
= false;
312 m_restart_requested
= false;
313 m_status_removed
= false;
315 if (on_finish
!= nullptr) {
316 ceph_assert(m_on_start_finish
== nullptr);
317 m_on_start_finish
= on_finish
;
319 ceph_assert(m_on_stop_contexts
.empty());
325 on_finish
->complete(r
);
333 template <typename I
>
334 void ImageReplayer
<I
>::bootstrap() {
337 std::unique_lock locker
{m_lock
};
338 if (m_peers
.empty()) {
341 dout(5) << "no peer clusters" << dendl
;
342 on_start_fail(-ENOENT
, "no peer clusters");
346 // TODO need to support multiple remote images
347 ceph_assert(!m_peers
.empty());
348 m_remote_image_peer
= *m_peers
.begin();
350 if (on_start_interrupted(m_lock
)) {
354 ceph_assert(m_state_builder
== nullptr);
355 auto ctx
= create_context_callback
<
356 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
357 auto request
= image_replayer::BootstrapRequest
<I
>::create(
358 m_threads
, m_local_io_ctx
, m_remote_image_peer
.io_ctx
, m_instance_watcher
,
359 m_global_image_id
, m_local_mirror_uuid
,
360 m_remote_image_peer
.remote_pool_meta
, m_cache_manager_handler
,
361 m_pool_meta_cache
, &m_progress_cxt
, &m_state_builder
, &m_resync_requested
,
365 m_bootstrap_request
= request
;
368 update_mirror_image_status(false, boost::none
);
372 template <typename I
>
373 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
374 dout(10) << "r=" << r
<< dendl
;
376 std::lock_guard locker
{m_lock
};
377 m_bootstrap_request
->put();
378 m_bootstrap_request
= nullptr;
381 if (on_start_interrupted()) {
383 } else if (r
== -ENOMSG
) {
384 dout(5) << "local image is primary" << dendl
;
385 on_start_fail(0, "local image is primary");
387 } else if (r
== -EREMOTEIO
) {
388 dout(5) << "remote image is not primary" << dendl
;
389 on_start_fail(-EREMOTEIO
, "remote image is not primary");
391 } else if (r
== -EEXIST
) {
392 on_start_fail(r
, "split-brain detected");
394 } else if (r
== -ENOLINK
) {
395 m_delete_requested
= true;
396 on_start_fail(0, "remote image no longer exists");
398 } else if (r
== -ERESTART
) {
399 on_start_fail(r
, "image in transient state, try again");
402 on_start_fail(r
, "error bootstrapping replay");
404 } else if (m_resync_requested
) {
405 on_start_fail(0, "resync requested");
412 template <typename I
>
413 void ImageReplayer
<I
>::start_replay() {
416 std::unique_lock locker
{m_lock
};
417 ceph_assert(m_replayer
== nullptr);
418 m_replayer
= m_state_builder
->create_replayer(m_threads
, m_instance_watcher
,
421 m_replayer_listener
);
423 auto ctx
= create_context_callback
<
424 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_start_replay
>(this);
425 m_replayer
->init(ctx
);
428 template <typename I
>
429 void ImageReplayer
<I
>::handle_start_replay(int r
) {
430 dout(10) << "r=" << r
<< dendl
;
432 if (on_start_interrupted()) {
435 std::string error_description
= m_replayer
->get_error_description();
436 if (r
== -ENOTCONN
&& m_replayer
->is_resync_requested()) {
437 std::unique_lock locker
{m_lock
};
438 m_resync_requested
= true;
441 // shut down not required if init failed
442 m_replayer
->destroy();
443 m_replayer
= nullptr;
445 derr
<< "error starting replay: " << cpp_strerror(r
) << dendl
;
446 on_start_fail(r
, error_description
);
450 Context
*on_finish
= nullptr;
452 std::unique_lock locker
{m_lock
};
453 ceph_assert(m_state
== STATE_STARTING
);
454 m_state
= STATE_REPLAYING
;
455 std::swap(m_on_start_finish
, on_finish
);
457 std::unique_lock timer_locker
{m_threads
->timer_lock
};
458 schedule_update_mirror_image_replay_status();
461 update_mirror_image_status(true, boost::none
);
462 if (on_replay_interrupted()) {
463 if (on_finish
!= nullptr) {
464 on_finish
->complete(r
);
469 dout(10) << "start succeeded" << dendl
;
470 if (on_finish
!= nullptr) {
471 dout(10) << "on finish complete, r=" << r
<< dendl
;
472 on_finish
->complete(r
);
476 template <typename I
>
477 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
479 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
480 Context
*ctx
= new LambdaContext([this, r
, desc
](int _r
) {
482 std::lock_guard locker
{m_lock
};
483 ceph_assert(m_state
== STATE_STARTING
);
484 m_state
= STATE_STOPPING
;
485 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
486 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
488 dout(10) << "start canceled" << dendl
;
492 set_state_description(r
, desc
);
493 update_mirror_image_status(false, boost::none
);
496 m_threads
->work_queue
->queue(ctx
, 0);
499 template <typename I
>
500 bool ImageReplayer
<I
>::on_start_interrupted() {
501 std::lock_guard locker
{m_lock
};
502 return on_start_interrupted(m_lock
);
505 template <typename I
>
506 bool ImageReplayer
<I
>::on_start_interrupted(ceph::mutex
& lock
) {
507 ceph_assert(ceph_mutex_is_locked(m_lock
));
508 ceph_assert(m_state
== STATE_STARTING
);
509 if (!m_stop_requested
) {
513 on_start_fail(-ECANCELED
, "");
517 template <typename I
>
518 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, bool restart
)
520 dout(10) << "on_finish=" << on_finish
<< ", manual=" << manual
521 << ", restart=" << restart
<< dendl
;
523 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
524 bool shut_down_replay
= false;
525 bool is_stopped
= false;
527 std::lock_guard locker
{m_lock
};
529 if (!is_running_()) {
530 if (manual
&& !m_manual_stop
) {
531 dout(10) << "marking manual" << dendl
;
532 m_manual_stop
= true;
534 if (!restart
&& m_restart_requested
) {
535 dout(10) << "canceling restart" << dendl
;
536 m_restart_requested
= false;
539 dout(10) << "already stopped" << dendl
;
542 dout(10) << "joining in-flight stop" << dendl
;
543 if (on_finish
!= nullptr) {
544 m_on_stop_contexts
.push_back(on_finish
);
548 if (m_state
== STATE_STARTING
) {
549 dout(10) << "canceling start" << dendl
;
550 if (m_bootstrap_request
!= nullptr) {
551 bootstrap_request
= m_bootstrap_request
;
552 bootstrap_request
->get();
555 dout(10) << "interrupting replay" << dendl
;
556 shut_down_replay
= true;
559 ceph_assert(m_on_stop_contexts
.empty());
560 if (on_finish
!= nullptr) {
561 m_on_stop_contexts
.push_back(on_finish
);
563 m_stop_requested
= true;
564 m_manual_stop
= manual
;
570 on_finish
->complete(-EINVAL
);
575 // avoid holding lock since bootstrap request will update status
576 if (bootstrap_request
!= nullptr) {
577 dout(10) << "canceling bootstrap" << dendl
;
578 bootstrap_request
->cancel();
579 bootstrap_request
->put();
582 if (shut_down_replay
) {
583 on_stop_journal_replay();
587 template <typename I
>
588 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
593 std::lock_guard locker
{m_lock
};
594 if (m_state
!= STATE_REPLAYING
) {
595 // might be invoked multiple times while stopping
599 m_stop_requested
= true;
600 m_state
= STATE_STOPPING
;
603 cancel_update_mirror_image_replay_status();
604 set_state_description(r
, desc
);
605 update_mirror_image_status(true, boost::none
);
609 template <typename I
>
610 void ImageReplayer
<I
>::restart(Context
*on_finish
)
613 std::lock_guard locker
{m_lock
};
614 m_restart_requested
= true;
617 auto ctx
= new LambdaContext(
618 [this, on_finish
](int r
) {
622 start(on_finish
, true, true);
624 stop(ctx
, false, true);
627 template <typename I
>
628 void ImageReplayer
<I
>::flush()
633 std::unique_lock locker
{m_lock
};
634 if (m_state
!= STATE_REPLAYING
) {
639 ceph_assert(m_replayer
!= nullptr);
640 m_replayer
->flush(&ctx
);
645 update_mirror_image_status(false, boost::none
);
649 template <typename I
>
650 bool ImageReplayer
<I
>::on_replay_interrupted()
654 std::lock_guard locker
{m_lock
};
655 shut_down
= m_stop_requested
;
659 on_stop_journal_replay();
664 template <typename I
>
665 void ImageReplayer
<I
>::print_status(Formatter
*f
)
669 std::lock_guard l
{m_lock
};
671 f
->open_object_section("image_replayer");
672 f
->dump_string("name", m_image_spec
);
673 f
->dump_string("state", to_string(m_state
));
677 template <typename I
>
678 void ImageReplayer
<I
>::schedule_update_mirror_image_replay_status() {
679 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
680 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
681 if (m_state
!= STATE_REPLAYING
) {
687 // periodically update the replaying status even if nothing changes
688 // so that we can adjust our performance stats
689 ceph_assert(m_update_status_task
== nullptr);
690 m_update_status_task
= create_context_callback
<
692 &ImageReplayer
<I
>::handle_update_mirror_image_replay_status
>(this);
693 m_threads
->timer
->add_event_after(10, m_update_status_task
);
696 template <typename I
>
697 void ImageReplayer
<I
>::handle_update_mirror_image_replay_status(int r
) {
700 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
702 ceph_assert(m_update_status_task
!= nullptr);
703 m_update_status_task
= nullptr;
705 auto ctx
= new LambdaContext([this](int) {
706 update_mirror_image_status(false, boost::none
);
708 std::unique_lock locker
{m_lock
};
709 std::unique_lock timer_locker
{m_threads
->timer_lock
};
711 schedule_update_mirror_image_replay_status();
712 m_in_flight_op_tracker
.finish_op();
715 m_in_flight_op_tracker
.start_op();
716 m_threads
->work_queue
->queue(ctx
, 0);
719 template <typename I
>
720 void ImageReplayer
<I
>::cancel_update_mirror_image_replay_status() {
721 std::unique_lock timer_locker
{m_threads
->timer_lock
};
722 if (m_update_status_task
!= nullptr) {
725 if (m_threads
->timer
->cancel_event(m_update_status_task
)) {
726 m_update_status_task
= nullptr;
731 template <typename I
>
732 void ImageReplayer
<I
>::update_mirror_image_status(
733 bool force
, const OptionalState
&opt_state
) {
734 dout(15) << "force=" << force
<< ", "
735 << "state=" << opt_state
<< dendl
;
738 std::lock_guard locker
{m_lock
};
739 if (!force
&& !is_stopped_() && !is_running_()) {
740 dout(15) << "shut down in-progress: ignoring update" << dendl
;
745 m_in_flight_op_tracker
.start_op();
746 auto ctx
= new LambdaContext(
747 [this, force
, opt_state
](int r
) {
748 set_mirror_image_status_update(force
, opt_state
);
750 m_threads
->work_queue
->queue(ctx
, 0);
753 template <typename I
>
754 void ImageReplayer
<I
>::set_mirror_image_status_update(
755 bool force
, const OptionalState
&opt_state
) {
756 dout(15) << "force=" << force
<< ", "
757 << "state=" << opt_state
<< dendl
;
759 reregister_admin_socket_hook();
762 std::string state_desc
;
764 bool stopping_replay
;
766 auto mirror_image_status_state
= boost::make_optional(
767 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
768 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
770 std::lock_guard locker
{m_lock
};
772 state_desc
= m_state_desc
;
773 mirror_image_status_state
= m_mirror_image_status_state
;
775 stopping_replay
= (m_replayer
!= nullptr);
777 if (m_bootstrap_request
!= nullptr) {
778 bootstrap_request
= m_bootstrap_request
;
779 bootstrap_request
->get();
783 bool syncing
= false;
784 if (bootstrap_request
!= nullptr) {
785 syncing
= bootstrap_request
->is_syncing();
786 bootstrap_request
->put();
787 bootstrap_request
= nullptr;
794 cls::rbd::MirrorImageSiteStatus status
;
799 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
800 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
801 mirror_image_status_state
= status
.state
;
803 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
804 status
.description
= "starting replay";
807 case STATE_REPLAYING
:
808 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
811 auto on_req_finish
= new LambdaContext(
812 [this, force
](int r
) {
813 dout(15) << "replay status ready: r=" << r
<< dendl
;
815 set_mirror_image_status_update(force
, boost::none
);
816 } else if (r
== -EAGAIN
) {
817 m_in_flight_op_tracker
.finish_op();
821 ceph_assert(m_replayer
!= nullptr);
822 if (!m_replayer
->get_replay_status(&desc
, on_req_finish
)) {
823 dout(15) << "waiting for replay status" << dendl
;
827 status
.description
= "replaying, " + desc
;
828 mirror_image_status_state
= boost::make_optional(
829 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
833 if (stopping_replay
) {
834 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
835 status
.description
= state_desc
.empty() ? "stopping replay" : state_desc
;
840 if (last_r
== -EREMOTEIO
) {
841 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
842 status
.description
= state_desc
;
843 mirror_image_status_state
= status
.state
;
844 } else if (last_r
< 0 && last_r
!= -ECANCELED
) {
845 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
846 status
.description
= state_desc
;
847 mirror_image_status_state
= status
.state
;
849 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
850 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
851 mirror_image_status_state
= boost::none
;
855 ceph_assert(!"invalid state");
859 std::lock_guard locker
{m_lock
};
860 m_mirror_image_status_state
= mirror_image_status_state
;
863 // prevent the status from ping-ponging when failed replays are restarted
864 if (mirror_image_status_state
&&
865 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
866 status
.state
= *mirror_image_status_state
;
869 dout(15) << "status=" << status
<< dendl
;
870 m_local_status_updater
->set_mirror_image_status(m_global_image_id
, status
,
872 if (m_remote_image_peer
.mirror_status_updater
!= nullptr) {
873 m_remote_image_peer
.mirror_status_updater
->set_mirror_image_status(
874 m_global_image_id
, status
, force
);
877 m_in_flight_op_tracker
.finish_op();
880 template <typename I
>
881 void ImageReplayer
<I
>::shut_down(int r
) {
882 dout(10) << "r=" << r
<< dendl
;
885 std::lock_guard locker
{m_lock
};
886 ceph_assert(m_state
== STATE_STOPPING
);
889 if (!m_in_flight_op_tracker
.empty()) {
890 dout(15) << "waiting for in-flight operations to complete" << dendl
;
891 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
897 // chain the shut down sequence (reverse order)
898 Context
*ctx
= new LambdaContext(
900 update_mirror_image_status(true, STATE_STOPPED
);
904 // destruct the state builder
905 if (m_state_builder
!= nullptr) {
906 ctx
= new LambdaContext([this, ctx
](int r
) {
907 m_state_builder
->close(ctx
);
911 // close the replayer
912 if (m_replayer
!= nullptr) {
913 ctx
= new LambdaContext([this, ctx
](int r
) {
914 m_replayer
->destroy();
915 m_replayer
= nullptr;
918 ctx
= new LambdaContext([this, ctx
](int r
) {
919 m_replayer
->shut_down(ctx
);
923 m_threads
->work_queue
->queue(ctx
, 0);
926 template <typename I
>
927 void ImageReplayer
<I
>::handle_shut_down(int r
) {
928 bool resync_requested
= false;
929 bool delete_requested
= false;
930 bool unregister_asok_hook
= false;
932 std::lock_guard locker
{m_lock
};
934 if (m_delete_requested
&& m_state_builder
!= nullptr &&
935 !m_state_builder
->local_image_id
.empty()) {
936 ceph_assert(m_state_builder
->remote_image_id
.empty());
937 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
938 unregister_asok_hook
= true;
939 std::swap(delete_requested
, m_delete_requested
);
940 m_delete_in_progress
= true;
943 std::swap(resync_requested
, m_resync_requested
);
944 if (!delete_requested
&& !resync_requested
&& m_last_r
== -ENOENT
&&
945 ((m_state_builder
== nullptr) ||
946 (m_state_builder
->local_image_id
.empty() &&
947 m_state_builder
->remote_image_id
.empty()))) {
948 dout(0) << "mirror image no longer exists" << dendl
;
949 unregister_asok_hook
= true;
954 if (unregister_asok_hook
) {
955 unregister_admin_socket_hook();
958 if (delete_requested
|| resync_requested
) {
959 dout(5) << "moving image to trash" << dendl
;
960 auto ctx
= new LambdaContext([this, r
](int) {
963 ImageDeleter
<I
>::trash_move(m_local_io_ctx
, m_global_image_id
,
964 resync_requested
, m_threads
->work_queue
, ctx
);
968 if (!m_in_flight_op_tracker
.empty()) {
969 dout(15) << "waiting for in-flight operations to complete" << dendl
;
970 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
976 if (!m_status_removed
) {
977 auto ctx
= new LambdaContext([this, r
](int) {
978 m_status_removed
= true;
981 remove_image_status(m_delete_in_progress
, ctx
);
985 if (m_state_builder
!= nullptr) {
986 m_state_builder
->destroy();
987 m_state_builder
= nullptr;
990 dout(10) << "stop complete" << dendl
;
991 Context
*on_start
= nullptr;
992 Contexts on_stop_contexts
;
994 std::lock_guard locker
{m_lock
};
995 std::swap(on_start
, m_on_start_finish
);
996 on_stop_contexts
= std::move(m_on_stop_contexts
);
997 m_stop_requested
= false;
998 ceph_assert(m_state
== STATE_STOPPING
);
999 m_state
= STATE_STOPPED
;
1002 if (on_start
!= nullptr) {
1003 dout(10) << "on start finish complete, r=" << r
<< dendl
;
1004 on_start
->complete(r
);
1007 for (auto ctx
: on_stop_contexts
) {
1008 dout(10) << "on stop finish " << ctx
<< " complete, r=" << r
<< dendl
;
1013 template <typename I
>
1014 void ImageReplayer
<I
>::handle_replayer_notification() {
1017 std::unique_lock locker
{m_lock
};
1018 if (m_state
!= STATE_REPLAYING
) {
1019 // might be attempting to shut down
1024 // detect a rename of the local image
1025 ceph_assert(m_state_builder
!= nullptr &&
1026 m_state_builder
->local_image_ctx
!= nullptr);
1027 std::shared_lock image_locker
{m_state_builder
->local_image_ctx
->image_lock
};
1028 if (m_local_image_name
!= m_state_builder
->local_image_ctx
->name
) {
1029 // will re-register with new name after next status update
1030 dout(10) << "image renamed" << dendl
;
1031 m_local_image_name
= m_state_builder
->local_image_ctx
->name
;
1035 // replayer cannot be shut down while notification is in-flight
1036 ceph_assert(m_replayer
!= nullptr);
1039 if (m_replayer
->is_resync_requested()) {
1040 dout(10) << "resync requested" << dendl
;
1041 m_resync_requested
= true;
1042 on_stop_journal_replay(0, "resync requested");
1046 if (!m_replayer
->is_replaying()) {
1047 auto error_code
= m_replayer
->get_error_code();
1048 auto error_description
= m_replayer
->get_error_description();
1049 dout(10) << "replay interrupted: "
1050 << "r=" << error_code
<< ", "
1051 << "error=" << error_description
<< dendl
;
1052 on_stop_journal_replay(error_code
, error_description
);
1056 update_mirror_image_status(false, {});
1059 template <typename I
>
1060 std::string ImageReplayer
<I
>::to_string(const State state
) {
1062 case ImageReplayer
<I
>::STATE_STARTING
:
1064 case ImageReplayer
<I
>::STATE_REPLAYING
:
1066 case ImageReplayer
<I
>::STATE_STOPPING
:
1068 case ImageReplayer
<I
>::STATE_STOPPED
:
1073 return "Unknown(" + stringify(state
) + ")";
1076 template <typename I
>
1077 void ImageReplayer
<I
>::register_admin_socket_hook() {
1078 ImageReplayerAdminSocketHook
<I
> *asok_hook
;
1080 std::lock_guard locker
{m_lock
};
1081 if (m_asok_hook
!= nullptr) {
1085 dout(15) << "registered asok hook: " << m_image_spec
<< dendl
;
1086 asok_hook
= new ImageReplayerAdminSocketHook
<I
>(
1087 g_ceph_context
, m_image_spec
, this);
1088 int r
= asok_hook
->register_commands();
1090 m_asok_hook
= asok_hook
;
1093 derr
<< "error registering admin socket commands" << dendl
;
1098 template <typename I
>
1099 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1102 AdminSocketHook
*asok_hook
= nullptr;
1104 std::lock_guard locker
{m_lock
};
1105 std::swap(asok_hook
, m_asok_hook
);
1110 template <typename I
>
1111 void ImageReplayer
<I
>::reregister_admin_socket_hook() {
1112 std::unique_lock locker
{m_lock
};
1113 if (m_state
== STATE_STARTING
&& m_bootstrap_request
!= nullptr) {
1114 m_local_image_name
= m_bootstrap_request
->get_local_image_name();
1117 auto image_spec
= image_replayer::util::compute_image_spec(
1118 m_local_io_ctx
, m_local_image_name
);
1119 if (m_asok_hook
!= nullptr && m_image_spec
== image_spec
) {
1123 dout(15) << "old_image_spec=" << m_image_spec
<< ", "
1124 << "new_image_spec=" << image_spec
<< dendl
;
1125 m_image_spec
= image_spec
;
1127 if (m_state
== STATE_STOPPING
|| m_state
== STATE_STOPPED
) {
1128 // no need to re-register if stopping
1133 unregister_admin_socket_hook();
1134 register_admin_socket_hook();
1137 template <typename I
>
1138 void ImageReplayer
<I
>::remove_image_status(bool force
, Context
*on_finish
)
1140 auto ctx
= new LambdaContext([this, force
, on_finish
](int) {
1141 remove_image_status_remote(force
, on_finish
);
1144 if (m_local_status_updater
->exists(m_global_image_id
)) {
1145 dout(15) << "removing local mirror image status" << dendl
;
1147 m_local_status_updater
->remove_mirror_image_status(
1148 m_global_image_id
, true, ctx
);
1150 m_local_status_updater
->remove_refresh_mirror_image_status(
1151 m_global_image_id
, ctx
);
1159 template <typename I
>
1160 void ImageReplayer
<I
>::remove_image_status_remote(bool force
, Context
*on_finish
)
1162 if (m_remote_image_peer
.mirror_status_updater
!= nullptr &&
1163 m_remote_image_peer
.mirror_status_updater
->exists(m_global_image_id
)) {
1164 dout(15) << "removing remote mirror image status" << dendl
;
1166 m_remote_image_peer
.mirror_status_updater
->remove_mirror_image_status(
1167 m_global_image_id
, true, on_finish
);
1169 m_remote_image_peer
.mirror_status_updater
->remove_refresh_mirror_image_status(
1170 m_global_image_id
, on_finish
);
1175 on_finish
->complete(0);
1179 template <typename I
>
1180 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1182 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1183 << "/" << replayer
.get_global_image_id() << "]";
1187 } // namespace mirror
1190 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;