]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/PoolReplayer.cc
update sources to v12.2.1
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
index a5a727ecd20b18a315b6ae3c0f1e447f0393e6a0..8d03e878f16c80a3421c90742fdf31b363767a69 100644 (file)
@@ -20,6 +20,7 @@
 #include "InstanceReplayer.h"
 #include "InstanceWatcher.h"
 #include "LeaderWatcher.h"
+#include "ServiceDaemon.h"
 #include "Threads.h"
 
 #define dout_context g_ceph_context
@@ -42,6 +43,10 @@ namespace mirror {
 
 namespace {
 
+const std::string SERVICE_DAEMON_LEADER_KEY("leader");
+const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
+const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
+
 class PoolReplayerAdminSocketCommand {
 public:
   PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
@@ -206,18 +211,19 @@ private:
 } // anonymous namespace
 
 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
-                          std::shared_ptr<ImageDeleter> image_deleter,
+                           ServiceDaemon<librbd::ImageCtx>* service_daemon,
+                          ImageDeleter<>* image_deleter,
                           int64_t local_pool_id, const peer_t &peer,
                           const std::vector<const char*> &args) :
   m_threads(threads),
+  m_service_daemon(service_daemon),
   m_image_deleter(image_deleter),
-  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+  m_local_pool_id(local_pool_id),
   m_peer(peer),
   m_args(args),
-  m_local_pool_id(local_pool_id),
+  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
   m_local_pool_watcher_listener(this, true),
   m_remote_pool_watcher_listener(this, false),
-  m_asok_hook(nullptr),
   m_pool_replayer_thread(this),
   m_leader_listener(this)
 {
@@ -226,27 +232,7 @@ PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
 PoolReplayer::~PoolReplayer()
 {
   delete m_asok_hook;
-
-  m_stopping = true;
-  {
-    Mutex::Locker l(m_lock);
-    m_cond.Signal();
-  }
-  if (m_pool_replayer_thread.is_started()) {
-    m_pool_replayer_thread.join();
-  }
-  if (m_leader_watcher) {
-    m_leader_watcher->shut_down();
-  }
-  if (m_instance_watcher) {
-    m_instance_watcher->shut_down();
-  }
-  if (m_instance_replayer) {
-    m_instance_replayer->shut_down();
-  }
-
-  assert(!m_local_pool_watcher);
-  assert(!m_remote_pool_watcher);
+  shut_down();
 }
 
 bool PoolReplayer::is_blacklisted() const {
@@ -259,29 +245,44 @@ bool PoolReplayer::is_leader() const {
   return m_leader_watcher && m_leader_watcher->is_leader();
 }
 
-int PoolReplayer::init()
+bool PoolReplayer::is_running() const {
+  return m_pool_replayer_thread.is_started();
+}
+
+void PoolReplayer::init()
 {
-  dout(20) << "replaying for " << m_peer << dendl;
+  assert(!m_pool_replayer_thread.is_started());
 
+  // reset state
+  m_stopping = false;
+  m_blacklisted = false;
+
+  dout(20) << "replaying for " << m_peer << dendl;
   int r = init_rados(g_ceph_context->_conf->cluster,
                      g_ceph_context->_conf->name.to_str(),
                      "local cluster", &m_local_rados);
   if (r < 0) {
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to connect to local cluster");
+    return;
   }
 
   r = init_rados(m_peer.cluster_name, m_peer.client_name,
                  std::string("remote peer ") + stringify(m_peer),
                  &m_remote_rados);
   if (r < 0) {
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to connect to remote cluster");
+    return;
   }
 
   r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
   if (r < 0) {
     derr << "error accessing local pool " << m_local_pool_id << ": "
          << cpp_strerror(r) << dendl;
-    return r;
+    return;
   }
 
   std::string local_mirror_uuid;
@@ -290,7 +291,10 @@ int PoolReplayer::init()
   if (r < 0) {
     derr << "failed to retrieve local mirror uuid from pool "
          << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to query local mirror uuid");
+    return;
   }
 
   r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
@@ -298,38 +302,76 @@ int PoolReplayer::init()
   if (r < 0) {
     derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
          << ": " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
+      "unable to access remote pool");
+    return;
   }
 
   dout(20) << "connected to " << m_peer << dendl;
 
-  m_instance_replayer.reset(
-    InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados,
-                               local_mirror_uuid, m_local_pool_id));
+  m_instance_replayer.reset(InstanceReplayer<>::create(
+    m_threads, m_service_daemon, m_image_deleter, m_local_rados,
+    local_mirror_uuid, m_local_pool_id));
   m_instance_replayer->init();
   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
 
-  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
-                                                     m_threads->work_queue,
-                                                     m_instance_replayer.get()));
+  m_instance_watcher.reset(InstanceWatcher<>::create(
+    m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
   r = m_instance_watcher->init();
   if (r < 0) {
     derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to initialize instance messenger object");
+    return;
   }
 
   m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
                                              &m_leader_listener));
-
   r = m_leader_watcher->init();
   if (r < 0) {
     derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
-    return r;
+    m_callout_id = m_service_daemon->add_or_update_callout(
+      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+      "unable to initialize leader messenger object");
+    return;
+  }
+
+  if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
+    m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
+    m_callout_id = service_daemon::CALLOUT_ID_NONE;
   }
 
   m_pool_replayer_thread.create("pool replayer");
+}
 
-  return 0;
+void PoolReplayer::shut_down() {
+  m_stopping = true;
+  {
+    Mutex::Locker l(m_lock);
+    m_cond.Signal();
+  }
+  if (m_pool_replayer_thread.is_started()) {
+    m_pool_replayer_thread.join();
+  }
+  if (m_leader_watcher) {
+    m_leader_watcher->shut_down();
+    m_leader_watcher.reset();
+  }
+  if (m_instance_watcher) {
+    m_instance_watcher->shut_down();
+    m_instance_watcher.reset();
+  }
+  if (m_instance_replayer) {
+    m_instance_replayer->shut_down();
+    m_instance_replayer.reset();
+  }
+
+  assert(!m_local_pool_watcher);
+  assert(!m_remote_pool_watcher);
+  m_local_rados.reset();
+  m_remote_rados.reset();
 }
 
 int PoolReplayer::init_rados(const std::string &cluster_name,
@@ -432,7 +474,9 @@ void PoolReplayer::run()
       break;
     }
 
-    m_cond.WaitInterval(m_lock, utime_t(1, 0));
+    if (!m_stopping) {
+      m_cond.WaitInterval(m_lock, utime_t(1, 0));
+    }
   }
 }
 
@@ -469,10 +513,10 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss)
 
   f->dump_string("local_cluster_admin_socket",
                  reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
-                     admin_socket);
+                     get_val<std::string>("admin_socket"));
   f->dump_string("remote_cluster_admin_socket",
                  reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
-                     admin_socket);
+                     get_val<std::string>("admin_socket"));
 
   f->open_object_section("sync_throttler");
   m_instance_watcher->print_sync_status(f, ss);
@@ -567,32 +611,13 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
     return;
   }
 
-  if (m_initial_mirror_image_ids.find(mirror_uuid) ==
-        m_initial_mirror_image_ids.end() &&
-      m_initial_mirror_image_ids.size() < 2) {
-    m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
-
-    if (m_initial_mirror_image_ids.size() == 2) {
-      dout(10) << "local and remote pools refreshed" << dendl;
-
-      // both local and remote initial pool listing received. derive
-      // removal notifications for the remote pool
-      auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
-      auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
-      for (auto &local_image_id : local_image_ids) {
-        if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
-          removed_image_ids.emplace(local_image_id.global_id, "");
-        }
-      }
-      local_image_ids.clear();
-      remote_image_ids.clear();
-    }
-  }
-
-  if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
-    m_instance_replayer->remove_peer(m_peer.uuid);
-    m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
-    m_peer.uuid = mirror_uuid;
+  m_service_daemon->add_or_update_attribute(
+    m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
+    m_local_pool_watcher->get_image_count());
+  if (m_remote_pool_watcher) {
+    m_service_daemon->add_or_update_attribute(
+      m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
+      m_remote_pool_watcher->get_image_count());
   }
 
   m_update_op_tracker.start_op();
@@ -607,16 +632,18 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
     // for now always send to myself (the leader)
     std::string &instance_id = m_instance_watcher->get_instance_id();
     m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
-                                             mirror_uuid, image_id.id,
                                              gather_ctx->new_sub());
   }
 
-  for (auto &image_id : removed_image_ids) {
-    // for now always send to myself (the leader)
-    std::string &instance_id = m_instance_watcher->get_instance_id();
-    m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
-                                             mirror_uuid, image_id.id, true,
-                                             gather_ctx->new_sub());
+  if (!mirror_uuid.empty()) {
+    for (auto &image_id : removed_image_ids) {
+      // for now always send to myself (the leader)
+      std::string &instance_id = m_instance_watcher->get_instance_id();
+      m_instance_watcher->notify_peer_image_removed(instance_id,
+                                                    image_id.global_id,
+                                                    mirror_uuid,
+                                                    gather_ctx->new_sub());
+    }
   }
 
   gather_ctx->activate();
@@ -625,6 +652,8 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid,
 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
   dout(20) << dendl;
 
+  m_service_daemon->add_or_update_attribute(m_local_pool_id,
+                                            SERVICE_DAEMON_LEADER_KEY, true);
   m_instance_watcher->handle_acquire_leader();
   init_local_pool_watcher(on_finish);
 }
@@ -632,6 +661,7 @@ void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
   dout(20) << dendl;
 
+  m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
   m_instance_watcher->handle_release_leader();
   shut_down_pool_watchers(on_finish);
 }
@@ -643,7 +673,6 @@ void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
   assert(!m_local_pool_watcher);
   m_local_pool_watcher.reset(new PoolWatcher<>(
     m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
-  m_initial_mirror_image_ids.clear();
 
   // ensure the initial set of local images is up-to-date
   // after acquiring the leader role