]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/NamespaceReplayer.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / NamespaceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "NamespaceReplayer.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/Utils.h"
10 #include "librbd/api/Config.h"
11 #include "librbd/api/Mirror.h"
12 #include "librbd/asio/ContextWQ.h"
13 #include "ServiceDaemon.h"
14 #include "Threads.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rbd_mirror
18 #undef dout_prefix
19 #define dout_prefix *_dout << "rbd::mirror::NamespaceReplayer: " \
20 << this << " " << __func__ << ": "
21
22 using librbd::util::create_async_context_callback;
23 using librbd::util::create_context_callback;
24
25 namespace rbd {
26 namespace mirror {
27
28 using ::operator<<;
29
30 namespace {
31
32 const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
33 const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
34
35 } // anonymous namespace
36
37 template <typename I>
38 NamespaceReplayer<I>::NamespaceReplayer(
39 const std::string &name,
40 librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
41 const std::string &local_mirror_uuid,
42 const std::string& local_mirror_peer_uuid,
43 const RemotePoolMeta& remote_pool_meta,
44 Threads<I> *threads,
45 Throttler<I> *image_sync_throttler,
46 Throttler<I> *image_deletion_throttler,
47 ServiceDaemon<I> *service_daemon,
48 journal::CacheManagerHandler *cache_manager_handler,
49 PoolMetaCache* pool_meta_cache) :
50 m_namespace_name(name),
51 m_local_mirror_uuid(local_mirror_uuid),
52 m_local_mirror_peer_uuid(local_mirror_peer_uuid),
53 m_remote_pool_meta(remote_pool_meta),
54 m_threads(threads), m_image_sync_throttler(image_sync_throttler),
55 m_image_deletion_throttler(image_deletion_throttler),
56 m_service_daemon(service_daemon),
57 m_cache_manager_handler(cache_manager_handler),
58 m_pool_meta_cache(pool_meta_cache),
59 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
60 "rbd::mirror::NamespaceReplayer " + name, this))),
61 m_local_pool_watcher_listener(this, true),
62 m_remote_pool_watcher_listener(this, false),
63 m_image_map_listener(this) {
64 dout(10) << name << dendl;
65
66 m_local_io_ctx.dup(local_io_ctx);
67 m_local_io_ctx.set_namespace(name);
68 m_remote_io_ctx.dup(remote_io_ctx);
69 m_remote_io_ctx.set_namespace(name);
70 }
71
72 template <typename I>
73 bool NamespaceReplayer<I>::is_blocklisted() const {
74 std::lock_guard locker{m_lock};
75 return m_instance_replayer->is_blocklisted() ||
76 (m_local_pool_watcher &&
77 m_local_pool_watcher->is_blocklisted()) ||
78 (m_remote_pool_watcher &&
79 m_remote_pool_watcher->is_blocklisted());
80 }
81
82 template <typename I>
83 void NamespaceReplayer<I>::init(Context *on_finish) {
84 dout(20) << dendl;
85
86 std::lock_guard locker{m_lock};
87
88 ceph_assert(m_on_finish == nullptr);
89 m_on_finish = on_finish;
90
91 init_local_status_updater();
92 }
93
94
95 template <typename I>
96 void NamespaceReplayer<I>::shut_down(Context *on_finish) {
97 dout(20) << dendl;
98
99 {
100 std::lock_guard locker{m_lock};
101
102 ceph_assert(m_on_finish == nullptr);
103 m_on_finish = on_finish;
104
105 if (!m_image_map) {
106 stop_instance_replayer();
107 return;
108 }
109 }
110
111 auto ctx = new LambdaContext(
112 [this] (int r) {
113 std::lock_guard locker{m_lock};
114 stop_instance_replayer();
115 });
116 handle_release_leader(ctx);
117 }
118
119 template <typename I>
120 void NamespaceReplayer<I>::print_status(Formatter *f)
121 {
122 dout(20) << dendl;
123
124 ceph_assert(f);
125
126 std::lock_guard locker{m_lock};
127
128 m_instance_replayer->print_status(f);
129
130 if (m_image_deleter) {
131 f->open_object_section("image_deleter");
132 m_image_deleter->print_status(f);
133 f->close_section();
134 }
135 }
136
137 template <typename I>
138 void NamespaceReplayer<I>::start()
139 {
140 dout(20) << dendl;
141
142 std::lock_guard locker{m_lock};
143
144 m_instance_replayer->start();
145 }
146
147 template <typename I>
148 void NamespaceReplayer<I>::stop()
149 {
150 dout(20) << dendl;
151
152 std::lock_guard locker{m_lock};
153
154 m_instance_replayer->stop();
155 }
156
157 template <typename I>
158 void NamespaceReplayer<I>::restart()
159 {
160 dout(20) << dendl;
161
162 std::lock_guard locker{m_lock};
163
164 m_instance_replayer->restart();
165 }
166
167 template <typename I>
168 void NamespaceReplayer<I>::flush()
169 {
170 dout(20) << dendl;
171
172 std::lock_guard locker{m_lock};
173
174 m_instance_replayer->flush();
175 }
176
177 template <typename I>
178 void NamespaceReplayer<I>::handle_update(const std::string &mirror_uuid,
179 ImageIds &&added_image_ids,
180 ImageIds &&removed_image_ids) {
181 std::lock_guard locker{m_lock};
182
183 if (!m_image_map) {
184 dout(20) << "not leader" << dendl;
185 return;
186 }
187
188 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
189 << "added_count=" << added_image_ids.size() << ", "
190 << "removed_count=" << removed_image_ids.size() << dendl;
191
192 m_service_daemon->add_or_update_namespace_attribute(
193 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
194 SERVICE_DAEMON_LOCAL_COUNT_KEY, m_local_pool_watcher->get_image_count());
195 if (m_remote_pool_watcher) {
196 m_service_daemon->add_or_update_namespace_attribute(
197 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
198 SERVICE_DAEMON_REMOTE_COUNT_KEY,
199 m_remote_pool_watcher->get_image_count());
200 }
201
202 std::set<std::string> added_global_image_ids;
203 for (auto& image_id : added_image_ids) {
204 added_global_image_ids.insert(image_id.global_id);
205 }
206
207 std::set<std::string> removed_global_image_ids;
208 for (auto& image_id : removed_image_ids) {
209 removed_global_image_ids.insert(image_id.global_id);
210 }
211
212 m_image_map->update_images(mirror_uuid,
213 std::move(added_global_image_ids),
214 std::move(removed_global_image_ids));
215 }
216
217 template <typename I>
218 void NamespaceReplayer<I>::handle_acquire_leader(Context *on_finish) {
219 dout(10) << dendl;
220
221 m_instance_watcher->handle_acquire_leader();
222
223 init_image_map(on_finish);
224 }
225
226 template <typename I>
227 void NamespaceReplayer<I>::handle_release_leader(Context *on_finish) {
228 dout(10) << dendl;
229
230 m_instance_watcher->handle_release_leader();
231 shut_down_image_deleter(on_finish);
232 }
233
234 template <typename I>
235 void NamespaceReplayer<I>::handle_update_leader(
236 const std::string &leader_instance_id) {
237 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
238
239 m_instance_watcher->handle_update_leader(leader_instance_id);
240 }
241
242 template <typename I>
243 void NamespaceReplayer<I>::handle_instances_added(
244 const std::vector<std::string> &instance_ids) {
245 dout(10) << "instance_ids=" << instance_ids << dendl;
246
247 std::lock_guard locker{m_lock};
248
249 if (!m_image_map) {
250 return;
251 }
252
253 m_image_map->update_instances_added(instance_ids);
254 }
255
256 template <typename I>
257 void NamespaceReplayer<I>::handle_instances_removed(
258 const std::vector<std::string> &instance_ids) {
259 dout(10) << "instance_ids=" << instance_ids << dendl;
260
261 std::lock_guard locker{m_lock};
262
263 if (!m_image_map) {
264 return;
265 }
266
267 m_image_map->update_instances_removed(instance_ids);
268 }
269
270 template <typename I>
271 void NamespaceReplayer<I>::init_local_status_updater() {
272 dout(10) << dendl;
273
274 ceph_assert(ceph_mutex_is_locked(m_lock));
275 ceph_assert(!m_local_status_updater);
276
277 m_local_status_updater.reset(MirrorStatusUpdater<I>::create(
278 m_local_io_ctx, m_threads, ""));
279 auto ctx = create_context_callback<
280 NamespaceReplayer<I>,
281 &NamespaceReplayer<I>::handle_init_local_status_updater>(this);
282
283 m_local_status_updater->init(ctx);
284 }
285
286 template <typename I>
287 void NamespaceReplayer<I>::handle_init_local_status_updater(int r) {
288 dout(10) << "r=" << r << dendl;
289
290 std::lock_guard locker{m_lock};
291
292 if (r < 0) {
293 derr << "error initializing local mirror status updater: "
294 << cpp_strerror(r) << dendl;
295
296 m_local_status_updater.reset();
297 ceph_assert(m_on_finish != nullptr);
298 m_threads->work_queue->queue(m_on_finish, r);
299 m_on_finish = nullptr;
300 return;
301 }
302
303 init_remote_status_updater();
304 }
305
306 template <typename I>
307 void NamespaceReplayer<I>::init_remote_status_updater() {
308 dout(10) << dendl;
309
310 ceph_assert(ceph_mutex_is_locked(m_lock));
311 ceph_assert(!m_remote_status_updater);
312
313 m_remote_status_updater.reset(MirrorStatusUpdater<I>::create(
314 m_remote_io_ctx, m_threads, m_local_mirror_uuid));
315 auto ctx = create_context_callback<
316 NamespaceReplayer<I>,
317 &NamespaceReplayer<I>::handle_init_remote_status_updater>(this);
318 m_remote_status_updater->init(ctx);
319 }
320
321 template <typename I>
322 void NamespaceReplayer<I>::handle_init_remote_status_updater(int r) {
323 dout(10) << "r=" << r << dendl;
324
325 std::lock_guard locker{m_lock};
326
327 if (r < 0) {
328 derr << "error initializing remote mirror status updater: "
329 << cpp_strerror(r) << dendl;
330
331 m_remote_status_updater.reset();
332 m_ret_val = r;
333 shut_down_local_status_updater();
334 return;
335 }
336
337 init_instance_replayer();
338 }
339
340 template <typename I>
341 void NamespaceReplayer<I>::init_instance_replayer() {
342 dout(10) << dendl;
343
344 ceph_assert(ceph_mutex_is_locked(m_lock));
345 ceph_assert(!m_instance_replayer);
346
347 m_instance_replayer.reset(InstanceReplayer<I>::create(
348 m_local_io_ctx, m_local_mirror_uuid, m_threads, m_service_daemon,
349 m_local_status_updater.get(), m_cache_manager_handler,
350 m_pool_meta_cache));
351 auto ctx = create_context_callback<NamespaceReplayer<I>,
352 &NamespaceReplayer<I>::handle_init_instance_replayer>(this);
353
354 m_instance_replayer->init(ctx);
355 }
356
357 template <typename I>
358 void NamespaceReplayer<I>::handle_init_instance_replayer(int r) {
359 dout(10) << "r=" << r << dendl;
360
361 std::lock_guard locker{m_lock};
362
363 if (r < 0) {
364 derr << "error initializing instance replayer: " << cpp_strerror(r)
365 << dendl;
366
367 m_instance_replayer.reset();
368 m_ret_val = r;
369 shut_down_remote_status_updater();
370 return;
371 }
372
373 m_instance_replayer->add_peer({m_local_mirror_peer_uuid, m_remote_io_ctx,
374 m_remote_pool_meta,
375 m_remote_status_updater.get()});
376
377 init_instance_watcher();
378 }
379
380 template <typename I>
381 void NamespaceReplayer<I>::init_instance_watcher() {
382 dout(10) << dendl;
383
384 ceph_assert(ceph_mutex_is_locked(m_lock));
385 ceph_assert(!m_instance_watcher);
386
387 m_instance_watcher.reset(InstanceWatcher<I>::create(
388 m_local_io_ctx, *m_threads->asio_engine, m_instance_replayer.get(),
389 m_image_sync_throttler));
390 auto ctx = create_context_callback<NamespaceReplayer<I>,
391 &NamespaceReplayer<I>::handle_init_instance_watcher>(this);
392
393 m_instance_watcher->init(ctx);
394 }
395
396 template <typename I>
397 void NamespaceReplayer<I>::handle_init_instance_watcher(int r) {
398 dout(10) << "r=" << r << dendl;
399
400 std::lock_guard locker{m_lock};
401
402 if (r < 0) {
403 derr << "error initializing instance watcher: " << cpp_strerror(r)
404 << dendl;
405
406 m_instance_watcher.reset();
407 m_ret_val = r;
408 shut_down_instance_replayer();
409 return;
410 }
411
412 ceph_assert(m_on_finish != nullptr);
413 m_threads->work_queue->queue(m_on_finish);
414 m_on_finish = nullptr;
415 }
416
417 template <typename I>
418 void NamespaceReplayer<I>::stop_instance_replayer() {
419 dout(10) << dendl;
420
421 ceph_assert(ceph_mutex_is_locked(m_lock));
422
423 Context *ctx = create_async_context_callback(
424 m_threads->work_queue, create_context_callback<NamespaceReplayer<I>,
425 &NamespaceReplayer<I>::handle_stop_instance_replayer>(this));
426
427 m_instance_replayer->stop(ctx);
428 }
429
430 template <typename I>
431 void NamespaceReplayer<I>::handle_stop_instance_replayer(int r) {
432 dout(10) << "r=" << r << dendl;
433
434 if (r < 0) {
435 derr << "error stopping instance replayer: " << cpp_strerror(r) << dendl;
436 }
437
438 std::lock_guard locker{m_lock};
439
440 shut_down_instance_watcher();
441 }
442
443 template <typename I>
444 void NamespaceReplayer<I>::shut_down_instance_watcher() {
445 dout(10) << dendl;
446
447 ceph_assert(ceph_mutex_is_locked(m_lock));
448 ceph_assert(m_instance_watcher);
449
450 Context *ctx = create_async_context_callback(
451 m_threads->work_queue, create_context_callback<NamespaceReplayer<I>,
452 &NamespaceReplayer<I>::handle_shut_down_instance_watcher>(this));
453
454 m_instance_watcher->shut_down(ctx);
455 }
456
457 template <typename I>
458 void NamespaceReplayer<I>::handle_shut_down_instance_watcher(int r) {
459 dout(10) << "r=" << r << dendl;
460
461 if (r < 0) {
462 derr << "error shutting instance watcher down: " << cpp_strerror(r)
463 << dendl;
464 }
465
466 std::lock_guard locker{m_lock};
467
468 m_instance_watcher.reset();
469
470 shut_down_instance_replayer();
471 }
472
473 template <typename I>
474 void NamespaceReplayer<I>::shut_down_instance_replayer() {
475 dout(10) << dendl;
476
477 ceph_assert(ceph_mutex_is_locked(m_lock));
478 ceph_assert(m_instance_replayer);
479
480 Context *ctx = create_async_context_callback(
481 m_threads->work_queue, create_context_callback<NamespaceReplayer<I>,
482 &NamespaceReplayer<I>::handle_shut_down_instance_replayer>(this));
483
484 m_instance_replayer->shut_down(ctx);
485 }
486
487 template <typename I>
488 void NamespaceReplayer<I>::handle_shut_down_instance_replayer(int r) {
489 dout(10) << "r=" << r << dendl;
490
491 if (r < 0) {
492 derr << "error shutting instance replayer down: " << cpp_strerror(r)
493 << dendl;
494 }
495
496 std::lock_guard locker{m_lock};
497
498 m_instance_replayer.reset();
499
500 shut_down_remote_status_updater();
501 }
502
503 template <typename I>
504 void NamespaceReplayer<I>::shut_down_remote_status_updater() {
505 dout(10) << dendl;
506
507 ceph_assert(ceph_mutex_is_locked(m_lock));
508 ceph_assert(m_remote_status_updater);
509
510 auto ctx = create_async_context_callback(
511 m_threads->work_queue, create_context_callback<
512 NamespaceReplayer<I>,
513 &NamespaceReplayer<I>::handle_shut_down_remote_status_updater>(this));
514 m_remote_status_updater->shut_down(ctx);
515 }
516
517 template <typename I>
518 void NamespaceReplayer<I>::handle_shut_down_remote_status_updater(int r) {
519 dout(10) << "r=" << r << dendl;
520
521 if (r < 0) {
522 derr << "error shutting remote mirror status updater down: "
523 << cpp_strerror(r) << dendl;
524 }
525
526 std::lock_guard locker{m_lock};
527 m_remote_status_updater.reset();
528
529 shut_down_local_status_updater();
530 }
531
532 template <typename I>
533 void NamespaceReplayer<I>::shut_down_local_status_updater() {
534 dout(10) << dendl;
535
536 ceph_assert(ceph_mutex_is_locked(m_lock));
537 ceph_assert(m_local_status_updater);
538
539 auto ctx = create_async_context_callback(
540 m_threads->work_queue, create_context_callback<
541 NamespaceReplayer<I>,
542 &NamespaceReplayer<I>::handle_shut_down_local_status_updater>(this));
543
544 m_local_status_updater->shut_down(ctx);
545 }
546
547 template <typename I>
548 void NamespaceReplayer<I>::handle_shut_down_local_status_updater(int r) {
549 dout(10) << "r=" << r << dendl;
550
551 if (r < 0) {
552 derr << "error shutting local mirror status updater down: "
553 << cpp_strerror(r) << dendl;
554 }
555
556 std::lock_guard locker{m_lock};
557
558 m_local_status_updater.reset();
559
560 ceph_assert(!m_image_map);
561 ceph_assert(!m_image_deleter);
562 ceph_assert(!m_local_pool_watcher);
563 ceph_assert(!m_remote_pool_watcher);
564 ceph_assert(!m_instance_watcher);
565 ceph_assert(!m_instance_replayer);
566
567 ceph_assert(m_on_finish != nullptr);
568 m_threads->work_queue->queue(m_on_finish, m_ret_val);
569 m_on_finish = nullptr;
570 m_ret_val = 0;
571 }
572
573 template <typename I>
574 void NamespaceReplayer<I>::init_image_map(Context *on_finish) {
575 dout(10) << dendl;
576
577 auto image_map = ImageMap<I>::create(m_local_io_ctx, m_threads,
578 m_instance_watcher->get_instance_id(),
579 m_image_map_listener);
580
581 auto ctx = new LambdaContext(
582 [this, image_map, on_finish](int r) {
583 handle_init_image_map(r, image_map, on_finish);
584 });
585 image_map->init(create_async_context_callback(
586 m_threads->work_queue, ctx));
587 }
588
589 template <typename I>
590 void NamespaceReplayer<I>::handle_init_image_map(int r, ImageMap<I> *image_map,
591 Context *on_finish) {
592 dout(10) << "r=" << r << dendl;
593 if (r < 0) {
594 derr << "failed to init image map: " << cpp_strerror(r) << dendl;
595 on_finish = new LambdaContext([image_map, on_finish, r](int) {
596 delete image_map;
597 on_finish->complete(r);
598 });
599 image_map->shut_down(on_finish);
600 return;
601 }
602
603 ceph_assert(!m_image_map);
604 m_image_map.reset(image_map);
605
606 init_local_pool_watcher(on_finish);
607 }
608
609 template <typename I>
610 void NamespaceReplayer<I>::init_local_pool_watcher(Context *on_finish) {
611 dout(10) << dendl;
612
613 std::lock_guard locker{m_lock};
614 ceph_assert(!m_local_pool_watcher);
615 m_local_pool_watcher.reset(PoolWatcher<I>::create(
616 m_threads, m_local_io_ctx, m_local_mirror_uuid,
617 m_local_pool_watcher_listener));
618
619 // ensure the initial set of local images is up-to-date
620 // after acquiring the leader role
621 auto ctx = new LambdaContext([this, on_finish](int r) {
622 handle_init_local_pool_watcher(r, on_finish);
623 });
624 m_local_pool_watcher->init(create_async_context_callback(
625 m_threads->work_queue, ctx));
626 }
627
628 template <typename I>
629 void NamespaceReplayer<I>::handle_init_local_pool_watcher(
630 int r, Context *on_finish) {
631 dout(10) << "r=" << r << dendl;
632 if (r < 0) {
633 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
634 on_finish = new LambdaContext([on_finish, r](int) {
635 on_finish->complete(r);
636 });
637 shut_down_pool_watchers(on_finish);
638 return;
639 }
640
641 init_remote_pool_watcher(on_finish);
642 }
643
644 template <typename I>
645 void NamespaceReplayer<I>::init_remote_pool_watcher(Context *on_finish) {
646 dout(10) << dendl;
647
648 std::lock_guard locker{m_lock};
649 ceph_assert(!m_remote_pool_watcher);
650 m_remote_pool_watcher.reset(PoolWatcher<I>::create(
651 m_threads, m_remote_io_ctx, m_remote_pool_meta.mirror_uuid,
652 m_remote_pool_watcher_listener));
653
654 auto ctx = new LambdaContext([this, on_finish](int r) {
655 handle_init_remote_pool_watcher(r, on_finish);
656 });
657 m_remote_pool_watcher->init(create_async_context_callback(
658 m_threads->work_queue, ctx));
659 }
660
661 template <typename I>
662 void NamespaceReplayer<I>::handle_init_remote_pool_watcher(
663 int r, Context *on_finish) {
664 dout(10) << "r=" << r << dendl;
665 if (r == -ENOENT) {
666 // Technically nothing to do since the other side doesn't
667 // have mirroring enabled. Eventually the remote pool watcher will
668 // detect images (if mirroring is enabled), so no point propagating
669 // an error which would just busy-spin the state machines.
670 dout(0) << "remote peer does not have mirroring configured" << dendl;
671 } else if (r < 0) {
672 derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
673 on_finish = new LambdaContext([on_finish, r](int) {
674 on_finish->complete(r);
675 });
676 shut_down_pool_watchers(on_finish);
677 return;
678 }
679
680 init_image_deleter(on_finish);
681 }
682
683 template <typename I>
684 void NamespaceReplayer<I>::init_image_deleter(Context *on_finish) {
685 dout(10) << dendl;
686
687 std::lock_guard locker{m_lock};
688 ceph_assert(!m_image_deleter);
689
690 on_finish = new LambdaContext([this, on_finish](int r) {
691 handle_init_image_deleter(r, on_finish);
692 });
693 m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
694 m_image_deletion_throttler,
695 m_service_daemon));
696 m_image_deleter->init(create_async_context_callback(
697 m_threads->work_queue, on_finish));
698 }
699
700 template <typename I>
701 void NamespaceReplayer<I>::handle_init_image_deleter(
702 int r, Context *on_finish) {
703 dout(10) << "r=" << r << dendl;
704 if (r < 0) {
705 derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
706 on_finish = new LambdaContext([on_finish, r](int) {
707 on_finish->complete(r);
708 });
709 shut_down_image_deleter(on_finish);
710 return;
711 }
712
713 on_finish->complete(0);
714 }
715
716 template <typename I>
717 void NamespaceReplayer<I>::shut_down_image_deleter(Context* on_finish) {
718 dout(10) << dendl;
719 {
720 std::lock_guard locker{m_lock};
721 if (m_image_deleter) {
722 Context *ctx = new LambdaContext([this, on_finish](int r) {
723 handle_shut_down_image_deleter(r, on_finish);
724 });
725 ctx = create_async_context_callback(m_threads->work_queue, ctx);
726
727 m_image_deleter->shut_down(ctx);
728 return;
729 }
730 }
731 shut_down_pool_watchers(on_finish);
732 }
733
734 template <typename I>
735 void NamespaceReplayer<I>::handle_shut_down_image_deleter(
736 int r, Context* on_finish) {
737 dout(10) << "r=" << r << dendl;
738
739 {
740 std::lock_guard locker{m_lock};
741 ceph_assert(m_image_deleter);
742 m_image_deleter.reset();
743 }
744
745 shut_down_pool_watchers(on_finish);
746 }
747
748 template <typename I>
749 void NamespaceReplayer<I>::shut_down_pool_watchers(Context *on_finish) {
750 dout(10) << dendl;
751
752 {
753 std::lock_guard locker{m_lock};
754 if (m_local_pool_watcher) {
755 Context *ctx = new LambdaContext([this, on_finish](int r) {
756 handle_shut_down_pool_watchers(r, on_finish);
757 });
758 ctx = create_async_context_callback(m_threads->work_queue, ctx);
759
760 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
761 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
762 if (m_remote_pool_watcher) {
763 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
764 }
765 gather_ctx->activate();
766 return;
767 }
768 }
769
770 on_finish->complete(0);
771 }
772
773 template <typename I>
774 void NamespaceReplayer<I>::handle_shut_down_pool_watchers(
775 int r, Context *on_finish) {
776 dout(10) << "r=" << r << dendl;
777
778 {
779 std::lock_guard locker{m_lock};
780 ceph_assert(m_local_pool_watcher);
781 m_local_pool_watcher.reset();
782
783 if (m_remote_pool_watcher) {
784 m_remote_pool_watcher.reset();
785 }
786 }
787 shut_down_image_map(on_finish);
788 }
789
790 template <typename I>
791 void NamespaceReplayer<I>::shut_down_image_map(Context *on_finish) {
792 dout(5) << dendl;
793
794 std::lock_guard locker{m_lock};
795 if (m_image_map) {
796 on_finish = new LambdaContext(
797 [this, on_finish](int r) {
798 handle_shut_down_image_map(r, on_finish);
799 });
800 m_image_map->shut_down(create_async_context_callback(
801 m_threads->work_queue, on_finish));
802 return;
803 }
804
805 m_threads->work_queue->queue(on_finish);
806 }
807
808 template <typename I>
809 void NamespaceReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
810 dout(5) << "r=" << r << dendl;
811 if (r < 0 && r != -EBLOCKLISTED) {
812 derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
813 }
814
815 std::lock_guard locker{m_lock};
816 ceph_assert(m_image_map);
817 m_image_map.reset();
818
819 m_instance_replayer->release_all(create_async_context_callback(
820 m_threads->work_queue, on_finish));
821 }
822
823 template <typename I>
824 void NamespaceReplayer<I>::handle_acquire_image(const std::string &global_image_id,
825 const std::string &instance_id,
826 Context* on_finish) {
827 dout(5) << "global_image_id=" << global_image_id << ", "
828 << "instance_id=" << instance_id << dendl;
829
830 m_instance_watcher->notify_image_acquire(instance_id, global_image_id,
831 on_finish);
832 }
833
834 template <typename I>
835 void NamespaceReplayer<I>::handle_release_image(const std::string &global_image_id,
836 const std::string &instance_id,
837 Context* on_finish) {
838 dout(5) << "global_image_id=" << global_image_id << ", "
839 << "instance_id=" << instance_id << dendl;
840
841 m_instance_watcher->notify_image_release(instance_id, global_image_id,
842 on_finish);
843 }
844
845 template <typename I>
846 void NamespaceReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
847 const std::string &global_image_id,
848 const std::string &instance_id,
849 Context* on_finish) {
850 ceph_assert(!mirror_uuid.empty());
851 dout(5) << "mirror_uuid=" << mirror_uuid << ", "
852 << "global_image_id=" << global_image_id << ", "
853 << "instance_id=" << instance_id << dendl;
854
855 m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id,
856 mirror_uuid, on_finish);
857 }
858
859 } // namespace mirror
860 } // namespace rbd
861
862 template class rbd::mirror::NamespaceReplayer<librbd::ImageCtx>;