]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to v12.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "PoolReplayer.h"
5#include <boost/bind.hpp>
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/ceph_argparse.h"
9#include "common/code_environment.h"
10#include "common/common_init.h"
11#include "common/debug.h"
12#include "common/errno.h"
13#include "include/stringify.h"
14#include "cls/rbd/cls_rbd_client.h"
15#include "global/global_context.h"
16#include "librbd/internal.h"
17#include "librbd/Utils.h"
18#include "librbd/Watcher.h"
19#include "librbd/api/Mirror.h"
20#include "InstanceReplayer.h"
21#include "InstanceWatcher.h"
22#include "LeaderWatcher.h"
23#include "Threads.h"
24
25#define dout_context g_ceph_context
26#define dout_subsys ceph_subsys_rbd_mirror
27#undef dout_prefix
28#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
29 << this << " " << __func__ << ": "
30
31using std::chrono::seconds;
32using std::map;
33using std::string;
34using std::unique_ptr;
35using std::vector;
36
37using librbd::cls_client::dir_get_name;
38using librbd::util::create_async_context_callback;
39
40namespace rbd {
41namespace mirror {
42
43namespace {
44
45class PoolReplayerAdminSocketCommand {
46public:
47 PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
48 : pool_replayer(pool_replayer) {
49 }
50 virtual ~PoolReplayerAdminSocketCommand() {}
51 virtual bool call(Formatter *f, stringstream *ss) = 0;
52protected:
53 PoolReplayer *pool_replayer;
54};
55
56class StatusCommand : public PoolReplayerAdminSocketCommand {
57public:
58 explicit StatusCommand(PoolReplayer *pool_replayer)
59 : PoolReplayerAdminSocketCommand(pool_replayer) {
60 }
61
62 bool call(Formatter *f, stringstream *ss) override {
63 pool_replayer->print_status(f, ss);
64 return true;
65 }
66};
67
68class StartCommand : public PoolReplayerAdminSocketCommand {
69public:
70 explicit StartCommand(PoolReplayer *pool_replayer)
71 : PoolReplayerAdminSocketCommand(pool_replayer) {
72 }
73
74 bool call(Formatter *f, stringstream *ss) override {
75 pool_replayer->start();
76 return true;
77 }
78};
79
80class StopCommand : public PoolReplayerAdminSocketCommand {
81public:
82 explicit StopCommand(PoolReplayer *pool_replayer)
83 : PoolReplayerAdminSocketCommand(pool_replayer) {
84 }
85
86 bool call(Formatter *f, stringstream *ss) override {
87 pool_replayer->stop(true);
88 return true;
89 }
90};
91
92class RestartCommand : public PoolReplayerAdminSocketCommand {
93public:
94 explicit RestartCommand(PoolReplayer *pool_replayer)
95 : PoolReplayerAdminSocketCommand(pool_replayer) {
96 }
97
98 bool call(Formatter *f, stringstream *ss) override {
99 pool_replayer->restart();
100 return true;
101 }
102};
103
104class FlushCommand : public PoolReplayerAdminSocketCommand {
105public:
106 explicit FlushCommand(PoolReplayer *pool_replayer)
107 : PoolReplayerAdminSocketCommand(pool_replayer) {
108 }
109
110 bool call(Formatter *f, stringstream *ss) override {
111 pool_replayer->flush();
112 return true;
113 }
114};
115
116class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
117public:
118 explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
119 : PoolReplayerAdminSocketCommand(pool_replayer) {
120 }
121
122 bool call(Formatter *f, stringstream *ss) override {
123 pool_replayer->release_leader();
124 return true;
125 }
126};
127
128class PoolReplayerAdminSocketHook : public AdminSocketHook {
129public:
130 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
131 PoolReplayer *pool_replayer)
132 : admin_socket(cct->get_admin_socket()) {
133 std::string command;
134 int r;
135
136 command = "rbd mirror status " + name;
137 r = admin_socket->register_command(command, command, this,
138 "get status for rbd mirror " + name);
139 if (r == 0) {
140 commands[command] = new StatusCommand(pool_replayer);
141 }
142
143 command = "rbd mirror start " + name;
144 r = admin_socket->register_command(command, command, this,
145 "start rbd mirror " + name);
146 if (r == 0) {
147 commands[command] = new StartCommand(pool_replayer);
148 }
149
150 command = "rbd mirror stop " + name;
151 r = admin_socket->register_command(command, command, this,
152 "stop rbd mirror " + name);
153 if (r == 0) {
154 commands[command] = new StopCommand(pool_replayer);
155 }
156
157 command = "rbd mirror restart " + name;
158 r = admin_socket->register_command(command, command, this,
159 "restart rbd mirror " + name);
160 if (r == 0) {
161 commands[command] = new RestartCommand(pool_replayer);
162 }
163
164 command = "rbd mirror flush " + name;
165 r = admin_socket->register_command(command, command, this,
166 "flush rbd mirror " + name);
167 if (r == 0) {
168 commands[command] = new FlushCommand(pool_replayer);
169 }
170
171 command = "rbd mirror leader release " + name;
172 r = admin_socket->register_command(command, command, this,
173 "release rbd mirror leader " + name);
174 if (r == 0) {
175 commands[command] = new LeaderReleaseCommand(pool_replayer);
176 }
177 }
178
179 ~PoolReplayerAdminSocketHook() override {
180 for (Commands::const_iterator i = commands.begin(); i != commands.end();
181 ++i) {
182 (void)admin_socket->unregister_command(i->first);
183 delete i->second;
184 }
185 }
186
187 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
188 bufferlist& out) override {
189 Commands::const_iterator i = commands.find(command);
190 assert(i != commands.end());
191 Formatter *f = Formatter::create(format);
192 stringstream ss;
193 bool r = i->second->call(f, &ss);
194 delete f;
195 out.append(ss);
196 return r;
197 }
198
199private:
200 typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
201
202 AdminSocket *admin_socket;
203 Commands commands;
204};
205
206} // anonymous namespace
207
208PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
209 std::shared_ptr<ImageDeleter> image_deleter,
7c673cae
FG
210 int64_t local_pool_id, const peer_t &peer,
211 const std::vector<const char*> &args) :
212 m_threads(threads),
213 m_image_deleter(image_deleter),
7c673cae
FG
214 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
215 m_peer(peer),
216 m_args(args),
217 m_local_pool_id(local_pool_id),
218 m_local_pool_watcher_listener(this, true),
219 m_remote_pool_watcher_listener(this, false),
220 m_asok_hook(nullptr),
221 m_pool_replayer_thread(this),
222 m_leader_listener(this)
223{
224}
225
226PoolReplayer::~PoolReplayer()
227{
228 delete m_asok_hook;
229
230 m_stopping = true;
231 {
232 Mutex::Locker l(m_lock);
233 m_cond.Signal();
234 }
235 if (m_pool_replayer_thread.is_started()) {
236 m_pool_replayer_thread.join();
237 }
238 if (m_leader_watcher) {
239 m_leader_watcher->shut_down();
240 }
241 if (m_instance_watcher) {
242 m_instance_watcher->shut_down();
243 }
244 if (m_instance_replayer) {
245 m_instance_replayer->shut_down();
246 }
247
248 assert(!m_local_pool_watcher);
249 assert(!m_remote_pool_watcher);
250}
251
252bool PoolReplayer::is_blacklisted() const {
253 Mutex::Locker locker(m_lock);
254 return m_blacklisted;
255}
256
257bool PoolReplayer::is_leader() const {
258 Mutex::Locker locker(m_lock);
259 return m_leader_watcher && m_leader_watcher->is_leader();
260}
261
262int PoolReplayer::init()
263{
264 dout(20) << "replaying for " << m_peer << dendl;
265
266 int r = init_rados(g_ceph_context->_conf->cluster,
267 g_ceph_context->_conf->name.to_str(),
268 "local cluster", &m_local_rados);
269 if (r < 0) {
270 return r;
271 }
272
273 r = init_rados(m_peer.cluster_name, m_peer.client_name,
274 std::string("remote peer ") + stringify(m_peer),
275 &m_remote_rados);
276 if (r < 0) {
277 return r;
278 }
279
280 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
281 if (r < 0) {
282 derr << "error accessing local pool " << m_local_pool_id << ": "
283 << cpp_strerror(r) << dendl;
284 return r;
285 }
286
287 std::string local_mirror_uuid;
288 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
289 &local_mirror_uuid);
290 if (r < 0) {
291 derr << "failed to retrieve local mirror uuid from pool "
292 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
293 return r;
294 }
295
296 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
297 m_remote_io_ctx);
298 if (r < 0) {
299 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
300 << ": " << cpp_strerror(r) << dendl;
301 return r;
302 }
303
304 dout(20) << "connected to " << m_peer << dendl;
305
306 m_instance_replayer.reset(
31f18b77 307 InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados,
7c673cae
FG
308 local_mirror_uuid, m_local_pool_id));
309 m_instance_replayer->init();
310 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
311
312 m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
313 m_threads->work_queue,
314 m_instance_replayer.get()));
315 r = m_instance_watcher->init();
316 if (r < 0) {
317 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
318 return r;
319 }
320
321 m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
322 &m_leader_listener));
31f18b77 323
7c673cae
FG
324 r = m_leader_watcher->init();
325 if (r < 0) {
326 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
327 return r;
328 }
329
330 m_pool_replayer_thread.create("pool replayer");
331
332 return 0;
333}
334
335int PoolReplayer::init_rados(const std::string &cluster_name,
336 const std::string &client_name,
337 const std::string &description,
338 RadosRef *rados_ref) {
339 rados_ref->reset(new librados::Rados());
340
341 // NOTE: manually bootstrap a CephContext here instead of via
342 // the librados API to avoid mixing global singletons between
343 // the librados shared library and the daemon
344 // TODO: eliminate intermingling of global singletons within Ceph APIs
345 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
346 if (client_name.empty() || !iparams.name.from_str(client_name)) {
347 derr << "error initializing cluster handle for " << description << dendl;
348 return -EINVAL;
349 }
350
351 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
352 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
353 cct->_conf->cluster = cluster_name;
354
355 // librados::Rados::conf_read_file
356 int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
357 if (r < 0) {
358 derr << "could not read ceph conf for " << description << ": "
359 << cpp_strerror(r) << dendl;
360 cct->put();
361 return r;
362 }
363 cct->_conf->parse_env();
364
365 // librados::Rados::conf_parse_env
366 std::vector<const char*> args;
367 env_to_vec(args, nullptr);
368 r = cct->_conf->parse_argv(args);
369 if (r < 0) {
370 derr << "could not parse environment for " << description << ":"
371 << cpp_strerror(r) << dendl;
372 cct->put();
373 return r;
374 }
375
376 if (!m_args.empty()) {
377 // librados::Rados::conf_parse_argv
378 args = m_args;
379 r = cct->_conf->parse_argv(args);
380 if (r < 0) {
381 derr << "could not parse command line args for " << description << ": "
382 << cpp_strerror(r) << dendl;
383 cct->put();
384 return r;
385 }
386 }
387
388 if (!g_ceph_context->_conf->admin_socket.empty()) {
389 cct->_conf->set_val_or_die("admin_socket",
390 "$run_dir/$name.$pid.$cluster.$cctid.asok");
391 }
392
393 // disable unnecessary librbd cache
394 cct->_conf->set_val_or_die("rbd_cache", "false");
395 cct->_conf->apply_changes(nullptr);
396 cct->_conf->complain_about_parse_errors(cct);
397
398 r = (*rados_ref)->init_with_context(cct);
399 assert(r == 0);
400 cct->put();
401
402 r = (*rados_ref)->connect();
403 if (r < 0) {
404 derr << "error connecting to " << description << ": "
405 << cpp_strerror(r) << dendl;
406 return r;
407 }
408
409 return 0;
410}
411
412void PoolReplayer::run()
413{
414 dout(20) << "enter" << dendl;
415
416 while (!m_stopping) {
417 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
418 m_peer.cluster_name;
419 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
420 m_asok_hook_name = asok_hook_name;
421 delete m_asok_hook;
422
423 m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
424 m_asok_hook_name, this);
425 }
426
427 Mutex::Locker locker(m_lock);
428 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
429 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
430 m_blacklisted = true;
431 m_stopping = true;
432 break;
433 }
434
435 m_cond.WaitInterval(m_lock, utime_t(1, 0));
436 }
437}
438
439void PoolReplayer::print_status(Formatter *f, stringstream *ss)
440{
441 dout(20) << "enter" << dendl;
442
443 if (!f) {
444 return;
445 }
446
447 Mutex::Locker l(m_lock);
448
449 f->open_object_section("pool_replayer_status");
450 f->dump_string("pool", m_local_io_ctx.get_pool_name());
451 f->dump_stream("peer") << m_peer;
452 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
453
454 std::string leader_instance_id;
455 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
456 f->dump_string("leader_instance_id", leader_instance_id);
457
458 bool leader = m_leader_watcher->is_leader();
459 f->dump_bool("leader", leader);
460 if (leader) {
461 std::vector<std::string> instance_ids;
462 m_leader_watcher->list_instances(&instance_ids);
463 f->open_array_section("instances");
464 for (auto instance_id : instance_ids) {
465 f->dump_string("instance_id", instance_id);
466 }
467 f->close_section();
468 }
469
470 f->dump_string("local_cluster_admin_socket",
471 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
472 admin_socket);
473 f->dump_string("remote_cluster_admin_socket",
474 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
475 admin_socket);
476
31f18b77
FG
477 f->open_object_section("sync_throttler");
478 m_instance_watcher->print_sync_status(f, ss);
479 f->close_section();
480
7c673cae
FG
481 m_instance_replayer->print_status(f, ss);
482
483 f->close_section();
484 f->flush(*ss);
485}
486
487void PoolReplayer::start()
488{
489 dout(20) << "enter" << dendl;
490
491 Mutex::Locker l(m_lock);
492
493 if (m_stopping) {
494 return;
495 }
496
497 m_instance_replayer->start();
498}
499
500void PoolReplayer::stop(bool manual)
501{
502 dout(20) << "enter: manual=" << manual << dendl;
503
504 Mutex::Locker l(m_lock);
505 if (!manual) {
506 m_stopping = true;
507 m_cond.Signal();
508 return;
509 } else if (m_stopping) {
510 return;
511 }
512
513 m_instance_replayer->stop();
514}
515
516void PoolReplayer::restart()
517{
518 dout(20) << "enter" << dendl;
519
520 Mutex::Locker l(m_lock);
521
522 if (m_stopping) {
523 return;
524 }
525
526 m_instance_replayer->restart();
527}
528
529void PoolReplayer::flush()
530{
531 dout(20) << "enter" << dendl;
532
533 Mutex::Locker l(m_lock);
534
535 if (m_stopping || m_manual_stop) {
536 return;
537 }
538
539 m_instance_replayer->flush();
540}
541
542void PoolReplayer::release_leader()
543{
544 dout(20) << "enter" << dendl;
545
546 Mutex::Locker l(m_lock);
547
548 if (m_stopping || !m_leader_watcher) {
549 return;
550 }
551
552 m_leader_watcher->release_leader();
553}
554
555void PoolReplayer::handle_update(const std::string &mirror_uuid,
556 ImageIds &&added_image_ids,
557 ImageIds &&removed_image_ids) {
558 if (m_stopping) {
559 return;
560 }
561
562 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
563 << "added_count=" << added_image_ids.size() << ", "
564 << "removed_count=" << removed_image_ids.size() << dendl;
565 Mutex::Locker locker(m_lock);
566 if (!m_leader_watcher->is_leader()) {
567 return;
568 }
569
570 if (m_initial_mirror_image_ids.find(mirror_uuid) ==
571 m_initial_mirror_image_ids.end() &&
572 m_initial_mirror_image_ids.size() < 2) {
573 m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
574
575 if (m_initial_mirror_image_ids.size() == 2) {
576 dout(10) << "local and remote pools refreshed" << dendl;
577
578 // both local and remote initial pool listing received. derive
579 // removal notifications for the remote pool
580 auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
581 auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
582 for (auto &local_image_id : local_image_ids) {
583 if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
584 removed_image_ids.emplace(local_image_id.global_id, "");
585 }
586 }
587 local_image_ids.clear();
588 remote_image_ids.clear();
589 }
590 }
591
592 if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
593 m_instance_replayer->remove_peer(m_peer.uuid);
594 m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
595 m_peer.uuid = mirror_uuid;
596 }
597
598 m_update_op_tracker.start_op();
599 Context *ctx = new FunctionContext([this](int r) {
600 dout(20) << "complete handle_update: r=" << r << dendl;
601 m_update_op_tracker.finish_op();
602 });
603
604 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
605
606 for (auto &image_id : added_image_ids) {
607 // for now always send to myself (the leader)
608 std::string &instance_id = m_instance_watcher->get_instance_id();
609 m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
610 mirror_uuid, image_id.id,
611 gather_ctx->new_sub());
612 }
613
614 for (auto &image_id : removed_image_ids) {
615 // for now always send to myself (the leader)
616 std::string &instance_id = m_instance_watcher->get_instance_id();
617 m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
618 mirror_uuid, image_id.id, true,
619 gather_ctx->new_sub());
620 }
621
622 gather_ctx->activate();
623}
624
625void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
626 dout(20) << dendl;
31f18b77
FG
627
628 m_instance_watcher->handle_acquire_leader();
7c673cae
FG
629 init_local_pool_watcher(on_finish);
630}
631
632void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
633 dout(20) << dendl;
31f18b77
FG
634
635 m_instance_watcher->handle_release_leader();
7c673cae
FG
636 shut_down_pool_watchers(on_finish);
637}
638
639void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
640 dout(20) << dendl;
641
642 Mutex::Locker locker(m_lock);
643 assert(!m_local_pool_watcher);
644 m_local_pool_watcher.reset(new PoolWatcher<>(
645 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
646 m_initial_mirror_image_ids.clear();
647
648 // ensure the initial set of local images is up-to-date
649 // after acquiring the leader role
650 auto ctx = new FunctionContext([this, on_finish](int r) {
651 handle_init_local_pool_watcher(r, on_finish);
652 });
653 m_local_pool_watcher->init(create_async_context_callback(
654 m_threads->work_queue, ctx));
655}
656
657void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
658 dout(20) << "r=" << r << dendl;
659 if (r < 0) {
660 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
661 on_finish->complete(r);
662 return;
663 }
664
665 init_remote_pool_watcher(on_finish);
666}
667
668void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
669 dout(20) << dendl;
670
671 Mutex::Locker locker(m_lock);
672 assert(!m_remote_pool_watcher);
673 m_remote_pool_watcher.reset(new PoolWatcher<>(
674 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
675 m_remote_pool_watcher->init(create_async_context_callback(
676 m_threads->work_queue, on_finish));
677
678 m_cond.Signal();
679}
680
681void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
682 dout(20) << dendl;
683
684 {
685 Mutex::Locker locker(m_lock);
686 if (m_local_pool_watcher) {
687 Context *ctx = new FunctionContext([this, on_finish](int r) {
688 handle_shut_down_pool_watchers(r, on_finish);
689 });
690 ctx = create_async_context_callback(m_threads->work_queue, ctx);
691
692 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
693 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
694 if (m_remote_pool_watcher) {
695 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
696 }
697 gather_ctx->activate();
698 return;
699 }
700 }
701
702 on_finish->complete(0);
703}
704
705void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
706 dout(20) << "r=" << r << dendl;
707
708 {
709 Mutex::Locker locker(m_lock);
710 assert(m_local_pool_watcher);
711 m_local_pool_watcher.reset();
712
713 if (m_remote_pool_watcher) {
714 m_remote_pool_watcher.reset();
715 }
716 }
717 wait_for_update_ops(on_finish);
718}
719
720void PoolReplayer::wait_for_update_ops(Context *on_finish) {
721 dout(20) << dendl;
722
723 Mutex::Locker locker(m_lock);
724
725 Context *ctx = new FunctionContext([this, on_finish](int r) {
726 handle_wait_for_update_ops(r, on_finish);
727 });
728 ctx = create_async_context_callback(m_threads->work_queue, ctx);
729
730 m_update_op_tracker.wait_for_ops(ctx);
731}
732
733void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
734 dout(20) << "r=" << r << dendl;
735
736 assert(r == 0);
737
738 Mutex::Locker locker(m_lock);
739 m_instance_replayer->release_all(on_finish);
740}
741
31f18b77
FG
742void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
743 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
744
745 m_instance_watcher->handle_update_leader(leader_instance_id);
746}
747
7c673cae
FG
748} // namespace mirror
749} // namespace rbd