]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to v12.1.2
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "PoolReplayer.h"
5#include <boost/bind.hpp>
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/ceph_argparse.h"
9#include "common/code_environment.h"
10#include "common/common_init.h"
11#include "common/debug.h"
12#include "common/errno.h"
13#include "include/stringify.h"
14#include "cls/rbd/cls_rbd_client.h"
15#include "global/global_context.h"
16#include "librbd/internal.h"
17#include "librbd/Utils.h"
18#include "librbd/Watcher.h"
19#include "librbd/api/Mirror.h"
20#include "InstanceReplayer.h"
21#include "InstanceWatcher.h"
22#include "LeaderWatcher.h"
c07f9fc5 23#include "ServiceDaemon.h"
7c673cae
FG
24#include "Threads.h"
25
26#define dout_context g_ceph_context
27#define dout_subsys ceph_subsys_rbd_mirror
28#undef dout_prefix
29#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30 << this << " " << __func__ << ": "
31
32using std::chrono::seconds;
33using std::map;
34using std::string;
35using std::unique_ptr;
36using std::vector;
37
38using librbd::cls_client::dir_get_name;
39using librbd::util::create_async_context_callback;
40
41namespace rbd {
42namespace mirror {
43
44namespace {
45
c07f9fc5
FG
46const std::string SERVICE_DAEMON_LEADER_KEY("leader");
47const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
49
7c673cae
FG
50class PoolReplayerAdminSocketCommand {
51public:
52 PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
53 : pool_replayer(pool_replayer) {
54 }
55 virtual ~PoolReplayerAdminSocketCommand() {}
56 virtual bool call(Formatter *f, stringstream *ss) = 0;
57protected:
58 PoolReplayer *pool_replayer;
59};
60
61class StatusCommand : public PoolReplayerAdminSocketCommand {
62public:
63 explicit StatusCommand(PoolReplayer *pool_replayer)
64 : PoolReplayerAdminSocketCommand(pool_replayer) {
65 }
66
67 bool call(Formatter *f, stringstream *ss) override {
68 pool_replayer->print_status(f, ss);
69 return true;
70 }
71};
72
73class StartCommand : public PoolReplayerAdminSocketCommand {
74public:
75 explicit StartCommand(PoolReplayer *pool_replayer)
76 : PoolReplayerAdminSocketCommand(pool_replayer) {
77 }
78
79 bool call(Formatter *f, stringstream *ss) override {
80 pool_replayer->start();
81 return true;
82 }
83};
84
85class StopCommand : public PoolReplayerAdminSocketCommand {
86public:
87 explicit StopCommand(PoolReplayer *pool_replayer)
88 : PoolReplayerAdminSocketCommand(pool_replayer) {
89 }
90
91 bool call(Formatter *f, stringstream *ss) override {
92 pool_replayer->stop(true);
93 return true;
94 }
95};
96
97class RestartCommand : public PoolReplayerAdminSocketCommand {
98public:
99 explicit RestartCommand(PoolReplayer *pool_replayer)
100 : PoolReplayerAdminSocketCommand(pool_replayer) {
101 }
102
103 bool call(Formatter *f, stringstream *ss) override {
104 pool_replayer->restart();
105 return true;
106 }
107};
108
109class FlushCommand : public PoolReplayerAdminSocketCommand {
110public:
111 explicit FlushCommand(PoolReplayer *pool_replayer)
112 : PoolReplayerAdminSocketCommand(pool_replayer) {
113 }
114
115 bool call(Formatter *f, stringstream *ss) override {
116 pool_replayer->flush();
117 return true;
118 }
119};
120
121class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
122public:
123 explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
124 : PoolReplayerAdminSocketCommand(pool_replayer) {
125 }
126
127 bool call(Formatter *f, stringstream *ss) override {
128 pool_replayer->release_leader();
129 return true;
130 }
131};
132
133class PoolReplayerAdminSocketHook : public AdminSocketHook {
134public:
135 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
136 PoolReplayer *pool_replayer)
137 : admin_socket(cct->get_admin_socket()) {
138 std::string command;
139 int r;
140
141 command = "rbd mirror status " + name;
142 r = admin_socket->register_command(command, command, this,
143 "get status for rbd mirror " + name);
144 if (r == 0) {
145 commands[command] = new StatusCommand(pool_replayer);
146 }
147
148 command = "rbd mirror start " + name;
149 r = admin_socket->register_command(command, command, this,
150 "start rbd mirror " + name);
151 if (r == 0) {
152 commands[command] = new StartCommand(pool_replayer);
153 }
154
155 command = "rbd mirror stop " + name;
156 r = admin_socket->register_command(command, command, this,
157 "stop rbd mirror " + name);
158 if (r == 0) {
159 commands[command] = new StopCommand(pool_replayer);
160 }
161
162 command = "rbd mirror restart " + name;
163 r = admin_socket->register_command(command, command, this,
164 "restart rbd mirror " + name);
165 if (r == 0) {
166 commands[command] = new RestartCommand(pool_replayer);
167 }
168
169 command = "rbd mirror flush " + name;
170 r = admin_socket->register_command(command, command, this,
171 "flush rbd mirror " + name);
172 if (r == 0) {
173 commands[command] = new FlushCommand(pool_replayer);
174 }
175
176 command = "rbd mirror leader release " + name;
177 r = admin_socket->register_command(command, command, this,
178 "release rbd mirror leader " + name);
179 if (r == 0) {
180 commands[command] = new LeaderReleaseCommand(pool_replayer);
181 }
182 }
183
184 ~PoolReplayerAdminSocketHook() override {
185 for (Commands::const_iterator i = commands.begin(); i != commands.end();
186 ++i) {
187 (void)admin_socket->unregister_command(i->first);
188 delete i->second;
189 }
190 }
191
192 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
193 bufferlist& out) override {
194 Commands::const_iterator i = commands.find(command);
195 assert(i != commands.end());
196 Formatter *f = Formatter::create(format);
197 stringstream ss;
198 bool r = i->second->call(f, &ss);
199 delete f;
200 out.append(ss);
201 return r;
202 }
203
204private:
205 typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
206
207 AdminSocket *admin_socket;
208 Commands commands;
209};
210
211} // anonymous namespace
212
213PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
c07f9fc5
FG
214 ServiceDaemon<librbd::ImageCtx>* service_daemon,
215 ImageDeleter<>* image_deleter,
7c673cae
FG
216 int64_t local_pool_id, const peer_t &peer,
217 const std::vector<const char*> &args) :
218 m_threads(threads),
c07f9fc5 219 m_service_daemon(service_daemon),
7c673cae 220 m_image_deleter(image_deleter),
c07f9fc5 221 m_local_pool_id(local_pool_id),
7c673cae
FG
222 m_peer(peer),
223 m_args(args),
c07f9fc5 224 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
7c673cae
FG
225 m_local_pool_watcher_listener(this, true),
226 m_remote_pool_watcher_listener(this, false),
7c673cae
FG
227 m_pool_replayer_thread(this),
228 m_leader_listener(this)
229{
230}
231
232PoolReplayer::~PoolReplayer()
233{
234 delete m_asok_hook;
c07f9fc5 235 shut_down();
7c673cae
FG
236}
237
238bool PoolReplayer::is_blacklisted() const {
239 Mutex::Locker locker(m_lock);
240 return m_blacklisted;
241}
242
243bool PoolReplayer::is_leader() const {
244 Mutex::Locker locker(m_lock);
245 return m_leader_watcher && m_leader_watcher->is_leader();
246}
247
c07f9fc5
FG
248bool PoolReplayer::is_running() const {
249 return m_pool_replayer_thread.is_started();
250}
251
252void PoolReplayer::init()
7c673cae 253{
c07f9fc5
FG
254 assert(!m_pool_replayer_thread.is_started());
255
256 // reset state
257 m_stopping = false;
258 m_blacklisted = false;
7c673cae 259
c07f9fc5 260 dout(20) << "replaying for " << m_peer << dendl;
7c673cae
FG
261 int r = init_rados(g_ceph_context->_conf->cluster,
262 g_ceph_context->_conf->name.to_str(),
263 "local cluster", &m_local_rados);
264 if (r < 0) {
c07f9fc5
FG
265 m_callout_id = m_service_daemon->add_or_update_callout(
266 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
267 "unable to connect to local cluster");
268 return;
7c673cae
FG
269 }
270
271 r = init_rados(m_peer.cluster_name, m_peer.client_name,
272 std::string("remote peer ") + stringify(m_peer),
273 &m_remote_rados);
274 if (r < 0) {
c07f9fc5
FG
275 m_callout_id = m_service_daemon->add_or_update_callout(
276 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
277 "unable to connect to remote cluster");
278 return;
7c673cae
FG
279 }
280
281 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
282 if (r < 0) {
283 derr << "error accessing local pool " << m_local_pool_id << ": "
284 << cpp_strerror(r) << dendl;
c07f9fc5 285 return;
7c673cae
FG
286 }
287
288 std::string local_mirror_uuid;
289 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
290 &local_mirror_uuid);
291 if (r < 0) {
292 derr << "failed to retrieve local mirror uuid from pool "
293 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
c07f9fc5
FG
294 m_callout_id = m_service_daemon->add_or_update_callout(
295 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
296 "unable to query local mirror uuid");
297 return;
7c673cae
FG
298 }
299
300 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
301 m_remote_io_ctx);
302 if (r < 0) {
303 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
304 << ": " << cpp_strerror(r) << dendl;
c07f9fc5
FG
305 m_callout_id = m_service_daemon->add_or_update_callout(
306 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
307 "unable to access remote pool");
308 return;
7c673cae
FG
309 }
310
311 dout(20) << "connected to " << m_peer << dendl;
312
c07f9fc5
FG
313 m_instance_replayer.reset(InstanceReplayer<>::create(
314 m_threads, m_service_daemon, m_image_deleter, m_local_rados,
315 local_mirror_uuid, m_local_pool_id));
7c673cae
FG
316 m_instance_replayer->init();
317 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
318
c07f9fc5
FG
319 m_instance_watcher.reset(InstanceWatcher<>::create(
320 m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
7c673cae
FG
321 r = m_instance_watcher->init();
322 if (r < 0) {
323 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
c07f9fc5
FG
324 m_callout_id = m_service_daemon->add_or_update_callout(
325 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
326 "unable to initialize instance messenger object");
327 return;
7c673cae
FG
328 }
329
330 m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
331 &m_leader_listener));
332 r = m_leader_watcher->init();
333 if (r < 0) {
334 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
c07f9fc5
FG
335 m_callout_id = m_service_daemon->add_or_update_callout(
336 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
337 "unable to initialize leader messenger object");
338 return;
339 }
340
341 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
342 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
343 m_callout_id = service_daemon::CALLOUT_ID_NONE;
7c673cae
FG
344 }
345
346 m_pool_replayer_thread.create("pool replayer");
c07f9fc5 347}
7c673cae 348
c07f9fc5
FG
349void PoolReplayer::shut_down() {
350 m_stopping = true;
351 {
352 Mutex::Locker l(m_lock);
353 m_cond.Signal();
354 }
355 if (m_pool_replayer_thread.is_started()) {
356 m_pool_replayer_thread.join();
357 }
358 if (m_leader_watcher) {
359 m_leader_watcher->shut_down();
360 }
361 if (m_instance_watcher) {
362 m_instance_watcher->shut_down();
363 }
364 if (m_instance_replayer) {
365 m_instance_replayer->shut_down();
366 }
367
368 assert(!m_local_pool_watcher);
369 assert(!m_remote_pool_watcher);
370 m_local_rados.reset();
371 m_remote_rados.reset();
7c673cae
FG
372}
373
374int PoolReplayer::init_rados(const std::string &cluster_name,
375 const std::string &client_name,
376 const std::string &description,
377 RadosRef *rados_ref) {
378 rados_ref->reset(new librados::Rados());
379
380 // NOTE: manually bootstrap a CephContext here instead of via
381 // the librados API to avoid mixing global singletons between
382 // the librados shared library and the daemon
383 // TODO: eliminate intermingling of global singletons within Ceph APIs
384 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
385 if (client_name.empty() || !iparams.name.from_str(client_name)) {
386 derr << "error initializing cluster handle for " << description << dendl;
387 return -EINVAL;
388 }
389
390 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
391 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
392 cct->_conf->cluster = cluster_name;
393
394 // librados::Rados::conf_read_file
395 int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
396 if (r < 0) {
397 derr << "could not read ceph conf for " << description << ": "
398 << cpp_strerror(r) << dendl;
399 cct->put();
400 return r;
401 }
402 cct->_conf->parse_env();
403
404 // librados::Rados::conf_parse_env
405 std::vector<const char*> args;
406 env_to_vec(args, nullptr);
407 r = cct->_conf->parse_argv(args);
408 if (r < 0) {
409 derr << "could not parse environment for " << description << ":"
410 << cpp_strerror(r) << dendl;
411 cct->put();
412 return r;
413 }
414
415 if (!m_args.empty()) {
416 // librados::Rados::conf_parse_argv
417 args = m_args;
418 r = cct->_conf->parse_argv(args);
419 if (r < 0) {
420 derr << "could not parse command line args for " << description << ": "
421 << cpp_strerror(r) << dendl;
422 cct->put();
423 return r;
424 }
425 }
426
427 if (!g_ceph_context->_conf->admin_socket.empty()) {
428 cct->_conf->set_val_or_die("admin_socket",
429 "$run_dir/$name.$pid.$cluster.$cctid.asok");
430 }
431
432 // disable unnecessary librbd cache
433 cct->_conf->set_val_or_die("rbd_cache", "false");
434 cct->_conf->apply_changes(nullptr);
435 cct->_conf->complain_about_parse_errors(cct);
436
437 r = (*rados_ref)->init_with_context(cct);
438 assert(r == 0);
439 cct->put();
440
441 r = (*rados_ref)->connect();
442 if (r < 0) {
443 derr << "error connecting to " << description << ": "
444 << cpp_strerror(r) << dendl;
445 return r;
446 }
447
448 return 0;
449}
450
451void PoolReplayer::run()
452{
453 dout(20) << "enter" << dendl;
454
455 while (!m_stopping) {
456 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
457 m_peer.cluster_name;
458 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
459 m_asok_hook_name = asok_hook_name;
460 delete m_asok_hook;
461
462 m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
463 m_asok_hook_name, this);
464 }
465
466 Mutex::Locker locker(m_lock);
467 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
468 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
469 m_blacklisted = true;
470 m_stopping = true;
471 break;
472 }
473
474 m_cond.WaitInterval(m_lock, utime_t(1, 0));
475 }
476}
477
478void PoolReplayer::print_status(Formatter *f, stringstream *ss)
479{
480 dout(20) << "enter" << dendl;
481
482 if (!f) {
483 return;
484 }
485
486 Mutex::Locker l(m_lock);
487
488 f->open_object_section("pool_replayer_status");
489 f->dump_string("pool", m_local_io_ctx.get_pool_name());
490 f->dump_stream("peer") << m_peer;
491 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
492
493 std::string leader_instance_id;
494 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
495 f->dump_string("leader_instance_id", leader_instance_id);
496
497 bool leader = m_leader_watcher->is_leader();
498 f->dump_bool("leader", leader);
499 if (leader) {
500 std::vector<std::string> instance_ids;
501 m_leader_watcher->list_instances(&instance_ids);
502 f->open_array_section("instances");
503 for (auto instance_id : instance_ids) {
504 f->dump_string("instance_id", instance_id);
505 }
506 f->close_section();
507 }
508
509 f->dump_string("local_cluster_admin_socket",
510 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
511 admin_socket);
512 f->dump_string("remote_cluster_admin_socket",
513 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
514 admin_socket);
515
31f18b77
FG
516 f->open_object_section("sync_throttler");
517 m_instance_watcher->print_sync_status(f, ss);
518 f->close_section();
519
7c673cae
FG
520 m_instance_replayer->print_status(f, ss);
521
522 f->close_section();
523 f->flush(*ss);
524}
525
526void PoolReplayer::start()
527{
528 dout(20) << "enter" << dendl;
529
530 Mutex::Locker l(m_lock);
531
532 if (m_stopping) {
533 return;
534 }
535
536 m_instance_replayer->start();
537}
538
539void PoolReplayer::stop(bool manual)
540{
541 dout(20) << "enter: manual=" << manual << dendl;
542
543 Mutex::Locker l(m_lock);
544 if (!manual) {
545 m_stopping = true;
546 m_cond.Signal();
547 return;
548 } else if (m_stopping) {
549 return;
550 }
551
552 m_instance_replayer->stop();
553}
554
555void PoolReplayer::restart()
556{
557 dout(20) << "enter" << dendl;
558
559 Mutex::Locker l(m_lock);
560
561 if (m_stopping) {
562 return;
563 }
564
565 m_instance_replayer->restart();
566}
567
568void PoolReplayer::flush()
569{
570 dout(20) << "enter" << dendl;
571
572 Mutex::Locker l(m_lock);
573
574 if (m_stopping || m_manual_stop) {
575 return;
576 }
577
578 m_instance_replayer->flush();
579}
580
581void PoolReplayer::release_leader()
582{
583 dout(20) << "enter" << dendl;
584
585 Mutex::Locker l(m_lock);
586
587 if (m_stopping || !m_leader_watcher) {
588 return;
589 }
590
591 m_leader_watcher->release_leader();
592}
593
594void PoolReplayer::handle_update(const std::string &mirror_uuid,
595 ImageIds &&added_image_ids,
596 ImageIds &&removed_image_ids) {
597 if (m_stopping) {
598 return;
599 }
600
601 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
602 << "added_count=" << added_image_ids.size() << ", "
603 << "removed_count=" << removed_image_ids.size() << dendl;
604 Mutex::Locker locker(m_lock);
605 if (!m_leader_watcher->is_leader()) {
606 return;
607 }
608
c07f9fc5
FG
609 m_service_daemon->add_or_update_attribute(
610 m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
611 m_local_pool_watcher->get_image_count());
612 if (m_remote_pool_watcher) {
613 m_service_daemon->add_or_update_attribute(
614 m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
615 m_remote_pool_watcher->get_image_count());
616 }
617
618 std::string removed_remote_peer_id;
619 ImageIds removed_remote_image_ids;
7c673cae
FG
620 if (m_initial_mirror_image_ids.find(mirror_uuid) ==
621 m_initial_mirror_image_ids.end() &&
622 m_initial_mirror_image_ids.size() < 2) {
623 m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
624
625 if (m_initial_mirror_image_ids.size() == 2) {
626 dout(10) << "local and remote pools refreshed" << dendl;
627
628 // both local and remote initial pool listing received. derive
629 // removal notifications for the remote pool
630 auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
631 auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
c07f9fc5 632 removed_remote_peer_id = m_initial_mirror_image_ids.rbegin()->first;
7c673cae
FG
633 for (auto &local_image_id : local_image_ids) {
634 if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
c07f9fc5 635 removed_remote_image_ids.emplace(local_image_id.global_id, "");
7c673cae
FG
636 }
637 }
638 local_image_ids.clear();
639 remote_image_ids.clear();
640 }
641 }
642
643 if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
644 m_instance_replayer->remove_peer(m_peer.uuid);
645 m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
646 m_peer.uuid = mirror_uuid;
647 }
648
649 m_update_op_tracker.start_op();
650 Context *ctx = new FunctionContext([this](int r) {
651 dout(20) << "complete handle_update: r=" << r << dendl;
652 m_update_op_tracker.finish_op();
653 });
654
655 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
656
657 for (auto &image_id : added_image_ids) {
658 // for now always send to myself (the leader)
659 std::string &instance_id = m_instance_watcher->get_instance_id();
660 m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
661 mirror_uuid, image_id.id,
662 gather_ctx->new_sub());
663 }
664
665 for (auto &image_id : removed_image_ids) {
666 // for now always send to myself (the leader)
667 std::string &instance_id = m_instance_watcher->get_instance_id();
668 m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
669 mirror_uuid, image_id.id, true,
670 gather_ctx->new_sub());
671 }
672
c07f9fc5
FG
673 // derived removal events for remote after initial image listing
674 for (auto& image_id : removed_remote_image_ids) {
675 // for now always send to myself (the leader)
676 std::string &instance_id = m_instance_watcher->get_instance_id();
677 m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
678 removed_remote_peer_id,
679 image_id.id, true,
680 gather_ctx->new_sub());
681 }
682
7c673cae
FG
683 gather_ctx->activate();
684}
685
686void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
687 dout(20) << dendl;
31f18b77 688
c07f9fc5
FG
689 m_service_daemon->add_or_update_attribute(m_local_pool_id,
690 SERVICE_DAEMON_LEADER_KEY, true);
31f18b77 691 m_instance_watcher->handle_acquire_leader();
7c673cae
FG
692 init_local_pool_watcher(on_finish);
693}
694
695void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
696 dout(20) << dendl;
31f18b77 697
c07f9fc5 698 m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
31f18b77 699 m_instance_watcher->handle_release_leader();
7c673cae
FG
700 shut_down_pool_watchers(on_finish);
701}
702
703void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
704 dout(20) << dendl;
705
706 Mutex::Locker locker(m_lock);
707 assert(!m_local_pool_watcher);
708 m_local_pool_watcher.reset(new PoolWatcher<>(
709 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
710 m_initial_mirror_image_ids.clear();
711
712 // ensure the initial set of local images is up-to-date
713 // after acquiring the leader role
714 auto ctx = new FunctionContext([this, on_finish](int r) {
715 handle_init_local_pool_watcher(r, on_finish);
716 });
717 m_local_pool_watcher->init(create_async_context_callback(
718 m_threads->work_queue, ctx));
719}
720
721void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
722 dout(20) << "r=" << r << dendl;
723 if (r < 0) {
724 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
725 on_finish->complete(r);
726 return;
727 }
728
729 init_remote_pool_watcher(on_finish);
730}
731
732void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
733 dout(20) << dendl;
734
735 Mutex::Locker locker(m_lock);
736 assert(!m_remote_pool_watcher);
737 m_remote_pool_watcher.reset(new PoolWatcher<>(
738 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
739 m_remote_pool_watcher->init(create_async_context_callback(
740 m_threads->work_queue, on_finish));
741
742 m_cond.Signal();
743}
744
745void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
746 dout(20) << dendl;
747
748 {
749 Mutex::Locker locker(m_lock);
750 if (m_local_pool_watcher) {
751 Context *ctx = new FunctionContext([this, on_finish](int r) {
752 handle_shut_down_pool_watchers(r, on_finish);
753 });
754 ctx = create_async_context_callback(m_threads->work_queue, ctx);
755
756 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
757 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
758 if (m_remote_pool_watcher) {
759 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
760 }
761 gather_ctx->activate();
762 return;
763 }
764 }
765
766 on_finish->complete(0);
767}
768
769void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
770 dout(20) << "r=" << r << dendl;
771
772 {
773 Mutex::Locker locker(m_lock);
774 assert(m_local_pool_watcher);
775 m_local_pool_watcher.reset();
776
777 if (m_remote_pool_watcher) {
778 m_remote_pool_watcher.reset();
779 }
780 }
781 wait_for_update_ops(on_finish);
782}
783
784void PoolReplayer::wait_for_update_ops(Context *on_finish) {
785 dout(20) << dendl;
786
787 Mutex::Locker locker(m_lock);
788
789 Context *ctx = new FunctionContext([this, on_finish](int r) {
790 handle_wait_for_update_ops(r, on_finish);
791 });
792 ctx = create_async_context_callback(m_threads->work_queue, ctx);
793
794 m_update_op_tracker.wait_for_ops(ctx);
795}
796
797void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
798 dout(20) << "r=" << r << dendl;
799
800 assert(r == 0);
801
802 Mutex::Locker locker(m_lock);
803 m_instance_replayer->release_all(on_finish);
804}
805
31f18b77
FG
806void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
807 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
808
809 m_instance_watcher->handle_update_leader(leader_instance_id);
810}
811
7c673cae
FG
812} // namespace mirror
813} // namespace rbd