]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to v12.1.3
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Mirror.h"
20 #include "InstanceReplayer.h"
21 #include "InstanceWatcher.h"
22 #include "LeaderWatcher.h"
23 #include "ServiceDaemon.h"
24 #include "Threads.h"
25
26 #define dout_context g_ceph_context
27 #define dout_subsys ceph_subsys_rbd_mirror
28 #undef dout_prefix
29 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30 << this << " " << __func__ << ": "
31
32 using std::chrono::seconds;
33 using std::map;
34 using std::string;
35 using std::unique_ptr;
36 using std::vector;
37
38 using librbd::cls_client::dir_get_name;
39 using librbd::util::create_async_context_callback;
40
41 namespace rbd {
42 namespace mirror {
43
44 namespace {
45
46 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
47 const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48 const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
49
50 class PoolReplayerAdminSocketCommand {
51 public:
52 PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
53 : pool_replayer(pool_replayer) {
54 }
55 virtual ~PoolReplayerAdminSocketCommand() {}
56 virtual bool call(Formatter *f, stringstream *ss) = 0;
57 protected:
58 PoolReplayer *pool_replayer;
59 };
60
61 class StatusCommand : public PoolReplayerAdminSocketCommand {
62 public:
63 explicit StatusCommand(PoolReplayer *pool_replayer)
64 : PoolReplayerAdminSocketCommand(pool_replayer) {
65 }
66
67 bool call(Formatter *f, stringstream *ss) override {
68 pool_replayer->print_status(f, ss);
69 return true;
70 }
71 };
72
73 class StartCommand : public PoolReplayerAdminSocketCommand {
74 public:
75 explicit StartCommand(PoolReplayer *pool_replayer)
76 : PoolReplayerAdminSocketCommand(pool_replayer) {
77 }
78
79 bool call(Formatter *f, stringstream *ss) override {
80 pool_replayer->start();
81 return true;
82 }
83 };
84
85 class StopCommand : public PoolReplayerAdminSocketCommand {
86 public:
87 explicit StopCommand(PoolReplayer *pool_replayer)
88 : PoolReplayerAdminSocketCommand(pool_replayer) {
89 }
90
91 bool call(Formatter *f, stringstream *ss) override {
92 pool_replayer->stop(true);
93 return true;
94 }
95 };
96
97 class RestartCommand : public PoolReplayerAdminSocketCommand {
98 public:
99 explicit RestartCommand(PoolReplayer *pool_replayer)
100 : PoolReplayerAdminSocketCommand(pool_replayer) {
101 }
102
103 bool call(Formatter *f, stringstream *ss) override {
104 pool_replayer->restart();
105 return true;
106 }
107 };
108
109 class FlushCommand : public PoolReplayerAdminSocketCommand {
110 public:
111 explicit FlushCommand(PoolReplayer *pool_replayer)
112 : PoolReplayerAdminSocketCommand(pool_replayer) {
113 }
114
115 bool call(Formatter *f, stringstream *ss) override {
116 pool_replayer->flush();
117 return true;
118 }
119 };
120
121 class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
122 public:
123 explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
124 : PoolReplayerAdminSocketCommand(pool_replayer) {
125 }
126
127 bool call(Formatter *f, stringstream *ss) override {
128 pool_replayer->release_leader();
129 return true;
130 }
131 };
132
133 class PoolReplayerAdminSocketHook : public AdminSocketHook {
134 public:
135 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
136 PoolReplayer *pool_replayer)
137 : admin_socket(cct->get_admin_socket()) {
138 std::string command;
139 int r;
140
141 command = "rbd mirror status " + name;
142 r = admin_socket->register_command(command, command, this,
143 "get status for rbd mirror " + name);
144 if (r == 0) {
145 commands[command] = new StatusCommand(pool_replayer);
146 }
147
148 command = "rbd mirror start " + name;
149 r = admin_socket->register_command(command, command, this,
150 "start rbd mirror " + name);
151 if (r == 0) {
152 commands[command] = new StartCommand(pool_replayer);
153 }
154
155 command = "rbd mirror stop " + name;
156 r = admin_socket->register_command(command, command, this,
157 "stop rbd mirror " + name);
158 if (r == 0) {
159 commands[command] = new StopCommand(pool_replayer);
160 }
161
162 command = "rbd mirror restart " + name;
163 r = admin_socket->register_command(command, command, this,
164 "restart rbd mirror " + name);
165 if (r == 0) {
166 commands[command] = new RestartCommand(pool_replayer);
167 }
168
169 command = "rbd mirror flush " + name;
170 r = admin_socket->register_command(command, command, this,
171 "flush rbd mirror " + name);
172 if (r == 0) {
173 commands[command] = new FlushCommand(pool_replayer);
174 }
175
176 command = "rbd mirror leader release " + name;
177 r = admin_socket->register_command(command, command, this,
178 "release rbd mirror leader " + name);
179 if (r == 0) {
180 commands[command] = new LeaderReleaseCommand(pool_replayer);
181 }
182 }
183
184 ~PoolReplayerAdminSocketHook() override {
185 for (Commands::const_iterator i = commands.begin(); i != commands.end();
186 ++i) {
187 (void)admin_socket->unregister_command(i->first);
188 delete i->second;
189 }
190 }
191
192 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
193 bufferlist& out) override {
194 Commands::const_iterator i = commands.find(command);
195 assert(i != commands.end());
196 Formatter *f = Formatter::create(format);
197 stringstream ss;
198 bool r = i->second->call(f, &ss);
199 delete f;
200 out.append(ss);
201 return r;
202 }
203
204 private:
205 typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
206
207 AdminSocket *admin_socket;
208 Commands commands;
209 };
210
211 } // anonymous namespace
212
213 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
214 ServiceDaemon<librbd::ImageCtx>* service_daemon,
215 ImageDeleter<>* image_deleter,
216 int64_t local_pool_id, const peer_t &peer,
217 const std::vector<const char*> &args) :
218 m_threads(threads),
219 m_service_daemon(service_daemon),
220 m_image_deleter(image_deleter),
221 m_local_pool_id(local_pool_id),
222 m_peer(peer),
223 m_args(args),
224 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
225 m_local_pool_watcher_listener(this, true),
226 m_remote_pool_watcher_listener(this, false),
227 m_pool_replayer_thread(this),
228 m_leader_listener(this)
229 {
230 }
231
232 PoolReplayer::~PoolReplayer()
233 {
234 delete m_asok_hook;
235 shut_down();
236 }
237
238 bool PoolReplayer::is_blacklisted() const {
239 Mutex::Locker locker(m_lock);
240 return m_blacklisted;
241 }
242
243 bool PoolReplayer::is_leader() const {
244 Mutex::Locker locker(m_lock);
245 return m_leader_watcher && m_leader_watcher->is_leader();
246 }
247
248 bool PoolReplayer::is_running() const {
249 return m_pool_replayer_thread.is_started();
250 }
251
252 void PoolReplayer::init()
253 {
254 assert(!m_pool_replayer_thread.is_started());
255
256 // reset state
257 m_stopping = false;
258 m_blacklisted = false;
259
260 dout(20) << "replaying for " << m_peer << dendl;
261 int r = init_rados(g_ceph_context->_conf->cluster,
262 g_ceph_context->_conf->name.to_str(),
263 "local cluster", &m_local_rados);
264 if (r < 0) {
265 m_callout_id = m_service_daemon->add_or_update_callout(
266 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
267 "unable to connect to local cluster");
268 return;
269 }
270
271 r = init_rados(m_peer.cluster_name, m_peer.client_name,
272 std::string("remote peer ") + stringify(m_peer),
273 &m_remote_rados);
274 if (r < 0) {
275 m_callout_id = m_service_daemon->add_or_update_callout(
276 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
277 "unable to connect to remote cluster");
278 return;
279 }
280
281 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
282 if (r < 0) {
283 derr << "error accessing local pool " << m_local_pool_id << ": "
284 << cpp_strerror(r) << dendl;
285 return;
286 }
287
288 std::string local_mirror_uuid;
289 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
290 &local_mirror_uuid);
291 if (r < 0) {
292 derr << "failed to retrieve local mirror uuid from pool "
293 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
294 m_callout_id = m_service_daemon->add_or_update_callout(
295 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
296 "unable to query local mirror uuid");
297 return;
298 }
299
300 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
301 m_remote_io_ctx);
302 if (r < 0) {
303 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
304 << ": " << cpp_strerror(r) << dendl;
305 m_callout_id = m_service_daemon->add_or_update_callout(
306 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
307 "unable to access remote pool");
308 return;
309 }
310
311 dout(20) << "connected to " << m_peer << dendl;
312
313 m_instance_replayer.reset(InstanceReplayer<>::create(
314 m_threads, m_service_daemon, m_image_deleter, m_local_rados,
315 local_mirror_uuid, m_local_pool_id));
316 m_instance_replayer->init();
317 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
318
319 m_instance_watcher.reset(InstanceWatcher<>::create(
320 m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
321 r = m_instance_watcher->init();
322 if (r < 0) {
323 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
324 m_callout_id = m_service_daemon->add_or_update_callout(
325 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
326 "unable to initialize instance messenger object");
327 return;
328 }
329
330 m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
331 &m_leader_listener));
332 r = m_leader_watcher->init();
333 if (r < 0) {
334 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
335 m_callout_id = m_service_daemon->add_or_update_callout(
336 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
337 "unable to initialize leader messenger object");
338 return;
339 }
340
341 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
342 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
343 m_callout_id = service_daemon::CALLOUT_ID_NONE;
344 }
345
346 m_pool_replayer_thread.create("pool replayer");
347 }
348
349 void PoolReplayer::shut_down() {
350 m_stopping = true;
351 {
352 Mutex::Locker l(m_lock);
353 m_cond.Signal();
354 }
355 if (m_pool_replayer_thread.is_started()) {
356 m_pool_replayer_thread.join();
357 }
358 if (m_leader_watcher) {
359 m_leader_watcher->shut_down();
360 m_leader_watcher.reset();
361 }
362 if (m_instance_watcher) {
363 m_instance_watcher->shut_down();
364 m_instance_watcher.reset();
365 }
366 if (m_instance_replayer) {
367 m_instance_replayer->shut_down();
368 m_instance_replayer.reset();
369 }
370
371 assert(!m_local_pool_watcher);
372 assert(!m_remote_pool_watcher);
373 m_local_rados.reset();
374 m_remote_rados.reset();
375 }
376
377 int PoolReplayer::init_rados(const std::string &cluster_name,
378 const std::string &client_name,
379 const std::string &description,
380 RadosRef *rados_ref) {
381 rados_ref->reset(new librados::Rados());
382
383 // NOTE: manually bootstrap a CephContext here instead of via
384 // the librados API to avoid mixing global singletons between
385 // the librados shared library and the daemon
386 // TODO: eliminate intermingling of global singletons within Ceph APIs
387 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
388 if (client_name.empty() || !iparams.name.from_str(client_name)) {
389 derr << "error initializing cluster handle for " << description << dendl;
390 return -EINVAL;
391 }
392
393 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
394 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
395 cct->_conf->cluster = cluster_name;
396
397 // librados::Rados::conf_read_file
398 int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
399 if (r < 0) {
400 derr << "could not read ceph conf for " << description << ": "
401 << cpp_strerror(r) << dendl;
402 cct->put();
403 return r;
404 }
405 cct->_conf->parse_env();
406
407 // librados::Rados::conf_parse_env
408 std::vector<const char*> args;
409 env_to_vec(args, nullptr);
410 r = cct->_conf->parse_argv(args);
411 if (r < 0) {
412 derr << "could not parse environment for " << description << ":"
413 << cpp_strerror(r) << dendl;
414 cct->put();
415 return r;
416 }
417
418 if (!m_args.empty()) {
419 // librados::Rados::conf_parse_argv
420 args = m_args;
421 r = cct->_conf->parse_argv(args);
422 if (r < 0) {
423 derr << "could not parse command line args for " << description << ": "
424 << cpp_strerror(r) << dendl;
425 cct->put();
426 return r;
427 }
428 }
429
430 if (!g_ceph_context->_conf->admin_socket.empty()) {
431 cct->_conf->set_val_or_die("admin_socket",
432 "$run_dir/$name.$pid.$cluster.$cctid.asok");
433 }
434
435 // disable unnecessary librbd cache
436 cct->_conf->set_val_or_die("rbd_cache", "false");
437 cct->_conf->apply_changes(nullptr);
438 cct->_conf->complain_about_parse_errors(cct);
439
440 r = (*rados_ref)->init_with_context(cct);
441 assert(r == 0);
442 cct->put();
443
444 r = (*rados_ref)->connect();
445 if (r < 0) {
446 derr << "error connecting to " << description << ": "
447 << cpp_strerror(r) << dendl;
448 return r;
449 }
450
451 return 0;
452 }
453
454 void PoolReplayer::run()
455 {
456 dout(20) << "enter" << dendl;
457
458 while (!m_stopping) {
459 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
460 m_peer.cluster_name;
461 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
462 m_asok_hook_name = asok_hook_name;
463 delete m_asok_hook;
464
465 m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
466 m_asok_hook_name, this);
467 }
468
469 Mutex::Locker locker(m_lock);
470 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
471 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
472 m_blacklisted = true;
473 m_stopping = true;
474 break;
475 }
476
477 if (!m_stopping) {
478 m_cond.WaitInterval(m_lock, utime_t(1, 0));
479 }
480 }
481 }
482
483 void PoolReplayer::print_status(Formatter *f, stringstream *ss)
484 {
485 dout(20) << "enter" << dendl;
486
487 if (!f) {
488 return;
489 }
490
491 Mutex::Locker l(m_lock);
492
493 f->open_object_section("pool_replayer_status");
494 f->dump_string("pool", m_local_io_ctx.get_pool_name());
495 f->dump_stream("peer") << m_peer;
496 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
497
498 std::string leader_instance_id;
499 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
500 f->dump_string("leader_instance_id", leader_instance_id);
501
502 bool leader = m_leader_watcher->is_leader();
503 f->dump_bool("leader", leader);
504 if (leader) {
505 std::vector<std::string> instance_ids;
506 m_leader_watcher->list_instances(&instance_ids);
507 f->open_array_section("instances");
508 for (auto instance_id : instance_ids) {
509 f->dump_string("instance_id", instance_id);
510 }
511 f->close_section();
512 }
513
514 f->dump_string("local_cluster_admin_socket",
515 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
516 admin_socket);
517 f->dump_string("remote_cluster_admin_socket",
518 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
519 admin_socket);
520
521 f->open_object_section("sync_throttler");
522 m_instance_watcher->print_sync_status(f, ss);
523 f->close_section();
524
525 m_instance_replayer->print_status(f, ss);
526
527 f->close_section();
528 f->flush(*ss);
529 }
530
531 void PoolReplayer::start()
532 {
533 dout(20) << "enter" << dendl;
534
535 Mutex::Locker l(m_lock);
536
537 if (m_stopping) {
538 return;
539 }
540
541 m_instance_replayer->start();
542 }
543
544 void PoolReplayer::stop(bool manual)
545 {
546 dout(20) << "enter: manual=" << manual << dendl;
547
548 Mutex::Locker l(m_lock);
549 if (!manual) {
550 m_stopping = true;
551 m_cond.Signal();
552 return;
553 } else if (m_stopping) {
554 return;
555 }
556
557 m_instance_replayer->stop();
558 }
559
560 void PoolReplayer::restart()
561 {
562 dout(20) << "enter" << dendl;
563
564 Mutex::Locker l(m_lock);
565
566 if (m_stopping) {
567 return;
568 }
569
570 m_instance_replayer->restart();
571 }
572
573 void PoolReplayer::flush()
574 {
575 dout(20) << "enter" << dendl;
576
577 Mutex::Locker l(m_lock);
578
579 if (m_stopping || m_manual_stop) {
580 return;
581 }
582
583 m_instance_replayer->flush();
584 }
585
586 void PoolReplayer::release_leader()
587 {
588 dout(20) << "enter" << dendl;
589
590 Mutex::Locker l(m_lock);
591
592 if (m_stopping || !m_leader_watcher) {
593 return;
594 }
595
596 m_leader_watcher->release_leader();
597 }
598
599 void PoolReplayer::handle_update(const std::string &mirror_uuid,
600 ImageIds &&added_image_ids,
601 ImageIds &&removed_image_ids) {
602 if (m_stopping) {
603 return;
604 }
605
606 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
607 << "added_count=" << added_image_ids.size() << ", "
608 << "removed_count=" << removed_image_ids.size() << dendl;
609 Mutex::Locker locker(m_lock);
610 if (!m_leader_watcher->is_leader()) {
611 return;
612 }
613
614 m_service_daemon->add_or_update_attribute(
615 m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
616 m_local_pool_watcher->get_image_count());
617 if (m_remote_pool_watcher) {
618 m_service_daemon->add_or_update_attribute(
619 m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
620 m_remote_pool_watcher->get_image_count());
621 }
622
623 m_update_op_tracker.start_op();
624 Context *ctx = new FunctionContext([this](int r) {
625 dout(20) << "complete handle_update: r=" << r << dendl;
626 m_update_op_tracker.finish_op();
627 });
628
629 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
630
631 for (auto &image_id : added_image_ids) {
632 // for now always send to myself (the leader)
633 std::string &instance_id = m_instance_watcher->get_instance_id();
634 m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
635 gather_ctx->new_sub());
636 }
637
638 if (!mirror_uuid.empty()) {
639 for (auto &image_id : removed_image_ids) {
640 // for now always send to myself (the leader)
641 std::string &instance_id = m_instance_watcher->get_instance_id();
642 m_instance_watcher->notify_peer_image_removed(instance_id,
643 image_id.global_id,
644 mirror_uuid,
645 gather_ctx->new_sub());
646 }
647 }
648
649 gather_ctx->activate();
650 }
651
652 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
653 dout(20) << dendl;
654
655 m_service_daemon->add_or_update_attribute(m_local_pool_id,
656 SERVICE_DAEMON_LEADER_KEY, true);
657 m_instance_watcher->handle_acquire_leader();
658 init_local_pool_watcher(on_finish);
659 }
660
661 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
662 dout(20) << dendl;
663
664 m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
665 m_instance_watcher->handle_release_leader();
666 shut_down_pool_watchers(on_finish);
667 }
668
669 void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
670 dout(20) << dendl;
671
672 Mutex::Locker locker(m_lock);
673 assert(!m_local_pool_watcher);
674 m_local_pool_watcher.reset(new PoolWatcher<>(
675 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
676
677 // ensure the initial set of local images is up-to-date
678 // after acquiring the leader role
679 auto ctx = new FunctionContext([this, on_finish](int r) {
680 handle_init_local_pool_watcher(r, on_finish);
681 });
682 m_local_pool_watcher->init(create_async_context_callback(
683 m_threads->work_queue, ctx));
684 }
685
686 void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
687 dout(20) << "r=" << r << dendl;
688 if (r < 0) {
689 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
690 on_finish->complete(r);
691 return;
692 }
693
694 init_remote_pool_watcher(on_finish);
695 }
696
697 void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
698 dout(20) << dendl;
699
700 Mutex::Locker locker(m_lock);
701 assert(!m_remote_pool_watcher);
702 m_remote_pool_watcher.reset(new PoolWatcher<>(
703 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
704 m_remote_pool_watcher->init(create_async_context_callback(
705 m_threads->work_queue, on_finish));
706
707 m_cond.Signal();
708 }
709
710 void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
711 dout(20) << dendl;
712
713 {
714 Mutex::Locker locker(m_lock);
715 if (m_local_pool_watcher) {
716 Context *ctx = new FunctionContext([this, on_finish](int r) {
717 handle_shut_down_pool_watchers(r, on_finish);
718 });
719 ctx = create_async_context_callback(m_threads->work_queue, ctx);
720
721 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
722 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
723 if (m_remote_pool_watcher) {
724 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
725 }
726 gather_ctx->activate();
727 return;
728 }
729 }
730
731 on_finish->complete(0);
732 }
733
734 void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
735 dout(20) << "r=" << r << dendl;
736
737 {
738 Mutex::Locker locker(m_lock);
739 assert(m_local_pool_watcher);
740 m_local_pool_watcher.reset();
741
742 if (m_remote_pool_watcher) {
743 m_remote_pool_watcher.reset();
744 }
745 }
746 wait_for_update_ops(on_finish);
747 }
748
749 void PoolReplayer::wait_for_update_ops(Context *on_finish) {
750 dout(20) << dendl;
751
752 Mutex::Locker locker(m_lock);
753
754 Context *ctx = new FunctionContext([this, on_finish](int r) {
755 handle_wait_for_update_ops(r, on_finish);
756 });
757 ctx = create_async_context_callback(m_threads->work_queue, ctx);
758
759 m_update_op_tracker.wait_for_ops(ctx);
760 }
761
762 void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
763 dout(20) << "r=" << r << dendl;
764
765 assert(r == 0);
766
767 Mutex::Locker locker(m_lock);
768 m_instance_replayer->release_all(on_finish);
769 }
770
771 void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
772 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
773
774 m_instance_watcher->handle_update_leader(leader_instance_id);
775 }
776
777 } // namespace mirror
778 } // namespace rbd