1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Mirror.h"
20 #include "InstanceReplayer.h"
21 #include "InstanceWatcher.h"
22 #include "LeaderWatcher.h"
23 #include "ServiceDaemon.h"
26 #define dout_context g_ceph_context
27 #define dout_subsys ceph_subsys_rbd_mirror
29 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30 << this << " " << __func__ << ": "
32 using std::chrono::seconds
;
35 using std::unique_ptr
;
38 using librbd::cls_client::dir_get_name
;
39 using librbd::util::create_async_context_callback
;
46 const std::string
SERVICE_DAEMON_LEADER_KEY("leader");
47 const std::string
SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48 const std::string
SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
50 const std::vector
<std::string
> UNIQUE_PEER_CONFIG_KEYS
{
51 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
53 class PoolReplayerAdminSocketCommand
{
55 PoolReplayerAdminSocketCommand(PoolReplayer
*pool_replayer
)
56 : pool_replayer(pool_replayer
) {
58 virtual ~PoolReplayerAdminSocketCommand() {}
59 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
61 PoolReplayer
*pool_replayer
;
64 class StatusCommand
: public PoolReplayerAdminSocketCommand
{
66 explicit StatusCommand(PoolReplayer
*pool_replayer
)
67 : PoolReplayerAdminSocketCommand(pool_replayer
) {
70 bool call(Formatter
*f
, stringstream
*ss
) override
{
71 pool_replayer
->print_status(f
, ss
);
76 class StartCommand
: public PoolReplayerAdminSocketCommand
{
78 explicit StartCommand(PoolReplayer
*pool_replayer
)
79 : PoolReplayerAdminSocketCommand(pool_replayer
) {
82 bool call(Formatter
*f
, stringstream
*ss
) override
{
83 pool_replayer
->start();
88 class StopCommand
: public PoolReplayerAdminSocketCommand
{
90 explicit StopCommand(PoolReplayer
*pool_replayer
)
91 : PoolReplayerAdminSocketCommand(pool_replayer
) {
94 bool call(Formatter
*f
, stringstream
*ss
) override
{
95 pool_replayer
->stop(true);
100 class RestartCommand
: public PoolReplayerAdminSocketCommand
{
102 explicit RestartCommand(PoolReplayer
*pool_replayer
)
103 : PoolReplayerAdminSocketCommand(pool_replayer
) {
106 bool call(Formatter
*f
, stringstream
*ss
) override
{
107 pool_replayer
->restart();
112 class FlushCommand
: public PoolReplayerAdminSocketCommand
{
114 explicit FlushCommand(PoolReplayer
*pool_replayer
)
115 : PoolReplayerAdminSocketCommand(pool_replayer
) {
118 bool call(Formatter
*f
, stringstream
*ss
) override
{
119 pool_replayer
->flush();
124 class LeaderReleaseCommand
: public PoolReplayerAdminSocketCommand
{
126 explicit LeaderReleaseCommand(PoolReplayer
*pool_replayer
)
127 : PoolReplayerAdminSocketCommand(pool_replayer
) {
130 bool call(Formatter
*f
, stringstream
*ss
) override
{
131 pool_replayer
->release_leader();
136 class PoolReplayerAdminSocketHook
: public AdminSocketHook
{
138 PoolReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
139 PoolReplayer
*pool_replayer
)
140 : admin_socket(cct
->get_admin_socket()) {
144 command
= "rbd mirror status " + name
;
145 r
= admin_socket
->register_command(command
, command
, this,
146 "get status for rbd mirror " + name
);
148 commands
[command
] = new StatusCommand(pool_replayer
);
151 command
= "rbd mirror start " + name
;
152 r
= admin_socket
->register_command(command
, command
, this,
153 "start rbd mirror " + name
);
155 commands
[command
] = new StartCommand(pool_replayer
);
158 command
= "rbd mirror stop " + name
;
159 r
= admin_socket
->register_command(command
, command
, this,
160 "stop rbd mirror " + name
);
162 commands
[command
] = new StopCommand(pool_replayer
);
165 command
= "rbd mirror restart " + name
;
166 r
= admin_socket
->register_command(command
, command
, this,
167 "restart rbd mirror " + name
);
169 commands
[command
] = new RestartCommand(pool_replayer
);
172 command
= "rbd mirror flush " + name
;
173 r
= admin_socket
->register_command(command
, command
, this,
174 "flush rbd mirror " + name
);
176 commands
[command
] = new FlushCommand(pool_replayer
);
179 command
= "rbd mirror leader release " + name
;
180 r
= admin_socket
->register_command(command
, command
, this,
181 "release rbd mirror leader " + name
);
183 commands
[command
] = new LeaderReleaseCommand(pool_replayer
);
187 ~PoolReplayerAdminSocketHook() override
{
188 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
190 (void)admin_socket
->unregister_command(i
->first
);
195 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
196 bufferlist
& out
) override
{
197 Commands::const_iterator i
= commands
.find(command
);
198 assert(i
!= commands
.end());
199 Formatter
*f
= Formatter::create(format
);
201 bool r
= i
->second
->call(f
, &ss
);
208 typedef std::map
<std::string
, PoolReplayerAdminSocketCommand
*> Commands
;
210 AdminSocket
*admin_socket
;
214 } // anonymous namespace
216 PoolReplayer::PoolReplayer(Threads
<librbd::ImageCtx
> *threads
,
217 ServiceDaemon
<librbd::ImageCtx
>* service_daemon
,
218 ImageDeleter
<>* image_deleter
,
219 int64_t local_pool_id
, const peer_t
&peer
,
220 const std::vector
<const char*> &args
) :
222 m_service_daemon(service_daemon
),
223 m_image_deleter(image_deleter
),
224 m_local_pool_id(local_pool_id
),
227 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer
)),
228 m_local_pool_watcher_listener(this, true),
229 m_remote_pool_watcher_listener(this, false),
230 m_pool_replayer_thread(this),
231 m_leader_listener(this)
235 PoolReplayer::~PoolReplayer()
241 bool PoolReplayer::is_blacklisted() const {
242 Mutex::Locker
locker(m_lock
);
243 return m_blacklisted
;
246 bool PoolReplayer::is_leader() const {
247 Mutex::Locker
locker(m_lock
);
248 return m_leader_watcher
&& m_leader_watcher
->is_leader();
251 bool PoolReplayer::is_running() const {
252 return m_pool_replayer_thread
.is_started();
255 void PoolReplayer::init()
257 assert(!m_pool_replayer_thread
.is_started());
261 m_blacklisted
= false;
263 dout(20) << "replaying for " << m_peer
<< dendl
;
264 int r
= init_rados(g_ceph_context
->_conf
->cluster
,
265 g_ceph_context
->_conf
->name
.to_str(),
266 "local cluster", &m_local_rados
, false);
268 m_callout_id
= m_service_daemon
->add_or_update_callout(
269 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
270 "unable to connect to local cluster");
274 r
= init_rados(m_peer
.cluster_name
, m_peer
.client_name
,
275 std::string("remote peer ") + stringify(m_peer
),
276 &m_remote_rados
, true);
278 m_callout_id
= m_service_daemon
->add_or_update_callout(
279 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
280 "unable to connect to remote cluster");
284 r
= m_local_rados
->ioctx_create2(m_local_pool_id
, m_local_io_ctx
);
286 derr
<< "error accessing local pool " << m_local_pool_id
<< ": "
287 << cpp_strerror(r
) << dendl
;
291 std::string local_mirror_uuid
;
292 r
= librbd::cls_client::mirror_uuid_get(&m_local_io_ctx
,
295 derr
<< "failed to retrieve local mirror uuid from pool "
296 << m_local_io_ctx
.get_pool_name() << ": " << cpp_strerror(r
) << dendl
;
297 m_callout_id
= m_service_daemon
->add_or_update_callout(
298 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
299 "unable to query local mirror uuid");
303 r
= m_remote_rados
->ioctx_create(m_local_io_ctx
.get_pool_name().c_str(),
306 derr
<< "error accessing remote pool " << m_local_io_ctx
.get_pool_name()
307 << ": " << cpp_strerror(r
) << dendl
;
308 m_callout_id
= m_service_daemon
->add_or_update_callout(
309 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_WARNING
,
310 "unable to access remote pool");
314 dout(20) << "connected to " << m_peer
<< dendl
;
316 m_instance_replayer
.reset(InstanceReplayer
<>::create(
317 m_threads
, m_service_daemon
, m_image_deleter
, m_local_rados
,
318 local_mirror_uuid
, m_local_pool_id
));
319 m_instance_replayer
->init();
320 m_instance_replayer
->add_peer(m_peer
.uuid
, m_remote_io_ctx
);
322 m_instance_watcher
.reset(InstanceWatcher
<>::create(
323 m_local_io_ctx
, m_threads
->work_queue
, m_instance_replayer
.get()));
324 r
= m_instance_watcher
->init();
326 derr
<< "error initializing instance watcher: " << cpp_strerror(r
) << dendl
;
327 m_callout_id
= m_service_daemon
->add_or_update_callout(
328 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
329 "unable to initialize instance messenger object");
333 m_leader_watcher
.reset(new LeaderWatcher
<>(m_threads
, m_local_io_ctx
,
334 &m_leader_listener
));
335 r
= m_leader_watcher
->init();
337 derr
<< "error initializing leader watcher: " << cpp_strerror(r
) << dendl
;
338 m_callout_id
= m_service_daemon
->add_or_update_callout(
339 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
340 "unable to initialize leader messenger object");
344 if (m_callout_id
!= service_daemon::CALLOUT_ID_NONE
) {
345 m_service_daemon
->remove_callout(m_local_pool_id
, m_callout_id
);
346 m_callout_id
= service_daemon::CALLOUT_ID_NONE
;
349 m_pool_replayer_thread
.create("pool replayer");
352 void PoolReplayer::shut_down() {
355 Mutex::Locker
l(m_lock
);
358 if (m_pool_replayer_thread
.is_started()) {
359 m_pool_replayer_thread
.join();
361 if (m_leader_watcher
) {
362 m_leader_watcher
->shut_down();
364 if (m_instance_watcher
) {
365 m_instance_watcher
->shut_down();
367 if (m_instance_replayer
) {
368 m_instance_replayer
->shut_down();
371 m_leader_watcher
.reset();
372 m_instance_watcher
.reset();
373 m_instance_replayer
.reset();
375 assert(!m_local_pool_watcher
);
376 assert(!m_remote_pool_watcher
);
377 m_local_rados
.reset();
378 m_remote_rados
.reset();
381 int PoolReplayer::init_rados(const std::string
&cluster_name
,
382 const std::string
&client_name
,
383 const std::string
&description
,
385 bool strip_cluster_overrides
) {
386 rados_ref
->reset(new librados::Rados());
388 // NOTE: manually bootstrap a CephContext here instead of via
389 // the librados API to avoid mixing global singletons between
390 // the librados shared library and the daemon
391 // TODO: eliminate intermingling of global singletons within Ceph APIs
392 CephInitParameters
iparams(CEPH_ENTITY_TYPE_CLIENT
);
393 if (client_name
.empty() || !iparams
.name
.from_str(client_name
)) {
394 derr
<< "error initializing cluster handle for " << description
<< dendl
;
398 CephContext
*cct
= common_preinit(iparams
, CODE_ENVIRONMENT_LIBRARY
,
399 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS
);
400 cct
->_conf
->cluster
= cluster_name
;
402 // librados::Rados::conf_read_file
403 int r
= cct
->_conf
->parse_config_files(nullptr, nullptr, 0);
405 derr
<< "could not read ceph conf for " << description
<< ": "
406 << cpp_strerror(r
) << dendl
;
411 // preserve cluster-specific config settings before applying environment/cli
413 std::map
<std::string
, std::string
> config_values
;
414 if (strip_cluster_overrides
) {
415 // remote peer connections shouldn't apply cluster-specific
416 // configuration settings
417 for (auto& key
: UNIQUE_PEER_CONFIG_KEYS
) {
418 config_values
[key
] = cct
->_conf
->get_val
<std::string
>(key
);
422 cct
->_conf
->parse_env();
424 // librados::Rados::conf_parse_env
425 std::vector
<const char*> args
;
426 env_to_vec(args
, nullptr);
427 r
= cct
->_conf
->parse_argv(args
);
429 derr
<< "could not parse environment for " << description
<< ":"
430 << cpp_strerror(r
) << dendl
;
435 if (!m_args
.empty()) {
436 // librados::Rados::conf_parse_argv
438 r
= cct
->_conf
->parse_argv(args
);
440 derr
<< "could not parse command line args for " << description
<< ": "
441 << cpp_strerror(r
) << dendl
;
447 if (strip_cluster_overrides
) {
448 // remote peer connections shouldn't apply cluster-specific
449 // configuration settings
450 for (auto& pair
: config_values
) {
451 auto value
= cct
->_conf
->get_val
<std::string
>(pair
.first
);
452 if (pair
.second
!= value
) {
453 dout(0) << "reverting global config option override: "
454 << pair
.first
<< ": " << value
<< " -> " << pair
.second
456 cct
->_conf
->set_val_or_die(pair
.first
, pair
.second
);
461 if (!g_ceph_context
->_conf
->admin_socket
.empty()) {
462 cct
->_conf
->set_val_or_die("admin_socket",
463 "$run_dir/$name.$pid.$cluster.$cctid.asok");
466 // disable unnecessary librbd cache
467 cct
->_conf
->set_val_or_die("rbd_cache", "false");
468 cct
->_conf
->apply_changes(nullptr);
469 cct
->_conf
->complain_about_parse_errors(cct
);
471 r
= (*rados_ref
)->init_with_context(cct
);
475 r
= (*rados_ref
)->connect();
477 derr
<< "error connecting to " << description
<< ": "
478 << cpp_strerror(r
) << dendl
;
485 void PoolReplayer::run()
487 dout(20) << "enter" << dendl
;
489 while (!m_stopping
) {
490 std::string asok_hook_name
= m_local_io_ctx
.get_pool_name() + " " +
492 if (m_asok_hook_name
!= asok_hook_name
|| m_asok_hook
== nullptr) {
493 m_asok_hook_name
= asok_hook_name
;
496 m_asok_hook
= new PoolReplayerAdminSocketHook(g_ceph_context
,
497 m_asok_hook_name
, this);
500 Mutex::Locker
locker(m_lock
);
501 if ((m_local_pool_watcher
&& m_local_pool_watcher
->is_blacklisted()) ||
502 (m_remote_pool_watcher
&& m_remote_pool_watcher
->is_blacklisted())) {
503 m_blacklisted
= true;
509 m_cond
.WaitInterval(m_lock
, utime_t(1, 0));
513 m_instance_replayer
->stop();
516 void PoolReplayer::print_status(Formatter
*f
, stringstream
*ss
)
518 dout(20) << "enter" << dendl
;
524 Mutex::Locker
l(m_lock
);
526 f
->open_object_section("pool_replayer_status");
527 f
->dump_string("pool", m_local_io_ctx
.get_pool_name());
528 f
->dump_stream("peer") << m_peer
;
529 f
->dump_string("instance_id", m_instance_watcher
->get_instance_id());
531 std::string leader_instance_id
;
532 m_leader_watcher
->get_leader_instance_id(&leader_instance_id
);
533 f
->dump_string("leader_instance_id", leader_instance_id
);
535 bool leader
= m_leader_watcher
->is_leader();
536 f
->dump_bool("leader", leader
);
538 std::vector
<std::string
> instance_ids
;
539 m_leader_watcher
->list_instances(&instance_ids
);
540 f
->open_array_section("instances");
541 for (auto instance_id
: instance_ids
) {
542 f
->dump_string("instance_id", instance_id
);
547 f
->dump_string("local_cluster_admin_socket",
548 reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct())->_conf
->
549 get_val
<std::string
>("admin_socket"));
550 f
->dump_string("remote_cluster_admin_socket",
551 reinterpret_cast<CephContext
*>(m_remote_io_ctx
.cct())->_conf
->
552 get_val
<std::string
>("admin_socket"));
554 f
->open_object_section("sync_throttler");
555 m_instance_watcher
->print_sync_status(f
, ss
);
558 m_instance_replayer
->print_status(f
, ss
);
564 void PoolReplayer::start()
566 dout(20) << "enter" << dendl
;
568 Mutex::Locker
l(m_lock
);
574 m_instance_replayer
->start();
577 void PoolReplayer::stop(bool manual
)
579 dout(20) << "enter: manual=" << manual
<< dendl
;
581 Mutex::Locker
l(m_lock
);
586 } else if (m_stopping
) {
590 m_instance_replayer
->stop();
593 void PoolReplayer::restart()
595 dout(20) << "enter" << dendl
;
597 Mutex::Locker
l(m_lock
);
603 m_instance_replayer
->restart();
606 void PoolReplayer::flush()
608 dout(20) << "enter" << dendl
;
610 Mutex::Locker
l(m_lock
);
612 if (m_stopping
|| m_manual_stop
) {
616 m_instance_replayer
->flush();
619 void PoolReplayer::release_leader()
621 dout(20) << "enter" << dendl
;
623 Mutex::Locker
l(m_lock
);
625 if (m_stopping
|| !m_leader_watcher
) {
629 m_leader_watcher
->release_leader();
632 void PoolReplayer::handle_update(const std::string
&mirror_uuid
,
633 ImageIds
&&added_image_ids
,
634 ImageIds
&&removed_image_ids
) {
639 dout(10) << "mirror_uuid=" << mirror_uuid
<< ", "
640 << "added_count=" << added_image_ids
.size() << ", "
641 << "removed_count=" << removed_image_ids
.size() << dendl
;
642 Mutex::Locker
locker(m_lock
);
643 if (!m_leader_watcher
->is_leader()) {
647 m_service_daemon
->add_or_update_attribute(
648 m_local_pool_id
, SERVICE_DAEMON_LOCAL_COUNT_KEY
,
649 m_local_pool_watcher
->get_image_count());
650 if (m_remote_pool_watcher
) {
651 m_service_daemon
->add_or_update_attribute(
652 m_local_pool_id
, SERVICE_DAEMON_REMOTE_COUNT_KEY
,
653 m_remote_pool_watcher
->get_image_count());
656 m_update_op_tracker
.start_op();
657 Context
*ctx
= new FunctionContext([this](int r
) {
658 dout(20) << "complete handle_update: r=" << r
<< dendl
;
659 m_update_op_tracker
.finish_op();
662 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
664 for (auto &image_id
: added_image_ids
) {
665 // for now always send to myself (the leader)
666 std::string
&instance_id
= m_instance_watcher
->get_instance_id();
667 m_instance_watcher
->notify_image_acquire(instance_id
, image_id
.global_id
,
668 gather_ctx
->new_sub());
671 if (!mirror_uuid
.empty()) {
672 for (auto &image_id
: removed_image_ids
) {
673 // for now always send to myself (the leader)
674 std::string
&instance_id
= m_instance_watcher
->get_instance_id();
675 m_instance_watcher
->notify_peer_image_removed(instance_id
,
678 gather_ctx
->new_sub());
682 gather_ctx
->activate();
685 void PoolReplayer::handle_post_acquire_leader(Context
*on_finish
) {
688 m_service_daemon
->add_or_update_attribute(m_local_pool_id
,
689 SERVICE_DAEMON_LEADER_KEY
, true);
690 m_instance_watcher
->handle_acquire_leader();
691 init_local_pool_watcher(on_finish
);
694 void PoolReplayer::handle_pre_release_leader(Context
*on_finish
) {
697 m_service_daemon
->remove_attribute(m_local_pool_id
, SERVICE_DAEMON_LEADER_KEY
);
698 m_instance_watcher
->handle_release_leader();
699 shut_down_pool_watchers(on_finish
);
702 void PoolReplayer::init_local_pool_watcher(Context
*on_finish
) {
705 Mutex::Locker
locker(m_lock
);
706 assert(!m_local_pool_watcher
);
707 m_local_pool_watcher
.reset(new PoolWatcher
<>(
708 m_threads
, m_local_io_ctx
, m_local_pool_watcher_listener
));
710 // ensure the initial set of local images is up-to-date
711 // after acquiring the leader role
712 auto ctx
= new FunctionContext([this, on_finish
](int r
) {
713 handle_init_local_pool_watcher(r
, on_finish
);
715 m_local_pool_watcher
->init(create_async_context_callback(
716 m_threads
->work_queue
, ctx
));
719 void PoolReplayer::handle_init_local_pool_watcher(int r
, Context
*on_finish
) {
720 dout(20) << "r=" << r
<< dendl
;
722 derr
<< "failed to retrieve local images: " << cpp_strerror(r
) << dendl
;
723 on_finish
->complete(r
);
727 init_remote_pool_watcher(on_finish
);
730 void PoolReplayer::init_remote_pool_watcher(Context
*on_finish
) {
733 Mutex::Locker
locker(m_lock
);
734 assert(!m_remote_pool_watcher
);
735 m_remote_pool_watcher
.reset(new PoolWatcher
<>(
736 m_threads
, m_remote_io_ctx
, m_remote_pool_watcher_listener
));
737 m_remote_pool_watcher
->init(create_async_context_callback(
738 m_threads
->work_queue
, on_finish
));
743 void PoolReplayer::shut_down_pool_watchers(Context
*on_finish
) {
747 Mutex::Locker
locker(m_lock
);
748 if (m_local_pool_watcher
) {
749 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
750 handle_shut_down_pool_watchers(r
, on_finish
);
752 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
754 auto gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
755 m_local_pool_watcher
->shut_down(gather_ctx
->new_sub());
756 if (m_remote_pool_watcher
) {
757 m_remote_pool_watcher
->shut_down(gather_ctx
->new_sub());
759 gather_ctx
->activate();
764 on_finish
->complete(0);
767 void PoolReplayer::handle_shut_down_pool_watchers(int r
, Context
*on_finish
) {
768 dout(20) << "r=" << r
<< dendl
;
771 Mutex::Locker
locker(m_lock
);
772 assert(m_local_pool_watcher
);
773 m_local_pool_watcher
.reset();
775 if (m_remote_pool_watcher
) {
776 m_remote_pool_watcher
.reset();
779 wait_for_update_ops(on_finish
);
782 void PoolReplayer::wait_for_update_ops(Context
*on_finish
) {
785 Mutex::Locker
locker(m_lock
);
787 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
788 handle_wait_for_update_ops(r
, on_finish
);
790 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
792 m_update_op_tracker
.wait_for_ops(ctx
);
795 void PoolReplayer::handle_wait_for_update_ops(int r
, Context
*on_finish
) {
796 dout(20) << "r=" << r
<< dendl
;
800 Mutex::Locker
locker(m_lock
);
801 m_instance_replayer
->release_all(on_finish
);
804 void PoolReplayer::handle_update_leader(const std::string
&leader_instance_id
) {
805 dout(20) << "leader_instance_id=" << leader_instance_id
<< dendl
;
807 m_instance_watcher
->handle_update_leader(leader_instance_id
);
810 } // namespace mirror