1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Mirror.h"
20 #include "InstanceReplayer.h"
21 #include "InstanceWatcher.h"
22 #include "LeaderWatcher.h"
23 #include "ServiceDaemon.h"
26 #define dout_context g_ceph_context
27 #define dout_subsys ceph_subsys_rbd_mirror
29 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30 << this << " " << __func__ << ": "
32 using std::chrono::seconds
;
35 using std::unique_ptr
;
38 using librbd::cls_client::dir_get_name
;
39 using librbd::util::create_async_context_callback
;
46 const std::string
SERVICE_DAEMON_LEADER_KEY("leader");
47 const std::string
SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48 const std::string
SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
50 class PoolReplayerAdminSocketCommand
{
52 PoolReplayerAdminSocketCommand(PoolReplayer
*pool_replayer
)
53 : pool_replayer(pool_replayer
) {
55 virtual ~PoolReplayerAdminSocketCommand() {}
56 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
58 PoolReplayer
*pool_replayer
;
61 class StatusCommand
: public PoolReplayerAdminSocketCommand
{
63 explicit StatusCommand(PoolReplayer
*pool_replayer
)
64 : PoolReplayerAdminSocketCommand(pool_replayer
) {
67 bool call(Formatter
*f
, stringstream
*ss
) override
{
68 pool_replayer
->print_status(f
, ss
);
73 class StartCommand
: public PoolReplayerAdminSocketCommand
{
75 explicit StartCommand(PoolReplayer
*pool_replayer
)
76 : PoolReplayerAdminSocketCommand(pool_replayer
) {
79 bool call(Formatter
*f
, stringstream
*ss
) override
{
80 pool_replayer
->start();
85 class StopCommand
: public PoolReplayerAdminSocketCommand
{
87 explicit StopCommand(PoolReplayer
*pool_replayer
)
88 : PoolReplayerAdminSocketCommand(pool_replayer
) {
91 bool call(Formatter
*f
, stringstream
*ss
) override
{
92 pool_replayer
->stop(true);
97 class RestartCommand
: public PoolReplayerAdminSocketCommand
{
99 explicit RestartCommand(PoolReplayer
*pool_replayer
)
100 : PoolReplayerAdminSocketCommand(pool_replayer
) {
103 bool call(Formatter
*f
, stringstream
*ss
) override
{
104 pool_replayer
->restart();
109 class FlushCommand
: public PoolReplayerAdminSocketCommand
{
111 explicit FlushCommand(PoolReplayer
*pool_replayer
)
112 : PoolReplayerAdminSocketCommand(pool_replayer
) {
115 bool call(Formatter
*f
, stringstream
*ss
) override
{
116 pool_replayer
->flush();
121 class LeaderReleaseCommand
: public PoolReplayerAdminSocketCommand
{
123 explicit LeaderReleaseCommand(PoolReplayer
*pool_replayer
)
124 : PoolReplayerAdminSocketCommand(pool_replayer
) {
127 bool call(Formatter
*f
, stringstream
*ss
) override
{
128 pool_replayer
->release_leader();
133 class PoolReplayerAdminSocketHook
: public AdminSocketHook
{
135 PoolReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
136 PoolReplayer
*pool_replayer
)
137 : admin_socket(cct
->get_admin_socket()) {
141 command
= "rbd mirror status " + name
;
142 r
= admin_socket
->register_command(command
, command
, this,
143 "get status for rbd mirror " + name
);
145 commands
[command
] = new StatusCommand(pool_replayer
);
148 command
= "rbd mirror start " + name
;
149 r
= admin_socket
->register_command(command
, command
, this,
150 "start rbd mirror " + name
);
152 commands
[command
] = new StartCommand(pool_replayer
);
155 command
= "rbd mirror stop " + name
;
156 r
= admin_socket
->register_command(command
, command
, this,
157 "stop rbd mirror " + name
);
159 commands
[command
] = new StopCommand(pool_replayer
);
162 command
= "rbd mirror restart " + name
;
163 r
= admin_socket
->register_command(command
, command
, this,
164 "restart rbd mirror " + name
);
166 commands
[command
] = new RestartCommand(pool_replayer
);
169 command
= "rbd mirror flush " + name
;
170 r
= admin_socket
->register_command(command
, command
, this,
171 "flush rbd mirror " + name
);
173 commands
[command
] = new FlushCommand(pool_replayer
);
176 command
= "rbd mirror leader release " + name
;
177 r
= admin_socket
->register_command(command
, command
, this,
178 "release rbd mirror leader " + name
);
180 commands
[command
] = new LeaderReleaseCommand(pool_replayer
);
184 ~PoolReplayerAdminSocketHook() override
{
185 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
187 (void)admin_socket
->unregister_command(i
->first
);
192 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
193 bufferlist
& out
) override
{
194 Commands::const_iterator i
= commands
.find(command
);
195 assert(i
!= commands
.end());
196 Formatter
*f
= Formatter::create(format
);
198 bool r
= i
->second
->call(f
, &ss
);
205 typedef std::map
<std::string
, PoolReplayerAdminSocketCommand
*> Commands
;
207 AdminSocket
*admin_socket
;
211 } // anonymous namespace
213 PoolReplayer::PoolReplayer(Threads
<librbd::ImageCtx
> *threads
,
214 ServiceDaemon
<librbd::ImageCtx
>* service_daemon
,
215 ImageDeleter
<>* image_deleter
,
216 int64_t local_pool_id
, const peer_t
&peer
,
217 const std::vector
<const char*> &args
) :
219 m_service_daemon(service_daemon
),
220 m_image_deleter(image_deleter
),
221 m_local_pool_id(local_pool_id
),
224 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer
)),
225 m_local_pool_watcher_listener(this, true),
226 m_remote_pool_watcher_listener(this, false),
227 m_pool_replayer_thread(this),
228 m_leader_listener(this)
232 PoolReplayer::~PoolReplayer()
238 bool PoolReplayer::is_blacklisted() const {
239 Mutex::Locker
locker(m_lock
);
240 return m_blacklisted
;
243 bool PoolReplayer::is_leader() const {
244 Mutex::Locker
locker(m_lock
);
245 return m_leader_watcher
&& m_leader_watcher
->is_leader();
248 bool PoolReplayer::is_running() const {
249 return m_pool_replayer_thread
.is_started();
252 void PoolReplayer::init()
254 assert(!m_pool_replayer_thread
.is_started());
258 m_blacklisted
= false;
260 dout(20) << "replaying for " << m_peer
<< dendl
;
261 int r
= init_rados(g_ceph_context
->_conf
->cluster
,
262 g_ceph_context
->_conf
->name
.to_str(),
263 "local cluster", &m_local_rados
);
265 m_callout_id
= m_service_daemon
->add_or_update_callout(
266 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
267 "unable to connect to local cluster");
271 r
= init_rados(m_peer
.cluster_name
, m_peer
.client_name
,
272 std::string("remote peer ") + stringify(m_peer
),
275 m_callout_id
= m_service_daemon
->add_or_update_callout(
276 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
277 "unable to connect to remote cluster");
281 r
= m_local_rados
->ioctx_create2(m_local_pool_id
, m_local_io_ctx
);
283 derr
<< "error accessing local pool " << m_local_pool_id
<< ": "
284 << cpp_strerror(r
) << dendl
;
288 std::string local_mirror_uuid
;
289 r
= librbd::cls_client::mirror_uuid_get(&m_local_io_ctx
,
292 derr
<< "failed to retrieve local mirror uuid from pool "
293 << m_local_io_ctx
.get_pool_name() << ": " << cpp_strerror(r
) << dendl
;
294 m_callout_id
= m_service_daemon
->add_or_update_callout(
295 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
296 "unable to query local mirror uuid");
300 r
= m_remote_rados
->ioctx_create(m_local_io_ctx
.get_pool_name().c_str(),
303 derr
<< "error accessing remote pool " << m_local_io_ctx
.get_pool_name()
304 << ": " << cpp_strerror(r
) << dendl
;
305 m_callout_id
= m_service_daemon
->add_or_update_callout(
306 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_WARNING
,
307 "unable to access remote pool");
311 dout(20) << "connected to " << m_peer
<< dendl
;
313 m_instance_replayer
.reset(InstanceReplayer
<>::create(
314 m_threads
, m_service_daemon
, m_image_deleter
, m_local_rados
,
315 local_mirror_uuid
, m_local_pool_id
));
316 m_instance_replayer
->init();
317 m_instance_replayer
->add_peer(m_peer
.uuid
, m_remote_io_ctx
);
319 m_instance_watcher
.reset(InstanceWatcher
<>::create(
320 m_local_io_ctx
, m_threads
->work_queue
, m_instance_replayer
.get()));
321 r
= m_instance_watcher
->init();
323 derr
<< "error initializing instance watcher: " << cpp_strerror(r
) << dendl
;
324 m_callout_id
= m_service_daemon
->add_or_update_callout(
325 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
326 "unable to initialize instance messenger object");
330 m_leader_watcher
.reset(new LeaderWatcher
<>(m_threads
, m_local_io_ctx
,
331 &m_leader_listener
));
332 r
= m_leader_watcher
->init();
334 derr
<< "error initializing leader watcher: " << cpp_strerror(r
) << dendl
;
335 m_callout_id
= m_service_daemon
->add_or_update_callout(
336 m_local_pool_id
, m_callout_id
, service_daemon::CALLOUT_LEVEL_ERROR
,
337 "unable to initialize leader messenger object");
341 if (m_callout_id
!= service_daemon::CALLOUT_ID_NONE
) {
342 m_service_daemon
->remove_callout(m_local_pool_id
, m_callout_id
);
343 m_callout_id
= service_daemon::CALLOUT_ID_NONE
;
346 m_pool_replayer_thread
.create("pool replayer");
349 void PoolReplayer::shut_down() {
352 Mutex::Locker
l(m_lock
);
355 if (m_pool_replayer_thread
.is_started()) {
356 m_pool_replayer_thread
.join();
358 if (m_leader_watcher
) {
359 m_leader_watcher
->shut_down();
360 m_leader_watcher
.reset();
362 if (m_instance_watcher
) {
363 m_instance_watcher
->shut_down();
364 m_instance_watcher
.reset();
366 if (m_instance_replayer
) {
367 m_instance_replayer
->shut_down();
368 m_instance_replayer
.reset();
371 assert(!m_local_pool_watcher
);
372 assert(!m_remote_pool_watcher
);
373 m_local_rados
.reset();
374 m_remote_rados
.reset();
377 int PoolReplayer::init_rados(const std::string
&cluster_name
,
378 const std::string
&client_name
,
379 const std::string
&description
,
380 RadosRef
*rados_ref
) {
381 rados_ref
->reset(new librados::Rados());
383 // NOTE: manually bootstrap a CephContext here instead of via
384 // the librados API to avoid mixing global singletons between
385 // the librados shared library and the daemon
386 // TODO: eliminate intermingling of global singletons within Ceph APIs
387 CephInitParameters
iparams(CEPH_ENTITY_TYPE_CLIENT
);
388 if (client_name
.empty() || !iparams
.name
.from_str(client_name
)) {
389 derr
<< "error initializing cluster handle for " << description
<< dendl
;
393 CephContext
*cct
= common_preinit(iparams
, CODE_ENVIRONMENT_LIBRARY
,
394 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS
);
395 cct
->_conf
->cluster
= cluster_name
;
397 // librados::Rados::conf_read_file
398 int r
= cct
->_conf
->parse_config_files(nullptr, nullptr, 0);
400 derr
<< "could not read ceph conf for " << description
<< ": "
401 << cpp_strerror(r
) << dendl
;
405 cct
->_conf
->parse_env();
407 // librados::Rados::conf_parse_env
408 std::vector
<const char*> args
;
409 env_to_vec(args
, nullptr);
410 r
= cct
->_conf
->parse_argv(args
);
412 derr
<< "could not parse environment for " << description
<< ":"
413 << cpp_strerror(r
) << dendl
;
418 if (!m_args
.empty()) {
419 // librados::Rados::conf_parse_argv
421 r
= cct
->_conf
->parse_argv(args
);
423 derr
<< "could not parse command line args for " << description
<< ": "
424 << cpp_strerror(r
) << dendl
;
430 if (!g_ceph_context
->_conf
->admin_socket
.empty()) {
431 cct
->_conf
->set_val_or_die("admin_socket",
432 "$run_dir/$name.$pid.$cluster.$cctid.asok");
435 // disable unnecessary librbd cache
436 cct
->_conf
->set_val_or_die("rbd_cache", "false");
437 cct
->_conf
->apply_changes(nullptr);
438 cct
->_conf
->complain_about_parse_errors(cct
);
440 r
= (*rados_ref
)->init_with_context(cct
);
444 r
= (*rados_ref
)->connect();
446 derr
<< "error connecting to " << description
<< ": "
447 << cpp_strerror(r
) << dendl
;
454 void PoolReplayer::run()
456 dout(20) << "enter" << dendl
;
458 while (!m_stopping
) {
459 std::string asok_hook_name
= m_local_io_ctx
.get_pool_name() + " " +
461 if (m_asok_hook_name
!= asok_hook_name
|| m_asok_hook
== nullptr) {
462 m_asok_hook_name
= asok_hook_name
;
465 m_asok_hook
= new PoolReplayerAdminSocketHook(g_ceph_context
,
466 m_asok_hook_name
, this);
469 Mutex::Locker
locker(m_lock
);
470 if ((m_local_pool_watcher
&& m_local_pool_watcher
->is_blacklisted()) ||
471 (m_remote_pool_watcher
&& m_remote_pool_watcher
->is_blacklisted())) {
472 m_blacklisted
= true;
478 m_cond
.WaitInterval(m_lock
, utime_t(1, 0));
483 void PoolReplayer::print_status(Formatter
*f
, stringstream
*ss
)
485 dout(20) << "enter" << dendl
;
491 Mutex::Locker
l(m_lock
);
493 f
->open_object_section("pool_replayer_status");
494 f
->dump_string("pool", m_local_io_ctx
.get_pool_name());
495 f
->dump_stream("peer") << m_peer
;
496 f
->dump_string("instance_id", m_instance_watcher
->get_instance_id());
498 std::string leader_instance_id
;
499 m_leader_watcher
->get_leader_instance_id(&leader_instance_id
);
500 f
->dump_string("leader_instance_id", leader_instance_id
);
502 bool leader
= m_leader_watcher
->is_leader();
503 f
->dump_bool("leader", leader
);
505 std::vector
<std::string
> instance_ids
;
506 m_leader_watcher
->list_instances(&instance_ids
);
507 f
->open_array_section("instances");
508 for (auto instance_id
: instance_ids
) {
509 f
->dump_string("instance_id", instance_id
);
514 f
->dump_string("local_cluster_admin_socket",
515 reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct())->_conf
->
516 get_val
<std::string
>("admin_socket"));
517 f
->dump_string("remote_cluster_admin_socket",
518 reinterpret_cast<CephContext
*>(m_remote_io_ctx
.cct())->_conf
->
519 get_val
<std::string
>("admin_socket"));
521 f
->open_object_section("sync_throttler");
522 m_instance_watcher
->print_sync_status(f
, ss
);
525 m_instance_replayer
->print_status(f
, ss
);
531 void PoolReplayer::start()
533 dout(20) << "enter" << dendl
;
535 Mutex::Locker
l(m_lock
);
541 m_instance_replayer
->start();
544 void PoolReplayer::stop(bool manual
)
546 dout(20) << "enter: manual=" << manual
<< dendl
;
548 Mutex::Locker
l(m_lock
);
553 } else if (m_stopping
) {
557 m_instance_replayer
->stop();
560 void PoolReplayer::restart()
562 dout(20) << "enter" << dendl
;
564 Mutex::Locker
l(m_lock
);
570 m_instance_replayer
->restart();
573 void PoolReplayer::flush()
575 dout(20) << "enter" << dendl
;
577 Mutex::Locker
l(m_lock
);
579 if (m_stopping
|| m_manual_stop
) {
583 m_instance_replayer
->flush();
586 void PoolReplayer::release_leader()
588 dout(20) << "enter" << dendl
;
590 Mutex::Locker
l(m_lock
);
592 if (m_stopping
|| !m_leader_watcher
) {
596 m_leader_watcher
->release_leader();
599 void PoolReplayer::handle_update(const std::string
&mirror_uuid
,
600 ImageIds
&&added_image_ids
,
601 ImageIds
&&removed_image_ids
) {
606 dout(10) << "mirror_uuid=" << mirror_uuid
<< ", "
607 << "added_count=" << added_image_ids
.size() << ", "
608 << "removed_count=" << removed_image_ids
.size() << dendl
;
609 Mutex::Locker
locker(m_lock
);
610 if (!m_leader_watcher
->is_leader()) {
614 m_service_daemon
->add_or_update_attribute(
615 m_local_pool_id
, SERVICE_DAEMON_LOCAL_COUNT_KEY
,
616 m_local_pool_watcher
->get_image_count());
617 if (m_remote_pool_watcher
) {
618 m_service_daemon
->add_or_update_attribute(
619 m_local_pool_id
, SERVICE_DAEMON_REMOTE_COUNT_KEY
,
620 m_remote_pool_watcher
->get_image_count());
623 m_update_op_tracker
.start_op();
624 Context
*ctx
= new FunctionContext([this](int r
) {
625 dout(20) << "complete handle_update: r=" << r
<< dendl
;
626 m_update_op_tracker
.finish_op();
629 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
631 for (auto &image_id
: added_image_ids
) {
632 // for now always send to myself (the leader)
633 std::string
&instance_id
= m_instance_watcher
->get_instance_id();
634 m_instance_watcher
->notify_image_acquire(instance_id
, image_id
.global_id
,
635 gather_ctx
->new_sub());
638 if (!mirror_uuid
.empty()) {
639 for (auto &image_id
: removed_image_ids
) {
640 // for now always send to myself (the leader)
641 std::string
&instance_id
= m_instance_watcher
->get_instance_id();
642 m_instance_watcher
->notify_peer_image_removed(instance_id
,
645 gather_ctx
->new_sub());
649 gather_ctx
->activate();
652 void PoolReplayer::handle_post_acquire_leader(Context
*on_finish
) {
655 m_service_daemon
->add_or_update_attribute(m_local_pool_id
,
656 SERVICE_DAEMON_LEADER_KEY
, true);
657 m_instance_watcher
->handle_acquire_leader();
658 init_local_pool_watcher(on_finish
);
661 void PoolReplayer::handle_pre_release_leader(Context
*on_finish
) {
664 m_service_daemon
->remove_attribute(m_local_pool_id
, SERVICE_DAEMON_LEADER_KEY
);
665 m_instance_watcher
->handle_release_leader();
666 shut_down_pool_watchers(on_finish
);
669 void PoolReplayer::init_local_pool_watcher(Context
*on_finish
) {
672 Mutex::Locker
locker(m_lock
);
673 assert(!m_local_pool_watcher
);
674 m_local_pool_watcher
.reset(new PoolWatcher
<>(
675 m_threads
, m_local_io_ctx
, m_local_pool_watcher_listener
));
677 // ensure the initial set of local images is up-to-date
678 // after acquiring the leader role
679 auto ctx
= new FunctionContext([this, on_finish
](int r
) {
680 handle_init_local_pool_watcher(r
, on_finish
);
682 m_local_pool_watcher
->init(create_async_context_callback(
683 m_threads
->work_queue
, ctx
));
686 void PoolReplayer::handle_init_local_pool_watcher(int r
, Context
*on_finish
) {
687 dout(20) << "r=" << r
<< dendl
;
689 derr
<< "failed to retrieve local images: " << cpp_strerror(r
) << dendl
;
690 on_finish
->complete(r
);
694 init_remote_pool_watcher(on_finish
);
697 void PoolReplayer::init_remote_pool_watcher(Context
*on_finish
) {
700 Mutex::Locker
locker(m_lock
);
701 assert(!m_remote_pool_watcher
);
702 m_remote_pool_watcher
.reset(new PoolWatcher
<>(
703 m_threads
, m_remote_io_ctx
, m_remote_pool_watcher_listener
));
704 m_remote_pool_watcher
->init(create_async_context_callback(
705 m_threads
->work_queue
, on_finish
));
710 void PoolReplayer::shut_down_pool_watchers(Context
*on_finish
) {
714 Mutex::Locker
locker(m_lock
);
715 if (m_local_pool_watcher
) {
716 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
717 handle_shut_down_pool_watchers(r
, on_finish
);
719 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
721 auto gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
722 m_local_pool_watcher
->shut_down(gather_ctx
->new_sub());
723 if (m_remote_pool_watcher
) {
724 m_remote_pool_watcher
->shut_down(gather_ctx
->new_sub());
726 gather_ctx
->activate();
731 on_finish
->complete(0);
734 void PoolReplayer::handle_shut_down_pool_watchers(int r
, Context
*on_finish
) {
735 dout(20) << "r=" << r
<< dendl
;
738 Mutex::Locker
locker(m_lock
);
739 assert(m_local_pool_watcher
);
740 m_local_pool_watcher
.reset();
742 if (m_remote_pool_watcher
) {
743 m_remote_pool_watcher
.reset();
746 wait_for_update_ops(on_finish
);
749 void PoolReplayer::wait_for_update_ops(Context
*on_finish
) {
752 Mutex::Locker
locker(m_lock
);
754 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
755 handle_wait_for_update_ops(r
, on_finish
);
757 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
759 m_update_op_tracker
.wait_for_ops(ctx
);
762 void PoolReplayer::handle_wait_for_update_ops(int r
, Context
*on_finish
) {
763 dout(20) << "r=" << r
<< dendl
;
767 Mutex::Locker
locker(m_lock
);
768 m_instance_replayer
->release_all(on_finish
);
771 void PoolReplayer::handle_update_leader(const std::string
&leader_instance_id
) {
772 dout(20) << "leader_instance_id=" << leader_instance_id
<< dendl
;
774 m_instance_watcher
->handle_update_leader(leader_instance_id
);
777 } // namespace mirror