]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to v12.1.2
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
index a5a727ecd20b18a315b6ae3c0f1e447f0393e6a0..fd0b21b1847d84fa67b84a5e1c4b3186d09444c7 100644 (file)
@@ -20,6 +20,7 @@
 #include "InstanceReplayer.h"
 #include "InstanceWatcher.h"
 #include "LeaderWatcher.h"
+#include "ServiceDaemon.h"
 #include "Threads.h"
 
 #define dout_context g_ceph_context
@@ -42,6 +43,10 @@ namespace mirror {
 
 namespace {
 
+const std::string SERVICE_DAEMON_LEADER_KEY("leader");
+const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
+const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
+
 class PoolReplayerAdminSocketCommand {
 public:
   PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
@@ -206,18 +211,19 @@ private:
 } // anonymous namespace
 
 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
-                          std::shared_ptr<ImageDeleter> image_deleter,
+                           ServiceDaemon<librbd::ImageCtx>* service_daemon,
+                          ImageDeleter<>* image_deleter,
                           int64_t local_pool_id, const peer_t &peer,
                           const std::vector<const char*> &args) :
   m_threads(threads),
+  m_service_daemon(service_daemon),
   m_image_deleter(image_deleter),
-  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+  m_local_pool_id(local_pool_id),
   m_peer(peer),
   m_args(args),
-  m_local_pool_id(local_pool_id),
+  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
   m_local_pool_watcher_listener(this, true),
   m_remote_pool_watcher_listener(this, false),
-  m_asok_hook(nullptr),
   m_pool_replayer_thread(this),
   m_leader_listener(this)
 {
@@ -226,27 +232,7 @@ PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
 PoolReplayer::~PoolReplayer()
 {
   delete m_asok_hook;
-
-  m_stopping = true;
-  {
-    Mutex::Locker l(m_lock);
-    m_cond.Signal();
-  }
-  if (m_pool_replayer_thread.is_started()) {
-    m_pool_replayer_thread.join();
-  }
-  if (m_leader_watcher) {
-    m_leader_watcher->shut_down();
-  }
-  if (m_instance_watcher) {
-    m_instance_watcher->shut_down();
-  }
-  if (m_instance_replayer) {
-    m_instance_replayer->shut_down();
-  }
-
-  assert(!m_local_pool_watcher);
-  assert(!m_remote_pool_watcher);
+  shut_down();
 }
 
 bool PoolReplayer::is_blacklisted() const {
@@ -259,29 +245,44 @@ bool PoolReplayer::is_leader() const {
   return m_leader_watcher && m_leader_watcher->is_leader();
 }
 
-int PoolReplayer::init()
+bool PoolReplayer::is_running() const {
+  return m_pool_replayer_thread.is_started();
+}
+
+void PoolReplayer::init()
 {
-  dout(20) << "replaying for " << m_peer << dendl;
+  assert(!m_pool_replayer_thread.is_started());
+
+  // reset state
+  m_stopping = false;
+  m_blacklisted = false;
 
+  dout(20) << "replaying for " << m_peer << dendl;
   int r = init_rados(g_ceph_context->_conf->cluster,
                      g_ceph_context->_conf->name.to_str(),
                      "local cluster", &m_local_rados);
   if (r < 0) {
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to connect to local cluster");
+    return;
   }
 
   r = init_rados(m_peer.cluster_name, m_peer.client_name,
                  std::string("remote peer ") + stringify(m_peer),
                  &m_remote_rados);
   if (r < 0) {
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to connect to remote cluster");
+    return;
   }
 
   r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
   if (r < 0) {
     derr << "error accessing local pool " << m_local_pool_id << ": "
          << cpp_strerror(r) << dendl;
-    return r;
+    return;
   }
 
   std::string local_mirror_uuid;
@@ -290,7 +291,10 @@ int PoolReplayer::init()
   if (r < 0) {
     derr << "failed to retrieve local mirror uuid from pool "
          << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to query local mirror uuid");
+    return;
   }
 
   r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
@@ -298,38 +302,73 @@ int PoolReplayer::init()
   if (r < 0) {
     derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
          << ": " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
+      "unable to access remote pool");
+    return;
   }
 
   dout(20) << "connected to " << m_peer << dendl;
 
-  m_instance_replayer.reset(
-    InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados,
-                               local_mirror_uuid, m_local_pool_id));
+  m_instance_replayer.reset(InstanceReplayer<>::create(
+    m_threads, m_service_daemon, m_image_deleter, m_local_rados,
+    local_mirror_uuid, m_local_pool_id));
   m_instance_replayer->init();
   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
 
-  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
-                                                     m_threads->work_queue,
-                                                     m_instance_replayer.get()));
+  m_instance_watcher.reset(InstanceWatcher<>::create(
+    m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
   r = m_instance_watcher->init();
   if (r < 0) {
     derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to initialize instance messenger object");
+    return;
   }
 
   m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
                                              &m_leader_listener));
-
   r = m_leader_watcher->init();
   if (r < 0) {
     derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to initialize leader messenger object");
+    return;
+  }
+
+  if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
+    m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
+    m_callout_id = service_daemon::CALLOUT_ID_NONE;
   }
 
   m_pool_replayer_thread.create("pool replayer");
+}
 
-  return 0;
+void PoolReplayer::shut_down() {
+  m_stopping = true;
+  {
+    Mutex::Locker l(m_lock);
+    m_cond.Signal();
+  }
+  if (m_pool_replayer_thread.is_started()) {
+    m_pool_replayer_thread.join();
+  }
+  if (m_leader_watcher) {
+    m_leader_watcher->shut_down();
+  }
+  if (m_instance_watcher) {
+    m_instance_watcher->shut_down();
+  }
+  if (m_instance_replayer) {
+    m_instance_replayer->shut_down();
+  }
+
+  assert(!m_local_pool_watcher);
+  assert(!m_remote_pool_watcher);
+  m_local_rados.reset();
+  m_remote_rados.reset();
 }
 
 int PoolReplayer::init_rados(const std::string &cluster_name,
@@ -567,6 +606,17 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
     return;
   }
 
+  m_service_daemon->add_or_update_attribute(
+    m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
+    m_local_pool_watcher->get_image_count());
+  if (m_remote_pool_watcher) {
+    m_service_daemon->add_or_update_attribute(
+      m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
+      m_remote_pool_watcher->get_image_count());
+  }
+
+  std::string removed_remote_peer_id;
+  ImageIds removed_remote_image_ids;
   if (m_initial_mirror_image_ids.find(mirror_uuid) ==
         m_initial_mirror_image_ids.end() &&
       m_initial_mirror_image_ids.size() < 2) {
@@ -579,9 +629,10 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
       // removal notifications for the remote pool
       auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
       auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
+      removed_remote_peer_id = m_initial_mirror_image_ids.rbegin()->first;
       for (auto &local_image_id : local_image_ids) {
         if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
-          removed_image_ids.emplace(local_image_id.global_id, "");
+          removed_remote_image_ids.emplace(local_image_id.global_id, "");
         }
       }
       local_image_ids.clear();
@@ -619,12 +670,24 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
                                              gather_ctx->new_sub());
   }
 
+  // derived removal events for remote after initial image listing
+  for (auto& image_id : removed_remote_image_ids) {
+    // for now always send to myself (the leader)
+    std::string &instance_id = m_instance_watcher->get_instance_id();
+    m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
+                                             removed_remote_peer_id,
+                                             image_id.id, true,
+                                             gather_ctx->new_sub());
+  }
+
   gather_ctx->activate();
 }
 
 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
   dout(20) << dendl;
 
+  m_service_daemon->add_or_update_attribute(m_local_pool_id,
+                                            SERVICE_DAEMON_LEADER_KEY, true);
   m_instance_watcher->handle_acquire_leader();
   init_local_pool_watcher(on_finish);
 }
@@ -632,6 +695,7 @@ void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
   dout(20) << dendl;
 
+  m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
   m_instance_watcher->handle_release_leader();
   shut_down_pool_watchers(on_finish);
 }