X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Ftools%2Frbd_mirror%2FMirror.cc;h=b4509d5c465d5cb725aa9d2e3ab4a4a1c51bd9fc;hb=c07f9fc5a4f48397831383549fb0482b93480643;hp=d383a8b0af5bbbd14843470c2ea7d4848a6e1115;hpb=9439ae556f035e65c9c107ae13ddd09457dbbecd;p=ceph.git diff --git a/ceph/src/tools/rbd_mirror/Mirror.cc b/ceph/src/tools/rbd_mirror/Mirror.cc index d383a8b0a..b4509d5c4 100644 --- a/ceph/src/tools/rbd_mirror/Mirror.cc +++ b/ceph/src/tools/rbd_mirror/Mirror.cc @@ -9,8 +9,8 @@ #include "common/errno.h" #include "librbd/ImageCtx.h" #include "Mirror.h" +#include "ServiceDaemon.h" #include "Threads.h" -#include "ImageSync.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -206,6 +206,7 @@ Mirror::Mirror(CephContext *cct, const std::vector &args) : { cct->lookup_or_create_singleton_object >( m_threads, "rbd_mirror::threads"); + m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads)); } Mirror::~Mirror() @@ -236,12 +237,19 @@ int Mirror::init() return r; } - m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock)); + r = m_service_daemon->init(); + if (r < 0) { + derr << "error registering service daemon: " << cpp_strerror(r) << dendl; + return r; + } - m_image_deleter.reset(new ImageDeleter(m_threads->work_queue, - m_threads->timer, - &m_threads->timer_lock)); + m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock, + m_service_daemon.get())); + m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue, + m_threads->timer, + &m_threads->timer_lock, + m_service_daemon.get())); return r; } @@ -379,14 +387,11 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers) for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { auto &peer = it->first.second; auto pool_peer_it = pool_peers.find(it->first.first); - if (it->second->is_blacklisted()) { - derr << "removing blacklisted pool replayer for " << peer << dendl; - // TODO: make async - it = m_pool_replayers.erase(it); - } else if (pool_peer_it == pool_peers.end() || - pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { + if (pool_peer_it == pool_peers.end() || + pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { dout(20) << "removing pool replayer for " << peer << dendl; // TODO: make async + it->second->shut_down(); it = m_pool_replayers.erase(it); } else { ++it; @@ -396,16 +401,29 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers) for (auto &kv : pool_peers) { for (auto &peer : kv.second) { PoolPeer pool_peer(kv.first, peer); - if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) { + + auto pool_replayers_it = m_pool_replayers.find(pool_peer); + if (pool_replayers_it != m_pool_replayers.end()) { + auto& pool_replayer = pool_replayers_it->second; + if (pool_replayer->is_blacklisted()) { + derr << "restarting blacklisted pool replayer for " << peer << dendl; + // TODO: make async + pool_replayer->shut_down(); + pool_replayer->init(); + } else if (!pool_replayer->is_running()) { + derr << "restarting failed pool replayer for " << peer << dendl; + // TODO: make async + pool_replayer->shut_down(); + pool_replayer->init(); + } + } else { dout(20) << "starting pool replayer for " << peer << dendl; unique_ptr pool_replayer(new PoolReplayer( - m_threads, m_image_deleter, kv.first, peer, m_args)); + m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first, + peer, m_args)); - // TODO: make async, and retry connecting within pool replayer - int r = pool_replayer->init(); - if (r < 0) { - continue; - } + // TODO: make async + pool_replayer->init(); m_pool_replayers.emplace(pool_peer, std::move(pool_replayer)); } }