1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "common/WorkQueue.h"
13 #include "global/global_context.h"
14 #include "journal/Journaler.h"
15 #include "journal/ReplayHandler.h"
16 #include "journal/Settings.h"
17 #include "librbd/ExclusiveLock.h"
18 #include "librbd/ImageCtx.h"
19 #include "librbd/ImageState.h"
20 #include "librbd/Journal.h"
21 #include "librbd/Operations.h"
22 #include "librbd/Utils.h"
23 #include "librbd/journal/Replay.h"
24 #include "ImageDeleter.h"
25 #include "ImageReplayer.h"
27 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
28 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
32 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_rbd_mirror
37 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
42 using std::unique_ptr
;
43 using std::shared_ptr
;
46 extern PerfCounters
*g_perf_counters
;
51 using librbd::util::create_context_callback
;
52 using librbd::util::create_rados_callback
;
53 using namespace rbd::mirror::image_replayer
;
56 std::ostream
&operator<<(std::ostream
&os
,
57 const typename ImageReplayer
<I
>::State
&state
);
62 struct ReplayHandler
: public ::journal::ReplayHandler
{
63 ImageReplayer
<I
> *replayer
;
64 ReplayHandler(ImageReplayer
<I
> *replayer
) : replayer(replayer
) {}
65 void get() override
{}
66 void put() override
{}
68 void handle_entries_available() override
{
69 replayer
->handle_replay_ready();
71 void handle_complete(int r
) override
{
74 ss
<< "replay completed with error: " << cpp_strerror(r
);
76 replayer
->handle_replay_complete(r
, ss
.str());
81 class ImageReplayerAdminSocketCommand
{
83 ImageReplayerAdminSocketCommand(const std::string
&desc
,
84 ImageReplayer
<I
> *replayer
)
85 : desc(desc
), replayer(replayer
) {
87 virtual ~ImageReplayerAdminSocketCommand() {}
88 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
91 ImageReplayer
<I
> *replayer
;
92 bool registered
= false;
96 class StatusCommand
: public ImageReplayerAdminSocketCommand
<I
> {
98 explicit StatusCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
99 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
102 bool call(Formatter
*f
, stringstream
*ss
) override
{
103 this->replayer
->print_status(f
, ss
);
108 template <typename I
>
109 class StartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
111 explicit StartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
112 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
115 bool call(Formatter
*f
, stringstream
*ss
) override
{
116 this->replayer
->start(nullptr, true);
121 template <typename I
>
122 class StopCommand
: public ImageReplayerAdminSocketCommand
<I
> {
124 explicit StopCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
125 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
128 bool call(Formatter
*f
, stringstream
*ss
) override
{
129 this->replayer
->stop(nullptr, true);
134 template <typename I
>
135 class RestartCommand
: public ImageReplayerAdminSocketCommand
<I
> {
137 explicit RestartCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
138 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
141 bool call(Formatter
*f
, stringstream
*ss
) override
{
142 this->replayer
->restart();
147 template <typename I
>
148 class FlushCommand
: public ImageReplayerAdminSocketCommand
<I
> {
150 explicit FlushCommand(const std::string
&desc
, ImageReplayer
<I
> *replayer
)
151 : ImageReplayerAdminSocketCommand
<I
>(desc
, replayer
) {
154 bool call(Formatter
*f
, stringstream
*ss
) override
{
155 this->replayer
->flush();
160 template <typename I
>
161 class ImageReplayerAdminSocketHook
: public AdminSocketHook
{
163 ImageReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
164 ImageReplayer
<I
> *replayer
)
165 : admin_socket(cct
->get_admin_socket()),
166 commands
{{"rbd mirror flush " + name
,
167 new FlushCommand
<I
>("flush rbd mirror " + name
, replayer
)},
168 {"rbd mirror restart " + name
,
169 new RestartCommand
<I
>("restart rbd mirror " + name
, replayer
)},
170 {"rbd mirror start " + name
,
171 new StartCommand
<I
>("start rbd mirror " + name
, replayer
)},
172 {"rbd mirror status " + name
,
173 new StatusCommand
<I
>("get status for rbd mirror " + name
, replayer
)},
174 {"rbd mirror stop " + name
,
175 new StopCommand
<I
>("stop rbd mirror " + name
, replayer
)}} {
178 int register_commands() {
179 for (auto &it
: commands
) {
180 int r
= admin_socket
->register_command(it
.first
, it
.first
, this,
185 it
.second
->registered
= true;
190 ~ImageReplayerAdminSocketHook() override
{
191 for (auto &it
: commands
) {
192 if (it
.second
->registered
) {
193 admin_socket
->unregister_command(it
.first
);
200 bool call(std::string_view command
, const cmdmap_t
& cmdmap
,
201 std::string_view format
, bufferlist
& out
) override
{
202 auto i
= commands
.find(command
);
203 ceph_assert(i
!= commands
.end());
204 Formatter
*f
= Formatter::create(format
);
206 bool r
= i
->second
->call(f
, &ss
);
213 typedef std::map
<std::string
, ImageReplayerAdminSocketCommand
<I
>*,
214 std::less
<>> Commands
;
216 AdminSocket
*admin_socket
;
220 uint32_t calculate_replay_delay(const utime_t
&event_time
,
221 int mirroring_replay_delay
) {
222 if (mirroring_replay_delay
<= 0) {
226 utime_t now
= ceph_clock_now();
227 if (event_time
+ mirroring_replay_delay
<= now
) {
231 // ensure it is rounded up when converting to integer
232 return (event_time
+ mirroring_replay_delay
- now
) + 1;
235 } // anonymous namespace
237 template <typename I
>
238 void ImageReplayer
<I
>::BootstrapProgressContext::update_progress(
239 const std::string
&description
, bool flush
)
241 const std::string desc
= "bootstrapping, " + description
;
242 replayer
->set_state_description(0, desc
);
244 replayer
->update_mirror_image_status(false, boost::none
);
248 template <typename I
>
249 void ImageReplayer
<I
>::RemoteJournalerListener::handle_update(
250 ::journal::JournalMetadata
*) {
251 FunctionContext
*ctx
= new FunctionContext([this](int r
) {
252 replayer
->handle_remote_journal_metadata_updated();
254 replayer
->m_threads
->work_queue
->queue(ctx
, 0);
257 template <typename I
>
258 ImageReplayer
<I
>::ImageReplayer(Threads
<I
> *threads
,
259 InstanceWatcher
<I
> *instance_watcher
,
261 const std::string
&local_mirror_uuid
,
262 int64_t local_pool_id
,
263 const std::string
&global_image_id
) :
265 m_instance_watcher(instance_watcher
),
267 m_local_mirror_uuid(local_mirror_uuid
),
268 m_local_pool_id(local_pool_id
),
269 m_global_image_id(global_image_id
), m_local_image_name(global_image_id
),
270 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id
) + " " +
272 m_progress_cxt(this),
273 m_journal_listener(new JournalListener(this)),
274 m_remote_listener(this)
276 // Register asok commands using a temporary "remote_pool_name/global_image_id"
277 // name. When the image name becomes known on start the asok commands will be
278 // re-registered using "remote_pool_name/remote_image_name" name.
280 std::string pool_name
;
281 int r
= m_local
->pool_reverse_lookup(m_local_pool_id
, &pool_name
);
283 derr
<< "error resolving local pool " << m_local_pool_id
284 << ": " << cpp_strerror(r
) << dendl
;
285 pool_name
= stringify(m_local_pool_id
);
288 m_name
= pool_name
+ "/" + m_global_image_id
;
289 register_admin_socket_hook();
292 template <typename I
>
293 ImageReplayer
<I
>::~ImageReplayer()
295 unregister_admin_socket_hook();
296 ceph_assert(m_event_preprocessor
== nullptr);
297 ceph_assert(m_replay_status_formatter
== nullptr);
298 ceph_assert(m_local_image_ctx
== nullptr);
299 ceph_assert(m_local_replay
== nullptr);
300 ceph_assert(m_remote_journaler
== nullptr);
301 ceph_assert(m_replay_handler
== nullptr);
302 ceph_assert(m_on_start_finish
== nullptr);
303 ceph_assert(m_on_stop_finish
== nullptr);
304 ceph_assert(m_bootstrap_request
== nullptr);
305 ceph_assert(m_in_flight_status_updates
== 0);
307 delete m_journal_listener
;
310 template <typename I
>
311 image_replayer::HealthState ImageReplayer
<I
>::get_health_state() const {
312 Mutex::Locker
locker(m_lock
);
314 if (!m_mirror_image_status_state
) {
315 return image_replayer::HEALTH_STATE_OK
;
316 } else if (*m_mirror_image_status_state
==
317 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
||
318 *m_mirror_image_status_state
==
319 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
) {
320 return image_replayer::HEALTH_STATE_WARNING
;
322 return image_replayer::HEALTH_STATE_ERROR
;
325 template <typename I
>
326 void ImageReplayer
<I
>::add_peer(const std::string
&peer_uuid
,
327 librados::IoCtx
&io_ctx
) {
328 Mutex::Locker
locker(m_lock
);
329 auto it
= m_peers
.find({peer_uuid
});
330 if (it
== m_peers
.end()) {
331 m_peers
.insert({peer_uuid
, io_ctx
});
335 template <typename I
>
336 void ImageReplayer
<I
>::set_state_description(int r
, const std::string
&desc
) {
337 dout(10) << r
<< " " << desc
<< dendl
;
339 Mutex::Locker
l(m_lock
);
344 template <typename I
>
345 void ImageReplayer
<I
>::start(Context
*on_finish
, bool manual
)
347 dout(10) << "on_finish=" << on_finish
<< dendl
;
351 Mutex::Locker
locker(m_lock
);
352 if (!is_stopped_()) {
353 derr
<< "already running" << dendl
;
355 } else if (m_manual_stop
&& !manual
) {
356 dout(5) << "stopped manually, ignoring start without manual flag"
360 m_state
= STATE_STARTING
;
362 m_state_desc
.clear();
363 m_manual_stop
= false;
364 m_delete_requested
= false;
366 if (on_finish
!= nullptr) {
367 ceph_assert(m_on_start_finish
== nullptr);
368 m_on_start_finish
= on_finish
;
370 ceph_assert(m_on_stop_finish
== nullptr);
376 on_finish
->complete(r
);
381 m_local_ioctx
.reset(new librados::IoCtx
{});
382 r
= m_local
->ioctx_create2(m_local_pool_id
, *m_local_ioctx
);
384 m_local_ioctx
.reset();
386 derr
<< "error opening ioctx for local pool " << m_local_pool_id
387 << ": " << cpp_strerror(r
) << dendl
;
388 on_start_fail(r
, "error opening local pool");
392 prepare_local_image();
395 template <typename I
>
396 void ImageReplayer
<I
>::prepare_local_image() {
399 m_local_image_id
= "";
400 Context
*ctx
= create_context_callback
<
401 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_local_image
>(this);
402 auto req
= PrepareLocalImageRequest
<I
>::create(
403 *m_local_ioctx
, m_global_image_id
, &m_local_image_id
, &m_local_image_name
,
404 &m_local_image_tag_owner
, m_threads
->work_queue
, ctx
);
408 template <typename I
>
409 void ImageReplayer
<I
>::handle_prepare_local_image(int r
) {
410 dout(10) << "r=" << r
<< dendl
;
413 dout(10) << "local image does not exist" << dendl
;
415 on_start_fail(r
, "error preparing local image for replay");
418 reregister_admin_socket_hook();
421 // local image doesn't exist or is non-primary
422 prepare_remote_image();
425 template <typename I
>
426 void ImageReplayer
<I
>::prepare_remote_image() {
428 if (m_peers
.empty()) {
429 // technically nothing to bootstrap, but it handles the status update
434 // TODO need to support multiple remote images
435 ceph_assert(!m_peers
.empty());
436 m_remote_image
= {*m_peers
.begin()};
438 auto cct
= static_cast<CephContext
*>(m_local
->cct());
439 journal::Settings journal_settings
;
440 journal_settings
.commit_interval
= cct
->_conf
.get_val
<double>(
441 "rbd_mirror_journal_commit_age");
442 journal_settings
.max_fetch_bytes
= cct
->_conf
.get_val
<Option::size_t>(
443 "rbd_mirror_journal_max_fetch_bytes");
445 Context
*ctx
= create_context_callback
<
446 ImageReplayer
, &ImageReplayer
<I
>::handle_prepare_remote_image
>(this);
447 auto req
= PrepareRemoteImageRequest
<I
>::create(
448 m_threads
, m_remote_image
.io_ctx
, m_global_image_id
, m_local_mirror_uuid
,
449 m_local_image_id
, journal_settings
, &m_remote_image
.mirror_uuid
,
450 &m_remote_image
.image_id
, &m_remote_journaler
, &m_client_state
,
451 &m_client_meta
, ctx
);
455 template <typename I
>
456 void ImageReplayer
<I
>::handle_prepare_remote_image(int r
) {
457 dout(10) << "r=" << r
<< dendl
;
459 ceph_assert(r
< 0 ? m_remote_journaler
== nullptr : m_remote_journaler
!= nullptr);
460 if (r
< 0 && !m_local_image_id
.empty() &&
461 m_local_image_tag_owner
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
462 // local image is primary -- fall-through
463 } else if (r
== -ENOENT
) {
464 dout(10) << "remote image does not exist" << dendl
;
466 // TODO need to support multiple remote images
467 if (m_remote_image
.image_id
.empty() && !m_local_image_id
.empty() &&
468 m_local_image_tag_owner
== m_remote_image
.mirror_uuid
) {
469 // local image exists and is non-primary and linked to the missing
472 m_delete_requested
= true;
473 on_start_fail(0, "remote image no longer exists");
475 on_start_fail(-ENOENT
, "remote image does not exist");
479 on_start_fail(r
, "error retrieving remote image id");
486 template <typename I
>
487 void ImageReplayer
<I
>::bootstrap() {
490 if (!m_local_image_id
.empty() &&
491 m_local_image_tag_owner
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
492 dout(5) << "local image is primary" << dendl
;
493 on_start_fail(0, "local image is primary");
495 } else if (m_peers
.empty()) {
496 dout(5) << "no peer clusters" << dendl
;
497 on_start_fail(-ENOENT
, "no peer clusters");
501 BootstrapRequest
<I
> *request
= nullptr;
503 Mutex::Locker
locker(m_lock
);
504 if (on_start_interrupted(m_lock
)) {
508 auto ctx
= create_context_callback
<
509 ImageReplayer
, &ImageReplayer
<I
>::handle_bootstrap
>(this);
510 request
= BootstrapRequest
<I
>::create(
511 m_threads
, *m_local_ioctx
, m_remote_image
.io_ctx
, m_instance_watcher
,
512 &m_local_image_ctx
, m_local_image_id
, m_remote_image
.image_id
,
513 m_global_image_id
, m_local_mirror_uuid
, m_remote_image
.mirror_uuid
,
514 m_remote_journaler
, &m_client_state
, &m_client_meta
, ctx
,
515 &m_resync_requested
, &m_progress_cxt
);
517 m_bootstrap_request
= request
;
520 update_mirror_image_status(false, boost::none
);
521 reschedule_update_status_task(10);
526 template <typename I
>
527 void ImageReplayer
<I
>::handle_bootstrap(int r
) {
528 dout(10) << "r=" << r
<< dendl
;
530 Mutex::Locker
locker(m_lock
);
531 m_bootstrap_request
->put();
532 m_bootstrap_request
= nullptr;
533 if (m_local_image_ctx
) {
534 m_local_image_id
= m_local_image_ctx
->id
;
538 if (on_start_interrupted()) {
540 } else if (r
== -EREMOTEIO
) {
541 m_local_image_tag_owner
= "";
542 dout(5) << "remote image is non-primary" << dendl
;
543 on_start_fail(-EREMOTEIO
, "remote image is non-primary");
545 } else if (r
== -EEXIST
) {
546 m_local_image_tag_owner
= "";
547 on_start_fail(r
, "split-brain detected");
550 on_start_fail(r
, "error bootstrapping replay");
552 } else if (m_resync_requested
) {
553 on_start_fail(0, "resync requested");
557 ceph_assert(m_local_journal
== nullptr);
559 RWLock::RLocker
snap_locker(m_local_image_ctx
->snap_lock
);
560 if (m_local_image_ctx
->journal
!= nullptr) {
561 m_local_journal
= m_local_image_ctx
->journal
;
562 m_local_journal
->add_listener(m_journal_listener
);
566 if (m_local_journal
== nullptr) {
567 on_start_fail(-EINVAL
, "error accessing local journal");
571 update_mirror_image_status(false, boost::none
);
572 init_remote_journaler();
575 template <typename I
>
576 void ImageReplayer
<I
>::init_remote_journaler() {
579 Context
*ctx
= create_context_callback
<
580 ImageReplayer
, &ImageReplayer
<I
>::handle_init_remote_journaler
>(this);
581 m_remote_journaler
->init(ctx
);
584 template <typename I
>
585 void ImageReplayer
<I
>::handle_init_remote_journaler(int r
) {
586 dout(10) << "r=" << r
<< dendl
;
588 if (on_start_interrupted()) {
591 derr
<< "failed to initialize remote journal: " << cpp_strerror(r
) << dendl
;
592 on_start_fail(r
, "error initializing remote journal");
596 m_remote_journaler
->add_listener(&m_remote_listener
);
598 cls::journal::Client client
;
599 r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
601 derr
<< "error retrieving remote journal client: " << cpp_strerror(r
)
603 on_start_fail(r
, "error retrieving remote journal client");
607 dout(5) << "image_id=" << m_local_image_id
<< ", "
608 << "client_meta.image_id=" << m_client_meta
.image_id
<< ", "
609 << "client.state=" << client
.state
<< dendl
;
610 if (m_client_meta
.image_id
== m_local_image_id
&&
611 client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
612 dout(5) << "client flagged disconnected, stopping image replay" << dendl
;
613 if (m_local_image_ctx
->config
.template get_val
<bool>("rbd_mirroring_resync_after_disconnect")) {
614 m_resync_requested
= true;
615 on_start_fail(-ENOTCONN
, "disconnected: automatic resync");
617 on_start_fail(-ENOTCONN
, "disconnected");
625 template <typename I
>
626 void ImageReplayer
<I
>::start_replay() {
629 Context
*start_ctx
= create_context_callback
<
630 ImageReplayer
, &ImageReplayer
<I
>::handle_start_replay
>(this);
631 m_local_journal
->start_external_replay(&m_local_replay
, start_ctx
);
634 template <typename I
>
635 void ImageReplayer
<I
>::handle_start_replay(int r
) {
636 dout(10) << "r=" << r
<< dendl
;
639 ceph_assert(m_local_replay
== nullptr);
640 derr
<< "error starting external replay on local image "
641 << m_local_image_id
<< ": " << cpp_strerror(r
) << dendl
;
642 on_start_fail(r
, "error starting replay on local image");
646 m_replay_status_formatter
=
647 ReplayStatusFormatter
<I
>::create(m_remote_journaler
, m_local_mirror_uuid
);
649 Context
*on_finish(nullptr);
651 Mutex::Locker
locker(m_lock
);
652 ceph_assert(m_state
== STATE_STARTING
);
653 m_state
= STATE_REPLAYING
;
654 std::swap(m_on_start_finish
, on_finish
);
657 m_event_preprocessor
= EventPreprocessor
<I
>::create(
658 *m_local_image_ctx
, *m_remote_journaler
, m_local_mirror_uuid
,
659 &m_client_meta
, m_threads
->work_queue
);
661 update_mirror_image_status(true, boost::none
);
662 reschedule_update_status_task(30);
664 if (on_replay_interrupted()) {
669 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
670 double poll_seconds
= cct
->_conf
.get_val
<double>(
671 "rbd_mirror_journal_poll_age");
673 Mutex::Locker
locker(m_lock
);
674 m_replay_handler
= new ReplayHandler
<I
>(this);
675 m_remote_journaler
->start_live_replay(m_replay_handler
, poll_seconds
);
677 dout(10) << "m_remote_journaler=" << *m_remote_journaler
<< dendl
;
680 dout(10) << "start succeeded" << dendl
;
681 if (on_finish
!= nullptr) {
682 dout(10) << "on finish complete, r=" << r
<< dendl
;
683 on_finish
->complete(r
);
687 template <typename I
>
688 void ImageReplayer
<I
>::on_start_fail(int r
, const std::string
&desc
)
690 dout(10) << "r=" << r
<< dendl
;
691 Context
*ctx
= new FunctionContext([this, r
, desc
](int _r
) {
693 Mutex::Locker
locker(m_lock
);
694 ceph_assert(m_state
== STATE_STARTING
);
695 m_state
= STATE_STOPPING
;
696 if (r
< 0 && r
!= -ECANCELED
&& r
!= -EREMOTEIO
&& r
!= -ENOENT
) {
697 derr
<< "start failed: " << cpp_strerror(r
) << dendl
;
699 dout(10) << "start canceled" << dendl
;
703 set_state_description(r
, desc
);
705 update_mirror_image_status(false, boost::none
);
707 reschedule_update_status_task(-1);
710 m_threads
->work_queue
->queue(ctx
, 0);
713 template <typename I
>
714 bool ImageReplayer
<I
>::on_start_interrupted() {
715 Mutex::Locker
locker(m_lock
);
716 return on_start_interrupted(m_lock
);
719 template <typename I
>
720 bool ImageReplayer
<I
>::on_start_interrupted(Mutex
& lock
) {
721 ceph_assert(m_lock
.is_locked());
722 ceph_assert(m_state
== STATE_STARTING
);
723 if (!m_stop_requested
) {
727 on_start_fail(-ECANCELED
, "");
731 template <typename I
>
732 void ImageReplayer
<I
>::stop(Context
*on_finish
, bool manual
, int r
,
733 const std::string
& desc
)
735 dout(10) << "on_finish=" << on_finish
<< ", manual=" << manual
736 << ", desc=" << desc
<< dendl
;
738 image_replayer::BootstrapRequest
<I
> *bootstrap_request
= nullptr;
739 bool shut_down_replay
= false;
742 Mutex::Locker
locker(m_lock
);
744 if (!is_running_()) {
747 if (!is_stopped_()) {
748 if (m_state
== STATE_STARTING
) {
749 dout(10) << "canceling start" << dendl
;
750 if (m_bootstrap_request
!= nullptr) {
751 bootstrap_request
= m_bootstrap_request
;
752 bootstrap_request
->get();
755 dout(10) << "interrupting replay" << dendl
;
756 shut_down_replay
= true;
759 ceph_assert(m_on_stop_finish
== nullptr);
760 std::swap(m_on_stop_finish
, on_finish
);
761 m_stop_requested
= true;
762 m_manual_stop
= manual
;
767 // avoid holding lock since bootstrap request will update status
768 if (bootstrap_request
!= nullptr) {
769 dout(10) << "canceling bootstrap" << dendl
;
770 bootstrap_request
->cancel();
771 bootstrap_request
->put();
775 dout(20) << "not running" << dendl
;
777 on_finish
->complete(-EINVAL
);
782 if (shut_down_replay
) {
783 on_stop_journal_replay(r
, desc
);
784 } else if (on_finish
!= nullptr) {
785 on_finish
->complete(0);
789 template <typename I
>
790 void ImageReplayer
<I
>::on_stop_journal_replay(int r
, const std::string
&desc
)
795 Mutex::Locker
locker(m_lock
);
796 if (m_state
!= STATE_REPLAYING
) {
797 // might be invoked multiple times while stopping
800 m_stop_requested
= true;
801 m_state
= STATE_STOPPING
;
804 set_state_description(r
, desc
);
805 update_mirror_image_status(true, boost::none
);
806 reschedule_update_status_task(-1);
810 template <typename I
>
811 void ImageReplayer
<I
>::handle_replay_ready()
814 if (on_replay_interrupted()) {
818 if (!m_remote_journaler
->try_pop_front(&m_replay_entry
, &m_replay_tag_tid
)) {
822 m_event_replay_tracker
.start_op();
825 bool stopping
= (m_state
== STATE_STOPPING
);
829 dout(10) << "stopping event replay" << dendl
;
830 m_event_replay_tracker
.finish_op();
834 if (m_replay_tag_valid
&& m_replay_tag
.tid
== m_replay_tag_tid
) {
842 template <typename I
>
843 void ImageReplayer
<I
>::restart(Context
*on_finish
)
845 FunctionContext
*ctx
= new FunctionContext(
846 [this, on_finish
](int r
) {
850 start(on_finish
, true);
855 template <typename I
>
856 void ImageReplayer
<I
>::flush(Context
*on_finish
)
861 Mutex::Locker
locker(m_lock
);
862 if (m_state
== STATE_REPLAYING
) {
863 Context
*ctx
= new FunctionContext(
865 if (on_finish
!= nullptr) {
866 on_finish
->complete(r
);
869 on_flush_local_replay_flush_start(ctx
);
875 on_finish
->complete(0);
879 template <typename I
>
880 void ImageReplayer
<I
>::on_flush_local_replay_flush_start(Context
*on_flush
)
883 FunctionContext
*ctx
= new FunctionContext(
884 [this, on_flush
](int r
) {
885 on_flush_local_replay_flush_finish(on_flush
, r
);
888 ceph_assert(m_lock
.is_locked());
889 ceph_assert(m_state
== STATE_REPLAYING
);
890 m_local_replay
->flush(ctx
);
893 template <typename I
>
894 void ImageReplayer
<I
>::on_flush_local_replay_flush_finish(Context
*on_flush
,
897 dout(10) << "r=" << r
<< dendl
;
899 derr
<< "error flushing local replay: " << cpp_strerror(r
) << dendl
;
900 on_flush
->complete(r
);
904 on_flush_flush_commit_position_start(on_flush
);
907 template <typename I
>
908 void ImageReplayer
<I
>::on_flush_flush_commit_position_start(Context
*on_flush
)
910 FunctionContext
*ctx
= new FunctionContext(
911 [this, on_flush
](int r
) {
912 on_flush_flush_commit_position_finish(on_flush
, r
);
915 m_remote_journaler
->flush_commit_position(ctx
);
918 template <typename I
>
919 void ImageReplayer
<I
>::on_flush_flush_commit_position_finish(Context
*on_flush
,
923 derr
<< "error flushing remote journal commit position: "
924 << cpp_strerror(r
) << dendl
;
927 update_mirror_image_status(false, boost::none
);
929 dout(20) << "flush complete, r=" << r
<< dendl
;
930 on_flush
->complete(r
);
933 template <typename I
>
934 bool ImageReplayer
<I
>::on_replay_interrupted()
938 Mutex::Locker
locker(m_lock
);
939 shut_down
= m_stop_requested
;
943 on_stop_journal_replay();
948 template <typename I
>
949 void ImageReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
)
953 Mutex::Locker
l(m_lock
);
956 f
->open_object_section("image_replayer");
957 f
->dump_string("name", m_name
);
958 f
->dump_string("state", to_string(m_state
));
962 *ss
<< m_name
<< ": state: " << to_string(m_state
);
966 template <typename I
>
967 void ImageReplayer
<I
>::handle_replay_complete(int r
, const std::string
&error_desc
)
969 dout(10) << "r=" << r
<< dendl
;
971 derr
<< "replay encountered an error: " << cpp_strerror(r
) << dendl
;
972 set_state_description(r
, error_desc
);
976 Mutex::Locker
locker(m_lock
);
977 m_stop_requested
= true;
979 on_replay_interrupted();
982 template <typename I
>
983 void ImageReplayer
<I
>::replay_flush() {
986 bool interrupted
= false;
988 Mutex::Locker
locker(m_lock
);
989 if (m_state
!= STATE_REPLAYING
) {
990 dout(10) << "replay interrupted" << dendl
;
993 m_state
= STATE_REPLAY_FLUSHING
;
998 m_event_replay_tracker
.finish_op();
1002 // shut down the replay to flush all IO and ops and create a new
1003 // replayer to handle the new tag epoch
1004 Context
*ctx
= create_context_callback
<
1005 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_replay_flush
>(this);
1006 ctx
= new FunctionContext([this, ctx
](int r
) {
1007 m_local_image_ctx
->journal
->stop_external_replay();
1008 m_local_replay
= nullptr;
1015 m_local_journal
->start_external_replay(&m_local_replay
, ctx
);
1017 m_local_replay
->shut_down(false, ctx
);
1020 template <typename I
>
1021 void ImageReplayer
<I
>::handle_replay_flush(int r
) {
1022 dout(10) << "r=" << r
<< dendl
;
1025 Mutex::Locker
locker(m_lock
);
1026 ceph_assert(m_state
== STATE_REPLAY_FLUSHING
);
1027 m_state
= STATE_REPLAYING
;
1031 derr
<< "replay flush encountered an error: " << cpp_strerror(r
) << dendl
;
1032 m_event_replay_tracker
.finish_op();
1033 handle_replay_complete(r
, "replay flush encountered an error");
1035 } else if (on_replay_interrupted()) {
1036 m_event_replay_tracker
.finish_op();
1043 template <typename I
>
1044 void ImageReplayer
<I
>::get_remote_tag() {
1045 dout(15) << "tag_tid: " << m_replay_tag_tid
<< dendl
;
1047 Context
*ctx
= create_context_callback
<
1048 ImageReplayer
, &ImageReplayer
<I
>::handle_get_remote_tag
>(this);
1049 m_remote_journaler
->get_tag(m_replay_tag_tid
, &m_replay_tag
, ctx
);
1052 template <typename I
>
1053 void ImageReplayer
<I
>::handle_get_remote_tag(int r
) {
1054 dout(15) << "r=" << r
<< dendl
;
1058 auto it
= m_replay_tag
.data
.cbegin();
1059 decode(m_replay_tag_data
, it
);
1060 } catch (const buffer::error
&err
) {
1066 derr
<< "failed to retrieve remote tag " << m_replay_tag_tid
<< ": "
1067 << cpp_strerror(r
) << dendl
;
1068 m_event_replay_tracker
.finish_op();
1069 handle_replay_complete(r
, "failed to retrieve remote tag");
1073 m_replay_tag_valid
= true;
1074 dout(15) << "decoded remote tag " << m_replay_tag_tid
<< ": "
1075 << m_replay_tag_data
<< dendl
;
1077 allocate_local_tag();
1080 template <typename I
>
1081 void ImageReplayer
<I
>::allocate_local_tag() {
1084 std::string mirror_uuid
= m_replay_tag_data
.mirror_uuid
;
1085 if (mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
1086 mirror_uuid
= m_remote_image
.mirror_uuid
;
1087 } else if (mirror_uuid
== m_local_mirror_uuid
) {
1088 mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
1089 } else if (mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
) {
1090 // handle possible edge condition where daemon can failover and
1091 // the local image has already been promoted/demoted
1092 auto local_tag_data
= m_local_journal
->get_tag_data();
1093 if (local_tag_data
.mirror_uuid
== librbd::Journal
<>::ORPHAN_MIRROR_UUID
&&
1094 (local_tag_data
.predecessor
.commit_valid
&&
1095 local_tag_data
.predecessor
.mirror_uuid
==
1096 librbd::Journal
<>::LOCAL_MIRROR_UUID
)) {
1097 dout(15) << "skipping stale demotion event" << dendl
;
1098 handle_process_entry_safe(m_replay_entry
, m_replay_start_time
, 0);
1099 handle_replay_ready();
1102 dout(5) << "encountered image demotion: stopping" << dendl
;
1103 Mutex::Locker
locker(m_lock
);
1104 m_stop_requested
= true;
1108 librbd::journal::TagPredecessor
predecessor(m_replay_tag_data
.predecessor
);
1109 if (predecessor
.mirror_uuid
== librbd::Journal
<>::LOCAL_MIRROR_UUID
) {
1110 predecessor
.mirror_uuid
= m_remote_image
.mirror_uuid
;
1111 } else if (predecessor
.mirror_uuid
== m_local_mirror_uuid
) {
1112 predecessor
.mirror_uuid
= librbd::Journal
<>::LOCAL_MIRROR_UUID
;
1115 dout(15) << "mirror_uuid=" << mirror_uuid
<< ", "
1116 << "predecessor=" << predecessor
<< ", "
1117 << "replay_tag_tid=" << m_replay_tag_tid
<< dendl
;
1118 Context
*ctx
= create_context_callback
<
1119 ImageReplayer
, &ImageReplayer
<I
>::handle_allocate_local_tag
>(this);
1120 m_local_journal
->allocate_tag(mirror_uuid
, predecessor
, ctx
);
1123 template <typename I
>
1124 void ImageReplayer
<I
>::handle_allocate_local_tag(int r
) {
1125 dout(15) << "r=" << r
<< ", "
1126 << "tag_tid=" << m_local_journal
->get_tag_tid() << dendl
;
1129 derr
<< "failed to allocate journal tag: " << cpp_strerror(r
) << dendl
;
1130 m_event_replay_tracker
.finish_op();
1131 handle_replay_complete(r
, "failed to allocate journal tag");
1138 template <typename I
>
1139 void ImageReplayer
<I
>::preprocess_entry() {
1140 dout(20) << "preprocessing entry tid=" << m_replay_entry
.get_commit_tid()
1143 bufferlist data
= m_replay_entry
.get_data();
1144 auto it
= data
.cbegin();
1145 int r
= m_local_replay
->decode(&it
, &m_event_entry
);
1147 derr
<< "failed to decode journal event" << dendl
;
1148 m_event_replay_tracker
.finish_op();
1149 handle_replay_complete(r
, "failed to decode journal event");
1153 uint32_t delay
= calculate_replay_delay(
1154 m_event_entry
.timestamp
, m_local_image_ctx
->mirroring_replay_delay
);
1156 handle_preprocess_entry_ready(0);
1160 dout(20) << "delaying replay by " << delay
<< " sec" << dendl
;
1162 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1163 ceph_assert(m_delayed_preprocess_task
== nullptr);
1164 m_delayed_preprocess_task
= new FunctionContext(
1166 ceph_assert(m_threads
->timer_lock
.is_locked());
1167 m_delayed_preprocess_task
= nullptr;
1168 m_threads
->work_queue
->queue(
1169 create_context_callback
<ImageReplayer
,
1170 &ImageReplayer
<I
>::handle_preprocess_entry_ready
>(this), 0);
1172 m_threads
->timer
->add_event_after(delay
, m_delayed_preprocess_task
);
1175 template <typename I
>
1176 void ImageReplayer
<I
>::handle_preprocess_entry_ready(int r
) {
1177 dout(20) << "r=" << r
<< dendl
;
1178 ceph_assert(r
== 0);
1180 m_replay_start_time
= ceph_clock_now();
1181 if (!m_event_preprocessor
->is_required(m_event_entry
)) {
1186 Context
*ctx
= create_context_callback
<
1187 ImageReplayer
, &ImageReplayer
<I
>::handle_preprocess_entry_safe
>(this);
1188 m_event_preprocessor
->preprocess(&m_event_entry
, ctx
);
1191 template <typename I
>
1192 void ImageReplayer
<I
>::handle_preprocess_entry_safe(int r
) {
1193 dout(20) << "r=" << r
<< dendl
;
1196 m_event_replay_tracker
.finish_op();
1198 if (r
== -ECANCELED
) {
1199 handle_replay_complete(0, "lost exclusive lock");
1201 derr
<< "failed to preprocess journal event" << dendl
;
1202 handle_replay_complete(r
, "failed to preprocess journal event");
1210 template <typename I
>
1211 void ImageReplayer
<I
>::process_entry() {
1212 dout(20) << "processing entry tid=" << m_replay_entry
.get_commit_tid()
1215 // stop replaying events if stop has been requested
1216 if (on_replay_interrupted()) {
1217 m_event_replay_tracker
.finish_op();
1221 Context
*on_ready
= create_context_callback
<
1222 ImageReplayer
, &ImageReplayer
<I
>::handle_process_entry_ready
>(this);
1223 Context
*on_commit
= new C_ReplayCommitted(this, std::move(m_replay_entry
),
1224 m_replay_start_time
);
1226 m_local_replay
->process(m_event_entry
, on_ready
, on_commit
);
1229 template <typename I
>
1230 void ImageReplayer
<I
>::handle_process_entry_ready(int r
) {
1232 ceph_assert(r
== 0);
1234 bool update_status
= false;
1236 RWLock::RLocker
snap_locker(m_local_image_ctx
->snap_lock
);
1237 if (m_local_image_name
!= m_local_image_ctx
->name
) {
1238 m_local_image_name
= m_local_image_ctx
->name
;
1239 update_status
= true;
1243 if (update_status
) {
1244 reschedule_update_status_task(0);
1247 // attempt to process the next event
1248 handle_replay_ready();
1251 template <typename I
>
1252 void ImageReplayer
<I
>::handle_process_entry_safe(const ReplayEntry
&replay_entry
,
1253 const utime_t
&replay_start_time
,
1255 dout(20) << "commit_tid=" << replay_entry
.get_commit_tid() << ", r=" << r
1259 derr
<< "failed to commit journal event: " << cpp_strerror(r
) << dendl
;
1260 handle_replay_complete(r
, "failed to commit journal event");
1262 ceph_assert(m_remote_journaler
!= nullptr);
1263 m_remote_journaler
->committed(replay_entry
);
1266 auto bytes
= replay_entry
.get_data().length();
1267 auto latency
= ceph_clock_now() - replay_start_time
;
1269 if (g_perf_counters
) {
1270 g_perf_counters
->inc(l_rbd_mirror_replay
);
1271 g_perf_counters
->inc(l_rbd_mirror_replay_bytes
, bytes
);
1272 g_perf_counters
->tinc(l_rbd_mirror_replay_latency
, latency
);
1275 auto ctx
= new FunctionContext(
1276 [this, bytes
, latency
](int r
) {
1277 Mutex::Locker
locker(m_lock
);
1278 if (m_perf_counters
) {
1279 m_perf_counters
->inc(l_rbd_mirror_replay
);
1280 m_perf_counters
->inc(l_rbd_mirror_replay_bytes
, bytes
);
1281 m_perf_counters
->tinc(l_rbd_mirror_replay_latency
, latency
);
1283 m_event_replay_tracker
.finish_op();
1285 m_threads
->work_queue
->queue(ctx
, 0);
1288 template <typename I
>
1289 bool ImageReplayer
<I
>::update_mirror_image_status(bool force
,
1290 const OptionalState
&state
) {
1293 Mutex::Locker
locker(m_lock
);
1294 if (!start_mirror_image_status_update(force
, false)) {
1299 queue_mirror_image_status_update(state
);
1303 template <typename I
>
1304 bool ImageReplayer
<I
>::start_mirror_image_status_update(bool force
,
1306 ceph_assert(m_lock
.is_locked());
1308 if (!force
&& !is_stopped_()) {
1309 if (!is_running_()) {
1310 dout(15) << "shut down in-progress: ignoring update" << dendl
;
1312 } else if (m_in_flight_status_updates
> (restarting
? 1 : 0)) {
1313 dout(15) << "already sending update" << dendl
;
1314 m_update_status_requested
= true;
1319 ++m_in_flight_status_updates
;
1320 dout(15) << "in-flight updates=" << m_in_flight_status_updates
<< dendl
;
1324 template <typename I
>
1325 void ImageReplayer
<I
>::finish_mirror_image_status_update() {
1326 reregister_admin_socket_hook();
1328 Context
*on_finish
= nullptr;
1330 Mutex::Locker
locker(m_lock
);
1331 ceph_assert(m_in_flight_status_updates
> 0);
1332 if (--m_in_flight_status_updates
> 0) {
1333 dout(15) << "waiting on " << m_in_flight_status_updates
<< " in-flight "
1334 << "updates" << dendl
;
1338 std::swap(on_finish
, m_on_update_status_finish
);
1342 if (on_finish
!= nullptr) {
1343 on_finish
->complete(0);
1347 template <typename I
>
1348 void ImageReplayer
<I
>::queue_mirror_image_status_update(const OptionalState
&state
) {
1350 FunctionContext
*ctx
= new FunctionContext(
1351 [this, state
](int r
) {
1352 send_mirror_status_update(state
);
1354 m_threads
->work_queue
->queue(ctx
, 0);
1357 template <typename I
>
1358 void ImageReplayer
<I
>::send_mirror_status_update(const OptionalState
&opt_state
) {
1360 std::string state_desc
;
1362 bool stopping_replay
;
1364 OptionalMirrorImageStatusState mirror_image_status_state
=
1365 boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
1366 image_replayer::BootstrapRequest
<I
>* bootstrap_request
= nullptr;
1368 Mutex::Locker
locker(m_lock
);
1370 state_desc
= m_state_desc
;
1371 mirror_image_status_state
= m_mirror_image_status_state
;
1373 stopping_replay
= (m_local_image_ctx
!= nullptr);
1375 if (m_bootstrap_request
!= nullptr) {
1376 bootstrap_request
= m_bootstrap_request
;
1377 bootstrap_request
->get();
1381 bool syncing
= false;
1382 if (bootstrap_request
!= nullptr) {
1383 syncing
= bootstrap_request
->is_syncing();
1384 bootstrap_request
->put();
1385 bootstrap_request
= nullptr;
1392 cls::rbd::MirrorImageStatus status
;
1395 case STATE_STARTING
:
1397 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING
;
1398 status
.description
= state_desc
.empty() ? "syncing" : state_desc
;
1399 mirror_image_status_state
= status
.state
;
1401 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
;
1402 status
.description
= "starting replay";
1405 case STATE_REPLAYING
:
1406 case STATE_REPLAY_FLUSHING
:
1407 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING
;
1409 Context
*on_req_finish
= new FunctionContext(
1411 dout(15) << "replay status ready: r=" << r
<< dendl
;
1413 send_mirror_status_update(boost::none
);
1414 } else if (r
== -EAGAIN
) {
1415 // decrement in-flight status update counter
1416 handle_mirror_status_update(r
);
1421 ceph_assert(m_replay_status_formatter
!= nullptr);
1422 if (!m_replay_status_formatter
->get_or_send_update(&desc
,
1424 dout(15) << "waiting for replay status" << dendl
;
1427 status
.description
= "replaying, " + desc
;
1428 mirror_image_status_state
= boost::make_optional(
1429 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
);
1432 case STATE_STOPPING
:
1433 if (stopping_replay
) {
1434 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
;
1435 status
.description
= state_desc
.empty() ? "stopping replay" : state_desc
;
1440 if (last_r
== -EREMOTEIO
) {
1441 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN
;
1442 status
.description
= state_desc
;
1443 mirror_image_status_state
= status
.state
;
1444 } else if (last_r
< 0) {
1445 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
;
1446 status
.description
= state_desc
;
1447 mirror_image_status_state
= status
.state
;
1449 status
.state
= cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED
;
1450 status
.description
= state_desc
.empty() ? "stopped" : state_desc
;
1451 mirror_image_status_state
= boost::none
;
1455 ceph_assert(!"invalid state");
1459 Mutex::Locker
locker(m_lock
);
1460 m_mirror_image_status_state
= mirror_image_status_state
;
1463 // prevent the status from ping-ponging when failed replays are restarted
1464 if (mirror_image_status_state
&&
1465 *mirror_image_status_state
== cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR
) {
1466 status
.state
= *mirror_image_status_state
;
1469 dout(15) << "status=" << status
<< dendl
;
1470 librados::ObjectWriteOperation op
;
1471 librbd::cls_client::mirror_image_status_set(&op
, m_global_image_id
, status
);
1473 ceph_assert(m_local_ioctx
);
1474 librados::AioCompletion
*aio_comp
= create_rados_callback
<
1475 ImageReplayer
<I
>, &ImageReplayer
<I
>::handle_mirror_status_update
>(this);
1476 int r
= m_local_ioctx
->aio_operate(RBD_MIRRORING
, aio_comp
, &op
);
1477 ceph_assert(r
== 0);
1478 aio_comp
->release();
1481 template <typename I
>
1482 void ImageReplayer
<I
>::handle_mirror_status_update(int r
) {
1483 dout(15) << "r=" << r
<< dendl
;
1485 bool running
= false;
1486 bool started
= false;
1488 Mutex::Locker
locker(m_lock
);
1489 bool update_status_requested
= false;
1490 std::swap(update_status_requested
, m_update_status_requested
);
1492 running
= is_running_();
1493 if (running
&& update_status_requested
) {
1494 started
= start_mirror_image_status_update(false, true);
1498 // if a deferred update is available, send it -- otherwise reschedule
1501 queue_mirror_image_status_update(boost::none
);
1502 } else if (running
) {
1503 reschedule_update_status_task(0);
1506 // mark committed status update as no longer in-flight
1507 finish_mirror_image_status_update();
1510 template <typename I
>
1511 void ImageReplayer
<I
>::reschedule_update_status_task(int new_interval
) {
1512 bool canceled_task
= false;
1514 Mutex::Locker
locker(m_lock
);
1515 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1517 if (m_update_status_task
) {
1518 dout(15) << "canceling existing status update task" << dendl
;
1520 canceled_task
= m_threads
->timer
->cancel_event(m_update_status_task
);
1521 m_update_status_task
= nullptr;
1524 if (new_interval
> 0) {
1525 m_update_status_interval
= new_interval
;
1528 if (new_interval
>= 0 && is_running_() &&
1529 start_mirror_image_status_update(true, false)) {
1530 m_update_status_task
= new FunctionContext(
1532 ceph_assert(m_threads
->timer_lock
.is_locked());
1533 m_update_status_task
= nullptr;
1535 queue_mirror_image_status_update(boost::none
);
1537 dout(15) << "scheduling status update task after "
1538 << m_update_status_interval
<< " seconds" << dendl
;
1539 m_threads
->timer
->add_event_after(m_update_status_interval
,
1540 m_update_status_task
);
1544 if (canceled_task
) {
1545 // decrement in-flight status update counter for canceled task
1546 finish_mirror_image_status_update();
1550 template <typename I
>
1551 void ImageReplayer
<I
>::shut_down(int r
) {
1552 dout(10) << "r=" << r
<< dendl
;
1554 bool canceled_delayed_preprocess_task
= false;
1556 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
1557 if (m_delayed_preprocess_task
!= nullptr) {
1558 canceled_delayed_preprocess_task
= m_threads
->timer
->cancel_event(
1559 m_delayed_preprocess_task
);
1560 ceph_assert(canceled_delayed_preprocess_task
);
1561 m_delayed_preprocess_task
= nullptr;
1564 if (canceled_delayed_preprocess_task
) {
1565 // wake up sleeping replay
1566 m_event_replay_tracker
.finish_op();
1569 reschedule_update_status_task(-1);
1572 Mutex::Locker
locker(m_lock
);
1573 ceph_assert(m_state
== STATE_STOPPING
);
1575 // if status updates are in-flight, wait for them to complete
1576 // before proceeding
1577 if (m_in_flight_status_updates
> 0) {
1578 if (m_on_update_status_finish
== nullptr) {
1579 dout(15) << "waiting for in-flight status update" << dendl
;
1580 m_on_update_status_finish
= new FunctionContext(
1589 // NOTE: it's important to ensure that the local image is fully
1590 // closed before attempting to close the remote journal in
1591 // case the remote cluster is unreachable
1593 // chain the shut down sequence (reverse order)
1594 Context
*ctx
= new FunctionContext(
1596 if (m_local_ioctx
) {
1597 update_mirror_image_status(true, STATE_STOPPED
);
1599 handle_shut_down(r
);
1602 // close the remote journal
1603 if (m_remote_journaler
!= nullptr) {
1604 ctx
= new FunctionContext([this, ctx
](int r
) {
1605 delete m_remote_journaler
;
1606 m_remote_journaler
= nullptr;
1609 ctx
= new FunctionContext([this, ctx
](int r
) {
1610 m_remote_journaler
->remove_listener(&m_remote_listener
);
1611 m_remote_journaler
->shut_down(ctx
);
1615 // stop the replay of remote journal events
1616 if (m_replay_handler
!= nullptr) {
1617 ctx
= new FunctionContext([this, ctx
](int r
) {
1618 delete m_replay_handler
;
1619 m_replay_handler
= nullptr;
1621 m_event_replay_tracker
.wait_for_ops(ctx
);
1623 ctx
= new FunctionContext([this, ctx
](int r
) {
1624 m_remote_journaler
->stop_replay(ctx
);
1628 // close the local image (release exclusive lock)
1629 if (m_local_image_ctx
) {
1630 ctx
= new FunctionContext([this, ctx
](int r
) {
1631 CloseImageRequest
<I
> *request
= CloseImageRequest
<I
>::create(
1632 &m_local_image_ctx
, ctx
);
1637 // shut down event replay into the local image
1638 if (m_local_journal
!= nullptr) {
1639 ctx
= new FunctionContext([this, ctx
](int r
) {
1640 m_local_journal
= nullptr;
1643 if (m_local_replay
!= nullptr) {
1644 ctx
= new FunctionContext([this, ctx
](int r
) {
1645 m_local_journal
->stop_external_replay();
1646 m_local_replay
= nullptr;
1648 EventPreprocessor
<I
>::destroy(m_event_preprocessor
);
1649 m_event_preprocessor
= nullptr;
1653 ctx
= new FunctionContext([this, ctx
](int r
) {
1654 // blocks if listener notification is in-progress
1655 m_local_journal
->remove_listener(m_journal_listener
);
1660 // wait for all local in-flight replay events to complete
1661 ctx
= new FunctionContext([this, ctx
](int r
) {
1663 derr
<< "error shutting down journal replay: " << cpp_strerror(r
)
1667 m_event_replay_tracker
.wait_for_ops(ctx
);
1670 // flush any local in-flight replay events
1671 if (m_local_replay
!= nullptr) {
1672 ctx
= new FunctionContext([this, ctx
](int r
) {
1673 m_local_replay
->shut_down(true, ctx
);
1677 m_threads
->work_queue
->queue(ctx
, 0);
1680 template <typename I
>
1681 void ImageReplayer
<I
>::handle_shut_down(int r
) {
1682 reschedule_update_status_task(-1);
1684 bool resync_requested
= false;
1685 bool delete_requested
= false;
1686 bool unregister_asok_hook
= false;
1688 Mutex::Locker
locker(m_lock
);
1690 // if status updates are in-flight, wait for them to complete
1691 // before proceeding
1692 if (m_in_flight_status_updates
> 0) {
1693 if (m_on_update_status_finish
== nullptr) {
1694 dout(15) << "waiting for in-flight status update" << dendl
;
1695 m_on_update_status_finish
= new FunctionContext(
1697 handle_shut_down(r
);
1703 if (m_delete_requested
&& !m_local_image_id
.empty()) {
1704 ceph_assert(m_remote_image
.image_id
.empty());
1705 dout(0) << "remote image no longer exists: scheduling deletion" << dendl
;
1706 unregister_asok_hook
= true;
1707 std::swap(delete_requested
, m_delete_requested
);
1710 std::swap(resync_requested
, m_resync_requested
);
1711 if (delete_requested
|| resync_requested
) {
1712 m_local_image_id
= "";
1713 } else if (m_last_r
== -ENOENT
&&
1714 m_local_image_id
.empty() && m_remote_image
.image_id
.empty()) {
1715 dout(0) << "mirror image no longer exists" << dendl
;
1716 unregister_asok_hook
= true;
1721 if (unregister_asok_hook
) {
1722 unregister_admin_socket_hook();
1725 if (delete_requested
|| resync_requested
) {
1726 dout(5) << "moving image to trash" << dendl
;
1727 auto ctx
= new FunctionContext([this, r
](int) {
1728 handle_shut_down(r
);
1730 ImageDeleter
<I
>::trash_move(*m_local_ioctx
, m_global_image_id
,
1731 resync_requested
, m_threads
->work_queue
, ctx
);
1735 dout(10) << "stop complete" << dendl
;
1736 ReplayStatusFormatter
<I
>::destroy(m_replay_status_formatter
);
1737 m_replay_status_formatter
= nullptr;
1739 Context
*on_start
= nullptr;
1740 Context
*on_stop
= nullptr;
1742 Mutex::Locker
locker(m_lock
);
1743 std::swap(on_start
, m_on_start_finish
);
1744 std::swap(on_stop
, m_on_stop_finish
);
1745 m_stop_requested
= false;
1746 ceph_assert(m_delayed_preprocess_task
== nullptr);
1747 ceph_assert(m_state
== STATE_STOPPING
);
1748 m_state
= STATE_STOPPED
;
1751 if (on_start
!= nullptr) {
1752 dout(10) << "on start finish complete, r=" << r
<< dendl
;
1753 on_start
->complete(r
);
1756 if (on_stop
!= nullptr) {
1757 dout(10) << "on stop finish complete, r=" << r
<< dendl
;
1758 on_stop
->complete(r
);
1762 template <typename I
>
1763 void ImageReplayer
<I
>::handle_remote_journal_metadata_updated() {
1766 cls::journal::Client client
;
1768 Mutex::Locker
locker(m_lock
);
1769 if (!is_running_()) {
1773 int r
= m_remote_journaler
->get_cached_client(m_local_mirror_uuid
, &client
);
1775 derr
<< "failed to retrieve client: " << cpp_strerror(r
) << dendl
;
1780 if (client
.state
!= cls::journal::CLIENT_STATE_CONNECTED
) {
1781 dout(0) << "client flagged disconnected, stopping image replay" << dendl
;
1782 stop(nullptr, false, -ENOTCONN
, "disconnected");
1786 template <typename I
>
1787 std::string ImageReplayer
<I
>::to_string(const State state
) {
1789 case ImageReplayer
<I
>::STATE_STARTING
:
1791 case ImageReplayer
<I
>::STATE_REPLAYING
:
1793 case ImageReplayer
<I
>::STATE_REPLAY_FLUSHING
:
1794 return "ReplayFlushing";
1795 case ImageReplayer
<I
>::STATE_STOPPING
:
1797 case ImageReplayer
<I
>::STATE_STOPPED
:
1802 return "Unknown(" + stringify(state
) + ")";
1805 template <typename I
>
1806 void ImageReplayer
<I
>::resync_image(Context
*on_finish
) {
1809 m_resync_requested
= true;
1813 template <typename I
>
1814 void ImageReplayer
<I
>::register_admin_socket_hook() {
1815 ImageReplayerAdminSocketHook
<I
> *asok_hook
;
1817 Mutex::Locker
locker(m_lock
);
1818 if (m_asok_hook
!= nullptr) {
1822 ceph_assert(m_perf_counters
== nullptr);
1824 dout(15) << "registered asok hook: " << m_name
<< dendl
;
1825 asok_hook
= new ImageReplayerAdminSocketHook
<I
>(g_ceph_context
, m_name
,
1827 int r
= asok_hook
->register_commands();
1829 m_asok_hook
= asok_hook
;
1831 CephContext
*cct
= static_cast<CephContext
*>(m_local
->cct());
1832 auto prio
= cct
->_conf
.get_val
<int64_t>("rbd_mirror_perf_stats_prio");
1833 PerfCountersBuilder
plb(g_ceph_context
, "rbd_mirror_" + m_name
,
1834 l_rbd_mirror_first
, l_rbd_mirror_last
);
1835 plb
.add_u64_counter(l_rbd_mirror_replay
, "replay", "Replays", "r", prio
);
1836 plb
.add_u64_counter(l_rbd_mirror_replay_bytes
, "replay_bytes",
1837 "Replayed data", "rb", prio
, unit_t(UNIT_BYTES
));
1838 plb
.add_time_avg(l_rbd_mirror_replay_latency
, "replay_latency",
1839 "Replay latency", "rl", prio
);
1840 m_perf_counters
= plb
.create_perf_counters();
1841 g_ceph_context
->get_perfcounters_collection()->add(m_perf_counters
);
1845 derr
<< "error registering admin socket commands" << dendl
;
1850 template <typename I
>
1851 void ImageReplayer
<I
>::unregister_admin_socket_hook() {
1854 AdminSocketHook
*asok_hook
= nullptr;
1855 PerfCounters
*perf_counters
= nullptr;
1857 Mutex::Locker
locker(m_lock
);
1858 std::swap(asok_hook
, m_asok_hook
);
1859 std::swap(perf_counters
, m_perf_counters
);
1862 if (perf_counters
!= nullptr) {
1863 g_ceph_context
->get_perfcounters_collection()->remove(perf_counters
);
1864 delete perf_counters
;
1868 template <typename I
>
1869 void ImageReplayer
<I
>::reregister_admin_socket_hook() {
1871 Mutex::Locker
locker(m_lock
);
1872 auto name
= m_local_ioctx
->get_pool_name() + "/" + m_local_image_name
;
1873 if (m_asok_hook
!= nullptr && m_name
== name
) {
1878 unregister_admin_socket_hook();
1879 register_admin_socket_hook();
1882 template <typename I
>
1883 std::ostream
&operator<<(std::ostream
&os
, const ImageReplayer
<I
> &replayer
)
1885 os
<< "ImageReplayer: " << &replayer
<< " [" << replayer
.get_local_pool_id()
1886 << "/" << replayer
.get_global_image_id() << "]";
1890 } // namespace mirror
1893 template class rbd::mirror::ImageReplayer
<librbd::ImageCtx
>;