]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Config.h"
20 #include "librbd/api/Mirror.h"
21 #include "ImageMap.h"
22 #include "InstanceReplayer.h"
23 #include "InstanceWatcher.h"
24 #include "LeaderWatcher.h"
25 #include "ServiceDaemon.h"
26 #include "Threads.h"
27
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_rbd_mirror
30 #undef dout_prefix
31 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
32 << this << " " << __func__ << ": "
33
34 using std::chrono::seconds;
35 using std::map;
36 using std::string;
37 using std::unique_ptr;
38 using std::vector;
39
40 using librbd::cls_client::dir_get_name;
41 using librbd::util::create_async_context_callback;
42
43 namespace rbd {
44 namespace mirror {
45
46 using ::operator<<;
47
48 namespace {
49
50 const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
51 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
52 const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
53 const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
54
55 const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
56 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
57
58 template <typename I>
59 class PoolReplayerAdminSocketCommand {
60 public:
61 PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer)
62 : pool_replayer(pool_replayer) {
63 }
64 virtual ~PoolReplayerAdminSocketCommand() {}
65 virtual bool call(Formatter *f, stringstream *ss) = 0;
66 protected:
67 PoolReplayer<I> *pool_replayer;
68 };
69
70 template <typename I>
71 class StatusCommand : public PoolReplayerAdminSocketCommand<I> {
72 public:
73 explicit StatusCommand(PoolReplayer<I> *pool_replayer)
74 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
75 }
76
77 bool call(Formatter *f, stringstream *ss) override {
78 this->pool_replayer->print_status(f, ss);
79 return true;
80 }
81 };
82
83 template <typename I>
84 class StartCommand : public PoolReplayerAdminSocketCommand<I> {
85 public:
86 explicit StartCommand(PoolReplayer<I> *pool_replayer)
87 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
88 }
89
90 bool call(Formatter *f, stringstream *ss) override {
91 this->pool_replayer->start();
92 return true;
93 }
94 };
95
96 template <typename I>
97 class StopCommand : public PoolReplayerAdminSocketCommand<I> {
98 public:
99 explicit StopCommand(PoolReplayer<I> *pool_replayer)
100 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
101 }
102
103 bool call(Formatter *f, stringstream *ss) override {
104 this->pool_replayer->stop(true);
105 return true;
106 }
107 };
108
109 template <typename I>
110 class RestartCommand : public PoolReplayerAdminSocketCommand<I> {
111 public:
112 explicit RestartCommand(PoolReplayer<I> *pool_replayer)
113 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
114 }
115
116 bool call(Formatter *f, stringstream *ss) override {
117 this->pool_replayer->restart();
118 return true;
119 }
120 };
121
122 template <typename I>
123 class FlushCommand : public PoolReplayerAdminSocketCommand<I> {
124 public:
125 explicit FlushCommand(PoolReplayer<I> *pool_replayer)
126 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
127 }
128
129 bool call(Formatter *f, stringstream *ss) override {
130 this->pool_replayer->flush();
131 return true;
132 }
133 };
134
135 template <typename I>
136 class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> {
137 public:
138 explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer)
139 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
140 }
141
142 bool call(Formatter *f, stringstream *ss) override {
143 this->pool_replayer->release_leader();
144 return true;
145 }
146 };
147
148 template <typename I>
149 class PoolReplayerAdminSocketHook : public AdminSocketHook {
150 public:
151 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
152 PoolReplayer<I> *pool_replayer)
153 : admin_socket(cct->get_admin_socket()) {
154 std::string command;
155 int r;
156
157 command = "rbd mirror status " + name;
158 r = admin_socket->register_command(command, command, this,
159 "get status for rbd mirror " + name);
160 if (r == 0) {
161 commands[command] = new StatusCommand<I>(pool_replayer);
162 }
163
164 command = "rbd mirror start " + name;
165 r = admin_socket->register_command(command, command, this,
166 "start rbd mirror " + name);
167 if (r == 0) {
168 commands[command] = new StartCommand<I>(pool_replayer);
169 }
170
171 command = "rbd mirror stop " + name;
172 r = admin_socket->register_command(command, command, this,
173 "stop rbd mirror " + name);
174 if (r == 0) {
175 commands[command] = new StopCommand<I>(pool_replayer);
176 }
177
178 command = "rbd mirror restart " + name;
179 r = admin_socket->register_command(command, command, this,
180 "restart rbd mirror " + name);
181 if (r == 0) {
182 commands[command] = new RestartCommand<I>(pool_replayer);
183 }
184
185 command = "rbd mirror flush " + name;
186 r = admin_socket->register_command(command, command, this,
187 "flush rbd mirror " + name);
188 if (r == 0) {
189 commands[command] = new FlushCommand<I>(pool_replayer);
190 }
191
192 command = "rbd mirror leader release " + name;
193 r = admin_socket->register_command(command, command, this,
194 "release rbd mirror leader " + name);
195 if (r == 0) {
196 commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
197 }
198 }
199
200 ~PoolReplayerAdminSocketHook() override {
201 for (auto i = commands.begin(); i != commands.end(); ++i) {
202 (void)admin_socket->unregister_command(i->first);
203 delete i->second;
204 }
205 }
206
207 bool call(std::string_view command, const cmdmap_t& cmdmap,
208 std::string_view format, bufferlist& out) override {
209 auto i = commands.find(command);
210 ceph_assert(i != commands.end());
211 Formatter *f = Formatter::create(format);
212 stringstream ss;
213 bool r = i->second->call(f, &ss);
214 delete f;
215 out.append(ss);
216 return r;
217 }
218
219 private:
220 typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*,
221 std::less<>> Commands;
222
223 AdminSocket *admin_socket;
224 Commands commands;
225 };
226
227 } // anonymous namespace
228
229 template <typename I>
230 PoolReplayer<I>::PoolReplayer(Threads<I> *threads,
231 ServiceDaemon<I>* service_daemon,
232 int64_t local_pool_id, const PeerSpec &peer,
233 const std::vector<const char*> &args) :
234 m_threads(threads),
235 m_service_daemon(service_daemon),
236 m_local_pool_id(local_pool_id),
237 m_peer(peer),
238 m_args(args),
239 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
240 m_local_pool_watcher_listener(this, true),
241 m_remote_pool_watcher_listener(this, false),
242 m_image_map_listener(this),
243 m_pool_replayer_thread(this),
244 m_leader_listener(this)
245 {
246 }
247
248 template <typename I>
249 PoolReplayer<I>::~PoolReplayer()
250 {
251 delete m_asok_hook;
252 shut_down();
253 }
254
255 template <typename I>
256 bool PoolReplayer<I>::is_blacklisted() const {
257 Mutex::Locker locker(m_lock);
258 return m_blacklisted;
259 }
260
261 template <typename I>
262 bool PoolReplayer<I>::is_leader() const {
263 Mutex::Locker locker(m_lock);
264 return m_leader_watcher && m_leader_watcher->is_leader();
265 }
266
267 template <typename I>
268 bool PoolReplayer<I>::is_running() const {
269 return m_pool_replayer_thread.is_started();
270 }
271
272 template <typename I>
273 void PoolReplayer<I>::init()
274 {
275 ceph_assert(!m_pool_replayer_thread.is_started());
276
277 // reset state
278 m_stopping = false;
279 m_blacklisted = false;
280
281 dout(10) << "replaying for " << m_peer << dendl;
282 int r = init_rados(g_ceph_context->_conf->cluster,
283 g_ceph_context->_conf->name.to_str(),
284 "", "", "local cluster", &m_local_rados, false);
285 if (r < 0) {
286 m_callout_id = m_service_daemon->add_or_update_callout(
287 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
288 "unable to connect to local cluster");
289 return;
290 }
291
292 r = init_rados(m_peer.cluster_name, m_peer.client_name,
293 m_peer.mon_host, m_peer.key,
294 std::string("remote peer ") + stringify(m_peer),
295 &m_remote_rados, true);
296 if (r < 0) {
297 m_callout_id = m_service_daemon->add_or_update_callout(
298 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
299 "unable to connect to remote cluster");
300 return;
301 }
302
303 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
304 if (r < 0) {
305 derr << "error accessing local pool " << m_local_pool_id << ": "
306 << cpp_strerror(r) << dendl;
307 return;
308 }
309
310 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
311 librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
312
313 std::string local_mirror_uuid;
314 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
315 &local_mirror_uuid);
316 if (r < 0) {
317 derr << "failed to retrieve local mirror uuid from pool "
318 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
319 m_callout_id = m_service_daemon->add_or_update_callout(
320 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
321 "unable to query local mirror uuid");
322 return;
323 }
324
325 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
326 m_remote_io_ctx);
327 if (r < 0) {
328 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
329 << ": " << cpp_strerror(r) << dendl;
330 m_callout_id = m_service_daemon->add_or_update_callout(
331 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
332 "unable to access remote pool");
333 return;
334 }
335
336 dout(10) << "connected to " << m_peer << dendl;
337
338 m_instance_replayer.reset(InstanceReplayer<I>::create(
339 m_threads, m_service_daemon, m_local_rados, local_mirror_uuid,
340 m_local_pool_id));
341 m_instance_replayer->init();
342 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
343
344 m_instance_watcher.reset(InstanceWatcher<I>::create(
345 m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
346 r = m_instance_watcher->init();
347 if (r < 0) {
348 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
349 m_callout_id = m_service_daemon->add_or_update_callout(
350 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
351 "unable to initialize instance messenger object");
352 return;
353 }
354 m_service_daemon->add_or_update_attribute(
355 m_local_pool_id, SERVICE_DAEMON_INSTANCE_ID_KEY,
356 m_instance_watcher->get_instance_id());
357
358 m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
359 &m_leader_listener));
360 r = m_leader_watcher->init();
361 if (r < 0) {
362 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
363 m_callout_id = m_service_daemon->add_or_update_callout(
364 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
365 "unable to initialize leader messenger object");
366 return;
367 }
368
369 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
370 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
371 m_callout_id = service_daemon::CALLOUT_ID_NONE;
372 }
373
374 m_pool_replayer_thread.create("pool replayer");
375 }
376
377 template <typename I>
378 void PoolReplayer<I>::shut_down() {
379 m_stopping = true;
380 {
381 Mutex::Locker l(m_lock);
382 m_cond.Signal();
383 }
384 if (m_pool_replayer_thread.is_started()) {
385 m_pool_replayer_thread.join();
386 }
387 if (m_leader_watcher) {
388 m_leader_watcher->shut_down();
389 }
390 if (m_instance_watcher) {
391 m_instance_watcher->shut_down();
392 }
393 if (m_instance_replayer) {
394 m_instance_replayer->shut_down();
395 }
396
397 m_leader_watcher.reset();
398 m_instance_watcher.reset();
399 m_instance_replayer.reset();
400
401 ceph_assert(!m_image_map);
402 ceph_assert(!m_image_deleter);
403 ceph_assert(!m_local_pool_watcher);
404 ceph_assert(!m_remote_pool_watcher);
405 m_local_rados.reset();
406 m_remote_rados.reset();
407 }
408
409 template <typename I>
410 int PoolReplayer<I>::init_rados(const std::string &cluster_name,
411 const std::string &client_name,
412 const std::string &mon_host,
413 const std::string &key,
414 const std::string &description,
415 RadosRef *rados_ref,
416 bool strip_cluster_overrides) {
417 rados_ref->reset(new librados::Rados());
418
419 // NOTE: manually bootstrap a CephContext here instead of via
420 // the librados API to avoid mixing global singletons between
421 // the librados shared library and the daemon
422 // TODO: eliminate intermingling of global singletons within Ceph APIs
423 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
424 if (client_name.empty() || !iparams.name.from_str(client_name)) {
425 derr << "error initializing cluster handle for " << description << dendl;
426 return -EINVAL;
427 }
428
429 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
430 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
431 cct->_conf->cluster = cluster_name;
432
433 // librados::Rados::conf_read_file
434 int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
435 if (r < 0 && r != -ENOENT) {
436 derr << "could not read ceph conf for " << description << ": "
437 << cpp_strerror(r) << dendl;
438 cct->put();
439 return r;
440 }
441
442 // preserve cluster-specific config settings before applying environment/cli
443 // overrides
444 std::map<std::string, std::string> config_values;
445 if (strip_cluster_overrides) {
446 // remote peer connections shouldn't apply cluster-specific
447 // configuration settings
448 for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
449 config_values[key] = cct->_conf.get_val<std::string>(key);
450 }
451 }
452
453 cct->_conf.parse_env(cct->get_module_type());
454
455 // librados::Rados::conf_parse_env
456 std::vector<const char*> args;
457 r = cct->_conf.parse_argv(args);
458 if (r < 0) {
459 derr << "could not parse environment for " << description << ":"
460 << cpp_strerror(r) << dendl;
461 cct->put();
462 return r;
463 }
464 cct->_conf.parse_env(cct->get_module_type());
465
466 if (!m_args.empty()) {
467 // librados::Rados::conf_parse_argv
468 args = m_args;
469 r = cct->_conf.parse_argv(args);
470 if (r < 0) {
471 derr << "could not parse command line args for " << description << ": "
472 << cpp_strerror(r) << dendl;
473 cct->put();
474 return r;
475 }
476 }
477
478 if (strip_cluster_overrides) {
479 // remote peer connections shouldn't apply cluster-specific
480 // configuration settings
481 for (auto& pair : config_values) {
482 auto value = cct->_conf.get_val<std::string>(pair.first);
483 if (pair.second != value) {
484 dout(0) << "reverting global config option override: "
485 << pair.first << ": " << value << " -> " << pair.second
486 << dendl;
487 cct->_conf.set_val_or_die(pair.first, pair.second);
488 }
489 }
490 }
491
492 if (!g_ceph_context->_conf->admin_socket.empty()) {
493 cct->_conf.set_val_or_die("admin_socket",
494 "$run_dir/$name.$pid.$cluster.$cctid.asok");
495 }
496
497 if (!mon_host.empty()) {
498 r = cct->_conf.set_val("mon_host", mon_host);
499 if (r < 0) {
500 derr << "failed to set mon_host config for " << description << ": "
501 << cpp_strerror(r) << dendl;
502 cct->put();
503 return r;
504 }
505 }
506
507 if (!key.empty()) {
508 r = cct->_conf.set_val("key", key);
509 if (r < 0) {
510 derr << "failed to set key config for " << description << ": "
511 << cpp_strerror(r) << dendl;
512 cct->put();
513 return r;
514 }
515 }
516
517 // disable unnecessary librbd cache
518 cct->_conf.set_val_or_die("rbd_cache", "false");
519 cct->_conf.apply_changes(nullptr);
520 cct->_conf.complain_about_parse_errors(cct);
521
522 r = (*rados_ref)->init_with_context(cct);
523 ceph_assert(r == 0);
524 cct->put();
525
526 r = (*rados_ref)->connect();
527 if (r < 0) {
528 derr << "error connecting to " << description << ": "
529 << cpp_strerror(r) << dendl;
530 return r;
531 }
532
533 return 0;
534 }
535
536 template <typename I>
537 void PoolReplayer<I>::run()
538 {
539 dout(20) << "enter" << dendl;
540
541 while (!m_stopping) {
542 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
543 m_peer.cluster_name;
544 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
545 m_asok_hook_name = asok_hook_name;
546 delete m_asok_hook;
547
548 m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context,
549 m_asok_hook_name, this);
550 }
551
552 Mutex::Locker locker(m_lock);
553 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
554 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
555 m_blacklisted = true;
556 m_stopping = true;
557 break;
558 }
559
560 if (!m_stopping) {
561 m_cond.WaitInterval(m_lock, utime_t(1, 0));
562 }
563 }
564
565 m_instance_replayer->stop();
566 }
567
568 template <typename I>
569 void PoolReplayer<I>::print_status(Formatter *f, stringstream *ss)
570 {
571 dout(20) << "enter" << dendl;
572
573 if (!f) {
574 return;
575 }
576
577 Mutex::Locker l(m_lock);
578
579 f->open_object_section("pool_replayer_status");
580 f->dump_string("pool", m_local_io_ctx.get_pool_name());
581 f->dump_stream("peer") << m_peer;
582 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
583
584 std::string state("running");
585 if (m_manual_stop) {
586 state = "stopped (manual)";
587 } else if (m_stopping) {
588 state = "stopped";
589 }
590 f->dump_string("state", state);
591
592 std::string leader_instance_id;
593 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
594 f->dump_string("leader_instance_id", leader_instance_id);
595
596 bool leader = m_leader_watcher->is_leader();
597 f->dump_bool("leader", leader);
598 if (leader) {
599 std::vector<std::string> instance_ids;
600 m_leader_watcher->list_instances(&instance_ids);
601 f->open_array_section("instances");
602 for (auto instance_id : instance_ids) {
603 f->dump_string("instance_id", instance_id);
604 }
605 f->close_section();
606 }
607
608 f->dump_string("local_cluster_admin_socket",
609 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf.
610 get_val<std::string>("admin_socket"));
611 f->dump_string("remote_cluster_admin_socket",
612 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf.
613 get_val<std::string>("admin_socket"));
614
615 f->open_object_section("sync_throttler");
616 m_instance_watcher->print_sync_status(f, ss);
617 f->close_section();
618
619 m_instance_replayer->print_status(f, ss);
620
621 if (m_image_deleter) {
622 f->open_object_section("image_deleter");
623 m_image_deleter->print_status(f, ss);
624 f->close_section();
625 }
626
627 f->close_section();
628 f->flush(*ss);
629 }
630
631 template <typename I>
632 void PoolReplayer<I>::start()
633 {
634 dout(20) << "enter" << dendl;
635
636 Mutex::Locker l(m_lock);
637
638 if (m_stopping) {
639 return;
640 }
641
642 m_manual_stop = false;
643 m_instance_replayer->start();
644 }
645
646 template <typename I>
647 void PoolReplayer<I>::stop(bool manual)
648 {
649 dout(20) << "enter: manual=" << manual << dendl;
650
651 Mutex::Locker l(m_lock);
652 if (!manual) {
653 m_stopping = true;
654 m_cond.Signal();
655 return;
656 } else if (m_stopping) {
657 return;
658 }
659
660 m_manual_stop = true;
661 m_instance_replayer->stop();
662 }
663
664 template <typename I>
665 void PoolReplayer<I>::restart()
666 {
667 dout(20) << "enter" << dendl;
668
669 Mutex::Locker l(m_lock);
670
671 if (m_stopping) {
672 return;
673 }
674
675 m_instance_replayer->restart();
676 }
677
678 template <typename I>
679 void PoolReplayer<I>::flush()
680 {
681 dout(20) << "enter" << dendl;
682
683 Mutex::Locker l(m_lock);
684
685 if (m_stopping || m_manual_stop) {
686 return;
687 }
688
689 m_instance_replayer->flush();
690 }
691
692 template <typename I>
693 void PoolReplayer<I>::release_leader()
694 {
695 dout(20) << "enter" << dendl;
696
697 Mutex::Locker l(m_lock);
698
699 if (m_stopping || !m_leader_watcher) {
700 return;
701 }
702
703 m_leader_watcher->release_leader();
704 }
705
706 template <typename I>
707 void PoolReplayer<I>::handle_update(const std::string &mirror_uuid,
708 ImageIds &&added_image_ids,
709 ImageIds &&removed_image_ids) {
710 if (m_stopping) {
711 return;
712 }
713
714 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
715 << "added_count=" << added_image_ids.size() << ", "
716 << "removed_count=" << removed_image_ids.size() << dendl;
717 Mutex::Locker locker(m_lock);
718 if (!m_leader_watcher->is_leader()) {
719 return;
720 }
721
722 m_service_daemon->add_or_update_attribute(
723 m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
724 m_local_pool_watcher->get_image_count());
725 if (m_remote_pool_watcher) {
726 m_service_daemon->add_or_update_attribute(
727 m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
728 m_remote_pool_watcher->get_image_count());
729 }
730
731 std::set<std::string> added_global_image_ids;
732 for (auto& image_id : added_image_ids) {
733 added_global_image_ids.insert(image_id.global_id);
734 }
735
736 std::set<std::string> removed_global_image_ids;
737 for (auto& image_id : removed_image_ids) {
738 removed_global_image_ids.insert(image_id.global_id);
739 }
740
741 m_image_map->update_images(mirror_uuid,
742 std::move(added_global_image_ids),
743 std::move(removed_global_image_ids));
744 }
745
746 template <typename I>
747 void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
748 dout(10) << dendl;
749
750 m_service_daemon->add_or_update_attribute(m_local_pool_id,
751 SERVICE_DAEMON_LEADER_KEY, true);
752 m_instance_watcher->handle_acquire_leader();
753 init_image_map(on_finish);
754 }
755
756 template <typename I>
757 void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
758 dout(10) << dendl;
759
760 m_service_daemon->remove_attribute(m_local_pool_id,
761 SERVICE_DAEMON_LEADER_KEY);
762 m_instance_watcher->handle_release_leader();
763 shut_down_image_deleter(on_finish);
764 }
765
766 template <typename I>
767 void PoolReplayer<I>::init_image_map(Context *on_finish) {
768 dout(5) << dendl;
769
770 Mutex::Locker locker(m_lock);
771 ceph_assert(!m_image_map);
772 m_image_map.reset(ImageMap<I>::create(m_local_io_ctx, m_threads,
773 m_instance_watcher->get_instance_id(),
774 m_image_map_listener));
775
776 auto ctx = new FunctionContext([this, on_finish](int r) {
777 handle_init_image_map(r, on_finish);
778 });
779 m_image_map->init(create_async_context_callback(
780 m_threads->work_queue, ctx));
781 }
782
783 template <typename I>
784 void PoolReplayer<I>::handle_init_image_map(int r, Context *on_finish) {
785 dout(5) << "r=" << r << dendl;
786 if (r < 0) {
787 derr << "failed to init image map: " << cpp_strerror(r) << dendl;
788 on_finish = new FunctionContext([on_finish, r](int) {
789 on_finish->complete(r);
790 });
791 shut_down_image_map(on_finish);
792 return;
793 }
794
795 init_local_pool_watcher(on_finish);
796 }
797
798 template <typename I>
799 void PoolReplayer<I>::init_local_pool_watcher(Context *on_finish) {
800 dout(10) << dendl;
801
802 Mutex::Locker locker(m_lock);
803 ceph_assert(!m_local_pool_watcher);
804 m_local_pool_watcher.reset(PoolWatcher<I>::create(
805 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
806
807 // ensure the initial set of local images is up-to-date
808 // after acquiring the leader role
809 auto ctx = new FunctionContext([this, on_finish](int r) {
810 handle_init_local_pool_watcher(r, on_finish);
811 });
812 m_local_pool_watcher->init(create_async_context_callback(
813 m_threads->work_queue, ctx));
814 }
815
816 template <typename I>
817 void PoolReplayer<I>::handle_init_local_pool_watcher(
818 int r, Context *on_finish) {
819 dout(10) << "r=" << r << dendl;
820 if (r < 0) {
821 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
822 on_finish = new FunctionContext([on_finish, r](int) {
823 on_finish->complete(r);
824 });
825 shut_down_pool_watchers(on_finish);
826 return;
827 }
828
829 init_remote_pool_watcher(on_finish);
830 }
831
832 template <typename I>
833 void PoolReplayer<I>::init_remote_pool_watcher(Context *on_finish) {
834 dout(10) << dendl;
835
836 Mutex::Locker locker(m_lock);
837 ceph_assert(!m_remote_pool_watcher);
838 m_remote_pool_watcher.reset(PoolWatcher<I>::create(
839 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
840
841 auto ctx = new FunctionContext([this, on_finish](int r) {
842 handle_init_remote_pool_watcher(r, on_finish);
843 });
844 m_remote_pool_watcher->init(create_async_context_callback(
845 m_threads->work_queue, ctx));
846 }
847
848 template <typename I>
849 void PoolReplayer<I>::handle_init_remote_pool_watcher(
850 int r, Context *on_finish) {
851 dout(10) << "r=" << r << dendl;
852 if (r == -ENOENT) {
853 // Technically nothing to do since the other side doesn't
854 // have mirroring enabled. Eventually the remote pool watcher will
855 // detect images (if mirroring is enabled), so no point propagating
856 // an error which would just busy-spin the state machines.
857 dout(0) << "remote peer does not have mirroring configured" << dendl;
858 } else if (r < 0) {
859 derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
860 on_finish = new FunctionContext([on_finish, r](int) {
861 on_finish->complete(r);
862 });
863 shut_down_pool_watchers(on_finish);
864 return;
865 }
866
867 init_image_deleter(on_finish);
868 }
869
870 template <typename I>
871 void PoolReplayer<I>::init_image_deleter(Context *on_finish) {
872 dout(10) << dendl;
873
874 Mutex::Locker locker(m_lock);
875 ceph_assert(!m_image_deleter);
876
877 on_finish = new FunctionContext([this, on_finish](int r) {
878 handle_init_image_deleter(r, on_finish);
879 });
880 m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
881 m_service_daemon));
882 m_image_deleter->init(create_async_context_callback(
883 m_threads->work_queue, on_finish));
884 }
885
886 template <typename I>
887 void PoolReplayer<I>::handle_init_image_deleter(int r, Context *on_finish) {
888 dout(10) << "r=" << r << dendl;
889 if (r < 0) {
890 derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
891 on_finish = new FunctionContext([on_finish, r](int) {
892 on_finish->complete(r);
893 });
894 shut_down_image_deleter(on_finish);
895 return;
896 }
897
898 on_finish->complete(0);
899
900 Mutex::Locker locker(m_lock);
901 m_cond.Signal();
902 }
903
904 template <typename I>
905 void PoolReplayer<I>::shut_down_image_deleter(Context* on_finish) {
906 dout(10) << dendl;
907 {
908 Mutex::Locker locker(m_lock);
909 if (m_image_deleter) {
910 Context *ctx = new FunctionContext([this, on_finish](int r) {
911 handle_shut_down_image_deleter(r, on_finish);
912 });
913 ctx = create_async_context_callback(m_threads->work_queue, ctx);
914
915 m_image_deleter->shut_down(ctx);
916 return;
917 }
918 }
919 shut_down_pool_watchers(on_finish);
920 }
921
922 template <typename I>
923 void PoolReplayer<I>::handle_shut_down_image_deleter(
924 int r, Context* on_finish) {
925 dout(10) << "r=" << r << dendl;
926
927 {
928 Mutex::Locker locker(m_lock);
929 ceph_assert(m_image_deleter);
930 m_image_deleter.reset();
931 }
932
933 shut_down_pool_watchers(on_finish);
934 }
935
936 template <typename I>
937 void PoolReplayer<I>::shut_down_pool_watchers(Context *on_finish) {
938 dout(10) << dendl;
939
940 {
941 Mutex::Locker locker(m_lock);
942 if (m_local_pool_watcher) {
943 Context *ctx = new FunctionContext([this, on_finish](int r) {
944 handle_shut_down_pool_watchers(r, on_finish);
945 });
946 ctx = create_async_context_callback(m_threads->work_queue, ctx);
947
948 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
949 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
950 if (m_remote_pool_watcher) {
951 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
952 }
953 gather_ctx->activate();
954 return;
955 }
956 }
957
958 on_finish->complete(0);
959 }
960
961 template <typename I>
962 void PoolReplayer<I>::handle_shut_down_pool_watchers(
963 int r, Context *on_finish) {
964 dout(10) << "r=" << r << dendl;
965
966 {
967 Mutex::Locker locker(m_lock);
968 ceph_assert(m_local_pool_watcher);
969 m_local_pool_watcher.reset();
970
971 if (m_remote_pool_watcher) {
972 m_remote_pool_watcher.reset();
973 }
974 }
975 wait_for_update_ops(on_finish);
976 }
977
978 template <typename I>
979 void PoolReplayer<I>::wait_for_update_ops(Context *on_finish) {
980 dout(10) << dendl;
981
982 Mutex::Locker locker(m_lock);
983
984 Context *ctx = new FunctionContext([this, on_finish](int r) {
985 handle_wait_for_update_ops(r, on_finish);
986 });
987 ctx = create_async_context_callback(m_threads->work_queue, ctx);
988
989 m_update_op_tracker.wait_for_ops(ctx);
990 }
991
992 template <typename I>
993 void PoolReplayer<I>::handle_wait_for_update_ops(int r, Context *on_finish) {
994 dout(10) << "r=" << r << dendl;
995 ceph_assert(r == 0);
996
997 shut_down_image_map(on_finish);
998 }
999
1000 template <typename I>
1001 void PoolReplayer<I>::shut_down_image_map(Context *on_finish) {
1002 dout(5) << dendl;
1003
1004 {
1005 Mutex::Locker locker(m_lock);
1006 if (m_image_map) {
1007 on_finish = new FunctionContext([this, on_finish](int r) {
1008 handle_shut_down_image_map(r, on_finish);
1009 });
1010 m_image_map->shut_down(create_async_context_callback(
1011 m_threads->work_queue, on_finish));
1012 return;
1013 }
1014 }
1015
1016 on_finish->complete(0);
1017 }
1018
1019 template <typename I>
1020 void PoolReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
1021 dout(5) << "r=" << r << dendl;
1022 if (r < 0 && r != -EBLACKLISTED) {
1023 derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
1024 }
1025
1026 Mutex::Locker locker(m_lock);
1027 ceph_assert(m_image_map);
1028 m_image_map.reset();
1029
1030 m_instance_replayer->release_all(on_finish);
1031 }
1032
1033 template <typename I>
1034 void PoolReplayer<I>::handle_update_leader(
1035 const std::string &leader_instance_id) {
1036 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
1037
1038 m_instance_watcher->handle_update_leader(leader_instance_id);
1039 }
1040
1041 template <typename I>
1042 void PoolReplayer<I>::handle_acquire_image(const std::string &global_image_id,
1043 const std::string &instance_id,
1044 Context* on_finish) {
1045 dout(5) << "global_image_id=" << global_image_id << ", "
1046 << "instance_id=" << instance_id << dendl;
1047
1048 m_instance_watcher->notify_image_acquire(instance_id, global_image_id,
1049 on_finish);
1050 }
1051
1052 template <typename I>
1053 void PoolReplayer<I>::handle_release_image(const std::string &global_image_id,
1054 const std::string &instance_id,
1055 Context* on_finish) {
1056 dout(5) << "global_image_id=" << global_image_id << ", "
1057 << "instance_id=" << instance_id << dendl;
1058
1059 m_instance_watcher->notify_image_release(instance_id, global_image_id,
1060 on_finish);
1061 }
1062
1063 template <typename I>
1064 void PoolReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
1065 const std::string &global_image_id,
1066 const std::string &instance_id,
1067 Context* on_finish) {
1068 ceph_assert(!mirror_uuid.empty());
1069 dout(5) << "mirror_uuid=" << mirror_uuid << ", "
1070 << "global_image_id=" << global_image_id << ", "
1071 << "instance_id=" << instance_id << dendl;
1072
1073 m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id,
1074 mirror_uuid, on_finish);
1075 }
1076
1077 template <typename I>
1078 void PoolReplayer<I>::handle_instances_added(const InstanceIds &instance_ids) {
1079 dout(5) << "instance_ids=" << instance_ids << dendl;
1080 Mutex::Locker locker(m_lock);
1081 if (!m_leader_watcher->is_leader()) {
1082 return;
1083 }
1084
1085 ceph_assert(m_image_map);
1086 m_image_map->update_instances_added(instance_ids);
1087 }
1088
1089 template <typename I>
1090 void PoolReplayer<I>::handle_instances_removed(
1091 const InstanceIds &instance_ids) {
1092 dout(5) << "instance_ids=" << instance_ids << dendl;
1093 Mutex::Locker locker(m_lock);
1094 if (!m_leader_watcher->is_leader()) {
1095 return;
1096 }
1097
1098 ceph_assert(m_image_map);
1099 m_image_map->update_instances_removed(instance_ids);
1100 }
1101
1102 } // namespace mirror
1103 } // namespace rbd
1104
1105 template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;