1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/range/adaptor/map.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "librbd/ImageCtx.h"
12 #include "ServiceDaemon.h"
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_rbd_mirror
18 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
25 using std::unique_ptr
;
28 using librados::Rados
;
29 using librados::IoCtx
;
30 using librbd::mirror_peer_t
;
37 class MirrorAdminSocketCommand
{
39 virtual ~MirrorAdminSocketCommand() {}
40 virtual bool call(Formatter
*f
, stringstream
*ss
) = 0;
43 class StatusCommand
: public MirrorAdminSocketCommand
{
45 explicit StatusCommand(Mirror
*mirror
) : mirror(mirror
) {}
47 bool call(Formatter
*f
, stringstream
*ss
) override
{
48 mirror
->print_status(f
, ss
);
56 class StartCommand
: public MirrorAdminSocketCommand
{
58 explicit StartCommand(Mirror
*mirror
) : mirror(mirror
) {}
60 bool call(Formatter
*f
, stringstream
*ss
) override
{
69 class StopCommand
: public MirrorAdminSocketCommand
{
71 explicit StopCommand(Mirror
*mirror
) : mirror(mirror
) {}
73 bool call(Formatter
*f
, stringstream
*ss
) override
{
82 class RestartCommand
: public MirrorAdminSocketCommand
{
84 explicit RestartCommand(Mirror
*mirror
) : mirror(mirror
) {}
86 bool call(Formatter
*f
, stringstream
*ss
) override
{
95 class FlushCommand
: public MirrorAdminSocketCommand
{
97 explicit FlushCommand(Mirror
*mirror
) : mirror(mirror
) {}
99 bool call(Formatter
*f
, stringstream
*ss
) override
{
108 class LeaderReleaseCommand
: public MirrorAdminSocketCommand
{
110 explicit LeaderReleaseCommand(Mirror
*mirror
) : mirror(mirror
) {}
112 bool call(Formatter
*f
, stringstream
*ss
) override
{
113 mirror
->release_leader();
121 } // anonymous namespace
123 class MirrorAdminSocketHook
: public AdminSocketHook
{
125 MirrorAdminSocketHook(CephContext
*cct
, Mirror
*mirror
) :
126 admin_socket(cct
->get_admin_socket()) {
130 command
= "rbd mirror status";
131 r
= admin_socket
->register_command(command
, command
, this,
132 "get status for rbd mirror");
134 commands
[command
] = new StatusCommand(mirror
);
137 command
= "rbd mirror start";
138 r
= admin_socket
->register_command(command
, command
, this,
141 commands
[command
] = new StartCommand(mirror
);
144 command
= "rbd mirror stop";
145 r
= admin_socket
->register_command(command
, command
, this,
148 commands
[command
] = new StopCommand(mirror
);
151 command
= "rbd mirror restart";
152 r
= admin_socket
->register_command(command
, command
, this,
153 "restart rbd mirror");
155 commands
[command
] = new RestartCommand(mirror
);
158 command
= "rbd mirror flush";
159 r
= admin_socket
->register_command(command
, command
, this,
162 commands
[command
] = new FlushCommand(mirror
);
165 command
= "rbd mirror leader release";
166 r
= admin_socket
->register_command(command
, command
, this,
167 "release rbd mirror leader");
169 commands
[command
] = new LeaderReleaseCommand(mirror
);
173 ~MirrorAdminSocketHook() override
{
174 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
176 (void)admin_socket
->unregister_command(i
->first
);
181 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
182 bufferlist
& out
) override
{
183 Commands::const_iterator i
= commands
.find(command
);
184 assert(i
!= commands
.end());
185 Formatter
*f
= Formatter::create(format
);
187 bool r
= i
->second
->call(f
, &ss
);
194 typedef std::map
<std::string
, MirrorAdminSocketCommand
*> Commands
;
196 AdminSocket
*admin_socket
;
200 Mirror::Mirror(CephContext
*cct
, const std::vector
<const char*> &args
) :
203 m_lock("rbd::mirror::Mirror"),
204 m_local(new librados::Rados()),
205 m_asok_hook(new MirrorAdminSocketHook(cct
, this))
207 cct
->lookup_or_create_singleton_object
<Threads
<librbd::ImageCtx
> >(
208 m_threads
, "rbd_mirror::threads");
209 m_service_daemon
.reset(new ServiceDaemon
<>(m_cct
, m_local
, m_threads
));
217 void Mirror::handle_signal(int signum
)
221 Mutex::Locker
l(m_lock
);
228 int r
= m_local
->init_with_context(m_cct
);
230 derr
<< "could not initialize rados handle" << dendl
;
234 r
= m_local
->connect();
236 derr
<< "error connecting to local cluster" << dendl
;
240 r
= m_service_daemon
->init();
242 derr
<< "error registering service daemon: " << cpp_strerror(r
) << dendl
;
246 m_local_cluster_watcher
.reset(new ClusterWatcher(m_local
, m_lock
,
247 m_service_daemon
.get()));
249 m_image_deleter
.reset(new ImageDeleter
<>(m_threads
->work_queue
,
251 &m_threads
->timer_lock
,
252 m_service_daemon
.get()));
258 dout(20) << "enter" << dendl
;
259 while (!m_stopping
) {
260 m_local_cluster_watcher
->refresh_pools();
261 Mutex::Locker
l(m_lock
);
262 if (!m_manual_stop
) {
263 update_pool_replayers(m_local_cluster_watcher
->get_pool_peers());
267 utime_t(m_cct
->_conf
->get_val
<int64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
270 // stop all pool replayers in parallel
271 Mutex::Locker
locker(m_lock
);
272 for (auto &pool_replayer
: m_pool_replayers
) {
273 pool_replayer
.second
->stop(false);
275 dout(20) << "return" << dendl
;
278 void Mirror::print_status(Formatter
*f
, stringstream
*ss
)
280 dout(20) << "enter" << dendl
;
282 Mutex::Locker
l(m_lock
);
289 f
->open_object_section("mirror_status");
290 f
->open_array_section("pool_replayers");
293 for (auto &pool_replayer
: m_pool_replayers
) {
294 pool_replayer
.second
->print_status(f
, ss
);
299 f
->open_object_section("image_deleter");
302 m_image_deleter
->print_status(f
, ss
);
307 dout(20) << "enter" << dendl
;
308 Mutex::Locker
l(m_lock
);
314 m_manual_stop
= false;
316 for (auto &pool_replayer
: m_pool_replayers
) {
317 pool_replayer
.second
->start();
323 dout(20) << "enter" << dendl
;
324 Mutex::Locker
l(m_lock
);
330 m_manual_stop
= true;
332 for (auto &pool_replayer
: m_pool_replayers
) {
333 pool_replayer
.second
->stop(true);
337 void Mirror::restart()
339 dout(20) << "enter" << dendl
;
340 Mutex::Locker
l(m_lock
);
346 m_manual_stop
= false;
348 for (auto &pool_replayer
: m_pool_replayers
) {
349 pool_replayer
.second
->restart();
355 dout(20) << "enter" << dendl
;
356 Mutex::Locker
l(m_lock
);
358 if (m_stopping
|| m_manual_stop
) {
362 for (auto &pool_replayer
: m_pool_replayers
) {
363 pool_replayer
.second
->flush();
367 void Mirror::release_leader()
369 dout(20) << "enter" << dendl
;
370 Mutex::Locker
l(m_lock
);
376 for (auto &pool_replayer
: m_pool_replayers
) {
377 pool_replayer
.second
->release_leader();
381 void Mirror::update_pool_replayers(const PoolPeers
&pool_peers
)
383 dout(20) << "enter" << dendl
;
384 assert(m_lock
.is_locked());
386 // remove stale pool replayers before creating new pool replayers
387 for (auto it
= m_pool_replayers
.begin(); it
!= m_pool_replayers
.end();) {
388 auto &peer
= it
->first
.second
;
389 auto pool_peer_it
= pool_peers
.find(it
->first
.first
);
390 if (pool_peer_it
== pool_peers
.end() ||
391 pool_peer_it
->second
.find(peer
) == pool_peer_it
->second
.end()) {
392 dout(20) << "removing pool replayer for " << peer
<< dendl
;
394 it
->second
->shut_down();
395 it
= m_pool_replayers
.erase(it
);
401 for (auto &kv
: pool_peers
) {
402 for (auto &peer
: kv
.second
) {
403 PoolPeer
pool_peer(kv
.first
, peer
);
405 auto pool_replayers_it
= m_pool_replayers
.find(pool_peer
);
406 if (pool_replayers_it
!= m_pool_replayers
.end()) {
407 auto& pool_replayer
= pool_replayers_it
->second
;
408 if (pool_replayer
->is_blacklisted()) {
409 derr
<< "restarting blacklisted pool replayer for " << peer
<< dendl
;
411 pool_replayer
->shut_down();
412 pool_replayer
->init();
413 } else if (!pool_replayer
->is_running()) {
414 derr
<< "restarting failed pool replayer for " << peer
<< dendl
;
416 pool_replayer
->shut_down();
417 pool_replayer
->init();
420 dout(20) << "starting pool replayer for " << peer
<< dendl
;
421 unique_ptr
<PoolReplayer
> pool_replayer(new PoolReplayer(
422 m_threads
, m_service_daemon
.get(), m_image_deleter
.get(), kv
.first
,
426 pool_replayer
->init();
427 m_pool_replayers
.emplace(pool_peer
, std::move(pool_replayer
));
431 // TODO currently only support a single peer
435 } // namespace mirror