]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
import 15.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
index 52e60605c758eac5dffa87f4422052f7e902a4e8..4ef838fa43ebbfcd1671d07231b01b153c7d035a 100644 (file)
@@ -2,9 +2,11 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "include/stringify.h"
+#include "common/Cond.h"
 #include "common/Timer.h"
 #include "common/debug.h"
 #include "common/errno.h"
+#include "common/WorkQueue.h"
 #include "librbd/Utils.h"
 #include "ImageReplayer.h"
 #include "InstanceReplayer.h"
@@ -33,20 +35,31 @@ using librbd::util::create_context_callback;
 
 template <typename I>
 InstanceReplayer<I>::InstanceReplayer(
+    librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
     Threads<I> *threads, ServiceDaemon<I>* service_daemon,
-    ImageDeleter<I>* image_deleter, RadosRef local_rados,
-    const std::string &local_mirror_uuid, int64_t local_pool_id)
-  : m_threads(threads), m_service_daemon(service_daemon),
-    m_image_deleter(image_deleter), m_local_rados(local_rados),
-    m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
-    m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+    MirrorStatusUpdater<I>* local_status_updater,
+    journal::CacheManagerHandler *cache_manager_handler,
+    PoolMetaCache* pool_meta_cache)
+  : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
+    m_threads(threads), m_service_daemon(service_daemon),
+    m_local_status_updater(local_status_updater),
+    m_cache_manager_handler(cache_manager_handler),
+    m_pool_meta_cache(pool_meta_cache),
+    m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
+        stringify(local_io_ctx.get_id()))) {
 }
 
 template <typename I>
 InstanceReplayer<I>::~InstanceReplayer() {
-  assert(m_image_state_check_task == nullptr);
-  assert(m_async_op_tracker.empty());
-  assert(m_image_replayers.empty());
+  ceph_assert(m_image_state_check_task == nullptr);
+  ceph_assert(m_async_op_tracker.empty());
+  ceph_assert(m_image_replayers.empty());
+}
+
+template <typename I>
+bool InstanceReplayer<I>::is_blacklisted() const {
+  std::lock_guard locker{m_lock};
+  return m_blacklisted;
 }
 
 template <typename I>
@@ -58,12 +71,12 @@ int InstanceReplayer<I>::init() {
 
 template <typename I>
 void InstanceReplayer<I>::init(Context *on_finish) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Context *ctx = new FunctionContext(
+  Context *ctx = new LambdaContext(
     [this, on_finish] (int r) {
       {
-        Mutex::Locker timer_locker(m_threads->timer_lock);
+        std::lock_guard timer_locker{m_threads->timer_lock};
         schedule_image_state_check_task();
       }
       on_finish->complete(0);
@@ -77,19 +90,19 @@ void InstanceReplayer<I>::shut_down() {
   C_SaferCond shut_down_ctx;
   shut_down(&shut_down_ctx);
   int r = shut_down_ctx.wait();
-  assert(r == 0);
+  ceph_assert(r == 0);
 }
 
 template <typename I>
 void InstanceReplayer<I>::shut_down(Context *on_finish) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
-  assert(m_on_shut_down == nullptr);
+  ceph_assert(m_on_shut_down == nullptr);
   m_on_shut_down = on_finish;
 
-  Context *ctx = new FunctionContext(
+  Context *ctx = new LambdaContext(
     [this] (int r) {
       cancel_image_state_check_task();
       wait_for_ops();
@@ -99,27 +112,26 @@ void InstanceReplayer<I>::shut_down(Context *on_finish) {
 }
 
 template <typename I>
-void InstanceReplayer<I>::add_peer(std::string peer_uuid,
-                                   librados::IoCtx io_ctx) {
-  dout(20) << peer_uuid << dendl;
+void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
+  dout(10) << "peer=" << peer << dendl;
 
-  Mutex::Locker locker(m_lock);
-  auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
-  assert(result);
+  std::lock_guard locker{m_lock};
+  auto result = m_peers.insert(peer).second;
+  ceph_assert(result);
 }
 
 template <typename I>
 void InstanceReplayer<I>::release_all(Context *on_finish) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
   for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
        it = m_image_replayers.erase(it)) {
     auto image_replayer = it->second;
     auto ctx = gather_ctx->new_sub();
-    ctx = new FunctionContext(
+    ctx = new LambdaContext(
       [image_replayer, ctx] (int r) {
         image_replayer->destroy();
         ctx->complete(0);
@@ -133,49 +145,53 @@ template <typename I>
 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
                                         const std::string &global_image_id,
                                         Context *on_finish) {
-  dout(20) << "global_image_id=" << global_image_id << dendl;
+  dout(10) << "global_image_id=" << global_image_id << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
-  assert(m_on_shut_down == nullptr);
+  ceph_assert(m_on_shut_down == nullptr);
 
   auto it = m_image_replayers.find(global_image_id);
   if (it == m_image_replayers.end()) {
     auto image_replayer = ImageReplayer<I>::create(
-        m_threads, m_image_deleter, instance_watcher, m_local_rados,
-        m_local_mirror_uuid, m_local_pool_id, global_image_id);
+        m_local_io_ctx, m_local_mirror_uuid, global_image_id,
+        m_threads, instance_watcher, m_local_status_updater,
+        m_cache_manager_handler, m_pool_meta_cache);
 
-    dout(20) << global_image_id << ": creating replayer " << image_replayer
+    dout(10) << global_image_id << ": creating replayer " << image_replayer
              << dendl;
 
     it = m_image_replayers.insert(std::make_pair(global_image_id,
                                                  image_replayer)).first;
 
     // TODO only a single peer is currently supported
-    assert(m_peers.size() == 1);
+    ceph_assert(m_peers.size() == 1);
     auto peer = *m_peers.begin();
-    image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
+    image_replayer->add_peer(peer);
+    start_image_replayer(image_replayer);
+  } else {
+    // A duplicate acquire notification implies (1) connection hiccup or
+    // (2) new leader election. For the second case, restart the replayer to
+    // detect if the image has been deleted while the leader was offline
+    auto& image_replayer = it->second;
+    image_replayer->set_finished(false);
+    image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
   }
 
-  auto& image_replayer = it->second;
-  // TODO temporary until policy integrated
-  image_replayer->set_finished(false);
-
-  start_image_replayer(image_replayer);
   m_threads->work_queue->queue(on_finish, 0);
 }
 
 template <typename I>
 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
                                         Context *on_finish) {
-  dout(20) << "global_image_id=" << global_image_id << dendl;
+  dout(10) << "global_image_id=" << global_image_id << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_on_shut_down == nullptr);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_on_shut_down == nullptr);
 
   auto it = m_image_replayers.find(global_image_id);
   if (it == m_image_replayers.end()) {
-    dout(20) << global_image_id << ": not found" << dendl;
+    dout(5) << global_image_id << ": not found" << dendl;
     m_threads->work_queue->queue(on_finish, 0);
     return;
   }
@@ -183,7 +199,7 @@ void InstanceReplayer<I>::release_image(const std::string &global_image_id,
   auto image_replayer = it->second;
   m_image_replayers.erase(it);
 
-  on_finish = new FunctionContext(
+  on_finish = new LambdaContext(
     [image_replayer, on_finish] (int r) {
       image_replayer->destroy();
       on_finish->complete(0);
@@ -195,11 +211,11 @@ template <typename I>
 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
                                             const std::string &peer_mirror_uuid,
                                             Context *on_finish) {
-  dout(20) << "global_image_id=" << global_image_id << ", "
+  dout(10) << "global_image_id=" << global_image_id << ", "
            << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_on_shut_down == nullptr);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_on_shut_down == nullptr);
 
   auto it = m_image_replayers.find(global_image_id);
   if (it != m_image_replayers.end()) {
@@ -208,25 +224,21 @@ void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
     // it will eventually detect that the peer image is missing and
     // determine if a delete propagation is required.
     auto image_replayer = it->second;
-    image_replayer->restart();
+    image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
   }
   m_threads->work_queue->queue(on_finish, 0);
 }
 
 template <typename I>
-void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
-  dout(20) << dendl;
+void InstanceReplayer<I>::print_status(Formatter *f) {
+  dout(10) << dendl;
 
-  if (!f) {
-    return;
-  }
-
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   f->open_array_section("image_replayers");
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
-    image_replayer->print_status(f, ss);
+    image_replayer->print_status(f);
   }
   f->close_section();
 }
@@ -234,54 +246,80 @@ void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
 template <typename I>
 void InstanceReplayer<I>::start()
 {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   m_manual_stop = false;
 
+  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+  auto gather_ctx = new C_Gather(
+    cct, new C_TrackedOp(m_async_op_tracker, nullptr));
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
-    image_replayer->start(nullptr, true);
+    image_replayer->start(gather_ctx->new_sub(), true);
   }
+
+  gather_ctx->activate();
 }
 
 template <typename I>
 void InstanceReplayer<I>::stop()
 {
-  dout(20) << dendl;
+  stop(nullptr);
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop(Context *on_finish)
+{
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  if (on_finish == nullptr) {
+    on_finish = new C_TrackedOp(m_async_op_tracker, on_finish);
+  } else {
+    on_finish = new LambdaContext(
+      [this, on_finish] (int r) {
+        m_async_op_tracker.wait_for_ops(on_finish);
+      });
+  }
 
-  m_manual_stop = true;
+  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+  auto gather_ctx = new C_Gather(cct, on_finish);
+  {
+    std::lock_guard locker{m_lock};
 
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
-    image_replayer->stop(nullptr, true);
+    m_manual_stop = true;
+
+    for (auto &kv : m_image_replayers) {
+      auto &image_replayer = kv.second;
+      image_replayer->stop(gather_ctx->new_sub(), true);
+    }
   }
+
+  gather_ctx->activate();
 }
 
 template <typename I>
 void InstanceReplayer<I>::restart()
 {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   m_manual_stop = false;
 
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
-    image_replayer->restart();
+    image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
   }
 }
 
 template <typename I>
 void InstanceReplayer<I>::flush()
 {
-  dout(20) << "enter" << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
@@ -292,15 +330,15 @@ void InstanceReplayer<I>::flush()
 template <typename I>
 void InstanceReplayer<I>::start_image_replayer(
     ImageReplayer<I> *image_replayer) {
-  assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   std::string global_image_id = image_replayer->get_global_image_id();
-  dout(20) << "global_image_id=" << global_image_id << dendl;
-
   if (!image_replayer->is_stopped()) {
     return;
   } else if (image_replayer->is_blacklisted()) {
-    derr << "blacklisted detected during image replay" << dendl;
+    derr << "global_image_id=" << global_image_id << ": blacklisted detected "
+         << "during image replay" << dendl;
+    m_blacklisted = true;
     return;
   } else if (image_replayer->is_finished()) {
     // TODO temporary until policy integrated
@@ -309,14 +347,17 @@ void InstanceReplayer<I>::start_image_replayer(
     m_image_replayers.erase(image_replayer->get_global_image_id());
     image_replayer->destroy();
     return;
+  } else if (m_manual_stop) {
+    return;
   }
 
-  image_replayer->start(nullptr, false);
+  dout(10) << "global_image_id=" << global_image_id << dendl;
+  image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
 }
 
 template <typename I>
 void InstanceReplayer<I>::queue_start_image_replayers() {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
   Context *ctx = create_context_callback<
     InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
@@ -326,9 +367,9 @@ void InstanceReplayer<I>::queue_start_image_replayers() {
 
 template <typename I>
 void InstanceReplayer<I>::start_image_replayers(int r) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   if (m_on_shut_down != nullptr) {
     return;
   }
@@ -352,12 +393,15 @@ void InstanceReplayer<I>::start_image_replayers(int r) {
     start_image_replayer(current_it->second);
   }
 
-  m_service_daemon->add_or_update_attribute(
-    m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
-  m_service_daemon->add_or_update_attribute(
-    m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
-  m_service_daemon->add_or_update_attribute(
-    m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
 
   m_async_op_tracker.finish_op();
 }
@@ -365,7 +409,7 @@ void InstanceReplayer<I>::start_image_replayers(int r) {
 template <typename I>
 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
                                               Context *on_finish) {
-  dout(20) << image_replayer << " global_image_id="
+  dout(10) << image_replayer << " global_image_id="
            << image_replayer->get_global_image_id() << ", on_finish="
            << on_finish << dendl;
 
@@ -376,7 +420,7 @@ void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
 
   m_async_op_tracker.start_op();
   Context *ctx = create_async_context_callback(
-    m_threads->work_queue, new FunctionContext(
+    m_threads->work_queue, new LambdaContext(
       [this, image_replayer, on_finish] (int r) {
         stop_image_replayer(image_replayer, on_finish);
         m_async_op_tracker.finish_op();
@@ -386,11 +430,11 @@ void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
     image_replayer->stop(ctx, false);
   } else {
     int after = 1;
-    dout(20) << "scheduling image replayer " << image_replayer << " stop after "
+    dout(10) << "scheduling image replayer " << image_replayer << " stop after "
              << after << " sec (task " << ctx << ")" << dendl;
-    ctx = new FunctionContext(
+    ctx = new LambdaContext(
       [this, after, ctx] (int r) {
-        Mutex::Locker timer_locker(m_threads->timer_lock);
+        std::lock_guard timer_locker{m_threads->timer_lock};
         m_threads->timer->add_event_after(after, ctx);
       });
     m_threads->work_queue->queue(ctx, 0);
@@ -399,7 +443,7 @@ void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
 
 template <typename I>
 void InstanceReplayer<I>::wait_for_ops() {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
   Context *ctx = create_context_callback<
     InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
@@ -409,19 +453,19 @@ void InstanceReplayer<I>::wait_for_ops() {
 
 template <typename I>
 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
-  dout(20) << "r=" << r << dendl;
+  dout(10) << "r=" << r << dendl;
 
-  assert(r == 0);
+  ceph_assert(r == 0);
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   stop_image_replayers();
 }
 
 template <typename I>
 void InstanceReplayer<I>::stop_image_replayers() {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   Context *ctx = create_async_context_callback(
     m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
@@ -436,21 +480,21 @@ void InstanceReplayer<I>::stop_image_replayers() {
 
 template <typename I>
 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
-  dout(20) << "r=" << r << dendl;
+  dout(10) << "r=" << r << dendl;
 
-  assert(r == 0);
+  ceph_assert(r == 0);
 
   Context *on_finish = nullptr;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
 
     for (auto &it : m_image_replayers) {
-      assert(it.second->is_stopped());
+      ceph_assert(it.second->is_stopped());
       it.second->destroy();
     }
     m_image_replayers.clear();
 
-    assert(m_on_shut_down != nullptr);
+    ceph_assert(m_on_shut_down != nullptr);
     std::swap(on_finish, m_on_shut_down);
   }
   on_finish->complete(r);
@@ -458,34 +502,36 @@ void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
 
 template <typename I>
 void InstanceReplayer<I>::cancel_image_state_check_task() {
-  Mutex::Locker timer_locker(m_threads->timer_lock);
+  std::lock_guard timer_locker{m_threads->timer_lock};
 
   if (m_image_state_check_task == nullptr) {
     return;
   }
 
-  dout(20) << m_image_state_check_task << dendl;
+  dout(10) << m_image_state_check_task << dendl;
   bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
-  assert(canceled);
+  ceph_assert(canceled);
   m_image_state_check_task = nullptr;
 }
 
 template <typename I>
 void InstanceReplayer<I>::schedule_image_state_check_task() {
-  assert(m_threads->timer_lock.is_locked());
-  assert(m_image_state_check_task == nullptr);
+  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+  ceph_assert(m_image_state_check_task == nullptr);
 
-  m_image_state_check_task = new FunctionContext(
+  m_image_state_check_task = new LambdaContext(
     [this](int r) {
-      assert(m_threads->timer_lock.is_locked());
+      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
       m_image_state_check_task = nullptr;
       schedule_image_state_check_task();
       queue_start_image_replayers();
     });
 
-  int after = g_ceph_context->_conf->rbd_mirror_image_state_check_interval;
+  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+  int after = cct->_conf.get_val<uint64_t>(
+    "rbd_mirror_image_state_check_interval");
 
-  dout(20) << "scheduling image state check after " << after << " sec (task "
+  dout(10) << "scheduling image state check after " << after << " sec (task "
            << m_image_state_check_task << ")" << dendl;
   m_threads->timer->add_event_after(after, m_image_state_check_task);
 }