1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "NamespaceReplayer.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/Utils.h"
10 #include "librbd/api/Config.h"
11 #include "librbd/api/Mirror.h"
12 #include "librbd/asio/ContextWQ.h"
13 #include "ServiceDaemon.h"
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
19 #define dout_prefix *_dout << "rbd::mirror::NamespaceReplayer: " \
20 << this << " " << __func__ << ": "
22 using librbd::util::create_async_context_callback
;
23 using librbd::util::create_context_callback
;
32 const std::string
SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
33 const std::string
SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
35 } // anonymous namespace
38 NamespaceReplayer
<I
>::NamespaceReplayer(
39 const std::string
&name
,
40 librados::IoCtx
&local_io_ctx
, librados::IoCtx
&remote_io_ctx
,
41 const std::string
&local_mirror_uuid
,
42 const std::string
& local_mirror_peer_uuid
,
43 const RemotePoolMeta
& remote_pool_meta
,
45 Throttler
<I
> *image_sync_throttler
,
46 Throttler
<I
> *image_deletion_throttler
,
47 ServiceDaemon
<I
> *service_daemon
,
48 journal::CacheManagerHandler
*cache_manager_handler
,
49 PoolMetaCache
* pool_meta_cache
) :
50 m_namespace_name(name
),
51 m_local_mirror_uuid(local_mirror_uuid
),
52 m_local_mirror_peer_uuid(local_mirror_peer_uuid
),
53 m_remote_pool_meta(remote_pool_meta
),
54 m_threads(threads
), m_image_sync_throttler(image_sync_throttler
),
55 m_image_deletion_throttler(image_deletion_throttler
),
56 m_service_daemon(service_daemon
),
57 m_cache_manager_handler(cache_manager_handler
),
58 m_pool_meta_cache(pool_meta_cache
),
59 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
60 "rbd::mirror::NamespaceReplayer " + name
, this))),
61 m_local_pool_watcher_listener(this, true),
62 m_remote_pool_watcher_listener(this, false),
63 m_image_map_listener(this) {
64 dout(10) << name
<< dendl
;
66 m_local_io_ctx
.dup(local_io_ctx
);
67 m_local_io_ctx
.set_namespace(name
);
68 m_remote_io_ctx
.dup(remote_io_ctx
);
69 m_remote_io_ctx
.set_namespace(name
);
73 bool NamespaceReplayer
<I
>::is_blocklisted() const {
74 std::lock_guard locker
{m_lock
};
75 return m_instance_replayer
->is_blocklisted() ||
76 (m_local_pool_watcher
&&
77 m_local_pool_watcher
->is_blocklisted()) ||
78 (m_remote_pool_watcher
&&
79 m_remote_pool_watcher
->is_blocklisted());
83 void NamespaceReplayer
<I
>::init(Context
*on_finish
) {
86 std::lock_guard locker
{m_lock
};
88 ceph_assert(m_on_finish
== nullptr);
89 m_on_finish
= on_finish
;
91 init_local_status_updater();
96 void NamespaceReplayer
<I
>::shut_down(Context
*on_finish
) {
100 std::lock_guard locker
{m_lock
};
102 ceph_assert(m_on_finish
== nullptr);
103 m_on_finish
= on_finish
;
106 stop_instance_replayer();
111 auto ctx
= new LambdaContext(
113 std::lock_guard locker
{m_lock
};
114 stop_instance_replayer();
116 handle_release_leader(ctx
);
119 template <typename I
>
120 void NamespaceReplayer
<I
>::print_status(Formatter
*f
)
126 std::lock_guard locker
{m_lock
};
128 m_instance_replayer
->print_status(f
);
130 if (m_image_deleter
) {
131 f
->open_object_section("image_deleter");
132 m_image_deleter
->print_status(f
);
137 template <typename I
>
138 void NamespaceReplayer
<I
>::start()
142 std::lock_guard locker
{m_lock
};
144 m_instance_replayer
->start();
147 template <typename I
>
148 void NamespaceReplayer
<I
>::stop()
152 std::lock_guard locker
{m_lock
};
154 m_instance_replayer
->stop();
157 template <typename I
>
158 void NamespaceReplayer
<I
>::restart()
162 std::lock_guard locker
{m_lock
};
164 m_instance_replayer
->restart();
167 template <typename I
>
168 void NamespaceReplayer
<I
>::flush()
172 std::lock_guard locker
{m_lock
};
174 m_instance_replayer
->flush();
177 template <typename I
>
178 void NamespaceReplayer
<I
>::handle_update(const std::string
&mirror_uuid
,
179 ImageIds
&&added_image_ids
,
180 ImageIds
&&removed_image_ids
) {
181 std::lock_guard locker
{m_lock
};
184 dout(20) << "not leader" << dendl
;
188 dout(10) << "mirror_uuid=" << mirror_uuid
<< ", "
189 << "added_count=" << added_image_ids
.size() << ", "
190 << "removed_count=" << removed_image_ids
.size() << dendl
;
192 m_service_daemon
->add_or_update_namespace_attribute(
193 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
194 SERVICE_DAEMON_LOCAL_COUNT_KEY
, m_local_pool_watcher
->get_image_count());
195 if (m_remote_pool_watcher
) {
196 m_service_daemon
->add_or_update_namespace_attribute(
197 m_local_io_ctx
.get_id(), m_local_io_ctx
.get_namespace(),
198 SERVICE_DAEMON_REMOTE_COUNT_KEY
,
199 m_remote_pool_watcher
->get_image_count());
202 std::set
<std::string
> added_global_image_ids
;
203 for (auto& image_id
: added_image_ids
) {
204 added_global_image_ids
.insert(image_id
.global_id
);
207 std::set
<std::string
> removed_global_image_ids
;
208 for (auto& image_id
: removed_image_ids
) {
209 removed_global_image_ids
.insert(image_id
.global_id
);
212 m_image_map
->update_images(mirror_uuid
,
213 std::move(added_global_image_ids
),
214 std::move(removed_global_image_ids
));
217 template <typename I
>
218 void NamespaceReplayer
<I
>::handle_acquire_leader(Context
*on_finish
) {
221 m_instance_watcher
->handle_acquire_leader();
223 init_image_map(on_finish
);
226 template <typename I
>
227 void NamespaceReplayer
<I
>::handle_release_leader(Context
*on_finish
) {
230 m_instance_watcher
->handle_release_leader();
231 shut_down_image_deleter(on_finish
);
234 template <typename I
>
235 void NamespaceReplayer
<I
>::handle_update_leader(
236 const std::string
&leader_instance_id
) {
237 dout(10) << "leader_instance_id=" << leader_instance_id
<< dendl
;
239 m_instance_watcher
->handle_update_leader(leader_instance_id
);
242 template <typename I
>
243 void NamespaceReplayer
<I
>::handle_instances_added(
244 const std::vector
<std::string
> &instance_ids
) {
245 dout(10) << "instance_ids=" << instance_ids
<< dendl
;
247 std::lock_guard locker
{m_lock
};
253 m_image_map
->update_instances_added(instance_ids
);
256 template <typename I
>
257 void NamespaceReplayer
<I
>::handle_instances_removed(
258 const std::vector
<std::string
> &instance_ids
) {
259 dout(10) << "instance_ids=" << instance_ids
<< dendl
;
261 std::lock_guard locker
{m_lock
};
267 m_image_map
->update_instances_removed(instance_ids
);
270 template <typename I
>
271 void NamespaceReplayer
<I
>::init_local_status_updater() {
274 ceph_assert(ceph_mutex_is_locked(m_lock
));
275 ceph_assert(!m_local_status_updater
);
277 m_local_status_updater
.reset(MirrorStatusUpdater
<I
>::create(
278 m_local_io_ctx
, m_threads
, ""));
279 auto ctx
= create_context_callback
<
280 NamespaceReplayer
<I
>,
281 &NamespaceReplayer
<I
>::handle_init_local_status_updater
>(this);
283 m_local_status_updater
->init(ctx
);
286 template <typename I
>
287 void NamespaceReplayer
<I
>::handle_init_local_status_updater(int r
) {
288 dout(10) << "r=" << r
<< dendl
;
290 std::lock_guard locker
{m_lock
};
293 derr
<< "error initializing local mirror status updater: "
294 << cpp_strerror(r
) << dendl
;
296 m_local_status_updater
.reset();
297 ceph_assert(m_on_finish
!= nullptr);
298 m_threads
->work_queue
->queue(m_on_finish
, r
);
299 m_on_finish
= nullptr;
303 init_remote_status_updater();
306 template <typename I
>
307 void NamespaceReplayer
<I
>::init_remote_status_updater() {
310 ceph_assert(ceph_mutex_is_locked(m_lock
));
311 ceph_assert(!m_remote_status_updater
);
313 m_remote_status_updater
.reset(MirrorStatusUpdater
<I
>::create(
314 m_remote_io_ctx
, m_threads
, m_local_mirror_uuid
));
315 auto ctx
= create_context_callback
<
316 NamespaceReplayer
<I
>,
317 &NamespaceReplayer
<I
>::handle_init_remote_status_updater
>(this);
318 m_remote_status_updater
->init(ctx
);
321 template <typename I
>
322 void NamespaceReplayer
<I
>::handle_init_remote_status_updater(int r
) {
323 dout(10) << "r=" << r
<< dendl
;
325 std::lock_guard locker
{m_lock
};
328 derr
<< "error initializing remote mirror status updater: "
329 << cpp_strerror(r
) << dendl
;
331 m_remote_status_updater
.reset();
333 shut_down_local_status_updater();
337 init_instance_replayer();
340 template <typename I
>
341 void NamespaceReplayer
<I
>::init_instance_replayer() {
344 ceph_assert(ceph_mutex_is_locked(m_lock
));
345 ceph_assert(!m_instance_replayer
);
347 m_instance_replayer
.reset(InstanceReplayer
<I
>::create(
348 m_local_io_ctx
, m_local_mirror_uuid
, m_threads
, m_service_daemon
,
349 m_local_status_updater
.get(), m_cache_manager_handler
,
351 auto ctx
= create_context_callback
<NamespaceReplayer
<I
>,
352 &NamespaceReplayer
<I
>::handle_init_instance_replayer
>(this);
354 m_instance_replayer
->init(ctx
);
357 template <typename I
>
358 void NamespaceReplayer
<I
>::handle_init_instance_replayer(int r
) {
359 dout(10) << "r=" << r
<< dendl
;
361 std::lock_guard locker
{m_lock
};
364 derr
<< "error initializing instance replayer: " << cpp_strerror(r
)
367 m_instance_replayer
.reset();
369 shut_down_remote_status_updater();
373 m_instance_replayer
->add_peer({m_local_mirror_peer_uuid
, m_remote_io_ctx
,
375 m_remote_status_updater
.get()});
377 init_instance_watcher();
380 template <typename I
>
381 void NamespaceReplayer
<I
>::init_instance_watcher() {
384 ceph_assert(ceph_mutex_is_locked(m_lock
));
385 ceph_assert(!m_instance_watcher
);
387 m_instance_watcher
.reset(InstanceWatcher
<I
>::create(
388 m_local_io_ctx
, *m_threads
->asio_engine
, m_instance_replayer
.get(),
389 m_image_sync_throttler
));
390 auto ctx
= create_context_callback
<NamespaceReplayer
<I
>,
391 &NamespaceReplayer
<I
>::handle_init_instance_watcher
>(this);
393 m_instance_watcher
->init(ctx
);
396 template <typename I
>
397 void NamespaceReplayer
<I
>::handle_init_instance_watcher(int r
) {
398 dout(10) << "r=" << r
<< dendl
;
400 std::lock_guard locker
{m_lock
};
403 derr
<< "error initializing instance watcher: " << cpp_strerror(r
)
406 m_instance_watcher
.reset();
408 shut_down_instance_replayer();
412 ceph_assert(m_on_finish
!= nullptr);
413 m_threads
->work_queue
->queue(m_on_finish
);
414 m_on_finish
= nullptr;
417 template <typename I
>
418 void NamespaceReplayer
<I
>::stop_instance_replayer() {
421 ceph_assert(ceph_mutex_is_locked(m_lock
));
423 Context
*ctx
= create_async_context_callback(
424 m_threads
->work_queue
, create_context_callback
<NamespaceReplayer
<I
>,
425 &NamespaceReplayer
<I
>::handle_stop_instance_replayer
>(this));
427 m_instance_replayer
->stop(ctx
);
430 template <typename I
>
431 void NamespaceReplayer
<I
>::handle_stop_instance_replayer(int r
) {
432 dout(10) << "r=" << r
<< dendl
;
435 derr
<< "error stopping instance replayer: " << cpp_strerror(r
) << dendl
;
438 std::lock_guard locker
{m_lock
};
440 shut_down_instance_watcher();
443 template <typename I
>
444 void NamespaceReplayer
<I
>::shut_down_instance_watcher() {
447 ceph_assert(ceph_mutex_is_locked(m_lock
));
448 ceph_assert(m_instance_watcher
);
450 Context
*ctx
= create_async_context_callback(
451 m_threads
->work_queue
, create_context_callback
<NamespaceReplayer
<I
>,
452 &NamespaceReplayer
<I
>::handle_shut_down_instance_watcher
>(this));
454 m_instance_watcher
->shut_down(ctx
);
457 template <typename I
>
458 void NamespaceReplayer
<I
>::handle_shut_down_instance_watcher(int r
) {
459 dout(10) << "r=" << r
<< dendl
;
462 derr
<< "error shutting instance watcher down: " << cpp_strerror(r
)
466 std::lock_guard locker
{m_lock
};
468 m_instance_watcher
.reset();
470 shut_down_instance_replayer();
473 template <typename I
>
474 void NamespaceReplayer
<I
>::shut_down_instance_replayer() {
477 ceph_assert(ceph_mutex_is_locked(m_lock
));
478 ceph_assert(m_instance_replayer
);
480 Context
*ctx
= create_async_context_callback(
481 m_threads
->work_queue
, create_context_callback
<NamespaceReplayer
<I
>,
482 &NamespaceReplayer
<I
>::handle_shut_down_instance_replayer
>(this));
484 m_instance_replayer
->shut_down(ctx
);
487 template <typename I
>
488 void NamespaceReplayer
<I
>::handle_shut_down_instance_replayer(int r
) {
489 dout(10) << "r=" << r
<< dendl
;
492 derr
<< "error shutting instance replayer down: " << cpp_strerror(r
)
496 std::lock_guard locker
{m_lock
};
498 m_instance_replayer
.reset();
500 shut_down_remote_status_updater();
503 template <typename I
>
504 void NamespaceReplayer
<I
>::shut_down_remote_status_updater() {
507 ceph_assert(ceph_mutex_is_locked(m_lock
));
508 ceph_assert(m_remote_status_updater
);
510 auto ctx
= create_async_context_callback(
511 m_threads
->work_queue
, create_context_callback
<
512 NamespaceReplayer
<I
>,
513 &NamespaceReplayer
<I
>::handle_shut_down_remote_status_updater
>(this));
514 m_remote_status_updater
->shut_down(ctx
);
517 template <typename I
>
518 void NamespaceReplayer
<I
>::handle_shut_down_remote_status_updater(int r
) {
519 dout(10) << "r=" << r
<< dendl
;
522 derr
<< "error shutting remote mirror status updater down: "
523 << cpp_strerror(r
) << dendl
;
526 std::lock_guard locker
{m_lock
};
527 m_remote_status_updater
.reset();
529 shut_down_local_status_updater();
532 template <typename I
>
533 void NamespaceReplayer
<I
>::shut_down_local_status_updater() {
536 ceph_assert(ceph_mutex_is_locked(m_lock
));
537 ceph_assert(m_local_status_updater
);
539 auto ctx
= create_async_context_callback(
540 m_threads
->work_queue
, create_context_callback
<
541 NamespaceReplayer
<I
>,
542 &NamespaceReplayer
<I
>::handle_shut_down_local_status_updater
>(this));
544 m_local_status_updater
->shut_down(ctx
);
547 template <typename I
>
548 void NamespaceReplayer
<I
>::handle_shut_down_local_status_updater(int r
) {
549 dout(10) << "r=" << r
<< dendl
;
552 derr
<< "error shutting local mirror status updater down: "
553 << cpp_strerror(r
) << dendl
;
556 std::lock_guard locker
{m_lock
};
558 m_local_status_updater
.reset();
560 ceph_assert(!m_image_map
);
561 ceph_assert(!m_image_deleter
);
562 ceph_assert(!m_local_pool_watcher
);
563 ceph_assert(!m_remote_pool_watcher
);
564 ceph_assert(!m_instance_watcher
);
565 ceph_assert(!m_instance_replayer
);
567 ceph_assert(m_on_finish
!= nullptr);
568 m_threads
->work_queue
->queue(m_on_finish
, m_ret_val
);
569 m_on_finish
= nullptr;
573 template <typename I
>
574 void NamespaceReplayer
<I
>::init_image_map(Context
*on_finish
) {
577 auto image_map
= ImageMap
<I
>::create(m_local_io_ctx
, m_threads
,
578 m_instance_watcher
->get_instance_id(),
579 m_image_map_listener
);
581 auto ctx
= new LambdaContext(
582 [this, image_map
, on_finish
](int r
) {
583 handle_init_image_map(r
, image_map
, on_finish
);
585 image_map
->init(create_async_context_callback(
586 m_threads
->work_queue
, ctx
));
589 template <typename I
>
590 void NamespaceReplayer
<I
>::handle_init_image_map(int r
, ImageMap
<I
> *image_map
,
591 Context
*on_finish
) {
592 dout(10) << "r=" << r
<< dendl
;
594 derr
<< "failed to init image map: " << cpp_strerror(r
) << dendl
;
595 on_finish
= new LambdaContext([image_map
, on_finish
, r
](int) {
597 on_finish
->complete(r
);
599 image_map
->shut_down(on_finish
);
603 ceph_assert(!m_image_map
);
604 m_image_map
.reset(image_map
);
606 init_local_pool_watcher(on_finish
);
609 template <typename I
>
610 void NamespaceReplayer
<I
>::init_local_pool_watcher(Context
*on_finish
) {
613 std::lock_guard locker
{m_lock
};
614 ceph_assert(!m_local_pool_watcher
);
615 m_local_pool_watcher
.reset(PoolWatcher
<I
>::create(
616 m_threads
, m_local_io_ctx
, m_local_mirror_uuid
,
617 m_local_pool_watcher_listener
));
619 // ensure the initial set of local images is up-to-date
620 // after acquiring the leader role
621 auto ctx
= new LambdaContext([this, on_finish
](int r
) {
622 handle_init_local_pool_watcher(r
, on_finish
);
624 m_local_pool_watcher
->init(create_async_context_callback(
625 m_threads
->work_queue
, ctx
));
628 template <typename I
>
629 void NamespaceReplayer
<I
>::handle_init_local_pool_watcher(
630 int r
, Context
*on_finish
) {
631 dout(10) << "r=" << r
<< dendl
;
633 derr
<< "failed to retrieve local images: " << cpp_strerror(r
) << dendl
;
634 on_finish
= new LambdaContext([on_finish
, r
](int) {
635 on_finish
->complete(r
);
637 shut_down_pool_watchers(on_finish
);
641 init_remote_pool_watcher(on_finish
);
644 template <typename I
>
645 void NamespaceReplayer
<I
>::init_remote_pool_watcher(Context
*on_finish
) {
648 std::lock_guard locker
{m_lock
};
649 ceph_assert(!m_remote_pool_watcher
);
650 m_remote_pool_watcher
.reset(PoolWatcher
<I
>::create(
651 m_threads
, m_remote_io_ctx
, m_remote_pool_meta
.mirror_uuid
,
652 m_remote_pool_watcher_listener
));
654 auto ctx
= new LambdaContext([this, on_finish
](int r
) {
655 handle_init_remote_pool_watcher(r
, on_finish
);
657 m_remote_pool_watcher
->init(create_async_context_callback(
658 m_threads
->work_queue
, ctx
));
661 template <typename I
>
662 void NamespaceReplayer
<I
>::handle_init_remote_pool_watcher(
663 int r
, Context
*on_finish
) {
664 dout(10) << "r=" << r
<< dendl
;
666 // Technically nothing to do since the other side doesn't
667 // have mirroring enabled. Eventually the remote pool watcher will
668 // detect images (if mirroring is enabled), so no point propagating
669 // an error which would just busy-spin the state machines.
670 dout(0) << "remote peer does not have mirroring configured" << dendl
;
672 derr
<< "failed to retrieve remote images: " << cpp_strerror(r
) << dendl
;
673 on_finish
= new LambdaContext([on_finish
, r
](int) {
674 on_finish
->complete(r
);
676 shut_down_pool_watchers(on_finish
);
680 init_image_deleter(on_finish
);
683 template <typename I
>
684 void NamespaceReplayer
<I
>::init_image_deleter(Context
*on_finish
) {
687 std::lock_guard locker
{m_lock
};
688 ceph_assert(!m_image_deleter
);
690 on_finish
= new LambdaContext([this, on_finish
](int r
) {
691 handle_init_image_deleter(r
, on_finish
);
693 m_image_deleter
.reset(ImageDeleter
<I
>::create(m_local_io_ctx
, m_threads
,
694 m_image_deletion_throttler
,
696 m_image_deleter
->init(create_async_context_callback(
697 m_threads
->work_queue
, on_finish
));
700 template <typename I
>
701 void NamespaceReplayer
<I
>::handle_init_image_deleter(
702 int r
, Context
*on_finish
) {
703 dout(10) << "r=" << r
<< dendl
;
705 derr
<< "failed to init image deleter: " << cpp_strerror(r
) << dendl
;
706 on_finish
= new LambdaContext([on_finish
, r
](int) {
707 on_finish
->complete(r
);
709 shut_down_image_deleter(on_finish
);
713 on_finish
->complete(0);
716 template <typename I
>
717 void NamespaceReplayer
<I
>::shut_down_image_deleter(Context
* on_finish
) {
720 std::lock_guard locker
{m_lock
};
721 if (m_image_deleter
) {
722 Context
*ctx
= new LambdaContext([this, on_finish
](int r
) {
723 handle_shut_down_image_deleter(r
, on_finish
);
725 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
727 m_image_deleter
->shut_down(ctx
);
731 shut_down_pool_watchers(on_finish
);
734 template <typename I
>
735 void NamespaceReplayer
<I
>::handle_shut_down_image_deleter(
736 int r
, Context
* on_finish
) {
737 dout(10) << "r=" << r
<< dendl
;
740 std::lock_guard locker
{m_lock
};
741 ceph_assert(m_image_deleter
);
742 m_image_deleter
.reset();
745 shut_down_pool_watchers(on_finish
);
748 template <typename I
>
749 void NamespaceReplayer
<I
>::shut_down_pool_watchers(Context
*on_finish
) {
753 std::lock_guard locker
{m_lock
};
754 if (m_local_pool_watcher
) {
755 Context
*ctx
= new LambdaContext([this, on_finish
](int r
) {
756 handle_shut_down_pool_watchers(r
, on_finish
);
758 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
760 auto gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
761 m_local_pool_watcher
->shut_down(gather_ctx
->new_sub());
762 if (m_remote_pool_watcher
) {
763 m_remote_pool_watcher
->shut_down(gather_ctx
->new_sub());
765 gather_ctx
->activate();
770 on_finish
->complete(0);
773 template <typename I
>
774 void NamespaceReplayer
<I
>::handle_shut_down_pool_watchers(
775 int r
, Context
*on_finish
) {
776 dout(10) << "r=" << r
<< dendl
;
779 std::lock_guard locker
{m_lock
};
780 ceph_assert(m_local_pool_watcher
);
781 m_local_pool_watcher
.reset();
783 if (m_remote_pool_watcher
) {
784 m_remote_pool_watcher
.reset();
787 shut_down_image_map(on_finish
);
790 template <typename I
>
791 void NamespaceReplayer
<I
>::shut_down_image_map(Context
*on_finish
) {
794 std::lock_guard locker
{m_lock
};
796 on_finish
= new LambdaContext(
797 [this, on_finish
](int r
) {
798 handle_shut_down_image_map(r
, on_finish
);
800 m_image_map
->shut_down(create_async_context_callback(
801 m_threads
->work_queue
, on_finish
));
805 m_threads
->work_queue
->queue(on_finish
);
808 template <typename I
>
809 void NamespaceReplayer
<I
>::handle_shut_down_image_map(int r
, Context
*on_finish
) {
810 dout(5) << "r=" << r
<< dendl
;
811 if (r
< 0 && r
!= -EBLOCKLISTED
) {
812 derr
<< "failed to shut down image map: " << cpp_strerror(r
) << dendl
;
815 std::lock_guard locker
{m_lock
};
816 ceph_assert(m_image_map
);
819 m_instance_replayer
->release_all(create_async_context_callback(
820 m_threads
->work_queue
, on_finish
));
823 template <typename I
>
824 void NamespaceReplayer
<I
>::handle_acquire_image(const std::string
&global_image_id
,
825 const std::string
&instance_id
,
826 Context
* on_finish
) {
827 dout(5) << "global_image_id=" << global_image_id
<< ", "
828 << "instance_id=" << instance_id
<< dendl
;
830 m_instance_watcher
->notify_image_acquire(instance_id
, global_image_id
,
834 template <typename I
>
835 void NamespaceReplayer
<I
>::handle_release_image(const std::string
&global_image_id
,
836 const std::string
&instance_id
,
837 Context
* on_finish
) {
838 dout(5) << "global_image_id=" << global_image_id
<< ", "
839 << "instance_id=" << instance_id
<< dendl
;
841 m_instance_watcher
->notify_image_release(instance_id
, global_image_id
,
845 template <typename I
>
846 void NamespaceReplayer
<I
>::handle_remove_image(const std::string
&mirror_uuid
,
847 const std::string
&global_image_id
,
848 const std::string
&instance_id
,
849 Context
* on_finish
) {
850 ceph_assert(!mirror_uuid
.empty());
851 dout(5) << "mirror_uuid=" << mirror_uuid
<< ", "
852 << "global_image_id=" << global_image_id
<< ", "
853 << "instance_id=" << instance_id
<< dendl
;
855 m_instance_watcher
->notify_peer_image_removed(instance_id
, global_image_id
,
856 mirror_uuid
, on_finish
);
859 } // namespace mirror
862 template class rbd::mirror::NamespaceReplayer
<librbd::ImageCtx
>;