]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/NamespaceReplayer.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / rbd_mirror / NamespaceReplayer.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "NamespaceReplayer.h"
9f95a23c
TL
5#include "common/Formatter.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "cls/rbd/cls_rbd_client.h"
9#include "librbd/Utils.h"
10#include "librbd/api/Config.h"
11#include "librbd/api/Mirror.h"
f67539c2 12#include "librbd/asio/ContextWQ.h"
9f95a23c
TL
13#include "ServiceDaemon.h"
14#include "Threads.h"
15
16#define dout_context g_ceph_context
17#define dout_subsys ceph_subsys_rbd_mirror
18#undef dout_prefix
19#define dout_prefix *_dout << "rbd::mirror::NamespaceReplayer: " \
20 << this << " " << __func__ << ": "
21
22using librbd::util::create_async_context_callback;
23using librbd::util::create_context_callback;
24
25namespace rbd {
26namespace mirror {
27
28using ::operator<<;
29
30namespace {
31
32const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
33const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
34
35} // anonymous namespace
36
37template <typename I>
38NamespaceReplayer<I>::NamespaceReplayer(
39 const std::string &name,
40 librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
41 const std::string &local_mirror_uuid,
42 const std::string& local_mirror_peer_uuid,
43 const RemotePoolMeta& remote_pool_meta,
44 Threads<I> *threads,
45 Throttler<I> *image_sync_throttler,
46 Throttler<I> *image_deletion_throttler,
47 ServiceDaemon<I> *service_daemon,
48 journal::CacheManagerHandler *cache_manager_handler,
49 PoolMetaCache* pool_meta_cache) :
50 m_namespace_name(name),
51 m_local_mirror_uuid(local_mirror_uuid),
52 m_local_mirror_peer_uuid(local_mirror_peer_uuid),
53 m_remote_pool_meta(remote_pool_meta),
54 m_threads(threads), m_image_sync_throttler(image_sync_throttler),
55 m_image_deletion_throttler(image_deletion_throttler),
56 m_service_daemon(service_daemon),
57 m_cache_manager_handler(cache_manager_handler),
58 m_pool_meta_cache(pool_meta_cache),
59 m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
60 "rbd::mirror::NamespaceReplayer " + name, this))),
61 m_local_pool_watcher_listener(this, true),
62 m_remote_pool_watcher_listener(this, false),
63 m_image_map_listener(this) {
64 dout(10) << name << dendl;
65
66 m_local_io_ctx.dup(local_io_ctx);
67 m_local_io_ctx.set_namespace(name);
68 m_remote_io_ctx.dup(remote_io_ctx);
69 m_remote_io_ctx.set_namespace(name);
70}
71
72template <typename I>
f67539c2 73bool NamespaceReplayer<I>::is_blocklisted() const {
9f95a23c 74 std::lock_guard locker{m_lock};
f67539c2 75 return m_instance_replayer->is_blocklisted() ||
9f95a23c 76 (m_local_pool_watcher &&
f67539c2 77 m_local_pool_watcher->is_blocklisted()) ||
9f95a23c 78 (m_remote_pool_watcher &&
f67539c2 79 m_remote_pool_watcher->is_blocklisted());
9f95a23c
TL
80}
81
82template <typename I>
83void NamespaceReplayer<I>::init(Context *on_finish) {
84 dout(20) << dendl;
85
86 std::lock_guard locker{m_lock};
87
88 ceph_assert(m_on_finish == nullptr);
89 m_on_finish = on_finish;
90
91 init_local_status_updater();
92}
93
94
95template <typename I>
96void NamespaceReplayer<I>::shut_down(Context *on_finish) {
97 dout(20) << dendl;
98
99 {
100 std::lock_guard locker{m_lock};
101
102 ceph_assert(m_on_finish == nullptr);
103 m_on_finish = on_finish;
104
105 if (!m_image_map) {
106 stop_instance_replayer();
107 return;
108 }
109 }
110
111 auto ctx = new LambdaContext(
112 [this] (int r) {
113 std::lock_guard locker{m_lock};
114 stop_instance_replayer();
115 });
116 handle_release_leader(ctx);
117}
118
119template <typename I>
120void NamespaceReplayer<I>::print_status(Formatter *f)
121{
122 dout(20) << dendl;
123
124 ceph_assert(f);
125
126 std::lock_guard locker{m_lock};
127
128 m_instance_replayer->print_status(f);
129
130 if (m_image_deleter) {
131 f->open_object_section("image_deleter");
132 m_image_deleter->print_status(f);
133 f->close_section();
134 }
135}
136
137template <typename I>
138void NamespaceReplayer<I>::start()
139{
140 dout(20) << dendl;
141
142 std::lock_guard locker{m_lock};
143
144 m_instance_replayer->start();
145}
146
147template <typename I>
148void NamespaceReplayer<I>::stop()
149{
150 dout(20) << dendl;
151
152 std::lock_guard locker{m_lock};
153
154 m_instance_replayer->stop();
155}
156
157template <typename I>
158void NamespaceReplayer<I>::restart()
159{
160 dout(20) << dendl;
161
162 std::lock_guard locker{m_lock};
163
164 m_instance_replayer->restart();
165}
166
167template <typename I>
168void NamespaceReplayer<I>::flush()
169{
170 dout(20) << dendl;
171
172 std::lock_guard locker{m_lock};
173
174 m_instance_replayer->flush();
175}
176
177template <typename I>
178void NamespaceReplayer<I>::handle_update(const std::string &mirror_uuid,
179 ImageIds &&added_image_ids,
180 ImageIds &&removed_image_ids) {
181 std::lock_guard locker{m_lock};
182
183 if (!m_image_map) {
184 dout(20) << "not leader" << dendl;
185 return;
186 }
187
188 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
189 << "added_count=" << added_image_ids.size() << ", "
190 << "removed_count=" << removed_image_ids.size() << dendl;
191
192 m_service_daemon->add_or_update_namespace_attribute(
193 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
194 SERVICE_DAEMON_LOCAL_COUNT_KEY, m_local_pool_watcher->get_image_count());
195 if (m_remote_pool_watcher) {
196 m_service_daemon->add_or_update_namespace_attribute(
197 m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
198 SERVICE_DAEMON_REMOTE_COUNT_KEY,
199 m_remote_pool_watcher->get_image_count());
200 }
201
202 std::set<std::string> added_global_image_ids;
203 for (auto& image_id : added_image_ids) {
204 added_global_image_ids.insert(image_id.global_id);
205 }
206
207 std::set<std::string> removed_global_image_ids;
208 for (auto& image_id : removed_image_ids) {
209 removed_global_image_ids.insert(image_id.global_id);
210 }
211
212 m_image_map->update_images(mirror_uuid,
213 std::move(added_global_image_ids),
214 std::move(removed_global_image_ids));
215}
216
217template <typename I>
218void NamespaceReplayer<I>::handle_acquire_leader(Context *on_finish) {
219 dout(10) << dendl;
220
221 m_instance_watcher->handle_acquire_leader();
222
223 init_image_map(on_finish);
224}
225
226template <typename I>
227void NamespaceReplayer<I>::handle_release_leader(Context *on_finish) {
228 dout(10) << dendl;
229
230 m_instance_watcher->handle_release_leader();
231 shut_down_image_deleter(on_finish);
232}
233
234template <typename I>
235void NamespaceReplayer<I>::handle_update_leader(
236 const std::string &leader_instance_id) {
237 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
238
239 m_instance_watcher->handle_update_leader(leader_instance_id);
240}
241
242template <typename I>
243void NamespaceReplayer<I>::handle_instances_added(
244 const std::vector<std::string> &instance_ids) {
245 dout(10) << "instance_ids=" << instance_ids << dendl;
246
247 std::lock_guard locker{m_lock};
248
249 if (!m_image_map) {
250 return;
251 }
252
253 m_image_map->update_instances_added(instance_ids);
254}
255
256template <typename I>
257void NamespaceReplayer<I>::handle_instances_removed(
258 const std::vector<std::string> &instance_ids) {
259 dout(10) << "instance_ids=" << instance_ids << dendl;
260
261 std::lock_guard locker{m_lock};
262
263 if (!m_image_map) {
264 return;
265 }
266
267 m_image_map->update_instances_removed(instance_ids);
268}
269
270template <typename I>
271void NamespaceReplayer<I>::init_local_status_updater() {
272 dout(10) << dendl;
273
274 ceph_assert(ceph_mutex_is_locked(m_lock));
275 ceph_assert(!m_local_status_updater);
276
277 m_local_status_updater.reset(MirrorStatusUpdater<I>::create(
278 m_local_io_ctx, m_threads, ""));
279 auto ctx = create_context_callback<
280 NamespaceReplayer<I>,
281 &NamespaceReplayer<I>::handle_init_local_status_updater>(this);
282
283 m_local_status_updater->init(ctx);
284}
285
286template <typename I>
287void NamespaceReplayer<I>::handle_init_local_status_updater(int r) {
288 dout(10) << "r=" << r << dendl;
289
290 std::lock_guard locker{m_lock};
291
292 if (r < 0) {
293 derr << "error initializing local mirror status updater: "
294 << cpp_strerror(r) << dendl;
295
296 m_local_status_updater.reset();
297 ceph_assert(m_on_finish != nullptr);
298 m_threads->work_queue->queue(m_on_finish, r);
299 m_on_finish = nullptr;
300 return;
301 }
302
303 init_remote_status_updater();
304}
305
306template <typename I>
307void NamespaceReplayer<I>::init_remote_status_updater() {
308 dout(10) << dendl;
309
310 ceph_assert(ceph_mutex_is_locked(m_lock));
311 ceph_assert(!m_remote_status_updater);
312
313 m_remote_status_updater.reset(MirrorStatusUpdater<I>::create(
314 m_remote_io_ctx, m_threads, m_local_mirror_uuid));
315 auto ctx = create_context_callback<
316 NamespaceReplayer<I>,
317 &NamespaceReplayer<I>::handle_init_remote_status_updater>(this);
318 m_remote_status_updater->init(ctx);
319}
320
321template <typename I>
322void NamespaceReplayer<I>::handle_init_remote_status_updater(int r) {
323 dout(10) << "r=" << r << dendl;
324
325 std::lock_guard locker{m_lock};
326
327 if (r < 0) {
328 derr << "error initializing remote mirror status updater: "
329 << cpp_strerror(r) << dendl;
330
331 m_remote_status_updater.reset();
332 m_ret_val = r;
333 shut_down_local_status_updater();
334 return;
335 }
336
337 init_instance_replayer();
338}
339
340template <typename I>
341void NamespaceReplayer<I>::init_instance_replayer() {
342 dout(10) << dendl;
343
344 ceph_assert(ceph_mutex_is_locked(m_lock));
345 ceph_assert(!m_instance_replayer);
346
347 m_instance_replayer.reset(InstanceReplayer<I>::create(
348 m_local_io_ctx, m_local_mirror_uuid, m_threads, m_service_daemon,
349 m_local_status_updater.get(), m_cache_manager_handler,
350 m_pool_meta_cache));
351 auto ctx = create_context_callback<NamespaceReplayer<I>,
352 &NamespaceReplayer<I>::handle_init_instance_replayer>(this);
353
354 m_instance_replayer->init(ctx);
355}
356
357template <typename I>
358void NamespaceReplayer<I>::handle_init_instance_replayer(int r) {
359 dout(10) << "r=" << r << dendl;
360
361 std::lock_guard locker{m_lock};
362
363 if (r < 0) {
364 derr << "error initializing instance replayer: " << cpp_strerror(r)
365 << dendl;
366
367 m_instance_replayer.reset();
368 m_ret_val = r;
369 shut_down_remote_status_updater();
370 return;
371 }
372
373 m_instance_replayer->add_peer({m_local_mirror_peer_uuid, m_remote_io_ctx,
374 m_remote_pool_meta,
375 m_remote_status_updater.get()});
376
377 init_instance_watcher();
378}
379
380template <typename I>
381void NamespaceReplayer<I>::init_instance_watcher() {
382 dout(10) << dendl;
383
384 ceph_assert(ceph_mutex_is_locked(m_lock));
385 ceph_assert(!m_instance_watcher);
386
387 m_instance_watcher.reset(InstanceWatcher<I>::create(
f67539c2 388 m_local_io_ctx, *m_threads->asio_engine, m_instance_replayer.get(),
9f95a23c
TL
389 m_image_sync_throttler));
390 auto ctx = create_context_callback<NamespaceReplayer<I>,
391 &NamespaceReplayer<I>::handle_init_instance_watcher>(this);
392
393 m_instance_watcher->init(ctx);
394}
395
396template <typename I>
397void NamespaceReplayer<I>::handle_init_instance_watcher(int r) {
398 dout(10) << "r=" << r << dendl;
399
400 std::lock_guard locker{m_lock};
401
402 if (r < 0) {
403 derr << "error initializing instance watcher: " << cpp_strerror(r)
404 << dendl;
405
406 m_instance_watcher.reset();
407 m_ret_val = r;
408 shut_down_instance_replayer();
409 return;
410 }
411
412 ceph_assert(m_on_finish != nullptr);
413 m_threads->work_queue->queue(m_on_finish);
414 m_on_finish = nullptr;
415}
416
417template <typename I>
418void NamespaceReplayer<I>::stop_instance_replayer() {
419 dout(10) << dendl;
420
421 ceph_assert(ceph_mutex_is_locked(m_lock));
422
423 Context *ctx = create_async_context_callback(
424 m_threads->work_queue, create_context_callback<NamespaceReplayer<I>,
425 &NamespaceReplayer<I>::handle_stop_instance_replayer>(this));
426
427 m_instance_replayer->stop(ctx);
428}
429
430template <typename I>
431void NamespaceReplayer<I>::handle_stop_instance_replayer(int r) {
432 dout(10) << "r=" << r << dendl;
433
434 if (r < 0) {
435 derr << "error stopping instance replayer: " << cpp_strerror(r) << dendl;
436 }
437
438 std::lock_guard locker{m_lock};
439
440 shut_down_instance_watcher();
441}
442
443template <typename I>
444void NamespaceReplayer<I>::shut_down_instance_watcher() {
445 dout(10) << dendl;
446
447 ceph_assert(ceph_mutex_is_locked(m_lock));
448 ceph_assert(m_instance_watcher);
449
450 Context *ctx = create_async_context_callback(
451 m_threads->work_queue, create_context_callback<NamespaceReplayer<I>,
452 &NamespaceReplayer<I>::handle_shut_down_instance_watcher>(this));
453
454 m_instance_watcher->shut_down(ctx);
455}
456
457template <typename I>
458void NamespaceReplayer<I>::handle_shut_down_instance_watcher(int r) {
459 dout(10) << "r=" << r << dendl;
460
461 if (r < 0) {
462 derr << "error shutting instance watcher down: " << cpp_strerror(r)
463 << dendl;
464 }
465
466 std::lock_guard locker{m_lock};
467
468 m_instance_watcher.reset();
469
470 shut_down_instance_replayer();
471}
472
473template <typename I>
474void NamespaceReplayer<I>::shut_down_instance_replayer() {
475 dout(10) << dendl;
476
477 ceph_assert(ceph_mutex_is_locked(m_lock));
478 ceph_assert(m_instance_replayer);
479
480 Context *ctx = create_async_context_callback(
481 m_threads->work_queue, create_context_callback<NamespaceReplayer<I>,
482 &NamespaceReplayer<I>::handle_shut_down_instance_replayer>(this));
483
484 m_instance_replayer->shut_down(ctx);
485}
486
487template <typename I>
488void NamespaceReplayer<I>::handle_shut_down_instance_replayer(int r) {
489 dout(10) << "r=" << r << dendl;
490
491 if (r < 0) {
492 derr << "error shutting instance replayer down: " << cpp_strerror(r)
493 << dendl;
494 }
495
496 std::lock_guard locker{m_lock};
497
498 m_instance_replayer.reset();
499
500 shut_down_remote_status_updater();
501}
502
503template <typename I>
504void NamespaceReplayer<I>::shut_down_remote_status_updater() {
505 dout(10) << dendl;
506
507 ceph_assert(ceph_mutex_is_locked(m_lock));
508 ceph_assert(m_remote_status_updater);
509
510 auto ctx = create_async_context_callback(
511 m_threads->work_queue, create_context_callback<
512 NamespaceReplayer<I>,
513 &NamespaceReplayer<I>::handle_shut_down_remote_status_updater>(this));
514 m_remote_status_updater->shut_down(ctx);
515}
516
517template <typename I>
518void NamespaceReplayer<I>::handle_shut_down_remote_status_updater(int r) {
519 dout(10) << "r=" << r << dendl;
520
521 if (r < 0) {
522 derr << "error shutting remote mirror status updater down: "
523 << cpp_strerror(r) << dendl;
524 }
525
526 std::lock_guard locker{m_lock};
527 m_remote_status_updater.reset();
528
529 shut_down_local_status_updater();
530}
531
532template <typename I>
533void NamespaceReplayer<I>::shut_down_local_status_updater() {
534 dout(10) << dendl;
535
536 ceph_assert(ceph_mutex_is_locked(m_lock));
537 ceph_assert(m_local_status_updater);
538
539 auto ctx = create_async_context_callback(
540 m_threads->work_queue, create_context_callback<
541 NamespaceReplayer<I>,
542 &NamespaceReplayer<I>::handle_shut_down_local_status_updater>(this));
543
544 m_local_status_updater->shut_down(ctx);
545}
546
547template <typename I>
548void NamespaceReplayer<I>::handle_shut_down_local_status_updater(int r) {
549 dout(10) << "r=" << r << dendl;
550
551 if (r < 0) {
552 derr << "error shutting local mirror status updater down: "
553 << cpp_strerror(r) << dendl;
554 }
555
556 std::lock_guard locker{m_lock};
557
558 m_local_status_updater.reset();
559
560 ceph_assert(!m_image_map);
561 ceph_assert(!m_image_deleter);
562 ceph_assert(!m_local_pool_watcher);
563 ceph_assert(!m_remote_pool_watcher);
564 ceph_assert(!m_instance_watcher);
565 ceph_assert(!m_instance_replayer);
566
567 ceph_assert(m_on_finish != nullptr);
568 m_threads->work_queue->queue(m_on_finish, m_ret_val);
569 m_on_finish = nullptr;
570 m_ret_val = 0;
571}
572
573template <typename I>
574void NamespaceReplayer<I>::init_image_map(Context *on_finish) {
575 dout(10) << dendl;
576
577 auto image_map = ImageMap<I>::create(m_local_io_ctx, m_threads,
578 m_instance_watcher->get_instance_id(),
579 m_image_map_listener);
580
581 auto ctx = new LambdaContext(
582 [this, image_map, on_finish](int r) {
583 handle_init_image_map(r, image_map, on_finish);
584 });
585 image_map->init(create_async_context_callback(
586 m_threads->work_queue, ctx));
587}
588
589template <typename I>
590void NamespaceReplayer<I>::handle_init_image_map(int r, ImageMap<I> *image_map,
591 Context *on_finish) {
592 dout(10) << "r=" << r << dendl;
593 if (r < 0) {
594 derr << "failed to init image map: " << cpp_strerror(r) << dendl;
595 on_finish = new LambdaContext([image_map, on_finish, r](int) {
596 delete image_map;
597 on_finish->complete(r);
598 });
599 image_map->shut_down(on_finish);
600 return;
601 }
602
603 ceph_assert(!m_image_map);
604 m_image_map.reset(image_map);
605
606 init_local_pool_watcher(on_finish);
607}
608
609template <typename I>
610void NamespaceReplayer<I>::init_local_pool_watcher(Context *on_finish) {
611 dout(10) << dendl;
612
613 std::lock_guard locker{m_lock};
614 ceph_assert(!m_local_pool_watcher);
615 m_local_pool_watcher.reset(PoolWatcher<I>::create(
616 m_threads, m_local_io_ctx, m_local_mirror_uuid,
617 m_local_pool_watcher_listener));
618
619 // ensure the initial set of local images is up-to-date
620 // after acquiring the leader role
621 auto ctx = new LambdaContext([this, on_finish](int r) {
622 handle_init_local_pool_watcher(r, on_finish);
623 });
624 m_local_pool_watcher->init(create_async_context_callback(
625 m_threads->work_queue, ctx));
626}
627
628template <typename I>
629void NamespaceReplayer<I>::handle_init_local_pool_watcher(
630 int r, Context *on_finish) {
631 dout(10) << "r=" << r << dendl;
632 if (r < 0) {
633 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
634 on_finish = new LambdaContext([on_finish, r](int) {
635 on_finish->complete(r);
636 });
637 shut_down_pool_watchers(on_finish);
638 return;
639 }
640
641 init_remote_pool_watcher(on_finish);
642}
643
644template <typename I>
645void NamespaceReplayer<I>::init_remote_pool_watcher(Context *on_finish) {
646 dout(10) << dendl;
647
648 std::lock_guard locker{m_lock};
649 ceph_assert(!m_remote_pool_watcher);
650 m_remote_pool_watcher.reset(PoolWatcher<I>::create(
651 m_threads, m_remote_io_ctx, m_remote_pool_meta.mirror_uuid,
652 m_remote_pool_watcher_listener));
653
654 auto ctx = new LambdaContext([this, on_finish](int r) {
655 handle_init_remote_pool_watcher(r, on_finish);
656 });
657 m_remote_pool_watcher->init(create_async_context_callback(
658 m_threads->work_queue, ctx));
659}
660
661template <typename I>
662void NamespaceReplayer<I>::handle_init_remote_pool_watcher(
663 int r, Context *on_finish) {
664 dout(10) << "r=" << r << dendl;
665 if (r == -ENOENT) {
666 // Technically nothing to do since the other side doesn't
667 // have mirroring enabled. Eventually the remote pool watcher will
668 // detect images (if mirroring is enabled), so no point propagating
669 // an error which would just busy-spin the state machines.
670 dout(0) << "remote peer does not have mirroring configured" << dendl;
671 } else if (r < 0) {
672 derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
673 on_finish = new LambdaContext([on_finish, r](int) {
674 on_finish->complete(r);
675 });
676 shut_down_pool_watchers(on_finish);
677 return;
678 }
679
680 init_image_deleter(on_finish);
681}
682
683template <typename I>
684void NamespaceReplayer<I>::init_image_deleter(Context *on_finish) {
685 dout(10) << dendl;
686
687 std::lock_guard locker{m_lock};
688 ceph_assert(!m_image_deleter);
689
690 on_finish = new LambdaContext([this, on_finish](int r) {
691 handle_init_image_deleter(r, on_finish);
692 });
693 m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
694 m_image_deletion_throttler,
695 m_service_daemon));
696 m_image_deleter->init(create_async_context_callback(
697 m_threads->work_queue, on_finish));
698}
699
700template <typename I>
701void NamespaceReplayer<I>::handle_init_image_deleter(
702 int r, Context *on_finish) {
703 dout(10) << "r=" << r << dendl;
704 if (r < 0) {
705 derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
706 on_finish = new LambdaContext([on_finish, r](int) {
707 on_finish->complete(r);
708 });
709 shut_down_image_deleter(on_finish);
710 return;
711 }
712
713 on_finish->complete(0);
714}
715
716template <typename I>
717void NamespaceReplayer<I>::shut_down_image_deleter(Context* on_finish) {
718 dout(10) << dendl;
719 {
720 std::lock_guard locker{m_lock};
721 if (m_image_deleter) {
722 Context *ctx = new LambdaContext([this, on_finish](int r) {
723 handle_shut_down_image_deleter(r, on_finish);
724 });
725 ctx = create_async_context_callback(m_threads->work_queue, ctx);
726
727 m_image_deleter->shut_down(ctx);
728 return;
729 }
730 }
731 shut_down_pool_watchers(on_finish);
732}
733
734template <typename I>
735void NamespaceReplayer<I>::handle_shut_down_image_deleter(
736 int r, Context* on_finish) {
737 dout(10) << "r=" << r << dendl;
738
739 {
740 std::lock_guard locker{m_lock};
741 ceph_assert(m_image_deleter);
742 m_image_deleter.reset();
743 }
744
745 shut_down_pool_watchers(on_finish);
746}
747
748template <typename I>
749void NamespaceReplayer<I>::shut_down_pool_watchers(Context *on_finish) {
750 dout(10) << dendl;
751
752 {
753 std::lock_guard locker{m_lock};
754 if (m_local_pool_watcher) {
755 Context *ctx = new LambdaContext([this, on_finish](int r) {
756 handle_shut_down_pool_watchers(r, on_finish);
757 });
758 ctx = create_async_context_callback(m_threads->work_queue, ctx);
759
760 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
761 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
762 if (m_remote_pool_watcher) {
763 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
764 }
765 gather_ctx->activate();
766 return;
767 }
768 }
769
770 on_finish->complete(0);
771}
772
773template <typename I>
774void NamespaceReplayer<I>::handle_shut_down_pool_watchers(
775 int r, Context *on_finish) {
776 dout(10) << "r=" << r << dendl;
777
778 {
779 std::lock_guard locker{m_lock};
780 ceph_assert(m_local_pool_watcher);
781 m_local_pool_watcher.reset();
782
783 if (m_remote_pool_watcher) {
784 m_remote_pool_watcher.reset();
785 }
786 }
787 shut_down_image_map(on_finish);
788}
789
790template <typename I>
791void NamespaceReplayer<I>::shut_down_image_map(Context *on_finish) {
792 dout(5) << dendl;
793
794 std::lock_guard locker{m_lock};
795 if (m_image_map) {
796 on_finish = new LambdaContext(
797 [this, on_finish](int r) {
798 handle_shut_down_image_map(r, on_finish);
799 });
800 m_image_map->shut_down(create_async_context_callback(
801 m_threads->work_queue, on_finish));
802 return;
803 }
804
805 m_threads->work_queue->queue(on_finish);
806}
807
808template <typename I>
809void NamespaceReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
810 dout(5) << "r=" << r << dendl;
f67539c2 811 if (r < 0 && r != -EBLOCKLISTED) {
9f95a23c
TL
812 derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
813 }
814
815 std::lock_guard locker{m_lock};
816 ceph_assert(m_image_map);
817 m_image_map.reset();
818
819 m_instance_replayer->release_all(create_async_context_callback(
820 m_threads->work_queue, on_finish));
821}
822
823template <typename I>
824void NamespaceReplayer<I>::handle_acquire_image(const std::string &global_image_id,
825 const std::string &instance_id,
826 Context* on_finish) {
827 dout(5) << "global_image_id=" << global_image_id << ", "
828 << "instance_id=" << instance_id << dendl;
829
830 m_instance_watcher->notify_image_acquire(instance_id, global_image_id,
831 on_finish);
832}
833
834template <typename I>
835void NamespaceReplayer<I>::handle_release_image(const std::string &global_image_id,
836 const std::string &instance_id,
837 Context* on_finish) {
838 dout(5) << "global_image_id=" << global_image_id << ", "
839 << "instance_id=" << instance_id << dendl;
840
841 m_instance_watcher->notify_image_release(instance_id, global_image_id,
842 on_finish);
843}
844
845template <typename I>
846void NamespaceReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
847 const std::string &global_image_id,
848 const std::string &instance_id,
849 Context* on_finish) {
850 ceph_assert(!mirror_uuid.empty());
851 dout(5) << "mirror_uuid=" << mirror_uuid << ", "
852 << "global_image_id=" << global_image_id << ", "
853 << "instance_id=" << instance_id << dendl;
854
855 m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id,
856 mirror_uuid, on_finish);
857}
858
859} // namespace mirror
860} // namespace rbd
861
862template class rbd::mirror::NamespaceReplayer<librbd::ImageCtx>;