]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Mirror.h"
20 #include "InstanceReplayer.h"
21 #include "InstanceWatcher.h"
22 #include "LeaderWatcher.h"
23 #include "ServiceDaemon.h"
24 #include "Threads.h"
25
26 #define dout_context g_ceph_context
27 #define dout_subsys ceph_subsys_rbd_mirror
28 #undef dout_prefix
29 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30 << this << " " << __func__ << ": "
31
32 using std::chrono::seconds;
33 using std::map;
34 using std::string;
35 using std::unique_ptr;
36 using std::vector;
37
38 using librbd::cls_client::dir_get_name;
39 using librbd::util::create_async_context_callback;
40
41 namespace rbd {
42 namespace mirror {
43
44 namespace {
45
46 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
47 const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48 const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
49
50 const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
51 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
52
53 class PoolReplayerAdminSocketCommand {
54 public:
55 PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
56 : pool_replayer(pool_replayer) {
57 }
58 virtual ~PoolReplayerAdminSocketCommand() {}
59 virtual bool call(Formatter *f, stringstream *ss) = 0;
60 protected:
61 PoolReplayer *pool_replayer;
62 };
63
64 class StatusCommand : public PoolReplayerAdminSocketCommand {
65 public:
66 explicit StatusCommand(PoolReplayer *pool_replayer)
67 : PoolReplayerAdminSocketCommand(pool_replayer) {
68 }
69
70 bool call(Formatter *f, stringstream *ss) override {
71 pool_replayer->print_status(f, ss);
72 return true;
73 }
74 };
75
76 class StartCommand : public PoolReplayerAdminSocketCommand {
77 public:
78 explicit StartCommand(PoolReplayer *pool_replayer)
79 : PoolReplayerAdminSocketCommand(pool_replayer) {
80 }
81
82 bool call(Formatter *f, stringstream *ss) override {
83 pool_replayer->start();
84 return true;
85 }
86 };
87
88 class StopCommand : public PoolReplayerAdminSocketCommand {
89 public:
90 explicit StopCommand(PoolReplayer *pool_replayer)
91 : PoolReplayerAdminSocketCommand(pool_replayer) {
92 }
93
94 bool call(Formatter *f, stringstream *ss) override {
95 pool_replayer->stop(true);
96 return true;
97 }
98 };
99
100 class RestartCommand : public PoolReplayerAdminSocketCommand {
101 public:
102 explicit RestartCommand(PoolReplayer *pool_replayer)
103 : PoolReplayerAdminSocketCommand(pool_replayer) {
104 }
105
106 bool call(Formatter *f, stringstream *ss) override {
107 pool_replayer->restart();
108 return true;
109 }
110 };
111
112 class FlushCommand : public PoolReplayerAdminSocketCommand {
113 public:
114 explicit FlushCommand(PoolReplayer *pool_replayer)
115 : PoolReplayerAdminSocketCommand(pool_replayer) {
116 }
117
118 bool call(Formatter *f, stringstream *ss) override {
119 pool_replayer->flush();
120 return true;
121 }
122 };
123
124 class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
125 public:
126 explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
127 : PoolReplayerAdminSocketCommand(pool_replayer) {
128 }
129
130 bool call(Formatter *f, stringstream *ss) override {
131 pool_replayer->release_leader();
132 return true;
133 }
134 };
135
136 class PoolReplayerAdminSocketHook : public AdminSocketHook {
137 public:
138 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
139 PoolReplayer *pool_replayer)
140 : admin_socket(cct->get_admin_socket()) {
141 std::string command;
142 int r;
143
144 command = "rbd mirror status " + name;
145 r = admin_socket->register_command(command, command, this,
146 "get status for rbd mirror " + name);
147 if (r == 0) {
148 commands[command] = new StatusCommand(pool_replayer);
149 }
150
151 command = "rbd mirror start " + name;
152 r = admin_socket->register_command(command, command, this,
153 "start rbd mirror " + name);
154 if (r == 0) {
155 commands[command] = new StartCommand(pool_replayer);
156 }
157
158 command = "rbd mirror stop " + name;
159 r = admin_socket->register_command(command, command, this,
160 "stop rbd mirror " + name);
161 if (r == 0) {
162 commands[command] = new StopCommand(pool_replayer);
163 }
164
165 command = "rbd mirror restart " + name;
166 r = admin_socket->register_command(command, command, this,
167 "restart rbd mirror " + name);
168 if (r == 0) {
169 commands[command] = new RestartCommand(pool_replayer);
170 }
171
172 command = "rbd mirror flush " + name;
173 r = admin_socket->register_command(command, command, this,
174 "flush rbd mirror " + name);
175 if (r == 0) {
176 commands[command] = new FlushCommand(pool_replayer);
177 }
178
179 command = "rbd mirror leader release " + name;
180 r = admin_socket->register_command(command, command, this,
181 "release rbd mirror leader " + name);
182 if (r == 0) {
183 commands[command] = new LeaderReleaseCommand(pool_replayer);
184 }
185 }
186
187 ~PoolReplayerAdminSocketHook() override {
188 for (Commands::const_iterator i = commands.begin(); i != commands.end();
189 ++i) {
190 (void)admin_socket->unregister_command(i->first);
191 delete i->second;
192 }
193 }
194
195 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
196 bufferlist& out) override {
197 Commands::const_iterator i = commands.find(command);
198 assert(i != commands.end());
199 Formatter *f = Formatter::create(format);
200 stringstream ss;
201 bool r = i->second->call(f, &ss);
202 delete f;
203 out.append(ss);
204 return r;
205 }
206
207 private:
208 typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
209
210 AdminSocket *admin_socket;
211 Commands commands;
212 };
213
214 } // anonymous namespace
215
216 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
217 ServiceDaemon<librbd::ImageCtx>* service_daemon,
218 ImageDeleter<>* image_deleter,
219 int64_t local_pool_id, const peer_t &peer,
220 const std::vector<const char*> &args) :
221 m_threads(threads),
222 m_service_daemon(service_daemon),
223 m_image_deleter(image_deleter),
224 m_local_pool_id(local_pool_id),
225 m_peer(peer),
226 m_args(args),
227 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
228 m_local_pool_watcher_listener(this, true),
229 m_remote_pool_watcher_listener(this, false),
230 m_pool_replayer_thread(this),
231 m_leader_listener(this)
232 {
233 }
234
235 PoolReplayer::~PoolReplayer()
236 {
237 delete m_asok_hook;
238 shut_down();
239 }
240
241 bool PoolReplayer::is_blacklisted() const {
242 Mutex::Locker locker(m_lock);
243 return m_blacklisted;
244 }
245
246 bool PoolReplayer::is_leader() const {
247 Mutex::Locker locker(m_lock);
248 return m_leader_watcher && m_leader_watcher->is_leader();
249 }
250
251 bool PoolReplayer::is_running() const {
252 return m_pool_replayer_thread.is_started();
253 }
254
255 void PoolReplayer::init()
256 {
257 assert(!m_pool_replayer_thread.is_started());
258
259 // reset state
260 m_stopping = false;
261 m_blacklisted = false;
262
263 dout(20) << "replaying for " << m_peer << dendl;
264 int r = init_rados(g_ceph_context->_conf->cluster,
265 g_ceph_context->_conf->name.to_str(),
266 "local cluster", &m_local_rados, false);
267 if (r < 0) {
268 m_callout_id = m_service_daemon->add_or_update_callout(
269 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
270 "unable to connect to local cluster");
271 return;
272 }
273
274 r = init_rados(m_peer.cluster_name, m_peer.client_name,
275 std::string("remote peer ") + stringify(m_peer),
276 &m_remote_rados, true);
277 if (r < 0) {
278 m_callout_id = m_service_daemon->add_or_update_callout(
279 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
280 "unable to connect to remote cluster");
281 return;
282 }
283
284 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
285 if (r < 0) {
286 derr << "error accessing local pool " << m_local_pool_id << ": "
287 << cpp_strerror(r) << dendl;
288 return;
289 }
290
291 std::string local_mirror_uuid;
292 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
293 &local_mirror_uuid);
294 if (r < 0) {
295 derr << "failed to retrieve local mirror uuid from pool "
296 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
297 m_callout_id = m_service_daemon->add_or_update_callout(
298 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
299 "unable to query local mirror uuid");
300 return;
301 }
302
303 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
304 m_remote_io_ctx);
305 if (r < 0) {
306 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
307 << ": " << cpp_strerror(r) << dendl;
308 m_callout_id = m_service_daemon->add_or_update_callout(
309 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
310 "unable to access remote pool");
311 return;
312 }
313
314 dout(20) << "connected to " << m_peer << dendl;
315
316 m_instance_replayer.reset(InstanceReplayer<>::create(
317 m_threads, m_service_daemon, m_image_deleter, m_local_rados,
318 local_mirror_uuid, m_local_pool_id));
319 m_instance_replayer->init();
320 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
321
322 m_instance_watcher.reset(InstanceWatcher<>::create(
323 m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
324 r = m_instance_watcher->init();
325 if (r < 0) {
326 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
327 m_callout_id = m_service_daemon->add_or_update_callout(
328 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
329 "unable to initialize instance messenger object");
330 return;
331 }
332
333 m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
334 &m_leader_listener));
335 r = m_leader_watcher->init();
336 if (r < 0) {
337 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
338 m_callout_id = m_service_daemon->add_or_update_callout(
339 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
340 "unable to initialize leader messenger object");
341 return;
342 }
343
344 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
345 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
346 m_callout_id = service_daemon::CALLOUT_ID_NONE;
347 }
348
349 m_pool_replayer_thread.create("pool replayer");
350 }
351
352 void PoolReplayer::shut_down() {
353 m_stopping = true;
354 {
355 Mutex::Locker l(m_lock);
356 m_cond.Signal();
357 }
358 if (m_pool_replayer_thread.is_started()) {
359 m_pool_replayer_thread.join();
360 }
361 if (m_leader_watcher) {
362 m_leader_watcher->shut_down();
363 }
364 if (m_instance_watcher) {
365 m_instance_watcher->shut_down();
366 }
367 if (m_instance_replayer) {
368 m_instance_replayer->shut_down();
369 }
370
371 m_leader_watcher.reset();
372 m_instance_watcher.reset();
373 m_instance_replayer.reset();
374
375 assert(!m_local_pool_watcher);
376 assert(!m_remote_pool_watcher);
377 m_local_rados.reset();
378 m_remote_rados.reset();
379 }
380
381 int PoolReplayer::init_rados(const std::string &cluster_name,
382 const std::string &client_name,
383 const std::string &description,
384 RadosRef *rados_ref,
385 bool strip_cluster_overrides) {
386 rados_ref->reset(new librados::Rados());
387
388 // NOTE: manually bootstrap a CephContext here instead of via
389 // the librados API to avoid mixing global singletons between
390 // the librados shared library and the daemon
391 // TODO: eliminate intermingling of global singletons within Ceph APIs
392 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
393 if (client_name.empty() || !iparams.name.from_str(client_name)) {
394 derr << "error initializing cluster handle for " << description << dendl;
395 return -EINVAL;
396 }
397
398 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
399 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
400 cct->_conf->cluster = cluster_name;
401
402 // librados::Rados::conf_read_file
403 int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
404 if (r < 0) {
405 derr << "could not read ceph conf for " << description << ": "
406 << cpp_strerror(r) << dendl;
407 cct->put();
408 return r;
409 }
410
411 // preserve cluster-specific config settings before applying environment/cli
412 // overrides
413 std::map<std::string, std::string> config_values;
414 if (strip_cluster_overrides) {
415 // remote peer connections shouldn't apply cluster-specific
416 // configuration settings
417 for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
418 config_values[key] = cct->_conf->get_val<std::string>(key);
419 }
420 }
421
422 cct->_conf->parse_env();
423
424 // librados::Rados::conf_parse_env
425 std::vector<const char*> args;
426 env_to_vec(args, nullptr);
427 r = cct->_conf->parse_argv(args);
428 if (r < 0) {
429 derr << "could not parse environment for " << description << ":"
430 << cpp_strerror(r) << dendl;
431 cct->put();
432 return r;
433 }
434
435 if (!m_args.empty()) {
436 // librados::Rados::conf_parse_argv
437 args = m_args;
438 r = cct->_conf->parse_argv(args);
439 if (r < 0) {
440 derr << "could not parse command line args for " << description << ": "
441 << cpp_strerror(r) << dendl;
442 cct->put();
443 return r;
444 }
445 }
446
447 if (strip_cluster_overrides) {
448 // remote peer connections shouldn't apply cluster-specific
449 // configuration settings
450 for (auto& pair : config_values) {
451 auto value = cct->_conf->get_val<std::string>(pair.first);
452 if (pair.second != value) {
453 dout(0) << "reverting global config option override: "
454 << pair.first << ": " << value << " -> " << pair.second
455 << dendl;
456 cct->_conf->set_val_or_die(pair.first, pair.second);
457 }
458 }
459 }
460
461 if (!g_ceph_context->_conf->admin_socket.empty()) {
462 cct->_conf->set_val_or_die("admin_socket",
463 "$run_dir/$name.$pid.$cluster.$cctid.asok");
464 }
465
466 // disable unnecessary librbd cache
467 cct->_conf->set_val_or_die("rbd_cache", "false");
468 cct->_conf->apply_changes(nullptr);
469 cct->_conf->complain_about_parse_errors(cct);
470
471 r = (*rados_ref)->init_with_context(cct);
472 assert(r == 0);
473 cct->put();
474
475 r = (*rados_ref)->connect();
476 if (r < 0) {
477 derr << "error connecting to " << description << ": "
478 << cpp_strerror(r) << dendl;
479 return r;
480 }
481
482 return 0;
483 }
484
485 void PoolReplayer::run()
486 {
487 dout(20) << "enter" << dendl;
488
489 while (!m_stopping) {
490 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
491 m_peer.cluster_name;
492 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
493 m_asok_hook_name = asok_hook_name;
494 delete m_asok_hook;
495
496 m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
497 m_asok_hook_name, this);
498 }
499
500 Mutex::Locker locker(m_lock);
501 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
502 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
503 m_blacklisted = true;
504 m_stopping = true;
505 break;
506 }
507
508 if (!m_stopping) {
509 m_cond.WaitInterval(m_lock, utime_t(1, 0));
510 }
511 }
512
513 m_instance_replayer->stop();
514 }
515
516 void PoolReplayer::print_status(Formatter *f, stringstream *ss)
517 {
518 dout(20) << "enter" << dendl;
519
520 if (!f) {
521 return;
522 }
523
524 Mutex::Locker l(m_lock);
525
526 f->open_object_section("pool_replayer_status");
527 f->dump_string("pool", m_local_io_ctx.get_pool_name());
528 f->dump_stream("peer") << m_peer;
529 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
530
531 std::string leader_instance_id;
532 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
533 f->dump_string("leader_instance_id", leader_instance_id);
534
535 bool leader = m_leader_watcher->is_leader();
536 f->dump_bool("leader", leader);
537 if (leader) {
538 std::vector<std::string> instance_ids;
539 m_leader_watcher->list_instances(&instance_ids);
540 f->open_array_section("instances");
541 for (auto instance_id : instance_ids) {
542 f->dump_string("instance_id", instance_id);
543 }
544 f->close_section();
545 }
546
547 f->dump_string("local_cluster_admin_socket",
548 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
549 get_val<std::string>("admin_socket"));
550 f->dump_string("remote_cluster_admin_socket",
551 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
552 get_val<std::string>("admin_socket"));
553
554 f->open_object_section("sync_throttler");
555 m_instance_watcher->print_sync_status(f, ss);
556 f->close_section();
557
558 m_instance_replayer->print_status(f, ss);
559
560 f->close_section();
561 f->flush(*ss);
562 }
563
564 void PoolReplayer::start()
565 {
566 dout(20) << "enter" << dendl;
567
568 Mutex::Locker l(m_lock);
569
570 if (m_stopping) {
571 return;
572 }
573
574 m_instance_replayer->start();
575 }
576
577 void PoolReplayer::stop(bool manual)
578 {
579 dout(20) << "enter: manual=" << manual << dendl;
580
581 Mutex::Locker l(m_lock);
582 if (!manual) {
583 m_stopping = true;
584 m_cond.Signal();
585 return;
586 } else if (m_stopping) {
587 return;
588 }
589
590 m_instance_replayer->stop();
591 }
592
593 void PoolReplayer::restart()
594 {
595 dout(20) << "enter" << dendl;
596
597 Mutex::Locker l(m_lock);
598
599 if (m_stopping) {
600 return;
601 }
602
603 m_instance_replayer->restart();
604 }
605
606 void PoolReplayer::flush()
607 {
608 dout(20) << "enter" << dendl;
609
610 Mutex::Locker l(m_lock);
611
612 if (m_stopping || m_manual_stop) {
613 return;
614 }
615
616 m_instance_replayer->flush();
617 }
618
619 void PoolReplayer::release_leader()
620 {
621 dout(20) << "enter" << dendl;
622
623 Mutex::Locker l(m_lock);
624
625 if (m_stopping || !m_leader_watcher) {
626 return;
627 }
628
629 m_leader_watcher->release_leader();
630 }
631
632 void PoolReplayer::handle_update(const std::string &mirror_uuid,
633 ImageIds &&added_image_ids,
634 ImageIds &&removed_image_ids) {
635 if (m_stopping) {
636 return;
637 }
638
639 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
640 << "added_count=" << added_image_ids.size() << ", "
641 << "removed_count=" << removed_image_ids.size() << dendl;
642 Mutex::Locker locker(m_lock);
643 if (!m_leader_watcher->is_leader()) {
644 return;
645 }
646
647 m_service_daemon->add_or_update_attribute(
648 m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
649 m_local_pool_watcher->get_image_count());
650 if (m_remote_pool_watcher) {
651 m_service_daemon->add_or_update_attribute(
652 m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
653 m_remote_pool_watcher->get_image_count());
654 }
655
656 m_update_op_tracker.start_op();
657 Context *ctx = new FunctionContext([this](int r) {
658 dout(20) << "complete handle_update: r=" << r << dendl;
659 m_update_op_tracker.finish_op();
660 });
661
662 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
663
664 for (auto &image_id : added_image_ids) {
665 // for now always send to myself (the leader)
666 std::string &instance_id = m_instance_watcher->get_instance_id();
667 m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
668 gather_ctx->new_sub());
669 }
670
671 if (!mirror_uuid.empty()) {
672 for (auto &image_id : removed_image_ids) {
673 // for now always send to myself (the leader)
674 std::string &instance_id = m_instance_watcher->get_instance_id();
675 m_instance_watcher->notify_peer_image_removed(instance_id,
676 image_id.global_id,
677 mirror_uuid,
678 gather_ctx->new_sub());
679 }
680 }
681
682 gather_ctx->activate();
683 }
684
685 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
686 dout(20) << dendl;
687
688 m_service_daemon->add_or_update_attribute(m_local_pool_id,
689 SERVICE_DAEMON_LEADER_KEY, true);
690 m_instance_watcher->handle_acquire_leader();
691 init_local_pool_watcher(on_finish);
692 }
693
694 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
695 dout(20) << dendl;
696
697 m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
698 m_instance_watcher->handle_release_leader();
699 shut_down_pool_watchers(on_finish);
700 }
701
702 void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
703 dout(20) << dendl;
704
705 Mutex::Locker locker(m_lock);
706 assert(!m_local_pool_watcher);
707 m_local_pool_watcher.reset(new PoolWatcher<>(
708 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
709
710 // ensure the initial set of local images is up-to-date
711 // after acquiring the leader role
712 auto ctx = new FunctionContext([this, on_finish](int r) {
713 handle_init_local_pool_watcher(r, on_finish);
714 });
715 m_local_pool_watcher->init(create_async_context_callback(
716 m_threads->work_queue, ctx));
717 }
718
719 void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
720 dout(20) << "r=" << r << dendl;
721 if (r < 0) {
722 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
723 on_finish->complete(r);
724 return;
725 }
726
727 init_remote_pool_watcher(on_finish);
728 }
729
730 void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
731 dout(20) << dendl;
732
733 Mutex::Locker locker(m_lock);
734 assert(!m_remote_pool_watcher);
735 m_remote_pool_watcher.reset(new PoolWatcher<>(
736 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
737 m_remote_pool_watcher->init(create_async_context_callback(
738 m_threads->work_queue, on_finish));
739
740 m_cond.Signal();
741 }
742
743 void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
744 dout(20) << dendl;
745
746 {
747 Mutex::Locker locker(m_lock);
748 if (m_local_pool_watcher) {
749 Context *ctx = new FunctionContext([this, on_finish](int r) {
750 handle_shut_down_pool_watchers(r, on_finish);
751 });
752 ctx = create_async_context_callback(m_threads->work_queue, ctx);
753
754 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
755 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
756 if (m_remote_pool_watcher) {
757 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
758 }
759 gather_ctx->activate();
760 return;
761 }
762 }
763
764 on_finish->complete(0);
765 }
766
767 void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
768 dout(20) << "r=" << r << dendl;
769
770 {
771 Mutex::Locker locker(m_lock);
772 assert(m_local_pool_watcher);
773 m_local_pool_watcher.reset();
774
775 if (m_remote_pool_watcher) {
776 m_remote_pool_watcher.reset();
777 }
778 }
779 wait_for_update_ops(on_finish);
780 }
781
782 void PoolReplayer::wait_for_update_ops(Context *on_finish) {
783 dout(20) << dendl;
784
785 Mutex::Locker locker(m_lock);
786
787 Context *ctx = new FunctionContext([this, on_finish](int r) {
788 handle_wait_for_update_ops(r, on_finish);
789 });
790 ctx = create_async_context_callback(m_threads->work_queue, ctx);
791
792 m_update_op_tracker.wait_for_ops(ctx);
793 }
794
795 void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
796 dout(20) << "r=" << r << dendl;
797
798 assert(r == 0);
799
800 Mutex::Locker locker(m_lock);
801 m_instance_replayer->release_all(on_finish);
802 }
803
804 void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
805 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
806
807 m_instance_watcher->handle_update_leader(leader_instance_id);
808 }
809
810 } // namespace mirror
811 } // namespace rbd