1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Mirror.h"
20 #include "InstanceReplayer.h"
21 #include "InstanceWatcher.h"
22 #include "LeaderWatcher.h"
25 #define dout_context g_ceph_context
26 #define dout_subsys ceph_subsys_rbd_mirror
28 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
29 << this << " " << __func__ << ": "
31 using std::chrono::seconds
;
34 using std::unique_ptr
;
37 using librbd::cls_client::dir_get_name
;
38 using librbd::util::create_async_context_callback
;
45 class PoolReplayerAdminSocketCommand
{
47 PoolReplayerAdminSocketCommand(PoolReplayer
*pool_replayer
)
48 : pool_replayer(pool_replayer
) {
50 virtual ~PoolReplayerAdminSocketCommand() {}
51 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
53 PoolReplayer
*pool_replayer
;
56 class StatusCommand
: public PoolReplayerAdminSocketCommand
{
58 explicit StatusCommand(PoolReplayer
*pool_replayer
)
59 : PoolReplayerAdminSocketCommand(pool_replayer
) {
62 bool call(Formatter
*f
, stringstream
*ss
) override
{
63 pool_replayer
->print_status(f
, ss
);
68 class StartCommand
: public PoolReplayerAdminSocketCommand
{
70 explicit StartCommand(PoolReplayer
*pool_replayer
)
71 : PoolReplayerAdminSocketCommand(pool_replayer
) {
74 bool call(Formatter
*f
, stringstream
*ss
) override
{
75 pool_replayer
->start();
80 class StopCommand
: public PoolReplayerAdminSocketCommand
{
82 explicit StopCommand(PoolReplayer
*pool_replayer
)
83 : PoolReplayerAdminSocketCommand(pool_replayer
) {
86 bool call(Formatter
*f
, stringstream
*ss
) override
{
87 pool_replayer
->stop(true);
92 class RestartCommand
: public PoolReplayerAdminSocketCommand
{
94 explicit RestartCommand(PoolReplayer
*pool_replayer
)
95 : PoolReplayerAdminSocketCommand(pool_replayer
) {
98 bool call(Formatter
*f
, stringstream
*ss
) override
{
99 pool_replayer
->restart();
104 class FlushCommand
: public PoolReplayerAdminSocketCommand
{
106 explicit FlushCommand(PoolReplayer
*pool_replayer
)
107 : PoolReplayerAdminSocketCommand(pool_replayer
) {
110 bool call(Formatter
*f
, stringstream
*ss
) override
{
111 pool_replayer
->flush();
116 class LeaderReleaseCommand
: public PoolReplayerAdminSocketCommand
{
118 explicit LeaderReleaseCommand(PoolReplayer
*pool_replayer
)
119 : PoolReplayerAdminSocketCommand(pool_replayer
) {
122 bool call(Formatter
*f
, stringstream
*ss
) override
{
123 pool_replayer
->release_leader();
128 class PoolReplayerAdminSocketHook
: public AdminSocketHook
{
130 PoolReplayerAdminSocketHook(CephContext
*cct
, const std::string
&name
,
131 PoolReplayer
*pool_replayer
)
132 : admin_socket(cct
->get_admin_socket()) {
136 command
= "rbd mirror status " + name
;
137 r
= admin_socket
->register_command(command
, command
, this,
138 "get status for rbd mirror " + name
);
140 commands
[command
] = new StatusCommand(pool_replayer
);
143 command
= "rbd mirror start " + name
;
144 r
= admin_socket
->register_command(command
, command
, this,
145 "start rbd mirror " + name
);
147 commands
[command
] = new StartCommand(pool_replayer
);
150 command
= "rbd mirror stop " + name
;
151 r
= admin_socket
->register_command(command
, command
, this,
152 "stop rbd mirror " + name
);
154 commands
[command
] = new StopCommand(pool_replayer
);
157 command
= "rbd mirror restart " + name
;
158 r
= admin_socket
->register_command(command
, command
, this,
159 "restart rbd mirror " + name
);
161 commands
[command
] = new RestartCommand(pool_replayer
);
164 command
= "rbd mirror flush " + name
;
165 r
= admin_socket
->register_command(command
, command
, this,
166 "flush rbd mirror " + name
);
168 commands
[command
] = new FlushCommand(pool_replayer
);
171 command
= "rbd mirror leader release " + name
;
172 r
= admin_socket
->register_command(command
, command
, this,
173 "release rbd mirror leader " + name
);
175 commands
[command
] = new LeaderReleaseCommand(pool_replayer
);
179 ~PoolReplayerAdminSocketHook() override
{
180 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
182 (void)admin_socket
->unregister_command(i
->first
);
187 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
188 bufferlist
& out
) override
{
189 Commands::const_iterator i
= commands
.find(command
);
190 assert(i
!= commands
.end());
191 Formatter
*f
= Formatter::create(format
);
193 bool r
= i
->second
->call(f
, &ss
);
200 typedef std::map
<std::string
, PoolReplayerAdminSocketCommand
*> Commands
;
202 AdminSocket
*admin_socket
;
206 } // anonymous namespace
208 PoolReplayer::PoolReplayer(Threads
<librbd::ImageCtx
> *threads
,
209 std::shared_ptr
<ImageDeleter
> image_deleter
,
210 int64_t local_pool_id
, const peer_t
&peer
,
211 const std::vector
<const char*> &args
) :
213 m_image_deleter(image_deleter
),
214 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer
)),
217 m_local_pool_id(local_pool_id
),
218 m_local_pool_watcher_listener(this, true),
219 m_remote_pool_watcher_listener(this, false),
220 m_asok_hook(nullptr),
221 m_pool_replayer_thread(this),
222 m_leader_listener(this)
226 PoolReplayer::~PoolReplayer()
232 Mutex::Locker
l(m_lock
);
235 if (m_pool_replayer_thread
.is_started()) {
236 m_pool_replayer_thread
.join();
238 if (m_leader_watcher
) {
239 m_leader_watcher
->shut_down();
241 if (m_instance_watcher
) {
242 m_instance_watcher
->shut_down();
244 if (m_instance_replayer
) {
245 m_instance_replayer
->shut_down();
248 assert(!m_local_pool_watcher
);
249 assert(!m_remote_pool_watcher
);
252 bool PoolReplayer::is_blacklisted() const {
253 Mutex::Locker
locker(m_lock
);
254 return m_blacklisted
;
257 bool PoolReplayer::is_leader() const {
258 Mutex::Locker
locker(m_lock
);
259 return m_leader_watcher
&& m_leader_watcher
->is_leader();
262 int PoolReplayer::init()
264 dout(20) << "replaying for " << m_peer
<< dendl
;
266 int r
= init_rados(g_ceph_context
->_conf
->cluster
,
267 g_ceph_context
->_conf
->name
.to_str(),
268 "local cluster", &m_local_rados
);
273 r
= init_rados(m_peer
.cluster_name
, m_peer
.client_name
,
274 std::string("remote peer ") + stringify(m_peer
),
280 r
= m_local_rados
->ioctx_create2(m_local_pool_id
, m_local_io_ctx
);
282 derr
<< "error accessing local pool " << m_local_pool_id
<< ": "
283 << cpp_strerror(r
) << dendl
;
287 std::string local_mirror_uuid
;
288 r
= librbd::cls_client::mirror_uuid_get(&m_local_io_ctx
,
291 derr
<< "failed to retrieve local mirror uuid from pool "
292 << m_local_io_ctx
.get_pool_name() << ": " << cpp_strerror(r
) << dendl
;
296 r
= m_remote_rados
->ioctx_create(m_local_io_ctx
.get_pool_name().c_str(),
299 derr
<< "error accessing remote pool " << m_local_io_ctx
.get_pool_name()
300 << ": " << cpp_strerror(r
) << dendl
;
304 dout(20) << "connected to " << m_peer
<< dendl
;
306 m_instance_replayer
.reset(
307 InstanceReplayer
<>::create(m_threads
, m_image_deleter
, m_local_rados
,
308 local_mirror_uuid
, m_local_pool_id
));
309 m_instance_replayer
->init();
310 m_instance_replayer
->add_peer(m_peer
.uuid
, m_remote_io_ctx
);
312 m_instance_watcher
.reset(InstanceWatcher
<>::create(m_local_io_ctx
,
313 m_threads
->work_queue
,
314 m_instance_replayer
.get()));
315 r
= m_instance_watcher
->init();
317 derr
<< "error initializing instance watcher: " << cpp_strerror(r
) << dendl
;
321 m_leader_watcher
.reset(new LeaderWatcher
<>(m_threads
, m_local_io_ctx
,
322 &m_leader_listener
));
324 r
= m_leader_watcher
->init();
326 derr
<< "error initializing leader watcher: " << cpp_strerror(r
) << dendl
;
330 m_pool_replayer_thread
.create("pool replayer");
335 int PoolReplayer::init_rados(const std::string
&cluster_name
,
336 const std::string
&client_name
,
337 const std::string
&description
,
338 RadosRef
*rados_ref
) {
339 rados_ref
->reset(new librados::Rados());
341 // NOTE: manually bootstrap a CephContext here instead of via
342 // the librados API to avoid mixing global singletons between
343 // the librados shared library and the daemon
344 // TODO: eliminate intermingling of global singletons within Ceph APIs
345 CephInitParameters
iparams(CEPH_ENTITY_TYPE_CLIENT
);
346 if (client_name
.empty() || !iparams
.name
.from_str(client_name
)) {
347 derr
<< "error initializing cluster handle for " << description
<< dendl
;
351 CephContext
*cct
= common_preinit(iparams
, CODE_ENVIRONMENT_LIBRARY
,
352 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS
);
353 cct
->_conf
->cluster
= cluster_name
;
355 // librados::Rados::conf_read_file
356 int r
= cct
->_conf
->parse_config_files(nullptr, nullptr, 0);
358 derr
<< "could not read ceph conf for " << description
<< ": "
359 << cpp_strerror(r
) << dendl
;
363 cct
->_conf
->parse_env();
365 // librados::Rados::conf_parse_env
366 std::vector
<const char*> args
;
367 env_to_vec(args
, nullptr);
368 r
= cct
->_conf
->parse_argv(args
);
370 derr
<< "could not parse environment for " << description
<< ":"
371 << cpp_strerror(r
) << dendl
;
376 if (!m_args
.empty()) {
377 // librados::Rados::conf_parse_argv
379 r
= cct
->_conf
->parse_argv(args
);
381 derr
<< "could not parse command line args for " << description
<< ": "
382 << cpp_strerror(r
) << dendl
;
388 if (!g_ceph_context
->_conf
->admin_socket
.empty()) {
389 cct
->_conf
->set_val_or_die("admin_socket",
390 "$run_dir/$name.$pid.$cluster.$cctid.asok");
393 // disable unnecessary librbd cache
394 cct
->_conf
->set_val_or_die("rbd_cache", "false");
395 cct
->_conf
->apply_changes(nullptr);
396 cct
->_conf
->complain_about_parse_errors(cct
);
398 r
= (*rados_ref
)->init_with_context(cct
);
402 r
= (*rados_ref
)->connect();
404 derr
<< "error connecting to " << description
<< ": "
405 << cpp_strerror(r
) << dendl
;
412 void PoolReplayer::run()
414 dout(20) << "enter" << dendl
;
416 while (!m_stopping
) {
417 std::string asok_hook_name
= m_local_io_ctx
.get_pool_name() + " " +
419 if (m_asok_hook_name
!= asok_hook_name
|| m_asok_hook
== nullptr) {
420 m_asok_hook_name
= asok_hook_name
;
423 m_asok_hook
= new PoolReplayerAdminSocketHook(g_ceph_context
,
424 m_asok_hook_name
, this);
427 Mutex::Locker
locker(m_lock
);
428 if ((m_local_pool_watcher
&& m_local_pool_watcher
->is_blacklisted()) ||
429 (m_remote_pool_watcher
&& m_remote_pool_watcher
->is_blacklisted())) {
430 m_blacklisted
= true;
435 m_cond
.WaitInterval(m_lock
, utime_t(1, 0));
439 void PoolReplayer::print_status(Formatter
*f
, stringstream
*ss
)
441 dout(20) << "enter" << dendl
;
447 Mutex::Locker
l(m_lock
);
449 f
->open_object_section("pool_replayer_status");
450 f
->dump_string("pool", m_local_io_ctx
.get_pool_name());
451 f
->dump_stream("peer") << m_peer
;
452 f
->dump_string("instance_id", m_instance_watcher
->get_instance_id());
454 std::string leader_instance_id
;
455 m_leader_watcher
->get_leader_instance_id(&leader_instance_id
);
456 f
->dump_string("leader_instance_id", leader_instance_id
);
458 bool leader
= m_leader_watcher
->is_leader();
459 f
->dump_bool("leader", leader
);
461 std::vector
<std::string
> instance_ids
;
462 m_leader_watcher
->list_instances(&instance_ids
);
463 f
->open_array_section("instances");
464 for (auto instance_id
: instance_ids
) {
465 f
->dump_string("instance_id", instance_id
);
470 f
->dump_string("local_cluster_admin_socket",
471 reinterpret_cast<CephContext
*>(m_local_io_ctx
.cct())->_conf
->
473 f
->dump_string("remote_cluster_admin_socket",
474 reinterpret_cast<CephContext
*>(m_remote_io_ctx
.cct())->_conf
->
477 f
->open_object_section("sync_throttler");
478 m_instance_watcher
->print_sync_status(f
, ss
);
481 m_instance_replayer
->print_status(f
, ss
);
487 void PoolReplayer::start()
489 dout(20) << "enter" << dendl
;
491 Mutex::Locker
l(m_lock
);
497 m_instance_replayer
->start();
500 void PoolReplayer::stop(bool manual
)
502 dout(20) << "enter: manual=" << manual
<< dendl
;
504 Mutex::Locker
l(m_lock
);
509 } else if (m_stopping
) {
513 m_instance_replayer
->stop();
516 void PoolReplayer::restart()
518 dout(20) << "enter" << dendl
;
520 Mutex::Locker
l(m_lock
);
526 m_instance_replayer
->restart();
529 void PoolReplayer::flush()
531 dout(20) << "enter" << dendl
;
533 Mutex::Locker
l(m_lock
);
535 if (m_stopping
|| m_manual_stop
) {
539 m_instance_replayer
->flush();
542 void PoolReplayer::release_leader()
544 dout(20) << "enter" << dendl
;
546 Mutex::Locker
l(m_lock
);
548 if (m_stopping
|| !m_leader_watcher
) {
552 m_leader_watcher
->release_leader();
555 void PoolReplayer::handle_update(const std::string
&mirror_uuid
,
556 ImageIds
&&added_image_ids
,
557 ImageIds
&&removed_image_ids
) {
562 dout(10) << "mirror_uuid=" << mirror_uuid
<< ", "
563 << "added_count=" << added_image_ids
.size() << ", "
564 << "removed_count=" << removed_image_ids
.size() << dendl
;
565 Mutex::Locker
locker(m_lock
);
566 if (!m_leader_watcher
->is_leader()) {
570 if (m_initial_mirror_image_ids
.find(mirror_uuid
) ==
571 m_initial_mirror_image_ids
.end() &&
572 m_initial_mirror_image_ids
.size() < 2) {
573 m_initial_mirror_image_ids
[mirror_uuid
] = added_image_ids
;
575 if (m_initial_mirror_image_ids
.size() == 2) {
576 dout(10) << "local and remote pools refreshed" << dendl
;
578 // both local and remote initial pool listing received. derive
579 // removal notifications for the remote pool
580 auto &local_image_ids
= m_initial_mirror_image_ids
.begin()->second
;
581 auto &remote_image_ids
= m_initial_mirror_image_ids
.rbegin()->second
;
582 for (auto &local_image_id
: local_image_ids
) {
583 if (remote_image_ids
.find(local_image_id
) == remote_image_ids
.end()) {
584 removed_image_ids
.emplace(local_image_id
.global_id
, "");
587 local_image_ids
.clear();
588 remote_image_ids
.clear();
592 if (!mirror_uuid
.empty() && m_peer
.uuid
!= mirror_uuid
) {
593 m_instance_replayer
->remove_peer(m_peer
.uuid
);
594 m_instance_replayer
->add_peer(mirror_uuid
, m_remote_io_ctx
);
595 m_peer
.uuid
= mirror_uuid
;
598 m_update_op_tracker
.start_op();
599 Context
*ctx
= new FunctionContext([this](int r
) {
600 dout(20) << "complete handle_update: r=" << r
<< dendl
;
601 m_update_op_tracker
.finish_op();
604 C_Gather
*gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
606 for (auto &image_id
: added_image_ids
) {
607 // for now always send to myself (the leader)
608 std::string
&instance_id
= m_instance_watcher
->get_instance_id();
609 m_instance_watcher
->notify_image_acquire(instance_id
, image_id
.global_id
,
610 mirror_uuid
, image_id
.id
,
611 gather_ctx
->new_sub());
614 for (auto &image_id
: removed_image_ids
) {
615 // for now always send to myself (the leader)
616 std::string
&instance_id
= m_instance_watcher
->get_instance_id();
617 m_instance_watcher
->notify_image_release(instance_id
, image_id
.global_id
,
618 mirror_uuid
, image_id
.id
, true,
619 gather_ctx
->new_sub());
622 gather_ctx
->activate();
625 void PoolReplayer::handle_post_acquire_leader(Context
*on_finish
) {
628 m_instance_watcher
->handle_acquire_leader();
629 init_local_pool_watcher(on_finish
);
632 void PoolReplayer::handle_pre_release_leader(Context
*on_finish
) {
635 m_instance_watcher
->handle_release_leader();
636 shut_down_pool_watchers(on_finish
);
639 void PoolReplayer::init_local_pool_watcher(Context
*on_finish
) {
642 Mutex::Locker
locker(m_lock
);
643 assert(!m_local_pool_watcher
);
644 m_local_pool_watcher
.reset(new PoolWatcher
<>(
645 m_threads
, m_local_io_ctx
, m_local_pool_watcher_listener
));
646 m_initial_mirror_image_ids
.clear();
648 // ensure the initial set of local images is up-to-date
649 // after acquiring the leader role
650 auto ctx
= new FunctionContext([this, on_finish
](int r
) {
651 handle_init_local_pool_watcher(r
, on_finish
);
653 m_local_pool_watcher
->init(create_async_context_callback(
654 m_threads
->work_queue
, ctx
));
657 void PoolReplayer::handle_init_local_pool_watcher(int r
, Context
*on_finish
) {
658 dout(20) << "r=" << r
<< dendl
;
660 derr
<< "failed to retrieve local images: " << cpp_strerror(r
) << dendl
;
661 on_finish
->complete(r
);
665 init_remote_pool_watcher(on_finish
);
668 void PoolReplayer::init_remote_pool_watcher(Context
*on_finish
) {
671 Mutex::Locker
locker(m_lock
);
672 assert(!m_remote_pool_watcher
);
673 m_remote_pool_watcher
.reset(new PoolWatcher
<>(
674 m_threads
, m_remote_io_ctx
, m_remote_pool_watcher_listener
));
675 m_remote_pool_watcher
->init(create_async_context_callback(
676 m_threads
->work_queue
, on_finish
));
681 void PoolReplayer::shut_down_pool_watchers(Context
*on_finish
) {
685 Mutex::Locker
locker(m_lock
);
686 if (m_local_pool_watcher
) {
687 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
688 handle_shut_down_pool_watchers(r
, on_finish
);
690 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
692 auto gather_ctx
= new C_Gather(g_ceph_context
, ctx
);
693 m_local_pool_watcher
->shut_down(gather_ctx
->new_sub());
694 if (m_remote_pool_watcher
) {
695 m_remote_pool_watcher
->shut_down(gather_ctx
->new_sub());
697 gather_ctx
->activate();
702 on_finish
->complete(0);
705 void PoolReplayer::handle_shut_down_pool_watchers(int r
, Context
*on_finish
) {
706 dout(20) << "r=" << r
<< dendl
;
709 Mutex::Locker
locker(m_lock
);
710 assert(m_local_pool_watcher
);
711 m_local_pool_watcher
.reset();
713 if (m_remote_pool_watcher
) {
714 m_remote_pool_watcher
.reset();
717 wait_for_update_ops(on_finish
);
720 void PoolReplayer::wait_for_update_ops(Context
*on_finish
) {
723 Mutex::Locker
locker(m_lock
);
725 Context
*ctx
= new FunctionContext([this, on_finish
](int r
) {
726 handle_wait_for_update_ops(r
, on_finish
);
728 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
730 m_update_op_tracker
.wait_for_ops(ctx
);
733 void PoolReplayer::handle_wait_for_update_ops(int r
, Context
*on_finish
) {
734 dout(20) << "r=" << r
<< dendl
;
738 Mutex::Locker
locker(m_lock
);
739 m_instance_replayer
->release_all(on_finish
);
742 void PoolReplayer::handle_update_leader(const std::string
&leader_instance_id
) {
743 dout(20) << "leader_instance_id=" << leader_instance_id
<< dendl
;
745 m_instance_watcher
->handle_update_leader(leader_instance_id
);
748 } // namespace mirror