]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/Mirror.cc
update sources to v12.1.2
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
index d383a8b0af5bbbd14843470c2ea7d4848a6e1115..b4509d5c465d5cb725aa9d2e3ab4a4a1c51bd9fc 100644 (file)
@@ -9,8 +9,8 @@
 #include "common/errno.h"
 #include "librbd/ImageCtx.h"
 #include "Mirror.h"
+#include "ServiceDaemon.h"
 #include "Threads.h"
-#include "ImageSync.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
@@ -206,6 +206,7 @@ Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
 {
   cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
     m_threads, "rbd_mirror::threads");
+  m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
 }
 
 Mirror::~Mirror()
@@ -236,12 +237,19 @@ int Mirror::init()
     return r;
   }
 
-  m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock));
+  r = m_service_daemon->init();
+  if (r < 0) {
+    derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
+    return r;
+  }
 
-  m_image_deleter.reset(new ImageDeleter(m_threads->work_queue,
-                                         m_threads->timer,
-                                         &m_threads->timer_lock));
+  m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
+                                                   m_service_daemon.get()));
 
+  m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue,
+                                           m_threads->timer,
+                                           &m_threads->timer_lock,
+                                           m_service_daemon.get()));
   return r;
 }
 
@@ -379,14 +387,11 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
   for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
     auto &peer = it->first.second;
     auto pool_peer_it = pool_peers.find(it->first.first);
-    if (it->second->is_blacklisted()) {
-      derr << "removing blacklisted pool replayer for " << peer << dendl;
-      // TODO: make async
-      it = m_pool_replayers.erase(it);
-    } else if (pool_peer_it == pool_peers.end() ||
-               pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
+    if (pool_peer_it == pool_peers.end() ||
+        pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
       dout(20) << "removing pool replayer for " << peer << dendl;
       // TODO: make async
+      it->second->shut_down();
       it = m_pool_replayers.erase(it);
     } else {
       ++it;
@@ -396,16 +401,29 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
   for (auto &kv : pool_peers) {
     for (auto &peer : kv.second) {
       PoolPeer pool_peer(kv.first, peer);
-      if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
+
+      auto pool_replayers_it = m_pool_replayers.find(pool_peer);
+      if (pool_replayers_it != m_pool_replayers.end()) {
+        auto& pool_replayer = pool_replayers_it->second;
+        if (pool_replayer->is_blacklisted()) {
+          derr << "restarting blacklisted pool replayer for " << peer << dendl;
+          // TODO: make async
+          pool_replayer->shut_down();
+          pool_replayer->init();
+        } else if (!pool_replayer->is_running()) {
+          derr << "restarting failed pool replayer for " << peer << dendl;
+          // TODO: make async
+          pool_replayer->shut_down();
+          pool_replayer->init();
+        }
+      } else {
         dout(20) << "starting pool replayer for " << peer << dendl;
         unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
-         m_threads, m_image_deleter, kv.first, peer, m_args));
+         m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first,
+          peer, m_args));
 
-        // TODO: make async, and retry connecting within pool replayer
-        int r = pool_replayer->init();
-        if (r < 0) {
-         continue;
-        }
+        // TODO: make async
+        pool_replayer->init();
         m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
       }
     }