1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Config.h"
20 #include "librbd/api/Mirror.h"
22 #include "InstanceReplayer.h"
23 #include "InstanceWatcher.h"
24 #include "LeaderWatcher.h"
25 #include "ServiceDaemon.h"
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_rbd_mirror
31 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
32 << this << " " << __func__ << ": "
34 using std::chrono::seconds
;
37 using std::unique_ptr
;
40 using librbd::cls_client::dir_get_name
;
41 using librbd::util::create_async_context_callback
;
50 const std::string
SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
51 const std::string
SERVICE_DAEMON_LEADER_KEY("leader");
52 const std::string
SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
53 const std::string
SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
55 const std::vector
<std::string
> UNIQUE_PEER_CONFIG_KEYS
{
56 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
59 class PoolReplayerAdminSocketCommand
{
61 PoolReplayerAdminSocketCommand(PoolReplayer
<I
> *pool_replayer
)
62 : pool_replayer(pool_replayer
) {
64 virtual ~PoolReplayerAdminSocketCommand() {}
65 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
67 PoolReplayer
<I
> *pool_replayer
;
71 class StatusCommand
: public PoolReplayerAdminSocketCommand
<I
> {
73 explicit StatusCommand(PoolReplayer
<I
> *pool_replayer
)
74 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
77 bool call(Formatter
*f
, stringstream
*ss
) override
{
78 this->pool_replayer
->print_status(f
, ss
);
84 class StartCommand
: public PoolReplayerAdminSocketCommand
<I
> {
86 explicit StartCommand(PoolReplayer
<I
> *pool_replayer
)
87 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
90 bool call(Formatter
*f
, stringstream
*ss
) override
{
91 this->pool_replayer
->start();
97 class StopCommand
: public PoolReplayerAdminSocketCommand
<I
> {
99 explicit StopCommand(PoolReplayer
<I
> *pool_replayer
)
100 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
103 bool call(Formatter
*f
, stringstream
*ss
) override
{
104 this->pool_replayer
->stop(true);
109 template <typename I
>
110 class RestartCommand
: public PoolReplayerAdminSocketCommand
<I
> {
112 explicit RestartCommand(PoolReplayer
<I
> *pool_replayer
)
113 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
116 bool call(Formatter
*f
, stringstream
*ss
) override
{
117 this->pool_replayer
->restart();
122 template <typename I
>
123 class FlushCommand
: public PoolReplayerAdminSocketCommand
<I
> {
125 explicit FlushCommand(PoolReplayer
<I
> *pool_replayer
)
126 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
129 bool call(Formatter
*f
, stringstream
*ss
) override
{
130 this->pool_replayer
->flush();
135 template <typename I
>
136 class LeaderReleaseCommand
: public PoolReplayerAdminSocketCommand
<I
> {
138 explicit LeaderReleaseCommand(PoolReplayer
<I
> *pool_replayer
)
139 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
142 bool call(Formatter
*f
, stringstream
*ss
) override
{
143 this->pool_replayer
->release_leader();
148 template <typename I
>
149 class PoolReplayerAdminSocketHook
: public AdminSocketHook
{
151 PoolReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
152 PoolReplayer
<I
> *pool_replayer
)
153 : admin_socket(cct
->get_admin_socket()) {
157 command
= "rbd mirror status " + name
;
158 r
= admin_socket
->register_command(command
, command
, this,
159 "get status for rbd mirror " + name
);
161 commands
[command
] = new StatusCommand
<I
>(pool_replayer
);
164 command
= "rbd mirror start " + name
;
165 r
= admin_socket
->register_command(command
, command
, this,
166 "start rbd mirror " + name
);
168 commands
[command
] = new StartCommand
<I
>(pool_replayer
);
171 command
= "rbd mirror stop " + name
;
172 r
= admin_socket
->register_command(command
, command
, this,
173 "stop rbd mirror " + name
);
175 commands
[command
] = new StopCommand
<I
>(pool_replayer
);
178 command
= "rbd mirror restart " + name
;
179 r
= admin_socket
->register_command(command
, command
, this,
180 "restart rbd mirror " + name
);
182 commands
[command
] = new RestartCommand
<I
>(pool_replayer
);
185 command
= "rbd mirror flush " + name
;
186 r
= admin_socket
->register_command(command
, command
, this,
187 "flush rbd mirror " + name
);
189 commands
[command
] = new FlushCommand
<I
>(pool_replayer
);
192 command
= "rbd mirror leader release " + name
;
193 r
= admin_socket
->register_command(command
, command
, this,
194 "release rbd mirror leader " + name
);
196 commands
[command
] = new LeaderReleaseCommand
<I
>(pool_replayer
);
200 ~PoolReplayerAdminSocketHook() override
{
201 for (auto i
= commands
.begin(); i
!= commands
.end(); ++i
) {
202 (void)admin_socket
->unregister_command(i
->first
);
207 bool call(std::string_view command
, const cmdmap_t
& cmdmap
,
208 std::string_view format
, bufferlist
& out
) override
{
209 auto i
= commands
.find(command
);
210 ceph_assert(i
!= commands
.end());
211 Formatter
*f
= Formatter::create(format
);
213 bool r
= i
->second
->call(f
, &ss
);
220 typedef std::map
<std::string
, PoolReplayerAdminSocketCommand
<I
>*,
221 std::less
<>> Commands
;
223 AdminSocket
*admin_socket
;
227 } // anonymous namespace
229 template <typename I
>
230 PoolReplayer
<I
>::PoolReplayer(Threads
<I
> *threads
,
231 ServiceDaemon
<I
>* service_daemon
,
232 int64_t local_pool_id
, const PeerSpec
&peer
,
233 const std::vector
<const char*> &args
) :
235 m_service_daemon(service_daemon
),
236 m_local_pool_id(local_pool_id
),
239 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer
)),
240 m_local_pool_watcher_listener(this, true),
241 m_remote_pool_watcher_listener(this, false),
242 m_image_map_listener(this),
243 m_pool_replayer_thread(this),
244 m_leader_listener(this)
248 template <typename I
>
249 PoolReplayer
<I
>::~PoolReplayer()
255 template <typename I
>
256 bool PoolReplayer
<I
>::is_blacklisted() const {
257 Mutex::Locker
locker(m_lock
);
258 return m_blacklisted
;
261 template <typename I
>
262 bool PoolReplayer
<I
>::is_leader() const {
263 Mutex::Locker
locker(m_lock
);
264 return m_leader_watcher
&& m_leader_watcher
->is_leader();
267 template <typename I
>
268 bool PoolReplayer
<I
>::is_running() const {
269 return m_pool_replayer_thread
.is_started();
272 template <typename I
>
273 void PoolReplayer
<I
>::init()
275 ceph_assert(!m_pool_replayer_thread
.is_started());
279 m_blacklisted
= false;
281 dout(10) << "replaying for " << m_peer
<< dendl
;
282 int r
= init_rados(g_ceph_context
->_conf
->cluster
,
283 g_ceph_context
->_conf
->name
.to_str(),
284 "", "", "local cluster", &m_local_rados
, false);
286 m_callout_id
= m_service_daemon
->add_or_update_callout(
287 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
288 "unable to connect to local cluster");
292 r
= init_rados(m_peer
.cluster_name
, m_peer
.client_name
,
293 m_peer
.mon_host
, m_peer
.key
,
294 std::string("remote peer ") + stringify(m_peer
),
295 &m_remote_rados
, true);
297 m_callout_id
= m_service_daemon
->add_or_update_callout(
298 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
299 "unable to connect to remote cluster");
303 r
= m_local_rados
->ioctx_create2(m_local_pool_id
, m_local_io_ctx
);
305 derr
<< "error accessing local pool " << m_local_pool_id
<< ": "
306 << cpp_strerror(r
) << dendl
;
310 auto cct
= reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct());
311 librbd::api::Config
<I
>::apply_pool_overrides(m_local_io_ctx
, &cct
->_conf
);
313 std::string local_mirror_uuid
;
314 r
= librbd::cls_client::mirror_uuid_get(&m_local_io_ctx
,
317 derr
<< "failed to retrieve local mirror uuid from pool "
318 << m_local_io_ctx
.get_pool_name() << ": " << cpp_strerror(r
) << dendl
;
319 m_callout_id
= m_service_daemon
->add_or_update_callout(
320 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
321 "unable to query local mirror uuid");
325 r
= m_remote_rados
->ioctx_create(m_local_io_ctx
.get_pool_name().c_str(),
328 derr
<< "error accessing remote pool " << m_local_io_ctx
.get_pool_name()
329 << ": " << cpp_strerror(r
) << dendl
;
330 m_callout_id
= m_service_daemon
->add_or_update_callout(
331 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_WARNING
,
332 "unable to access remote pool");
336 dout(10) << "connected to " << m_peer
<< dendl
;
338 m_instance_replayer
.reset(InstanceReplayer
<I
>::create(
339 m_threads
, m_service_daemon
, m_local_rados
, local_mirror_uuid
,
341 m_instance_replayer
->init();
342 m_instance_replayer
->add_peer(m_peer
.uuid
, m_remote_io_ctx
);
344 m_instance_watcher
.reset(InstanceWatcher
<I
>::create(
345 m_local_io_ctx
, m_threads
->work_queue
, m_instance_replayer
.get()));
346 r
= m_instance_watcher
->init();
348 derr
<< "error initializing instance watcher: " << cpp_strerror(r
) << dendl
;
349 m_callout_id
= m_service_daemon
->add_or_update_callout(
350 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
351 "unable to initialize instance messenger object");
354 m_service_daemon
->add_or_update_attribute(
355 m_local_pool_id
, SERVICE_DAEMON_INSTANCE_ID_KEY
,
356 m_instance_watcher
->get_instance_id());
358 m_leader_watcher
.reset(LeaderWatcher
<I
>::create(m_threads
, m_local_io_ctx
,
359 &m_leader_listener
));
360 r
= m_leader_watcher
->init();
362 derr
<< "error initializing leader watcher: " << cpp_strerror(r
) << dendl
;
363 m_callout_id
= m_service_daemon
->add_or_update_callout(
364 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
365 "unable to initialize leader messenger object");
369 if (m_callout_id
!= service_daemon::CALLOUT_ID_NONE
) {
370 m_service_daemon
->remove_callout(m_local_pool_id
, m_callout_id
);
371 m_callout_id
= service_daemon::CALLOUT_ID_NONE
;
374 m_pool_replayer_thread
.create("pool replayer");
377 template <typename I
>
378 void PoolReplayer
<I
>::shut_down() {
381 Mutex::Locker
l(m_lock
);
384 if (m_pool_replayer_thread
.is_started()) {
385 m_pool_replayer_thread
.join();
387 if (m_leader_watcher
) {
388 m_leader_watcher
->shut_down();
390 if (m_instance_watcher
) {
391 m_instance_watcher
->shut_down();
393 if (m_instance_replayer
) {
394 m_instance_replayer
->shut_down();
397 m_leader_watcher
.reset();
398 m_instance_watcher
.reset();
399 m_instance_replayer
.reset();
401 ceph_assert(!m_image_map
);
402 ceph_assert(!m_image_deleter
);
403 ceph_assert(!m_local_pool_watcher
);
404 ceph_assert(!m_remote_pool_watcher
);
405 m_local_rados
.reset();
406 m_remote_rados
.reset();
409 template <typename I
>
410 int PoolReplayer
<I
>::init_rados(const std::string
&cluster_name
,
411 const std::string
&client_name
,
412 const std::string
&mon_host
,
413 const std::string
&key
,
414 const std::string
&description
,
416 bool strip_cluster_overrides
) {
417 rados_ref
->reset(new librados::Rados());
419 // NOTE: manually bootstrap a CephContext here instead of via
420 // the librados API to avoid mixing global singletons between
421 // the librados shared library and the daemon
422 // TODO: eliminate intermingling of global singletons within Ceph APIs
423 CephInitParameters
iparams(CEPH_ENTITY_TYPE_CLIENT
);
424 if (client_name
.empty() || !iparams
.name
.from_str(client_name
)) {
425 derr
<< "error initializing cluster handle for " << description
<< dendl
;
429 CephContext
*cct
= common_preinit(iparams
, CODE_ENVIRONMENT_LIBRARY
,
430 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS
);
431 cct
->_conf
->cluster
= cluster_name
;
433 // librados::Rados::conf_read_file
434 int r
= cct
->_conf
.parse_config_files(nullptr, nullptr, 0);
435 if (r
< 0 && r
!= -ENOENT
) {
436 derr
<< "could not read ceph conf for " << description
<< ": "
437 << cpp_strerror(r
) << dendl
;
442 // preserve cluster-specific config settings before applying environment/cli
444 std::map
<std::string
, std::string
> config_values
;
445 if (strip_cluster_overrides
) {
446 // remote peer connections shouldn't apply cluster-specific
447 // configuration settings
448 for (auto& key
: UNIQUE_PEER_CONFIG_KEYS
) {
449 config_values
[key
] = cct
->_conf
.get_val
<std::string
>(key
);
453 cct
->_conf
.parse_env(cct
->get_module_type());
455 // librados::Rados::conf_parse_env
456 std::vector
<const char*> args
;
457 r
= cct
->_conf
.parse_argv(args
);
459 derr
<< "could not parse environment for " << description
<< ":"
460 << cpp_strerror(r
) << dendl
;
464 cct
->_conf
.parse_env(cct
->get_module_type());
466 if (!m_args
.empty()) {
467 // librados::Rados::conf_parse_argv
469 r
= cct
->_conf
.parse_argv(args
);
471 derr
<< "could not parse command line args for " << description
<< ": "
472 << cpp_strerror(r
) << dendl
;
478 if (strip_cluster_overrides
) {
479 // remote peer connections shouldn't apply cluster-specific
480 // configuration settings
481 for (auto& pair
: config_values
) {
482 auto value
= cct
->_conf
.get_val
<std::string
>(pair
.first
);
483 if (pair
.second
!= value
) {
484 dout(0) << "reverting global config option override: "
485 << pair
.first
<< ": " << value
<< " -> " << pair
.second
487 cct
->_conf
.set_val_or_die(pair
.first
, pair
.second
);
492 if (!g_ceph_context
->_conf
->admin_socket
.empty()) {
493 cct
->_conf
.set_val_or_die("admin_socket",
494 "$run_dir/$name.$pid.$cluster.$cctid.asok");
497 if (!mon_host
.empty()) {
498 r
= cct
->_conf
.set_val("mon_host", mon_host
);
500 derr
<< "failed to set mon_host config for " << description
<< ": "
501 << cpp_strerror(r
) << dendl
;
508 r
= cct
->_conf
.set_val("key", key
);
510 derr
<< "failed to set key config for " << description
<< ": "
511 << cpp_strerror(r
) << dendl
;
517 // disable unnecessary librbd cache
518 cct
->_conf
.set_val_or_die("rbd_cache", "false");
519 cct
->_conf
.apply_changes(nullptr);
520 cct
->_conf
.complain_about_parse_errors(cct
);
522 r
= (*rados_ref
)->init_with_context(cct
);
526 r
= (*rados_ref
)->connect();
528 derr
<< "error connecting to " << description
<< ": "
529 << cpp_strerror(r
) << dendl
;
536 template <typename I
>
537 void PoolReplayer
<I
>::run()
539 dout(20) << "enter" << dendl
;
541 while (!m_stopping
) {
542 std::string asok_hook_name
= m_local_io_ctx
.get_pool_name() + " " +
544 if (m_asok_hook_name
!= asok_hook_name
|| m_asok_hook
== nullptr) {
545 m_asok_hook_name
= asok_hook_name
;
548 m_asok_hook
= new PoolReplayerAdminSocketHook
<I
>(g_ceph_context
,
549 m_asok_hook_name
, this);
552 Mutex::Locker
locker(m_lock
);
553 if ((m_local_pool_watcher
&& m_local_pool_watcher
->is_blacklisted()) ||
554 (m_remote_pool_watcher
&& m_remote_pool_watcher
->is_blacklisted())) {
555 m_blacklisted
= true;
561 m_cond
.WaitInterval(m_lock
, utime_t(1, 0));
565 m_instance_replayer
->stop();
568 template <typename I
>
569 void PoolReplayer
<I
>::print_status(Formatter
*f
, stringstream
*ss
)
571 dout(20) << "enter" << dendl
;
577 Mutex::Locker
l(m_lock
);
579 f
->open_object_section("pool_replayer_status");
580 f
->dump_string("pool", m_local_io_ctx
.get_pool_name());
581 f
->dump_stream("peer") << m_peer
;
582 f
->dump_string("instance_id", m_instance_watcher
->get_instance_id());
584 std::string
state("running");
586 state
= "stopped (manual)";
587 } else if (m_stopping
) {
590 f
->dump_string("state", state
);
592 std::string leader_instance_id
;
593 m_leader_watcher
->get_leader_instance_id(&leader_instance_id
);
594 f
->dump_string("leader_instance_id", leader_instance_id
);
596 bool leader
= m_leader_watcher
->is_leader();
597 f
->dump_bool("leader", leader
);
599 std::vector
<std::string
> instance_ids
;
600 m_leader_watcher
->list_instances(&instance_ids
);
601 f
->open_array_section("instances");
602 for (auto instance_id
: instance_ids
) {
603 f
->dump_string("instance_id", instance_id
);
608 f
->dump_string("local_cluster_admin_socket",
609 reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct())->_conf
.
610 get_val
<std::string
>("admin_socket"));
611 f
->dump_string("remote_cluster_admin_socket",
612 reinterpret_cast<CephContext
*>(m_remote_io_ctx
.cct())->_conf
.
613 get_val
<std::string
>("admin_socket"));
615 f
->open_object_section("sync_throttler");
616 m_instance_watcher
->print_sync_status(f
, ss
);
619 m_instance_replayer
->print_status(f
, ss
);
621 if (m_image_deleter
) {
622 f
->open_object_section("image_deleter");
623 m_image_deleter
->print_status(f
, ss
);
631 template <typename I
>
632 void PoolReplayer
<I
>::start()
634 dout(20) << "enter" << dendl
;
636 Mutex::Locker
l(m_lock
);
642 m_manual_stop
= false;
643 m_instance_replayer
->start();
646 template <typename I
>
647 void PoolReplayer
<I
>::stop(bool manual
)
649 dout(20) << "enter: manual=" << manual
<< dendl
;
651 Mutex::Locker
l(m_lock
);
656 } else if (m_stopping
) {
660 m_manual_stop
= true;
661 m_instance_replayer
->stop();
664 template <typename I
>
665 void PoolReplayer
<I
>::restart()
667 dout(20) << "enter" << dendl
;
669 Mutex::Locker
l(m_lock
);
675 m_instance_replayer
->restart();
678 template <typename I
>
679 void PoolReplayer
<I
>::flush()
681 dout(20) << "enter" << dendl
;
683 Mutex::Locker
l(m_lock
);
685 if (m_stopping
|| m_manual_stop
) {
689 m_instance_replayer
->flush();
692 template <typename I
>
693 void PoolReplayer
<I
>::release_leader()
695 dout(20) << "enter" << dendl
;
697 Mutex::Locker
l(m_lock
);
699 if (m_stopping
|| !m_leader_watcher
) {
703 m_leader_watcher
->release_leader();
706 template <typename I
>
707 void PoolReplayer
<I
>::handle_update(const std::string
&mirror_uuid
,
708 ImageIds
&&added_image_ids
,
709 ImageIds
&&removed_image_ids
) {
714 dout(10) << "mirror_uuid=" << mirror_uuid
<< ", "
715 << "added_count=" << added_image_ids
.size() << ", "
716 << "removed_count=" << removed_image_ids
.size() << dendl
;
717 Mutex::Locker
locker(m_lock
);
718 if (!m_leader_watcher
->is_leader()) {
722 m_service_daemon
->add_or_update_attribute(
723 m_local_pool_id
, SERVICE_DAEMON_LOCAL_COUNT_KEY
,
724 m_local_pool_watcher
->get_image_count());
725 if (m_remote_pool_watcher
) {
726 m_service_daemon
->add_or_update_attribute(
727 m_local_pool_id
, SERVICE_DAEMON_REMOTE_COUNT_KEY
,
728 m_remote_pool_watcher
->get_image_count());
731 std::set
<std::string
> added_global_image_ids
;
732 for (auto& image_id
: added_image_ids
) {
733 added_global_image_ids
.insert(image_id
.global_id
);
736 std::set
<std::string
> removed_global_image_ids
;
737 for (auto& image_id
: removed_image_ids
) {
738 removed_global_image_ids
.insert(image_id
.global_id
);
741 m_image_map
->update_images(mirror_uuid
,
742 std::move(added_global_image_ids
),
743 std::move(removed_global_image_ids
));
746 template <typename I
>
747 void PoolReplayer
<I
>::handle_post_acquire_leader(Context
*on_finish
) {
750 m_service_daemon
->add_or_update_attribute(m_local_pool_id
,
751 SERVICE_DAEMON_LEADER_KEY
, true);
752 m_instance_watcher
->handle_acquire_leader();
753 init_image_map(on_finish
);
756 template <typename I
>
757 void PoolReplayer
<I
>::handle_pre_release_leader(Context
*on_finish
) {
760 m_service_daemon
->remove_attribute(m_local_pool_id
,
761 SERVICE_DAEMON_LEADER_KEY
);
762 m_instance_watcher
->handle_release_leader();
763 shut_down_image_deleter(on_finish
);
766 template <typename I
>
767 void PoolReplayer
<I
>::init_image_map(Context
*on_finish
) {
770 Mutex::Locker
locker(m_lock
);
771 ceph_assert(!m_image_map
);
772 m_image_map
.reset(ImageMap
<I
>::create(m_local_io_ctx
, m_threads
,
773 m_instance_watcher
->get_instance_id(),
774 m_image_map_listener
));
776 auto ctx
= new FunctionContext([this, on_finish
](int r
) {
777 handle_init_image_map(r
, on_finish
);
779 m_image_map
->init(create_async_context_callback(
780 m_threads
->work_queue
, ctx
));
783 template <typename I
>
784 void PoolReplayer
<I
>::handle_init_image_map(int r
, Context
*on_finish
) {
785 dout(5) << "r=" << r
<< dendl
;
787 derr
<< "failed to init image map: " << cpp_strerror(r
) << dendl
;
788 on_finish
= new FunctionContext([on_finish
, r
](int) {
789 on_finish
->complete(r
);
791 shut_down_image_map(on_finish
);
795 init_local_pool_watcher(on_finish
);
798 template <typename I
>
799 void PoolReplayer
<I
>::init_local_pool_watcher(Context
*on_finish
) {
802 Mutex::Locker
locker(m_lock
);
803 ceph_assert(!m_local_pool_watcher
);
804 m_local_pool_watcher
.reset(PoolWatcher
<I
>::create(
805 m_threads
, m_local_io_ctx
, m_local_pool_watcher_listener
));
807 // ensure the initial set of local images is up-to-date
808 // after acquiring the leader role
809 auto ctx
= new FunctionContext([this, on_finish
](int r
) {
810 handle_init_local_pool_watcher(r
, on_finish
);
812 m_local_pool_watcher
->init(create_async_context_callback(
813 m_threads
->work_queue
, ctx
));
816 template <typename I
>
817 void PoolReplayer
<I
>::handle_init_local_pool_watcher(
818 int r
, Context
*on_finish
) {
819 dout(10) << "r=" << r
<< dendl
;
821 derr
<< "failed to retrieve local images: " << cpp_strerror(r
) << dendl
;
822 on_finish
= new FunctionContext([on_finish
, r
](int) {
823 on_finish
->complete(r
);
825 shut_down_pool_watchers(on_finish
);
829 init_remote_pool_watcher(on_finish
);
832 template <typename I
>
833 void PoolReplayer
<I
>::init_remote_pool_watcher(Context
*on_finish
) {
836 Mutex::Locker
locker(m_lock
);
837 ceph_assert(!m_remote_pool_watcher
);
838 m_remote_pool_watcher
.reset(PoolWatcher
<I
>::create(
839 m_threads
, m_remote_io_ctx
, m_remote_pool_watcher_listener
));
841 auto ctx
= new FunctionContext([this, on_finish
](int r
) {
842 handle_init_remote_pool_watcher(r
, on_finish
);
844 m_remote_pool_watcher
->init(create_async_context_callback(
845 m_threads
->work_queue
, ctx
));
848 template <typename I
>
849 void PoolReplayer
<I
>::handle_init_remote_pool_watcher(
850 int r
, Context
*on_finish
) {
851 dout(10) << "r=" << r
<< dendl
;
853 // Technically nothing to do since the other side doesn't
854 // have mirroring enabled. Eventually the remote pool watcher will
855 // detect images (if mirroring is enabled), so no point propagating
856 // an error which would just busy-spin the state machines.
857 dout(0) << "remote peer does not have mirroring configured" << dendl
;
859 derr
<< "failed to retrieve remote images: " << cpp_strerror(r
) << dendl
;
860 on_finish
= new FunctionContext([on_finish
, r
](int) {
861 on_finish
->complete(r
);
863 shut_down_pool_watchers(on_finish
);
867 init_image_deleter(on_finish
);
870 template <typename I
>
871 void PoolReplayer
<I
>::init_image_deleter(Context
*on_finish
) {
874 Mutex::Locker
locker(m_lock
);
875 ceph_assert(!m_image_deleter
);
877 on_finish
= new FunctionContext([this, on_finish
](int r
) {
878 handle_init_image_deleter(r
, on_finish
);
880 m_image_deleter
.reset(ImageDeleter
<I
>::create(m_local_io_ctx
, m_threads
,
882 m_image_deleter
->init(create_async_context_callback(
883 m_threads
->work_queue
, on_finish
));
886 template <typename I
>
887 void PoolReplayer
<I
>::handle_init_image_deleter(int r
, Context
*on_finish
) {
888 dout(10) << "r=" << r
<< dendl
;
890 derr
<< "failed to init image deleter: " << cpp_strerror(r
) << dendl
;
891 on_finish
= new FunctionContext([on_finish
, r
](int) {
892 on_finish
->complete(r
);
894 shut_down_image_deleter(on_finish
);
898 on_finish
->complete(0);
900 Mutex::Locker
locker(m_lock
);
904 template <typename I
>
905 void PoolReplayer
<I
>::shut_down_image_deleter(Context
* on_finish
) {
908 Mutex::Locker
locker(m_lock
);
909 if (m_image_deleter
) {
910 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
911 handle_shut_down_image_deleter(r
, on_finish
);
913 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
915 m_image_deleter
->shut_down(ctx
);
919 shut_down_pool_watchers(on_finish
);
922 template <typename I
>
923 void PoolReplayer
<I
>::handle_shut_down_image_deleter(
924 int r
, Context
* on_finish
) {
925 dout(10) << "r=" << r
<< dendl
;
928 Mutex::Locker
locker(m_lock
);
929 ceph_assert(m_image_deleter
);
930 m_image_deleter
.reset();
933 shut_down_pool_watchers(on_finish
);
936 template <typename I
>
937 void PoolReplayer
<I
>::shut_down_pool_watchers(Context
*on_finish
) {
941 Mutex::Locker
locker(m_lock
);
942 if (m_local_pool_watcher
) {
943 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
944 handle_shut_down_pool_watchers(r
, on_finish
);
946 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
948 auto gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
949 m_local_pool_watcher
->shut_down(gather_ctx
->new_sub());
950 if (m_remote_pool_watcher
) {
951 m_remote_pool_watcher
->shut_down(gather_ctx
->new_sub());
953 gather_ctx
->activate();
958 on_finish
->complete(0);
961 template <typename I
>
962 void PoolReplayer
<I
>::handle_shut_down_pool_watchers(
963 int r
, Context
*on_finish
) {
964 dout(10) << "r=" << r
<< dendl
;
967 Mutex::Locker
locker(m_lock
);
968 ceph_assert(m_local_pool_watcher
);
969 m_local_pool_watcher
.reset();
971 if (m_remote_pool_watcher
) {
972 m_remote_pool_watcher
.reset();
975 wait_for_update_ops(on_finish
);
978 template <typename I
>
979 void PoolReplayer
<I
>::wait_for_update_ops(Context
*on_finish
) {
982 Mutex::Locker
locker(m_lock
);
984 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
985 handle_wait_for_update_ops(r
, on_finish
);
987 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
989 m_update_op_tracker
.wait_for_ops(ctx
);
992 template <typename I
>
993 void PoolReplayer
<I
>::handle_wait_for_update_ops(int r
, Context
*on_finish
) {
994 dout(10) << "r=" << r
<< dendl
;
997 shut_down_image_map(on_finish
);
1000 template <typename I
>
1001 void PoolReplayer
<I
>::shut_down_image_map(Context
*on_finish
) {
1005 Mutex::Locker
locker(m_lock
);
1007 on_finish
= new FunctionContext([this, on_finish
](int r
) {
1008 handle_shut_down_image_map(r
, on_finish
);
1010 m_image_map
->shut_down(create_async_context_callback(
1011 m_threads
->work_queue
, on_finish
));
1016 on_finish
->complete(0);
1019 template <typename I
>
1020 void PoolReplayer
<I
>::handle_shut_down_image_map(int r
, Context
*on_finish
) {
1021 dout(5) << "r=" << r
<< dendl
;
1022 if (r
< 0 && r
!= -EBLACKLISTED
) {
1023 derr
<< "failed to shut down image map: " << cpp_strerror(r
) << dendl
;
1026 Mutex::Locker
locker(m_lock
);
1027 ceph_assert(m_image_map
);
1028 m_image_map
.reset();
1030 m_instance_replayer
->release_all(on_finish
);
1033 template <typename I
>
1034 void PoolReplayer
<I
>::handle_update_leader(
1035 const std::string
&leader_instance_id
) {
1036 dout(10) << "leader_instance_id=" << leader_instance_id
<< dendl
;
1038 m_instance_watcher
->handle_update_leader(leader_instance_id
);
1041 template <typename I
>
1042 void PoolReplayer
<I
>::handle_acquire_image(const std::string
&global_image_id
,
1043 const std::string
&instance_id
,
1044 Context
* on_finish
) {
1045 dout(5) << "global_image_id=" << global_image_id
<< ", "
1046 << "instance_id=" << instance_id
<< dendl
;
1048 m_instance_watcher
->notify_image_acquire(instance_id
, global_image_id
,
1052 template <typename I
>
1053 void PoolReplayer
<I
>::handle_release_image(const std::string
&global_image_id
,
1054 const std::string
&instance_id
,
1055 Context
* on_finish
) {
1056 dout(5) << "global_image_id=" << global_image_id
<< ", "
1057 << "instance_id=" << instance_id
<< dendl
;
1059 m_instance_watcher
->notify_image_release(instance_id
, global_image_id
,
1063 template <typename I
>
1064 void PoolReplayer
<I
>::handle_remove_image(const std::string
&mirror_uuid
,
1065 const std::string
&global_image_id
,
1066 const std::string
&instance_id
,
1067 Context
* on_finish
) {
1068 ceph_assert(!mirror_uuid
.empty());
1069 dout(5) << "mirror_uuid=" << mirror_uuid
<< ", "
1070 << "global_image_id=" << global_image_id
<< ", "
1071 << "instance_id=" << instance_id
<< dendl
;
1073 m_instance_watcher
->notify_peer_image_removed(instance_id
, global_image_id
,
1074 mirror_uuid
, on_finish
);
1077 template <typename I
>
1078 void PoolReplayer
<I
>::handle_instances_added(const InstanceIds
&instance_ids
) {
1079 dout(5) << "instance_ids=" << instance_ids
<< dendl
;
1080 Mutex::Locker
locker(m_lock
);
1081 if (!m_leader_watcher
->is_leader()) {
1085 ceph_assert(m_image_map
);
1086 m_image_map
->update_instances_added(instance_ids
);
1089 template <typename I
>
1090 void PoolReplayer
<I
>::handle_instances_removed(
1091 const InstanceIds
&instance_ids
) {
1092 dout(5) << "instance_ids=" << instance_ids
<< dendl
;
1093 Mutex::Locker
locker(m_lock
);
1094 if (!m_leader_watcher
->is_leader()) {
1098 ceph_assert(m_image_map
);
1099 m_image_map
->update_instances_removed(instance_ids
);
1102 } // namespace mirror
1105 template class rbd::mirror::PoolReplayer
<librbd::ImageCtx
>;