1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ImageState.h"
17 #include "librbd/Journal.h"
18 #include "librbd/Operations.h"
19 #include "librbd/Utils.h"
20 #include "librbd/asio/ContextWQ.h"
21 #include "ImageDeleter.h"
22 #include "ImageReplayer.h"
23 #include "MirrorStatusUpdater.h"
25 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
26 #include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
27 #include "tools/rbd_mirror/image_replayer/StateBuilder.h"
28 #include "tools/rbd_mirror/image_replayer/Utils.h"
29 #include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
30 #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
42 using librbd::util::create_context_callback
;
45 std::ostream
&operator<<(std::ostream
&os
,
46 const typename ImageReplayer
<I
>::State
&state
);
51 class ImageReplayerAdminSocketCommand
{
53 ImageReplayerAdminSocketCommand(const std::string
&desc
,
54 ImageReplayer
<I
> *replayer
)
55 : desc(desc
), replayer(replayer
) {
57 virtual ~ImageReplayerAdminSocketCommand() {}
58 virtual int call(Formatter
*f
) = 0;
61 ImageReplayer
<I
> *replayer
;
62 bool registered
= false;
66 class StatusCommand
: public ImageReplayerAdminSocketCommand
<I
> {
68 explicit StatusCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
69 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
72 int call(Formatter
*f
) override
{
73 this->replayer
->print_status(f
);
79 class StartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
81 explicit StartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
82 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
85 int call(Formatter
*f
) override
{
86 this->replayer
->start(nullptr, true);
92 class StopCommand
: public ImageReplayerAdminSocketCommand
<I
> {
94 explicit StopCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
95 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
98 int call(Formatter
*f
) override
{
99 this->replayer
->stop(nullptr, true);
104 template <typename I
>
105 class RestartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
107 explicit RestartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
108 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
111 int call(Formatter
*f
) override
{
112 this->replayer
->restart();
117 template <typename I
>
118 class FlushCommand
: public ImageReplayerAdminSocketCommand
<I
> {
120 explicit FlushCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
121 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
124 int call(Formatter
*f
) override
{
125 this->replayer
->flush();
130 template <typename I
>
131 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
133 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
134 ImageReplayer
<I
> *replayer
)
135 : admin_socket(cct
->get_admin_socket()),
136 commands
{{"rbd mirror flush " + name
,
137 new FlushCommand
<I
>("flush rbd mirror " + name
, replayer
)},
138 {"rbd mirror restart " + name
,
139 new RestartCommand
<I
>("restart rbd mirror " + name
, replayer
)},
140 {"rbd mirror start " + name
,
141 new StartCommand
<I
>("start rbd mirror " + name
, replayer
)},
142 {"rbd mirror status " + name
,
143 new StatusCommand
<I
>("get status for rbd mirror " + name
, replayer
)},
144 {"rbd mirror stop " + name
,
145 new StopCommand
<I
>("stop rbd mirror " + name
, replayer
)}} {
148 int register_commands() {
149 for (auto &it
: commands
) {
150 int r
= admin_socket
->register_command(it
.first
, this,
155 it
.second
->registered
= true;
160 ~ImageReplayerAdminSocketHook() override
{
161 admin_socket
->unregister_commands(this);
162 for (auto &it
: commands
) {
168 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
172 bufferlist
& out
) override
{
173 auto i
= commands
.find(command
);
174 ceph_assert(i
!= commands
.end());
175 return i
->second
->call(f
);
179 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
<I
>*,
180 std::less
<>> Commands
;
182 AdminSocket
*admin_socket
;
186 } // anonymous namespace
188 template <typename I
>
189 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
190 const std::string
&description
, bool flush
)
192 const std::string desc
= "bootstrapping, " + description
;
193 replayer
->set_state_description(0, desc
);
195 replayer
->update_mirror_image_status(false, boost::none
);
199 template <typename I
>
200 struct ImageReplayer
<I
>::ReplayerListener
201 : public image_replayer::ReplayerListener
{
202 ImageReplayer
<I
>* image_replayer
;
204 ReplayerListener(ImageReplayer
<I
>* image_replayer
)
205 : image_replayer(image_replayer
) {
208 void handle_notification() override
{
209 image_replayer
->handle_replayer_notification();
213 template <typename I
>
214 ImageReplayer
<I
>::ImageReplayer(
215 librados::IoCtx
&local_io_ctx
, const std::string
&local_mirror_uuid
,
216 const std::string
&global_image_id
, Threads
<I
> *threads
,
217 InstanceWatcher
<I
> *instance_watcher
,
218 MirrorStatusUpdater
<I
>* local_status_updater
,
219 journal::CacheManagerHandler
*cache_manager_handler
,
220 PoolMetaCache
* pool_meta_cache
) :
221 m_local_io_ctx(local_io_ctx
), m_local_mirror_uuid(local_mirror_uuid
),
222 m_global_image_id(global_image_id
), m_threads(threads
),
223 m_instance_watcher(instance_watcher
),
224 m_local_status_updater(local_status_updater
),
225 m_cache_manager_handler(cache_manager_handler
),
226 m_pool_meta_cache(pool_meta_cache
),
227 m_local_image_name(global_image_id
),
228 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
229 stringify(local_io_ctx
.get_id()) + " " + global_image_id
)),
230 m_progress_cxt(this),
231 m_replayer_listener(new ReplayerListener(this))
233 // Register asok commands using a temporary "remote_pool_name/global_image_id"
234 // name. When the image name becomes known on start the asok commands will be
235 // re-registered using "remote_pool_name/remote_image_name" name.
237 m_image_spec
= image_replayer::util::compute_image_spec(
238 local_io_ctx
, global_image_id
);
239 register_admin_socket_hook();
242 template <typename I
>
243 ImageReplayer
<I
>::~ImageReplayer()
245 unregister_admin_socket_hook();
246 ceph_assert(m_state_builder
== nullptr);
247 ceph_assert(m_on_start_finish
== nullptr);
248 ceph_assert(m_on_stop_contexts
.empty());
249 ceph_assert(m_bootstrap_request
== nullptr);
250 ceph_assert(m_update_status_task
== nullptr);
251 delete m_replayer_listener
;
254 template <typename I
>
255 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
256 std::lock_guard locker
{m_lock
};
258 if (!m_mirror_image_status_state
) {
259 return image_replayer::HEALTH_STATE_OK
;
260 } else if (*m_mirror_image_status_state
==
261 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
262 *m_mirror_image_status_state
==
263 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
264 return image_replayer::HEALTH_STATE_WARNING
;
266 return image_replayer::HEALTH_STATE_ERROR
;
269 template <typename I
>
270 void ImageReplayer
<I
>::add_peer(const Peer
<I
>& peer
) {
271 dout(10) << "peer=" << peer
<< dendl
;
273 std::lock_guard locker
{m_lock
};
274 auto it
= m_peers
.find(peer
);
275 if (it
== m_peers
.end()) {
276 m_peers
.insert(peer
);
280 template <typename I
>
281 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
282 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
284 std::lock_guard l
{m_lock
};
289 template <typename I
>
290 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
, bool restart
)
292 dout(10) << "on_finish=" << on_finish
<< dendl
;
296 std::lock_guard locker
{m_lock
};
297 if (!is_stopped_()) {
298 derr
<< "already running" << dendl
;
300 } else if (m_manual_stop
&& !manual
) {
301 dout(5) << "stopped manually, ignoring start without manual flag"
304 } else if (restart
&& !m_restart_requested
) {
305 dout(10) << "canceled restart" << dendl
;
308 m_state
= STATE_STARTING
;
310 m_state_desc
.clear();
311 m_manual_stop
= false;
312 m_delete_requested
= false;
313 m_restart_requested
= false;
314 m_status_removed
= false;
316 if (on_finish
!= nullptr) {
317 ceph_assert(m_on_start_finish
== nullptr);
318 m_on_start_finish
= on_finish
;
320 ceph_assert(m_on_stop_contexts
.empty());
326 on_finish
->complete(r
);
334 template <typename I
>
335 void ImageReplayer
<I
>::bootstrap() {
338 std::unique_lock locker
{m_lock
};
339 if (m_peers
.empty()) {
342 dout(5) << "no peer clusters" << dendl
;
343 on_start_fail(-ENOENT
, "no peer clusters");
347 // TODO need to support multiple remote images
348 ceph_assert(!m_peers
.empty());
349 m_remote_image_peer
= *m_peers
.begin();
351 ceph_assert(m_state_builder
== nullptr);
352 auto ctx
= create_context_callback
<
353 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
354 auto request
= image_replayer::BootstrapRequest
<I
>::create(
355 m_threads
, m_local_io_ctx
, m_remote_image_peer
.io_ctx
, m_instance_watcher
,
356 m_global_image_id
, m_local_mirror_uuid
,
357 m_remote_image_peer
.remote_pool_meta
, m_cache_manager_handler
,
358 m_pool_meta_cache
, &m_progress_cxt
, &m_state_builder
, &m_resync_requested
,
362 m_bootstrap_request
= request
;
364 // proceed even if stop was requested to allow for m_delete_requested
365 // to get set; cancel() would prevent BootstrapRequest from going into
367 if (m_stop_requested
) {
372 update_mirror_image_status(false, boost::none
);
376 template <typename I
>
377 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
378 dout(10) << "r=" << r
<< dendl
;
380 std::lock_guard locker
{m_lock
};
381 m_bootstrap_request
->put();
382 m_bootstrap_request
= nullptr;
385 // set m_delete_requested early to ensure that in case remote
386 // image no longer exists local image gets deleted even if start
389 dout(5) << "remote image no longer exists" << dendl
;
390 m_delete_requested
= true;
393 if (on_start_interrupted()) {
395 } else if (r
== -ENOMSG
) {
396 dout(5) << "local image is primary" << dendl
;
397 on_start_fail(0, "local image is primary");
399 } else if (r
== -EREMOTEIO
) {
400 dout(5) << "remote image is not primary" << dendl
;
401 on_start_fail(-EREMOTEIO
, "remote image is not primary");
403 } else if (r
== -EEXIST
) {
404 on_start_fail(r
, "split-brain detected");
406 } else if (r
== -ENOLINK
) {
407 on_start_fail(0, "remote image no longer exists");
409 } else if (r
== -ERESTART
) {
410 on_start_fail(r
, "image in transient state, try again");
413 on_start_fail(r
, "error bootstrapping replay");
415 } else if (m_resync_requested
) {
416 on_start_fail(0, "resync requested");
423 template <typename I
>
424 void ImageReplayer
<I
>::start_replay() {
427 std::unique_lock locker
{m_lock
};
428 ceph_assert(m_replayer
== nullptr);
429 m_replayer
= m_state_builder
->create_replayer(m_threads
, m_instance_watcher
,
432 m_replayer_listener
);
434 auto ctx
= create_context_callback
<
435 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_start_replay
>(this);
436 m_replayer
->init(ctx
);
439 template <typename I
>
440 void ImageReplayer
<I
>::handle_start_replay(int r
) {
441 dout(10) << "r=" << r
<< dendl
;
443 if (on_start_interrupted()) {
446 std::string error_description
= m_replayer
->get_error_description();
447 if (r
== -ENOTCONN
&& m_replayer
->is_resync_requested()) {
448 std::unique_lock locker
{m_lock
};
449 m_resync_requested
= true;
452 // shut down not required if init failed
453 m_replayer
->destroy();
454 m_replayer
= nullptr;
456 derr
<< "error starting replay: " << cpp_strerror(r
) << dendl
;
457 on_start_fail(r
, error_description
);
461 Context
*on_finish
= nullptr;
463 std::unique_lock locker
{m_lock
};
464 ceph_assert(m_state
== STATE_STARTING
);
465 m_state
= STATE_REPLAYING
;
466 std::swap(m_on_start_finish
, on_finish
);
468 std::unique_lock timer_locker
{m_threads
->timer_lock
};
469 schedule_update_mirror_image_replay_status();
472 update_mirror_image_status(true, boost::none
);
473 if (on_replay_interrupted()) {
474 if (on_finish
!= nullptr) {
475 on_finish
->complete(r
);
480 dout(10) << "start succeeded" << dendl
;
481 if (on_finish
!= nullptr) {
482 dout(10) << "on finish complete, r=" << r
<< dendl
;
483 on_finish
->complete(r
);
487 template <typename I
>
488 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
490 dout(10) << "r=" << r
<< ", desc=" << desc
<< dendl
;
491 Context
*ctx
= new LambdaContext([this, r
, desc
](int _r
) {
493 std::lock_guard locker
{m_lock
};
494 ceph_assert(m_state
== STATE_STARTING
);
495 m_state
= STATE_STOPPING
;
496 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
497 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
499 dout(10) << "start canceled" << dendl
;
503 set_state_description(r
, desc
);
504 update_mirror_image_status(false, boost::none
);
507 m_threads
->work_queue
->queue(ctx
, 0);
510 template <typename I
>
511 bool ImageReplayer
<I
>::on_start_interrupted() {
512 std::lock_guard locker
{m_lock
};
513 return on_start_interrupted(m_lock
);
516 template <typename I
>
517 bool ImageReplayer
<I
>::on_start_interrupted(ceph::mutex
& lock
) {
518 ceph_assert(ceph_mutex_is_locked(m_lock
));
519 ceph_assert(m_state
== STATE_STARTING
);
520 if (!m_stop_requested
) {
524 on_start_fail(-ECANCELED
, "");
528 template <typename I
>
529 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, bool restart
)
531 dout(10) << "on_finish=" << on_finish
<< ", manual=" << manual
532 << ", restart=" << restart
<< dendl
;
534 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
535 bool shut_down_replay
= false;
536 bool is_stopped
= false;
538 std::lock_guard locker
{m_lock
};
540 if (!is_running_()) {
541 if (manual
&& !m_manual_stop
) {
542 dout(10) << "marking manual" << dendl
;
543 m_manual_stop
= true;
545 if (!restart
&& m_restart_requested
) {
546 dout(10) << "canceling restart" << dendl
;
547 m_restart_requested
= false;
550 dout(10) << "already stopped" << dendl
;
553 dout(10) << "joining in-flight stop" << dendl
;
554 if (on_finish
!= nullptr) {
555 m_on_stop_contexts
.push_back(on_finish
);
559 if (m_state
== STATE_STARTING
) {
560 dout(10) << "canceling start" << dendl
;
561 if (m_bootstrap_request
!= nullptr) {
562 bootstrap_request
= m_bootstrap_request
;
563 bootstrap_request
->get();
566 dout(10) << "interrupting replay" << dendl
;
567 shut_down_replay
= true;
570 ceph_assert(m_on_stop_contexts
.empty());
571 if (on_finish
!= nullptr) {
572 m_on_stop_contexts
.push_back(on_finish
);
574 m_stop_requested
= true;
575 m_manual_stop
= manual
;
581 on_finish
->complete(-EINVAL
);
586 // avoid holding lock since bootstrap request will update status
587 if (bootstrap_request
!= nullptr) {
588 dout(10) << "canceling bootstrap" << dendl
;
589 bootstrap_request
->cancel();
590 bootstrap_request
->put();
593 if (shut_down_replay
) {
594 on_stop_journal_replay();
598 template <typename I
>
599 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
604 std::lock_guard locker
{m_lock
};
605 if (m_state
!= STATE_REPLAYING
) {
606 // might be invoked multiple times while stopping
610 m_stop_requested
= true;
611 m_state
= STATE_STOPPING
;
614 cancel_update_mirror_image_replay_status();
615 set_state_description(r
, desc
);
616 update_mirror_image_status(true, boost::none
);
620 template <typename I
>
621 void ImageReplayer
<I
>::restart(Context
*on_finish
)
624 std::lock_guard locker
{m_lock
};
625 m_restart_requested
= true;
628 auto ctx
= new LambdaContext(
629 [this, on_finish
](int r
) {
633 start(on_finish
, true, true);
635 stop(ctx
, false, true);
638 template <typename I
>
639 void ImageReplayer
<I
>::flush()
644 std::unique_lock locker
{m_lock
};
645 if (m_state
!= STATE_REPLAYING
) {
650 ceph_assert(m_replayer
!= nullptr);
651 m_replayer
->flush(&ctx
);
656 update_mirror_image_status(false, boost::none
);
660 template <typename I
>
661 bool ImageReplayer
<I
>::on_replay_interrupted()
665 std::lock_guard locker
{m_lock
};
666 shut_down
= m_stop_requested
;
670 on_stop_journal_replay();
675 template <typename I
>
676 void ImageReplayer
<I
>::print_status(Formatter
*f
)
680 std::lock_guard l
{m_lock
};
682 f
->open_object_section("image_replayer");
683 f
->dump_string("name", m_image_spec
);
684 f
->dump_string("state", to_string(m_state
));
688 template <typename I
>
689 void ImageReplayer
<I
>::schedule_update_mirror_image_replay_status() {
690 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
691 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
692 if (m_state
!= STATE_REPLAYING
) {
698 // periodically update the replaying status even if nothing changes
699 // so that we can adjust our performance stats
700 ceph_assert(m_update_status_task
== nullptr);
701 m_update_status_task
= create_context_callback
<
703 &ImageReplayer
<I
>::handle_update_mirror_image_replay_status
>(this);
704 m_threads
->timer
->add_event_after(10, m_update_status_task
);
707 template <typename I
>
708 void ImageReplayer
<I
>::handle_update_mirror_image_replay_status(int r
) {
711 ceph_assert(ceph_mutex_is_locked_by_me(m_threads
->timer_lock
));
713 ceph_assert(m_update_status_task
!= nullptr);
714 m_update_status_task
= nullptr;
716 auto ctx
= new LambdaContext([this](int) {
717 update_mirror_image_status(false, boost::none
);
719 std::unique_lock locker
{m_lock
};
720 std::unique_lock timer_locker
{m_threads
->timer_lock
};
722 schedule_update_mirror_image_replay_status();
723 m_in_flight_op_tracker
.finish_op();
726 m_in_flight_op_tracker
.start_op();
727 m_threads
->work_queue
->queue(ctx
, 0);
730 template <typename I
>
731 void ImageReplayer
<I
>::cancel_update_mirror_image_replay_status() {
732 std::unique_lock timer_locker
{m_threads
->timer_lock
};
733 if (m_update_status_task
!= nullptr) {
736 if (m_threads
->timer
->cancel_event(m_update_status_task
)) {
737 m_update_status_task
= nullptr;
742 template <typename I
>
743 void ImageReplayer
<I
>::update_mirror_image_status(
744 bool force
, const OptionalState
&opt_state
) {
745 dout(15) << "force=" << force
<< ", "
746 << "state=" << opt_state
<< dendl
;
749 std::lock_guard locker
{m_lock
};
750 if (!force
&& !is_stopped_() && !is_running_()) {
751 dout(15) << "shut down in-progress: ignoring update" << dendl
;
756 m_in_flight_op_tracker
.start_op();
757 auto ctx
= new LambdaContext(
758 [this, force
, opt_state
](int r
) {
759 set_mirror_image_status_update(force
, opt_state
);
761 m_threads
->work_queue
->queue(ctx
, 0);
764 template <typename I
>
765 void ImageReplayer
<I
>::set_mirror_image_status_update(
766 bool force
, const OptionalState
&opt_state
) {
767 dout(15) << "force=" << force
<< ", "
768 << "state=" << opt_state
<< dendl
;
770 reregister_admin_socket_hook();
773 std::string state_desc
;
775 bool stopping_replay
;
777 auto mirror_image_status_state
= boost::make_optional(
778 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
779 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
781 std::lock_guard locker
{m_lock
};
783 state_desc
= m_state_desc
;
784 mirror_image_status_state
= m_mirror_image_status_state
;
786 stopping_replay
= (m_replayer
!= nullptr);
788 if (m_bootstrap_request
!= nullptr) {
789 bootstrap_request
= m_bootstrap_request
;
790 bootstrap_request
->get();
794 bool syncing
= false;
795 if (bootstrap_request
!= nullptr) {
796 syncing
= bootstrap_request
->is_syncing();
797 bootstrap_request
->put();
798 bootstrap_request
= nullptr;
805 cls::rbd::MirrorImageSiteStatus status
;
810 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
811 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
812 mirror_image_status_state
= status
.state
;
814 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
815 status
.description
= "starting replay";
818 case STATE_REPLAYING
:
819 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
822 auto on_req_finish
= new LambdaContext(
823 [this, force
](int r
) {
824 dout(15) << "replay status ready: r=" << r
<< dendl
;
826 set_mirror_image_status_update(force
, boost::none
);
827 } else if (r
== -EAGAIN
) {
828 m_in_flight_op_tracker
.finish_op();
832 ceph_assert(m_replayer
!= nullptr);
833 if (!m_replayer
->get_replay_status(&desc
, on_req_finish
)) {
834 dout(15) << "waiting for replay status" << dendl
;
838 status
.description
= "replaying, " + desc
;
839 mirror_image_status_state
= boost::make_optional(
840 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
844 if (stopping_replay
) {
845 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
846 status
.description
= state_desc
.empty() ? "stopping replay" : state_desc
;
851 if (last_r
== -EREMOTEIO
) {
852 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
853 status
.description
= state_desc
;
854 mirror_image_status_state
= status
.state
;
855 } else if (last_r
< 0 && last_r
!= -ECANCELED
) {
856 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
857 status
.description
= state_desc
;
858 mirror_image_status_state
= status
.state
;
860 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
861 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
862 mirror_image_status_state
= boost::none
;
866 ceph_assert(!"invalid state");
870 std::lock_guard locker
{m_lock
};
871 m_mirror_image_status_state
= mirror_image_status_state
;
874 // prevent the status from ping-ponging when failed replays are restarted
875 if (mirror_image_status_state
&&
876 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
877 status
.state
= *mirror_image_status_state
;
880 dout(15) << "status=" << status
<< dendl
;
881 m_local_status_updater
->set_mirror_image_status(m_global_image_id
, status
,
883 if (m_remote_image_peer
.mirror_status_updater
!= nullptr) {
884 m_remote_image_peer
.mirror_status_updater
->set_mirror_image_status(
885 m_global_image_id
, status
, force
);
888 m_in_flight_op_tracker
.finish_op();
891 template <typename I
>
892 void ImageReplayer
<I
>::shut_down(int r
) {
893 dout(10) << "r=" << r
<< dendl
;
896 std::lock_guard locker
{m_lock
};
897 ceph_assert(m_state
== STATE_STOPPING
);
900 if (!m_in_flight_op_tracker
.empty()) {
901 dout(15) << "waiting for in-flight operations to complete" << dendl
;
902 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
908 // chain the shut down sequence (reverse order)
909 Context
*ctx
= new LambdaContext(
911 update_mirror_image_status(true, STATE_STOPPED
);
915 // destruct the state builder
916 if (m_state_builder
!= nullptr) {
917 ctx
= new LambdaContext([this, ctx
](int r
) {
918 m_state_builder
->close(ctx
);
922 // close the replayer
923 if (m_replayer
!= nullptr) {
924 ctx
= new LambdaContext([this, ctx
](int r
) {
925 m_replayer
->destroy();
926 m_replayer
= nullptr;
929 ctx
= new LambdaContext([this, ctx
](int r
) {
930 m_replayer
->shut_down(ctx
);
934 m_threads
->work_queue
->queue(ctx
, 0);
937 template <typename I
>
938 void ImageReplayer
<I
>::handle_shut_down(int r
) {
939 bool resync_requested
= false;
940 bool delete_requested
= false;
941 bool unregister_asok_hook
= false;
943 std::lock_guard locker
{m_lock
};
945 if (m_delete_requested
&& m_state_builder
!= nullptr &&
946 !m_state_builder
->local_image_id
.empty()) {
947 ceph_assert(m_state_builder
->remote_image_id
.empty());
948 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
949 unregister_asok_hook
= true;
950 std::swap(delete_requested
, m_delete_requested
);
951 m_delete_in_progress
= true;
954 std::swap(resync_requested
, m_resync_requested
);
955 if (!delete_requested
&& !resync_requested
&& m_last_r
== -ENOENT
&&
956 ((m_state_builder
== nullptr) ||
957 (m_state_builder
->local_image_id
.empty() &&
958 m_state_builder
->remote_image_id
.empty()))) {
959 dout(0) << "mirror image no longer exists" << dendl
;
960 unregister_asok_hook
= true;
965 if (unregister_asok_hook
) {
966 unregister_admin_socket_hook();
969 if (delete_requested
|| resync_requested
) {
970 dout(5) << "moving image to trash" << dendl
;
971 auto ctx
= new LambdaContext([this, r
](int) {
974 ImageDeleter
<I
>::trash_move(m_local_io_ctx
, m_global_image_id
,
975 resync_requested
, m_threads
->work_queue
, ctx
);
979 if (!m_in_flight_op_tracker
.empty()) {
980 dout(15) << "waiting for in-flight operations to complete" << dendl
;
981 m_in_flight_op_tracker
.wait_for_ops(new LambdaContext([this, r
](int) {
987 if (!m_status_removed
) {
988 auto ctx
= new LambdaContext([this, r
](int) {
989 m_status_removed
= true;
992 remove_image_status(m_delete_in_progress
, ctx
);
996 if (m_state_builder
!= nullptr) {
997 m_state_builder
->destroy();
998 m_state_builder
= nullptr;
1001 dout(10) << "stop complete" << dendl
;
1002 Context
*on_start
= nullptr;
1003 Contexts on_stop_contexts
;
1005 std::lock_guard locker
{m_lock
};
1006 std::swap(on_start
, m_on_start_finish
);
1007 on_stop_contexts
= std::move(m_on_stop_contexts
);
1008 m_stop_requested
= false;
1009 ceph_assert(m_state
== STATE_STOPPING
);
1010 m_state
= STATE_STOPPED
;
1013 if (on_start
!= nullptr) {
1014 dout(10) << "on start finish complete, r=" << r
<< dendl
;
1015 on_start
->complete(r
);
1018 for (auto ctx
: on_stop_contexts
) {
1019 dout(10) << "on stop finish " << ctx
<< " complete, r=" << r
<< dendl
;
1024 template <typename I
>
1025 void ImageReplayer
<I
>::handle_replayer_notification() {
1028 std::unique_lock locker
{m_lock
};
1029 if (m_state
!= STATE_REPLAYING
) {
1030 // might be attempting to shut down
1035 // detect a rename of the local image
1036 ceph_assert(m_state_builder
!= nullptr &&
1037 m_state_builder
->local_image_ctx
!= nullptr);
1038 std::shared_lock image_locker
{m_state_builder
->local_image_ctx
->image_lock
};
1039 if (m_local_image_name
!= m_state_builder
->local_image_ctx
->name
) {
1040 // will re-register with new name after next status update
1041 dout(10) << "image renamed" << dendl
;
1042 m_local_image_name
= m_state_builder
->local_image_ctx
->name
;
1046 // replayer cannot be shut down while notification is in-flight
1047 ceph_assert(m_replayer
!= nullptr);
1050 if (m_replayer
->is_resync_requested()) {
1051 dout(10) << "resync requested" << dendl
;
1052 m_resync_requested
= true;
1053 on_stop_journal_replay(0, "resync requested");
1057 if (!m_replayer
->is_replaying()) {
1058 auto error_code
= m_replayer
->get_error_code();
1059 auto error_description
= m_replayer
->get_error_description();
1060 dout(10) << "replay interrupted: "
1061 << "r=" << error_code
<< ", "
1062 << "error=" << error_description
<< dendl
;
1063 on_stop_journal_replay(error_code
, error_description
);
1067 update_mirror_image_status(false, {});
1070 template <typename I
>
1071 std::string ImageReplayer
<I
>::to_string(const State state
) {
1073 case ImageReplayer
<I
>::STATE_STARTING
:
1075 case ImageReplayer
<I
>::STATE_REPLAYING
:
1077 case ImageReplayer
<I
>::STATE_STOPPING
:
1079 case ImageReplayer
<I
>::STATE_STOPPED
:
1084 return "Unknown(" + stringify(state
) + ")";
1087 template <typename I
>
1088 void ImageReplayer
<I
>::register_admin_socket_hook() {
1089 ImageReplayerAdminSocketHook
<I
> *asok_hook
;
1091 std::lock_guard locker
{m_lock
};
1092 if (m_asok_hook
!= nullptr) {
1096 dout(15) << "registered asok hook: " << m_image_spec
<< dendl
;
1097 asok_hook
= new ImageReplayerAdminSocketHook
<I
>(
1098 g_ceph_context
, m_image_spec
, this);
1099 int r
= asok_hook
->register_commands();
1101 m_asok_hook
= asok_hook
;
1104 derr
<< "error registering admin socket commands" << dendl
;
1109 template <typename I
>
1110 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1113 AdminSocketHook
*asok_hook
= nullptr;
1115 std::lock_guard locker
{m_lock
};
1116 std::swap(asok_hook
, m_asok_hook
);
1121 template <typename I
>
1122 void ImageReplayer
<I
>::reregister_admin_socket_hook() {
1123 std::unique_lock locker
{m_lock
};
1124 if (m_state
== STATE_STARTING
&& m_bootstrap_request
!= nullptr) {
1125 m_local_image_name
= m_bootstrap_request
->get_local_image_name();
1128 auto image_spec
= image_replayer::util::compute_image_spec(
1129 m_local_io_ctx
, m_local_image_name
);
1130 if (m_asok_hook
!= nullptr && m_image_spec
== image_spec
) {
1134 dout(15) << "old_image_spec=" << m_image_spec
<< ", "
1135 << "new_image_spec=" << image_spec
<< dendl
;
1136 m_image_spec
= image_spec
;
1138 if (m_state
== STATE_STOPPING
|| m_state
== STATE_STOPPED
) {
1139 // no need to re-register if stopping
1144 unregister_admin_socket_hook();
1145 register_admin_socket_hook();
1148 template <typename I
>
1149 void ImageReplayer
<I
>::remove_image_status(bool force
, Context
*on_finish
)
1151 auto ctx
= new LambdaContext([this, force
, on_finish
](int) {
1152 remove_image_status_remote(force
, on_finish
);
1155 if (m_local_status_updater
->exists(m_global_image_id
)) {
1156 dout(15) << "removing local mirror image status" << dendl
;
1158 m_local_status_updater
->remove_mirror_image_status(
1159 m_global_image_id
, true, ctx
);
1161 m_local_status_updater
->remove_refresh_mirror_image_status(
1162 m_global_image_id
, ctx
);
1170 template <typename I
>
1171 void ImageReplayer
<I
>::remove_image_status_remote(bool force
, Context
*on_finish
)
1173 if (m_remote_image_peer
.mirror_status_updater
!= nullptr &&
1174 m_remote_image_peer
.mirror_status_updater
->exists(m_global_image_id
)) {
1175 dout(15) << "removing remote mirror image status" << dendl
;
1177 m_remote_image_peer
.mirror_status_updater
->remove_mirror_image_status(
1178 m_global_image_id
, true, on_finish
);
1180 m_remote_image_peer
.mirror_status_updater
->remove_refresh_mirror_image_status(
1181 m_global_image_id
, on_finish
);
1186 on_finish
->complete(0);
1190 template <typename I
>
1191 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1193 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1194 << "/" << replayer
.get_global_image_id() << "]";
1198 } // namespace mirror
1201 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;