1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PoolReplayer.h"
5 #include "common/Cond.h"
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "cls/rbd/cls_rbd_client.h"
14 #include "global/global_context.h"
15 #include "librbd/api/Config.h"
16 #include "librbd/api/Namespace.h"
17 #include "PoolMetaCache.h"
18 #include "RemotePoolPoller.h"
19 #include "ServiceDaemon.h"
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rbd_mirror
25 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
26 << this << " " << __func__ << ": "
35 const std::string
SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
36 const std::string
SERVICE_DAEMON_LEADER_KEY("leader");
38 const std::vector
<std::string
> UNIQUE_PEER_CONFIG_KEYS
{
39 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
42 class PoolReplayerAdminSocketCommand
{
44 PoolReplayerAdminSocketCommand(PoolReplayer
<I
> *pool_replayer
)
45 : pool_replayer(pool_replayer
) {
47 virtual ~PoolReplayerAdminSocketCommand() {}
48 virtual int call(Formatter
*f
) = 0;
50 PoolReplayer
<I
> *pool_replayer
;
54 class StatusCommand
: public PoolReplayerAdminSocketCommand
<I
> {
56 explicit StatusCommand(PoolReplayer
<I
> *pool_replayer
)
57 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
60 int call(Formatter
*f
) override
{
61 this->pool_replayer
->print_status(f
);
67 class StartCommand
: public PoolReplayerAdminSocketCommand
<I
> {
69 explicit StartCommand(PoolReplayer
<I
> *pool_replayer
)
70 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
73 int call(Formatter
*f
) override
{
74 this->pool_replayer
->start();
80 class StopCommand
: public PoolReplayerAdminSocketCommand
<I
> {
82 explicit StopCommand(PoolReplayer
<I
> *pool_replayer
)
83 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
86 int call(Formatter
*f
) override
{
87 this->pool_replayer
->stop(true);
93 class RestartCommand
: public PoolReplayerAdminSocketCommand
<I
> {
95 explicit RestartCommand(PoolReplayer
<I
> *pool_replayer
)
96 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
99 int call(Formatter
*f
) override
{
100 this->pool_replayer
->restart();
105 template <typename I
>
106 class FlushCommand
: public PoolReplayerAdminSocketCommand
<I
> {
108 explicit FlushCommand(PoolReplayer
<I
> *pool_replayer
)
109 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
112 int call(Formatter
*f
) override
{
113 this->pool_replayer
->flush();
118 template <typename I
>
119 class LeaderReleaseCommand
: public PoolReplayerAdminSocketCommand
<I
> {
121 explicit LeaderReleaseCommand(PoolReplayer
<I
> *pool_replayer
)
122 : PoolReplayerAdminSocketCommand
<I
>(pool_replayer
) {
125 int call(Formatter
*f
) override
{
126 this->pool_replayer
->release_leader();
131 template <typename I
>
132 class PoolReplayerAdminSocketHook
: public AdminSocketHook
{
134 PoolReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
135 PoolReplayer
<I
> *pool_replayer
)
136 : admin_socket(cct
->get_admin_socket()) {
140 command
= "rbd mirror status " + name
;
141 r
= admin_socket
->register_command(command
, this,
142 "get status for rbd mirror " + name
);
144 commands
[command
] = new StatusCommand
<I
>(pool_replayer
);
147 command
= "rbd mirror start " + name
;
148 r
= admin_socket
->register_command(command
, this,
149 "start rbd mirror " + name
);
151 commands
[command
] = new StartCommand
<I
>(pool_replayer
);
154 command
= "rbd mirror stop " + name
;
155 r
= admin_socket
->register_command(command
, this,
156 "stop rbd mirror " + name
);
158 commands
[command
] = new StopCommand
<I
>(pool_replayer
);
161 command
= "rbd mirror restart " + name
;
162 r
= admin_socket
->register_command(command
, this,
163 "restart rbd mirror " + name
);
165 commands
[command
] = new RestartCommand
<I
>(pool_replayer
);
168 command
= "rbd mirror flush " + name
;
169 r
= admin_socket
->register_command(command
, this,
170 "flush rbd mirror " + name
);
172 commands
[command
] = new FlushCommand
<I
>(pool_replayer
);
175 command
= "rbd mirror leader release " + name
;
176 r
= admin_socket
->register_command(command
, this,
177 "release rbd mirror leader " + name
);
179 commands
[command
] = new LeaderReleaseCommand
<I
>(pool_replayer
);
183 ~PoolReplayerAdminSocketHook() override
{
184 (void)admin_socket
->unregister_commands(this);
185 for (auto i
= commands
.begin(); i
!= commands
.end(); ++i
) {
190 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
193 bufferlist
& out
) override
{
194 auto i
= commands
.find(command
);
195 ceph_assert(i
!= commands
.end());
196 return i
->second
->call(f
);
200 typedef std::map
<std::string
, PoolReplayerAdminSocketCommand
<I
>*,
201 std::less
<>> Commands
;
203 AdminSocket
*admin_socket
;
207 } // anonymous namespace
209 template <typename I
>
210 struct PoolReplayer
<I
>::RemotePoolPollerListener
211 : public remote_pool_poller::Listener
{
213 PoolReplayer
<I
>* m_pool_replayer
;
215 RemotePoolPollerListener(PoolReplayer
<I
>* pool_replayer
)
216 : m_pool_replayer(pool_replayer
) {
219 void handle_updated(const RemotePoolMeta
& remote_pool_meta
) override
{
220 m_pool_replayer
->handle_remote_pool_meta_updated(remote_pool_meta
);
224 template <typename I
>
225 PoolReplayer
<I
>::PoolReplayer(
226 Threads
<I
> *threads
, ServiceDaemon
<I
> *service_daemon
,
227 journal::CacheManagerHandler
*cache_manager_handler
,
228 PoolMetaCache
* pool_meta_cache
, int64_t local_pool_id
,
229 const PeerSpec
&peer
, const std::vector
<const char*> &args
) :
231 m_service_daemon(service_daemon
),
232 m_cache_manager_handler(cache_manager_handler
),
233 m_pool_meta_cache(pool_meta_cache
),
234 m_local_pool_id(local_pool_id
),
237 m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer
))),
238 m_pool_replayer_thread(this),
239 m_leader_listener(this) {
242 template <typename I
>
243 PoolReplayer
<I
>::~PoolReplayer()
247 ceph_assert(m_asok_hook
== nullptr);
250 template <typename I
>
251 bool PoolReplayer
<I
>::is_blocklisted() const {
252 std::lock_guard locker
{m_lock
};
253 return m_blocklisted
;
256 template <typename I
>
257 bool PoolReplayer
<I
>::is_leader() const {
258 std::lock_guard locker
{m_lock
};
259 return m_leader_watcher
&& m_leader_watcher
->is_leader();
262 template <typename I
>
263 bool PoolReplayer
<I
>::is_running() const {
264 return m_pool_replayer_thread
.is_started() && !m_stopping
;
267 template <typename I
>
268 void PoolReplayer
<I
>::init(const std::string
& site_name
) {
269 std::lock_guard locker
{m_lock
};
271 ceph_assert(!m_pool_replayer_thread
.is_started());
275 m_blocklisted
= false;
276 m_site_name
= site_name
;
278 dout(10) << "replaying for " << m_peer
<< dendl
;
279 int r
= init_rados(g_ceph_context
->_conf
->cluster
,
280 g_ceph_context
->_conf
->name
.to_str(),
281 "", "", "local cluster", &m_local_rados
, false);
283 m_callout_id
= m_service_daemon
->add_or_update_callout(
284 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
285 "unable to connect to local cluster");
289 r
= init_rados(m_peer
.cluster_name
, m_peer
.client_name
,
290 m_peer
.mon_host
, m_peer
.key
,
291 std::string("remote peer ") + stringify(m_peer
),
292 &m_remote_rados
, true);
294 m_callout_id
= m_service_daemon
->add_or_update_callout(
295 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
296 "unable to connect to remote cluster");
300 r
= m_local_rados
->ioctx_create2(m_local_pool_id
, m_local_io_ctx
);
302 derr
<< "error accessing local pool " << m_local_pool_id
<< ": "
303 << cpp_strerror(r
) << dendl
;
307 auto cct
= reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct());
308 librbd::api::Config
<I
>::apply_pool_overrides(m_local_io_ctx
, &cct
->_conf
);
310 r
= librbd::cls_client::mirror_uuid_get(&m_local_io_ctx
,
311 &m_local_mirror_uuid
);
313 derr
<< "failed to retrieve local mirror uuid from pool "
314 << m_local_io_ctx
.get_pool_name() << ": " << cpp_strerror(r
) << dendl
;
315 m_callout_id
= m_service_daemon
->add_or_update_callout(
316 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
317 "unable to query local mirror uuid");
321 r
= m_remote_rados
->ioctx_create(m_local_io_ctx
.get_pool_name().c_str(),
324 derr
<< "error accessing remote pool " << m_local_io_ctx
.get_pool_name()
325 << ": " << cpp_strerror(r
) << dendl
;
326 m_callout_id
= m_service_daemon
->add_or_update_callout(
327 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_WARNING
,
328 "unable to access remote pool");
332 dout(10) << "connected to " << m_peer
<< dendl
;
334 m_image_sync_throttler
.reset(
335 Throttler
<I
>::create(cct
, "rbd_mirror_concurrent_image_syncs"));
337 m_image_deletion_throttler
.reset(
338 Throttler
<I
>::create(cct
, "rbd_mirror_concurrent_image_deletions"));
340 m_remote_pool_poller_listener
.reset(new RemotePoolPollerListener(this));
341 m_remote_pool_poller
.reset(RemotePoolPoller
<I
>::create(
342 m_threads
, m_remote_io_ctx
, m_site_name
, m_local_mirror_uuid
,
343 *m_remote_pool_poller_listener
));
345 C_SaferCond on_pool_poller_init
;
346 m_remote_pool_poller
->init(&on_pool_poller_init
);
347 r
= on_pool_poller_init
.wait();
349 derr
<< "failed to initialize remote pool poller: " << cpp_strerror(r
)
351 m_callout_id
= m_service_daemon
->add_or_update_callout(
352 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
353 "unable to initialize remote pool poller");
354 m_remote_pool_poller
.reset();
357 ceph_assert(!m_remote_pool_meta
.mirror_uuid
.empty());
358 m_pool_meta_cache
->set_remote_pool_meta(
359 m_remote_io_ctx
.get_id(), m_remote_pool_meta
);
360 m_pool_meta_cache
->set_local_pool_meta(
361 m_local_io_ctx
.get_id(), {m_local_mirror_uuid
});
363 m_default_namespace_replayer
.reset(NamespaceReplayer
<I
>::create(
364 "", m_local_io_ctx
, m_remote_io_ctx
, m_local_mirror_uuid
, m_peer
.uuid
,
365 m_remote_pool_meta
, m_threads
, m_image_sync_throttler
.get(),
366 m_image_deletion_throttler
.get(), m_service_daemon
,
367 m_cache_manager_handler
, m_pool_meta_cache
));
370 m_default_namespace_replayer
->init(&on_init
);
373 derr
<< "error initializing default namespace replayer: " << cpp_strerror(r
)
375 m_callout_id
= m_service_daemon
->add_or_update_callout(
376 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
377 "unable to initialize default namespace replayer");
378 m_default_namespace_replayer
.reset();
382 m_leader_watcher
.reset(LeaderWatcher
<I
>::create(m_threads
, m_local_io_ctx
,
383 &m_leader_listener
));
384 r
= m_leader_watcher
->init();
386 derr
<< "error initializing leader watcher: " << cpp_strerror(r
) << dendl
;
387 m_callout_id
= m_service_daemon
->add_or_update_callout(
388 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
389 "unable to initialize leader messenger object");
390 m_leader_watcher
.reset();
394 if (m_callout_id
!= service_daemon::CALLOUT_ID_NONE
) {
395 m_service_daemon
->remove_callout(m_local_pool_id
, m_callout_id
);
396 m_callout_id
= service_daemon::CALLOUT_ID_NONE
;
399 m_service_daemon
->add_or_update_attribute(
400 m_local_io_ctx
.get_id(), SERVICE_DAEMON_INSTANCE_ID_KEY
,
401 stringify(m_local_io_ctx
.get_instance_id()));
403 m_pool_replayer_thread
.create("pool replayer");
406 template <typename I
>
407 void PoolReplayer
<I
>::shut_down() {
409 std::lock_guard l
{m_lock
};
413 if (m_pool_replayer_thread
.is_started()) {
414 m_pool_replayer_thread
.join();
417 if (m_leader_watcher
) {
418 m_leader_watcher
->shut_down();
420 m_leader_watcher
.reset();
422 if (m_default_namespace_replayer
) {
423 C_SaferCond on_shut_down
;
424 m_default_namespace_replayer
->shut_down(&on_shut_down
);
427 m_default_namespace_replayer
.reset();
429 if (m_remote_pool_poller
) {
431 m_remote_pool_poller
->shut_down(&ctx
);
434 m_pool_meta_cache
->remove_remote_pool_meta(m_remote_io_ctx
.get_id());
435 m_pool_meta_cache
->remove_local_pool_meta(m_local_io_ctx
.get_id());
437 m_remote_pool_poller
.reset();
438 m_remote_pool_poller_listener
.reset();
440 m_image_sync_throttler
.reset();
441 m_image_deletion_throttler
.reset();
443 m_local_rados
.reset();
444 m_remote_rados
.reset();
447 template <typename I
>
448 int PoolReplayer
<I
>::init_rados(const std::string
&cluster_name
,
449 const std::string
&client_name
,
450 const std::string
&mon_host
,
451 const std::string
&key
,
452 const std::string
&description
,
454 bool strip_cluster_overrides
) {
455 // NOTE: manually bootstrap a CephContext here instead of via
456 // the librados API to avoid mixing global singletons between
457 // the librados shared library and the daemon
458 // TODO: eliminate intermingling of global singletons within Ceph APIs
459 CephInitParameters
iparams(CEPH_ENTITY_TYPE_CLIENT
);
460 if (client_name
.empty() || !iparams
.name
.from_str(client_name
)) {
461 derr
<< "error initializing cluster handle for " << description
<< dendl
;
465 CephContext
*cct
= common_preinit(iparams
, CODE_ENVIRONMENT_LIBRARY
,
466 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS
);
467 cct
->_conf
->cluster
= cluster_name
;
469 // librados::Rados::conf_read_file
470 int r
= cct
->_conf
.parse_config_files(nullptr, nullptr, 0);
471 if (r
< 0 && r
!= -ENOENT
) {
472 // do not treat this as fatal, it might still be able to connect
473 derr
<< "could not read ceph conf for " << description
<< ": "
474 << cpp_strerror(r
) << dendl
;
477 // preserve cluster-specific config settings before applying environment/cli
479 std::map
<std::string
, std::string
> config_values
;
480 if (strip_cluster_overrides
) {
481 // remote peer connections shouldn't apply cluster-specific
482 // configuration settings
483 for (auto& key
: UNIQUE_PEER_CONFIG_KEYS
) {
484 config_values
[key
] = cct
->_conf
.get_val
<std::string
>(key
);
488 cct
->_conf
.parse_env(cct
->get_module_type());
490 // librados::Rados::conf_parse_env
491 std::vector
<const char*> args
;
492 r
= cct
->_conf
.parse_argv(args
);
494 derr
<< "could not parse environment for " << description
<< ":"
495 << cpp_strerror(r
) << dendl
;
499 cct
->_conf
.parse_env(cct
->get_module_type());
501 if (!m_args
.empty()) {
502 // librados::Rados::conf_parse_argv
504 r
= cct
->_conf
.parse_argv(args
);
506 derr
<< "could not parse command line args for " << description
<< ": "
507 << cpp_strerror(r
) << dendl
;
513 if (strip_cluster_overrides
) {
514 // remote peer connections shouldn't apply cluster-specific
515 // configuration settings
516 for (auto& pair
: config_values
) {
517 auto value
= cct
->_conf
.get_val
<std::string
>(pair
.first
);
518 if (pair
.second
!= value
) {
519 dout(0) << "reverting global config option override: "
520 << pair
.first
<< ": " << value
<< " -> " << pair
.second
522 cct
->_conf
.set_val_or_die(pair
.first
, pair
.second
);
527 if (!g_ceph_context
->_conf
->admin_socket
.empty()) {
528 cct
->_conf
.set_val_or_die("admin_socket",
529 "$run_dir/$name.$pid.$cluster.$cctid.asok");
532 if (!mon_host
.empty()) {
533 r
= cct
->_conf
.set_val("mon_host", mon_host
);
535 derr
<< "failed to set mon_host config for " << description
<< ": "
536 << cpp_strerror(r
) << dendl
;
543 r
= cct
->_conf
.set_val("key", key
);
545 derr
<< "failed to set key config for " << description
<< ": "
546 << cpp_strerror(r
) << dendl
;
552 // disable unnecessary librbd cache
553 cct
->_conf
.set_val_or_die("rbd_cache", "false");
554 cct
->_conf
.apply_changes(nullptr);
555 cct
->_conf
.complain_about_parse_error(cct
);
557 rados_ref
->reset(new librados::Rados());
559 r
= (*rados_ref
)->init_with_context(cct
);
563 r
= (*rados_ref
)->connect();
565 derr
<< "error connecting to " << description
<< ": "
566 << cpp_strerror(r
) << dendl
;
573 template <typename I
>
574 void PoolReplayer
<I
>::run() {
578 std::string asok_hook_name
= m_local_io_ctx
.get_pool_name() + " " +
580 if (m_asok_hook_name
!= asok_hook_name
|| m_asok_hook
== nullptr) {
581 m_asok_hook_name
= asok_hook_name
;
584 m_asok_hook
= new PoolReplayerAdminSocketHook
<I
>(g_ceph_context
,
585 m_asok_hook_name
, this);
588 with_namespace_replayers([this]() { update_namespace_replayers(); });
590 std::unique_lock locker
{m_lock
};
592 if (m_leader_watcher
->is_blocklisted() ||
593 m_default_namespace_replayer
->is_blocklisted()) {
594 m_blocklisted
= true;
598 for (auto &it
: m_namespace_replayers
) {
599 if (it
.second
->is_blocklisted()) {
600 m_blocklisted
= true;
610 auto seconds
= g_ceph_context
->_conf
.get_val
<uint64_t>(
611 "rbd_mirror_pool_replayers_refresh_interval");
612 m_cond
.wait_for(locker
, ceph::make_timespan(seconds
));
615 // shut down namespace replayers
616 with_namespace_replayers([this]() { update_namespace_replayers(); });
619 m_asok_hook
= nullptr;
622 template <typename I
>
623 void PoolReplayer
<I
>::update_namespace_replayers() {
626 ceph_assert(ceph_mutex_is_locked(m_lock
));
628 std::set
<std::string
> mirroring_namespaces
;
630 int r
= list_mirroring_namespaces(&mirroring_namespaces
);
636 auto cct
= reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct());
638 auto gather_ctx
= new C_Gather(cct
, &cond
);
639 for (auto it
= m_namespace_replayers
.begin();
640 it
!= m_namespace_replayers
.end(); ) {
641 auto iter
= mirroring_namespaces
.find(it
->first
);
642 if (iter
== mirroring_namespaces
.end()) {
643 auto namespace_replayer
= it
->second
;
644 auto on_shut_down
= new LambdaContext(
645 [namespace_replayer
, ctx
=gather_ctx
->new_sub()](int r
) {
646 delete namespace_replayer
;
649 m_service_daemon
->remove_namespace(m_local_pool_id
, it
->first
);
650 namespace_replayer
->shut_down(on_shut_down
);
651 it
= m_namespace_replayers
.erase(it
);
653 mirroring_namespaces
.erase(iter
);
658 for (auto &name
: mirroring_namespaces
) {
659 auto namespace_replayer
= NamespaceReplayer
<I
>::create(
660 name
, m_local_io_ctx
, m_remote_io_ctx
, m_local_mirror_uuid
, m_peer
.uuid
,
661 m_remote_pool_meta
, m_threads
, m_image_sync_throttler
.get(),
662 m_image_deletion_throttler
.get(), m_service_daemon
,
663 m_cache_manager_handler
, m_pool_meta_cache
);
664 auto on_init
= new LambdaContext(
665 [this, namespace_replayer
, name
, &mirroring_namespaces
,
666 ctx
=gather_ctx
->new_sub()](int r
) {
667 std::lock_guard locker
{m_lock
};
669 derr
<< "failed to initialize namespace replayer for namespace "
670 << name
<< ": " << cpp_strerror(r
) << dendl
;
671 delete namespace_replayer
;
672 mirroring_namespaces
.erase(name
);
674 m_namespace_replayers
[name
] = namespace_replayer
;
675 m_service_daemon
->add_namespace(m_local_pool_id
, name
);
679 namespace_replayer
->init(on_init
);
682 gather_ctx
->activate();
689 C_SaferCond acquire_cond
;
690 auto acquire_gather_ctx
= new C_Gather(cct
, &acquire_cond
);
692 for (auto &name
: mirroring_namespaces
) {
693 namespace_replayer_acquire_leader(name
, acquire_gather_ctx
->new_sub());
695 acquire_gather_ctx
->activate();
701 std::vector
<std::string
> instance_ids
;
702 m_leader_watcher
->list_instances(&instance_ids
);
704 for (auto &name
: mirroring_namespaces
) {
705 auto it
= m_namespace_replayers
.find(name
);
706 if (it
== m_namespace_replayers
.end()) {
707 // acuire leader for this namespace replayer failed
710 it
->second
->handle_instances_added(instance_ids
);
713 std::string leader_instance_id
;
714 if (m_leader_watcher
->get_leader_instance_id(&leader_instance_id
)) {
715 for (auto &name
: mirroring_namespaces
) {
716 m_namespace_replayers
[name
]->handle_update_leader(leader_instance_id
);
722 template <typename I
>
723 int PoolReplayer
<I
>::list_mirroring_namespaces(
724 std::set
<std::string
> *namespaces
) {
725 ceph_assert(ceph_mutex_is_locked(m_lock
));
727 std::vector
<std::string
> names
;
729 int r
= librbd::api::Namespace
<I
>::list(m_local_io_ctx
, &names
);
731 derr
<< "failed to list namespaces: " << cpp_strerror(r
) << dendl
;
735 for (auto &name
: names
) {
736 cls::rbd::MirrorMode mirror_mode
= cls::rbd::MIRROR_MODE_DISABLED
;
737 int r
= librbd::cls_client::mirror_mode_get(&m_local_io_ctx
, &mirror_mode
);
738 if (r
< 0 && r
!= -ENOENT
) {
739 derr
<< "failed to get namespace mirror mode: " << cpp_strerror(r
)
741 if (m_namespace_replayers
.count(name
) == 0) {
744 } else if (mirror_mode
== cls::rbd::MIRROR_MODE_DISABLED
) {
745 dout(10) << "mirroring is disabled for namespace " << name
<< dendl
;
749 namespaces
->insert(name
);
755 template <typename I
>
756 void PoolReplayer
<I
>::reopen_logs()
758 std::lock_guard locker
{m_lock
};
761 reinterpret_cast<CephContext
*>(m_local_rados
->cct())->reopen_logs();
763 if (m_remote_rados
) {
764 reinterpret_cast<CephContext
*>(m_remote_rados
->cct())->reopen_logs();
768 template <typename I
>
769 void PoolReplayer
<I
>::namespace_replayer_acquire_leader(const std::string
&name
,
770 Context
*on_finish
) {
771 ceph_assert(ceph_mutex_is_locked(m_lock
));
773 auto it
= m_namespace_replayers
.find(name
);
774 ceph_assert(it
!= m_namespace_replayers
.end());
776 on_finish
= new LambdaContext(
777 [this, name
, on_finish
](int r
) {
779 derr
<< "failed to handle acquire leader for namespace: "
780 << name
<< ": " << cpp_strerror(r
) << dendl
;
782 // remove the namespace replayer -- update_namespace_replayers will
783 // retry to create it and acquire leader.
785 std::lock_guard locker
{m_lock
};
787 auto namespace_replayer
= m_namespace_replayers
[name
];
788 m_namespace_replayers
.erase(name
);
789 auto on_shut_down
= new LambdaContext(
790 [namespace_replayer
, on_finish
](int r
) {
791 delete namespace_replayer
;
792 on_finish
->complete(r
);
794 m_service_daemon
->remove_namespace(m_local_pool_id
, name
);
795 namespace_replayer
->shut_down(on_shut_down
);
798 on_finish
->complete(0);
801 it
->second
->handle_acquire_leader(on_finish
);
804 template <typename I
>
805 void PoolReplayer
<I
>::print_status(Formatter
*f
) {
810 std::lock_guard l
{m_lock
};
812 f
->open_object_section("pool_replayer_status");
813 f
->dump_stream("peer") << m_peer
;
814 if (m_local_io_ctx
.is_valid()) {
815 f
->dump_string("pool", m_local_io_ctx
.get_pool_name());
816 f
->dump_stream("instance_id") << m_local_io_ctx
.get_instance_id();
819 std::string
state("running");
821 state
= "stopped (manual)";
822 } else if (m_stopping
) {
824 } else if (!is_running()) {
827 f
->dump_string("state", state
);
829 if (m_leader_watcher
) {
830 std::string leader_instance_id
;
831 m_leader_watcher
->get_leader_instance_id(&leader_instance_id
);
832 f
->dump_string("leader_instance_id", leader_instance_id
);
834 bool leader
= m_leader_watcher
->is_leader();
835 f
->dump_bool("leader", leader
);
837 std::vector
<std::string
> instance_ids
;
838 m_leader_watcher
->list_instances(&instance_ids
);
839 f
->open_array_section("instances");
840 for (auto instance_id
: instance_ids
) {
841 f
->dump_string("instance_id", instance_id
);
843 f
->close_section(); // instances
848 auto cct
= reinterpret_cast<CephContext
*>(m_local_rados
->cct());
849 f
->dump_string("local_cluster_admin_socket",
850 cct
->_conf
.get_val
<std::string
>("admin_socket"));
852 if (m_remote_rados
) {
853 auto cct
= reinterpret_cast<CephContext
*>(m_remote_rados
->cct());
854 f
->dump_string("remote_cluster_admin_socket",
855 cct
->_conf
.get_val
<std::string
>("admin_socket"));
858 if (m_image_sync_throttler
) {
859 f
->open_object_section("sync_throttler");
860 m_image_sync_throttler
->print_status(f
);
861 f
->close_section(); // sync_throttler
864 if (m_image_deletion_throttler
) {
865 f
->open_object_section("deletion_throttler");
866 m_image_deletion_throttler
->print_status(f
);
867 f
->close_section(); // deletion_throttler
870 if (m_default_namespace_replayer
) {
871 m_default_namespace_replayer
->print_status(f
);
874 f
->open_array_section("namespaces");
875 for (auto &it
: m_namespace_replayers
) {
876 f
->open_object_section("namespace");
877 f
->dump_string("name", it
.first
);
878 it
.second
->print_status(f
);
879 f
->close_section(); // namespace
881 f
->close_section(); // namespaces
883 f
->close_section(); // pool_replayer_status
886 template <typename I
>
887 void PoolReplayer
<I
>::start() {
890 std::lock_guard l
{m_lock
};
896 m_manual_stop
= false;
898 if (m_default_namespace_replayer
) {
899 m_default_namespace_replayer
->start();
901 for (auto &it
: m_namespace_replayers
) {
906 template <typename I
>
907 void PoolReplayer
<I
>::stop(bool manual
) {
908 dout(20) << "enter: manual=" << manual
<< dendl
;
910 std::lock_guard l
{m_lock
};
915 } else if (m_stopping
) {
919 m_manual_stop
= true;
921 if (m_default_namespace_replayer
) {
922 m_default_namespace_replayer
->stop();
924 for (auto &it
: m_namespace_replayers
) {
929 template <typename I
>
930 void PoolReplayer
<I
>::restart() {
933 std::lock_guard l
{m_lock
};
939 if (m_default_namespace_replayer
) {
940 m_default_namespace_replayer
->restart();
942 for (auto &it
: m_namespace_replayers
) {
943 it
.second
->restart();
947 template <typename I
>
948 void PoolReplayer
<I
>::flush() {
951 std::lock_guard l
{m_lock
};
953 if (m_stopping
|| m_manual_stop
) {
957 if (m_default_namespace_replayer
) {
958 m_default_namespace_replayer
->flush();
960 for (auto &it
: m_namespace_replayers
) {
965 template <typename I
>
966 void PoolReplayer
<I
>::release_leader() {
969 std::lock_guard l
{m_lock
};
971 if (m_stopping
|| !m_leader_watcher
) {
975 m_leader_watcher
->release_leader();
978 template <typename I
>
979 void PoolReplayer
<I
>::handle_post_acquire_leader(Context
*on_finish
) {
982 with_namespace_replayers(
983 [this](Context
*on_finish
) {
984 dout(10) << "handle_post_acquire_leader" << dendl
;
986 ceph_assert(ceph_mutex_is_locked(m_lock
));
988 m_service_daemon
->add_or_update_attribute(m_local_pool_id
,
989 SERVICE_DAEMON_LEADER_KEY
,
991 auto ctx
= new LambdaContext(
992 [this, on_finish
](int r
) {
994 std::lock_guard locker
{m_lock
};
997 on_finish
->complete(r
);
1000 auto cct
= reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct());
1001 auto gather_ctx
= new C_Gather(cct
, ctx
);
1003 m_default_namespace_replayer
->handle_acquire_leader(
1004 gather_ctx
->new_sub());
1006 for (auto &it
: m_namespace_replayers
) {
1007 namespace_replayer_acquire_leader(it
.first
, gather_ctx
->new_sub());
1010 gather_ctx
->activate();
1014 template <typename I
>
1015 void PoolReplayer
<I
>::handle_pre_release_leader(Context
*on_finish
) {
1018 with_namespace_replayers(
1019 [this](Context
*on_finish
) {
1020 dout(10) << "handle_pre_release_leader" << dendl
;
1022 ceph_assert(ceph_mutex_is_locked(m_lock
));
1025 m_service_daemon
->remove_attribute(m_local_pool_id
,
1026 SERVICE_DAEMON_LEADER_KEY
);
1028 auto cct
= reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct());
1029 auto gather_ctx
= new C_Gather(cct
, on_finish
);
1031 m_default_namespace_replayer
->handle_release_leader(
1032 gather_ctx
->new_sub());
1034 for (auto &it
: m_namespace_replayers
) {
1035 it
.second
->handle_release_leader(gather_ctx
->new_sub());
1038 gather_ctx
->activate();
1042 template <typename I
>
1043 void PoolReplayer
<I
>::handle_update_leader(
1044 const std::string
&leader_instance_id
) {
1045 dout(10) << "leader_instance_id=" << leader_instance_id
<< dendl
;
1047 std::lock_guard locker
{m_lock
};
1049 m_default_namespace_replayer
->handle_update_leader(leader_instance_id
);
1051 for (auto &it
: m_namespace_replayers
) {
1052 it
.second
->handle_update_leader(leader_instance_id
);
1056 template <typename I
>
1057 void PoolReplayer
<I
>::handle_instances_added(
1058 const std::vector
<std::string
> &instance_ids
) {
1059 dout(5) << "instance_ids=" << instance_ids
<< dendl
;
1061 std::lock_guard locker
{m_lock
};
1062 if (!m_leader_watcher
->is_leader()) {
1066 m_default_namespace_replayer
->handle_instances_added(instance_ids
);
1068 for (auto &it
: m_namespace_replayers
) {
1069 it
.second
->handle_instances_added(instance_ids
);
1073 template <typename I
>
1074 void PoolReplayer
<I
>::handle_instances_removed(
1075 const std::vector
<std::string
> &instance_ids
) {
1076 dout(5) << "instance_ids=" << instance_ids
<< dendl
;
1078 std::lock_guard locker
{m_lock
};
1079 if (!m_leader_watcher
->is_leader()) {
1083 m_default_namespace_replayer
->handle_instances_removed(instance_ids
);
1085 for (auto &it
: m_namespace_replayers
) {
1086 it
.second
->handle_instances_removed(instance_ids
);
1090 template <typename I
>
1091 void PoolReplayer
<I
>::handle_remote_pool_meta_updated(
1092 const RemotePoolMeta
& remote_pool_meta
) {
1093 dout(5) << "remote_pool_meta=" << remote_pool_meta
<< dendl
;
1095 if (!m_default_namespace_replayer
) {
1096 m_remote_pool_meta
= remote_pool_meta
;
1100 derr
<< "remote pool metadata updated unexpectedly" << dendl
;
1101 std::unique_lock locker
{m_lock
};
1103 m_cond
.notify_all();
1106 } // namespace mirror
1109 template class rbd::mirror::PoolReplayer
<librbd::ImageCtx
>;