]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "PoolReplayer.h"
5#include <boost/bind.hpp>
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/ceph_argparse.h"
9#include "common/code_environment.h"
10#include "common/common_init.h"
11#include "common/debug.h"
12#include "common/errno.h"
13#include "include/stringify.h"
14#include "cls/rbd/cls_rbd_client.h"
15#include "global/global_context.h"
16#include "librbd/internal.h"
17#include "librbd/Utils.h"
18#include "librbd/Watcher.h"
19#include "librbd/api/Mirror.h"
20#include "InstanceReplayer.h"
21#include "InstanceWatcher.h"
22#include "LeaderWatcher.h"
c07f9fc5 23#include "ServiceDaemon.h"
7c673cae
FG
24#include "Threads.h"
25
26#define dout_context g_ceph_context
27#define dout_subsys ceph_subsys_rbd_mirror
28#undef dout_prefix
29#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30 << this << " " << __func__ << ": "
31
32using std::chrono::seconds;
33using std::map;
34using std::string;
35using std::unique_ptr;
36using std::vector;
37
38using librbd::cls_client::dir_get_name;
39using librbd::util::create_async_context_callback;
40
41namespace rbd {
42namespace mirror {
43
44namespace {
45
c07f9fc5
FG
46const std::string SERVICE_DAEMON_LEADER_KEY("leader");
47const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
49
3efd9988
FG
50const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
51 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
52
7c673cae
FG
53class PoolReplayerAdminSocketCommand {
54public:
55 PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
56 : pool_replayer(pool_replayer) {
57 }
58 virtual ~PoolReplayerAdminSocketCommand() {}
59 virtual bool call(Formatter *f, stringstream *ss) = 0;
60protected:
61 PoolReplayer *pool_replayer;
62};
63
64class StatusCommand : public PoolReplayerAdminSocketCommand {
65public:
66 explicit StatusCommand(PoolReplayer *pool_replayer)
67 : PoolReplayerAdminSocketCommand(pool_replayer) {
68 }
69
70 bool call(Formatter *f, stringstream *ss) override {
71 pool_replayer->print_status(f, ss);
72 return true;
73 }
74};
75
76class StartCommand : public PoolReplayerAdminSocketCommand {
77public:
78 explicit StartCommand(PoolReplayer *pool_replayer)
79 : PoolReplayerAdminSocketCommand(pool_replayer) {
80 }
81
82 bool call(Formatter *f, stringstream *ss) override {
83 pool_replayer->start();
84 return true;
85 }
86};
87
88class StopCommand : public PoolReplayerAdminSocketCommand {
89public:
90 explicit StopCommand(PoolReplayer *pool_replayer)
91 : PoolReplayerAdminSocketCommand(pool_replayer) {
92 }
93
94 bool call(Formatter *f, stringstream *ss) override {
95 pool_replayer->stop(true);
96 return true;
97 }
98};
99
100class RestartCommand : public PoolReplayerAdminSocketCommand {
101public:
102 explicit RestartCommand(PoolReplayer *pool_replayer)
103 : PoolReplayerAdminSocketCommand(pool_replayer) {
104 }
105
106 bool call(Formatter *f, stringstream *ss) override {
107 pool_replayer->restart();
108 return true;
109 }
110};
111
112class FlushCommand : public PoolReplayerAdminSocketCommand {
113public:
114 explicit FlushCommand(PoolReplayer *pool_replayer)
115 : PoolReplayerAdminSocketCommand(pool_replayer) {
116 }
117
118 bool call(Formatter *f, stringstream *ss) override {
119 pool_replayer->flush();
120 return true;
121 }
122};
123
124class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
125public:
126 explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
127 : PoolReplayerAdminSocketCommand(pool_replayer) {
128 }
129
130 bool call(Formatter *f, stringstream *ss) override {
131 pool_replayer->release_leader();
132 return true;
133 }
134};
135
136class PoolReplayerAdminSocketHook : public AdminSocketHook {
137public:
138 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
139 PoolReplayer *pool_replayer)
140 : admin_socket(cct->get_admin_socket()) {
141 std::string command;
142 int r;
143
144 command = "rbd mirror status " + name;
145 r = admin_socket->register_command(command, command, this,
146 "get status for rbd mirror " + name);
147 if (r == 0) {
148 commands[command] = new StatusCommand(pool_replayer);
149 }
150
151 command = "rbd mirror start " + name;
152 r = admin_socket->register_command(command, command, this,
153 "start rbd mirror " + name);
154 if (r == 0) {
155 commands[command] = new StartCommand(pool_replayer);
156 }
157
158 command = "rbd mirror stop " + name;
159 r = admin_socket->register_command(command, command, this,
160 "stop rbd mirror " + name);
161 if (r == 0) {
162 commands[command] = new StopCommand(pool_replayer);
163 }
164
165 command = "rbd mirror restart " + name;
166 r = admin_socket->register_command(command, command, this,
167 "restart rbd mirror " + name);
168 if (r == 0) {
169 commands[command] = new RestartCommand(pool_replayer);
170 }
171
172 command = "rbd mirror flush " + name;
173 r = admin_socket->register_command(command, command, this,
174 "flush rbd mirror " + name);
175 if (r == 0) {
176 commands[command] = new FlushCommand(pool_replayer);
177 }
178
179 command = "rbd mirror leader release " + name;
180 r = admin_socket->register_command(command, command, this,
181 "release rbd mirror leader " + name);
182 if (r == 0) {
183 commands[command] = new LeaderReleaseCommand(pool_replayer);
184 }
185 }
186
187 ~PoolReplayerAdminSocketHook() override {
188 for (Commands::const_iterator i = commands.begin(); i != commands.end();
189 ++i) {
190 (void)admin_socket->unregister_command(i->first);
191 delete i->second;
192 }
193 }
194
195 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
196 bufferlist& out) override {
197 Commands::const_iterator i = commands.find(command);
198 assert(i != commands.end());
199 Formatter *f = Formatter::create(format);
200 stringstream ss;
201 bool r = i->second->call(f, &ss);
202 delete f;
203 out.append(ss);
204 return r;
205 }
206
207private:
208 typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
209
210 AdminSocket *admin_socket;
211 Commands commands;
212};
213
214} // anonymous namespace
215
216PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
c07f9fc5
FG
217 ServiceDaemon<librbd::ImageCtx>* service_daemon,
218 ImageDeleter<>* image_deleter,
7c673cae
FG
219 int64_t local_pool_id, const peer_t &peer,
220 const std::vector<const char*> &args) :
221 m_threads(threads),
c07f9fc5 222 m_service_daemon(service_daemon),
7c673cae 223 m_image_deleter(image_deleter),
c07f9fc5 224 m_local_pool_id(local_pool_id),
7c673cae
FG
225 m_peer(peer),
226 m_args(args),
c07f9fc5 227 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
7c673cae
FG
228 m_local_pool_watcher_listener(this, true),
229 m_remote_pool_watcher_listener(this, false),
7c673cae
FG
230 m_pool_replayer_thread(this),
231 m_leader_listener(this)
232{
233}
234
235PoolReplayer::~PoolReplayer()
236{
237 delete m_asok_hook;
c07f9fc5 238 shut_down();
7c673cae
FG
239}
240
241bool PoolReplayer::is_blacklisted() const {
242 Mutex::Locker locker(m_lock);
243 return m_blacklisted;
244}
245
246bool PoolReplayer::is_leader() const {
247 Mutex::Locker locker(m_lock);
248 return m_leader_watcher && m_leader_watcher->is_leader();
249}
250
c07f9fc5
FG
251bool PoolReplayer::is_running() const {
252 return m_pool_replayer_thread.is_started();
253}
254
255void PoolReplayer::init()
7c673cae 256{
c07f9fc5
FG
257 assert(!m_pool_replayer_thread.is_started());
258
259 // reset state
260 m_stopping = false;
261 m_blacklisted = false;
7c673cae 262
c07f9fc5 263 dout(20) << "replaying for " << m_peer << dendl;
7c673cae
FG
264 int r = init_rados(g_ceph_context->_conf->cluster,
265 g_ceph_context->_conf->name.to_str(),
3efd9988 266 "local cluster", &m_local_rados, false);
7c673cae 267 if (r < 0) {
c07f9fc5
FG
268 m_callout_id = m_service_daemon->add_or_update_callout(
269 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
270 "unable to connect to local cluster");
271 return;
7c673cae
FG
272 }
273
274 r = init_rados(m_peer.cluster_name, m_peer.client_name,
275 std::string("remote peer ") + stringify(m_peer),
3efd9988 276 &m_remote_rados, true);
7c673cae 277 if (r < 0) {
c07f9fc5
FG
278 m_callout_id = m_service_daemon->add_or_update_callout(
279 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
280 "unable to connect to remote cluster");
281 return;
7c673cae
FG
282 }
283
284 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
285 if (r < 0) {
286 derr << "error accessing local pool " << m_local_pool_id << ": "
287 << cpp_strerror(r) << dendl;
c07f9fc5 288 return;
7c673cae
FG
289 }
290
291 std::string local_mirror_uuid;
292 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
293 &local_mirror_uuid);
294 if (r < 0) {
295 derr << "failed to retrieve local mirror uuid from pool "
296 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
c07f9fc5
FG
297 m_callout_id = m_service_daemon->add_or_update_callout(
298 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
299 "unable to query local mirror uuid");
300 return;
7c673cae
FG
301 }
302
303 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
304 m_remote_io_ctx);
305 if (r < 0) {
306 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
307 << ": " << cpp_strerror(r) << dendl;
c07f9fc5
FG
308 m_callout_id = m_service_daemon->add_or_update_callout(
309 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
310 "unable to access remote pool");
311 return;
7c673cae
FG
312 }
313
314 dout(20) << "connected to " << m_peer << dendl;
315
c07f9fc5
FG
316 m_instance_replayer.reset(InstanceReplayer<>::create(
317 m_threads, m_service_daemon, m_image_deleter, m_local_rados,
318 local_mirror_uuid, m_local_pool_id));
7c673cae
FG
319 m_instance_replayer->init();
320 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
321
c07f9fc5
FG
322 m_instance_watcher.reset(InstanceWatcher<>::create(
323 m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
7c673cae
FG
324 r = m_instance_watcher->init();
325 if (r < 0) {
326 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
c07f9fc5
FG
327 m_callout_id = m_service_daemon->add_or_update_callout(
328 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
329 "unable to initialize instance messenger object");
330 return;
7c673cae
FG
331 }
332
333 m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
334 &m_leader_listener));
335 r = m_leader_watcher->init();
336 if (r < 0) {
337 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
c07f9fc5
FG
338 m_callout_id = m_service_daemon->add_or_update_callout(
339 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
340 "unable to initialize leader messenger object");
341 return;
342 }
343
344 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
345 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
346 m_callout_id = service_daemon::CALLOUT_ID_NONE;
7c673cae
FG
347 }
348
349 m_pool_replayer_thread.create("pool replayer");
c07f9fc5 350}
7c673cae 351
c07f9fc5
FG
352void PoolReplayer::shut_down() {
353 m_stopping = true;
354 {
355 Mutex::Locker l(m_lock);
356 m_cond.Signal();
357 }
358 if (m_pool_replayer_thread.is_started()) {
359 m_pool_replayer_thread.join();
360 }
361 if (m_leader_watcher) {
362 m_leader_watcher->shut_down();
363 }
364 if (m_instance_watcher) {
365 m_instance_watcher->shut_down();
366 }
367 if (m_instance_replayer) {
368 m_instance_replayer->shut_down();
369 }
370
28e407b8
AA
371 m_leader_watcher.reset();
372 m_instance_watcher.reset();
373 m_instance_replayer.reset();
374
c07f9fc5
FG
375 assert(!m_local_pool_watcher);
376 assert(!m_remote_pool_watcher);
377 m_local_rados.reset();
378 m_remote_rados.reset();
7c673cae
FG
379}
380
381int PoolReplayer::init_rados(const std::string &cluster_name,
382 const std::string &client_name,
383 const std::string &description,
3efd9988
FG
384 RadosRef *rados_ref,
385 bool strip_cluster_overrides) {
7c673cae
FG
386 rados_ref->reset(new librados::Rados());
387
388 // NOTE: manually bootstrap a CephContext here instead of via
389 // the librados API to avoid mixing global singletons between
390 // the librados shared library and the daemon
391 // TODO: eliminate intermingling of global singletons within Ceph APIs
392 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
393 if (client_name.empty() || !iparams.name.from_str(client_name)) {
394 derr << "error initializing cluster handle for " << description << dendl;
395 return -EINVAL;
396 }
397
398 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
399 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
400 cct->_conf->cluster = cluster_name;
401
402 // librados::Rados::conf_read_file
403 int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
404 if (r < 0) {
405 derr << "could not read ceph conf for " << description << ": "
406 << cpp_strerror(r) << dendl;
407 cct->put();
408 return r;
409 }
3efd9988
FG
410
411 // preserve cluster-specific config settings before applying environment/cli
412 // overrides
413 std::map<std::string, std::string> config_values;
414 if (strip_cluster_overrides) {
415 // remote peer connections shouldn't apply cluster-specific
416 // configuration settings
417 for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
418 config_values[key] = cct->_conf->get_val<std::string>(key);
419 }
420 }
421
7c673cae
FG
422 cct->_conf->parse_env();
423
424 // librados::Rados::conf_parse_env
425 std::vector<const char*> args;
426 env_to_vec(args, nullptr);
427 r = cct->_conf->parse_argv(args);
428 if (r < 0) {
429 derr << "could not parse environment for " << description << ":"
430 << cpp_strerror(r) << dendl;
431 cct->put();
432 return r;
433 }
434
435 if (!m_args.empty()) {
436 // librados::Rados::conf_parse_argv
437 args = m_args;
438 r = cct->_conf->parse_argv(args);
439 if (r < 0) {
440 derr << "could not parse command line args for " << description << ": "
441 << cpp_strerror(r) << dendl;
442 cct->put();
443 return r;
444 }
445 }
446
3efd9988
FG
447 if (strip_cluster_overrides) {
448 // remote peer connections shouldn't apply cluster-specific
449 // configuration settings
450 for (auto& pair : config_values) {
451 auto value = cct->_conf->get_val<std::string>(pair.first);
452 if (pair.second != value) {
453 dout(0) << "reverting global config option override: "
454 << pair.first << ": " << value << " -> " << pair.second
455 << dendl;
456 cct->_conf->set_val_or_die(pair.first, pair.second);
457 }
458 }
459 }
460
7c673cae
FG
461 if (!g_ceph_context->_conf->admin_socket.empty()) {
462 cct->_conf->set_val_or_die("admin_socket",
463 "$run_dir/$name.$pid.$cluster.$cctid.asok");
464 }
465
466 // disable unnecessary librbd cache
467 cct->_conf->set_val_or_die("rbd_cache", "false");
468 cct->_conf->apply_changes(nullptr);
469 cct->_conf->complain_about_parse_errors(cct);
470
471 r = (*rados_ref)->init_with_context(cct);
472 assert(r == 0);
473 cct->put();
474
475 r = (*rados_ref)->connect();
476 if (r < 0) {
477 derr << "error connecting to " << description << ": "
478 << cpp_strerror(r) << dendl;
479 return r;
480 }
481
482 return 0;
483}
484
485void PoolReplayer::run()
486{
487 dout(20) << "enter" << dendl;
488
489 while (!m_stopping) {
490 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
491 m_peer.cluster_name;
492 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
493 m_asok_hook_name = asok_hook_name;
494 delete m_asok_hook;
495
496 m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
497 m_asok_hook_name, this);
498 }
499
500 Mutex::Locker locker(m_lock);
501 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
502 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
503 m_blacklisted = true;
504 m_stopping = true;
505 break;
506 }
507
d2e6a577
FG
508 if (!m_stopping) {
509 m_cond.WaitInterval(m_lock, utime_t(1, 0));
510 }
7c673cae 511 }
28e407b8
AA
512
513 m_instance_replayer->stop();
7c673cae
FG
514}
515
516void PoolReplayer::print_status(Formatter *f, stringstream *ss)
517{
518 dout(20) << "enter" << dendl;
519
520 if (!f) {
521 return;
522 }
523
524 Mutex::Locker l(m_lock);
525
526 f->open_object_section("pool_replayer_status");
527 f->dump_string("pool", m_local_io_ctx.get_pool_name());
528 f->dump_stream("peer") << m_peer;
529 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
530
531 std::string leader_instance_id;
532 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
533 f->dump_string("leader_instance_id", leader_instance_id);
534
535 bool leader = m_leader_watcher->is_leader();
536 f->dump_bool("leader", leader);
537 if (leader) {
538 std::vector<std::string> instance_ids;
539 m_leader_watcher->list_instances(&instance_ids);
540 f->open_array_section("instances");
541 for (auto instance_id : instance_ids) {
542 f->dump_string("instance_id", instance_id);
543 }
544 f->close_section();
545 }
546
547 f->dump_string("local_cluster_admin_socket",
548 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
181888fb 549 get_val<std::string>("admin_socket"));
7c673cae
FG
550 f->dump_string("remote_cluster_admin_socket",
551 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
181888fb 552 get_val<std::string>("admin_socket"));
7c673cae 553
31f18b77
FG
554 f->open_object_section("sync_throttler");
555 m_instance_watcher->print_sync_status(f, ss);
556 f->close_section();
557
7c673cae
FG
558 m_instance_replayer->print_status(f, ss);
559
560 f->close_section();
561 f->flush(*ss);
562}
563
564void PoolReplayer::start()
565{
566 dout(20) << "enter" << dendl;
567
568 Mutex::Locker l(m_lock);
569
570 if (m_stopping) {
571 return;
572 }
573
574 m_instance_replayer->start();
575}
576
577void PoolReplayer::stop(bool manual)
578{
579 dout(20) << "enter: manual=" << manual << dendl;
580
581 Mutex::Locker l(m_lock);
582 if (!manual) {
583 m_stopping = true;
584 m_cond.Signal();
585 return;
586 } else if (m_stopping) {
587 return;
588 }
589
590 m_instance_replayer->stop();
591}
592
593void PoolReplayer::restart()
594{
595 dout(20) << "enter" << dendl;
596
597 Mutex::Locker l(m_lock);
598
599 if (m_stopping) {
600 return;
601 }
602
603 m_instance_replayer->restart();
604}
605
606void PoolReplayer::flush()
607{
608 dout(20) << "enter" << dendl;
609
610 Mutex::Locker l(m_lock);
611
612 if (m_stopping || m_manual_stop) {
613 return;
614 }
615
616 m_instance_replayer->flush();
617}
618
619void PoolReplayer::release_leader()
620{
621 dout(20) << "enter" << dendl;
622
623 Mutex::Locker l(m_lock);
624
625 if (m_stopping || !m_leader_watcher) {
626 return;
627 }
628
629 m_leader_watcher->release_leader();
630}
631
632void PoolReplayer::handle_update(const std::string &mirror_uuid,
633 ImageIds &&added_image_ids,
634 ImageIds &&removed_image_ids) {
635 if (m_stopping) {
636 return;
637 }
638
639 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
640 << "added_count=" << added_image_ids.size() << ", "
641 << "removed_count=" << removed_image_ids.size() << dendl;
642 Mutex::Locker locker(m_lock);
643 if (!m_leader_watcher->is_leader()) {
644 return;
645 }
646
c07f9fc5
FG
647 m_service_daemon->add_or_update_attribute(
648 m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
649 m_local_pool_watcher->get_image_count());
650 if (m_remote_pool_watcher) {
651 m_service_daemon->add_or_update_attribute(
652 m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
653 m_remote_pool_watcher->get_image_count());
654 }
655
7c673cae
FG
656 m_update_op_tracker.start_op();
657 Context *ctx = new FunctionContext([this](int r) {
658 dout(20) << "complete handle_update: r=" << r << dendl;
659 m_update_op_tracker.finish_op();
660 });
661
662 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
663
664 for (auto &image_id : added_image_ids) {
665 // for now always send to myself (the leader)
666 std::string &instance_id = m_instance_watcher->get_instance_id();
667 m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
7c673cae
FG
668 gather_ctx->new_sub());
669 }
670
d2e6a577
FG
671 if (!mirror_uuid.empty()) {
672 for (auto &image_id : removed_image_ids) {
673 // for now always send to myself (the leader)
674 std::string &instance_id = m_instance_watcher->get_instance_id();
675 m_instance_watcher->notify_peer_image_removed(instance_id,
676 image_id.global_id,
677 mirror_uuid,
678 gather_ctx->new_sub());
679 }
c07f9fc5
FG
680 }
681
7c673cae
FG
682 gather_ctx->activate();
683}
684
685void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
686 dout(20) << dendl;
31f18b77 687
c07f9fc5
FG
688 m_service_daemon->add_or_update_attribute(m_local_pool_id,
689 SERVICE_DAEMON_LEADER_KEY, true);
31f18b77 690 m_instance_watcher->handle_acquire_leader();
7c673cae
FG
691 init_local_pool_watcher(on_finish);
692}
693
694void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
695 dout(20) << dendl;
31f18b77 696
c07f9fc5 697 m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
31f18b77 698 m_instance_watcher->handle_release_leader();
7c673cae
FG
699 shut_down_pool_watchers(on_finish);
700}
701
702void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
703 dout(20) << dendl;
704
705 Mutex::Locker locker(m_lock);
706 assert(!m_local_pool_watcher);
707 m_local_pool_watcher.reset(new PoolWatcher<>(
708 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
7c673cae
FG
709
710 // ensure the initial set of local images is up-to-date
711 // after acquiring the leader role
712 auto ctx = new FunctionContext([this, on_finish](int r) {
713 handle_init_local_pool_watcher(r, on_finish);
714 });
715 m_local_pool_watcher->init(create_async_context_callback(
716 m_threads->work_queue, ctx));
717}
718
719void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
720 dout(20) << "r=" << r << dendl;
721 if (r < 0) {
722 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
723 on_finish->complete(r);
724 return;
725 }
726
727 init_remote_pool_watcher(on_finish);
728}
729
730void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
731 dout(20) << dendl;
732
733 Mutex::Locker locker(m_lock);
734 assert(!m_remote_pool_watcher);
735 m_remote_pool_watcher.reset(new PoolWatcher<>(
736 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
737 m_remote_pool_watcher->init(create_async_context_callback(
738 m_threads->work_queue, on_finish));
739
740 m_cond.Signal();
741}
742
743void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
744 dout(20) << dendl;
745
746 {
747 Mutex::Locker locker(m_lock);
748 if (m_local_pool_watcher) {
749 Context *ctx = new FunctionContext([this, on_finish](int r) {
750 handle_shut_down_pool_watchers(r, on_finish);
751 });
752 ctx = create_async_context_callback(m_threads->work_queue, ctx);
753
754 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
755 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
756 if (m_remote_pool_watcher) {
757 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
758 }
759 gather_ctx->activate();
760 return;
761 }
762 }
763
764 on_finish->complete(0);
765}
766
767void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
768 dout(20) << "r=" << r << dendl;
769
770 {
771 Mutex::Locker locker(m_lock);
772 assert(m_local_pool_watcher);
773 m_local_pool_watcher.reset();
774
775 if (m_remote_pool_watcher) {
776 m_remote_pool_watcher.reset();
777 }
778 }
779 wait_for_update_ops(on_finish);
780}
781
782void PoolReplayer::wait_for_update_ops(Context *on_finish) {
783 dout(20) << dendl;
784
785 Mutex::Locker locker(m_lock);
786
787 Context *ctx = new FunctionContext([this, on_finish](int r) {
788 handle_wait_for_update_ops(r, on_finish);
789 });
790 ctx = create_async_context_callback(m_threads->work_queue, ctx);
791
792 m_update_op_tracker.wait_for_ops(ctx);
793}
794
795void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
796 dout(20) << "r=" << r << dendl;
797
798 assert(r == 0);
799
800 Mutex::Locker locker(m_lock);
801 m_instance_replayer->release_all(on_finish);
802}
803
31f18b77
FG
804void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
805 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
806
807 m_instance_watcher->handle_update_leader(leader_instance_id);
808}
809
7c673cae
FG
810} // namespace mirror
811} // namespace rbd