]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/rbd_mirror/InstanceReplayer.cc
import 15.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
index 40a8c1b43205a6f7d6e79043ff4437050ee9c619..4ef838fa43ebbfcd1671d07231b01b153c7d035a 100644 (file)
@@ -2,9 +2,11 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "include/stringify.h"
+#include "common/Cond.h"
 #include "common/Timer.h"
 #include "common/debug.h"
 #include "common/errno.h"
+#include "common/WorkQueue.h"
 #include "librbd/Utils.h"
 #include "ImageReplayer.h"
 #include "InstanceReplayer.h"
@@ -33,20 +35,31 @@ using librbd::util::create_context_callback;
 
 template <typename I>
 InstanceReplayer<I>::InstanceReplayer(
+    librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
     Threads<I> *threads, ServiceDaemon<I>* service_daemon,
-    ImageDeleter<I>* image_deleter, RadosRef local_rados,
-    const std::string &local_mirror_uuid, int64_t local_pool_id)
-  : m_threads(threads), m_service_daemon(service_daemon),
-    m_image_deleter(image_deleter), m_local_rados(local_rados),
-    m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
-    m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+    MirrorStatusUpdater<I>* local_status_updater,
+    journal::CacheManagerHandler *cache_manager_handler,
+    PoolMetaCache* pool_meta_cache)
+  : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
+    m_threads(threads), m_service_daemon(service_daemon),
+    m_local_status_updater(local_status_updater),
+    m_cache_manager_handler(cache_manager_handler),
+    m_pool_meta_cache(pool_meta_cache),
+    m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
+        stringify(local_io_ctx.get_id()))) {
 }
 
 template <typename I>
 InstanceReplayer<I>::~InstanceReplayer() {
-  assert(m_image_state_check_task == nullptr);
-  assert(m_async_op_tracker.empty());
-  assert(m_image_replayers.empty());
+  ceph_assert(m_image_state_check_task == nullptr);
+  ceph_assert(m_async_op_tracker.empty());
+  ceph_assert(m_image_replayers.empty());
+}
+
+template <typename I>
+bool InstanceReplayer<I>::is_blacklisted() const {
+  std::lock_guard locker{m_lock};
+  return m_blacklisted;
 }
 
 template <typename I>
@@ -58,12 +71,12 @@ int InstanceReplayer<I>::init() {
 
 template <typename I>
 void InstanceReplayer<I>::init(Context *on_finish) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Context *ctx = new FunctionContext(
+  Context *ctx = new LambdaContext(
     [this, on_finish] (int r) {
       {
-        Mutex::Locker timer_locker(m_threads->timer_lock);
+        std::lock_guard timer_locker{m_threads->timer_lock};
         schedule_image_state_check_task();
       }
       on_finish->complete(0);
@@ -77,19 +90,19 @@ void InstanceReplayer<I>::shut_down() {
   C_SaferCond shut_down_ctx;
   shut_down(&shut_down_ctx);
   int r = shut_down_ctx.wait();
-  assert(r == 0);
+  ceph_assert(r == 0);
 }
 
 template <typename I>
 void InstanceReplayer<I>::shut_down(Context *on_finish) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
-  assert(m_on_shut_down == nullptr);
+  ceph_assert(m_on_shut_down == nullptr);
   m_on_shut_down = on_finish;
 
-  Context *ctx = new FunctionContext(
+  Context *ctx = new LambdaContext(
     [this] (int r) {
       cancel_image_state_check_task();
       wait_for_ops();
@@ -99,36 +112,26 @@ void InstanceReplayer<I>::shut_down(Context *on_finish) {
 }
 
 template <typename I>
-void InstanceReplayer<I>::add_peer(std::string mirror_uuid,
-                                   librados::IoCtx io_ctx) {
-  dout(20) << mirror_uuid << dendl;
-
-  Mutex::Locker locker(m_lock);
-  auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second;
-  assert(result);
-}
-
-template <typename I>
-void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) {
-  dout(20) << mirror_uuid << dendl;
+void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
+  dout(10) << "peer=" << peer << dendl;
 
-  Mutex::Locker locker(m_lock);
-  auto result = m_peers.erase(Peer(mirror_uuid));
-  assert(result > 0);
+  std::lock_guard locker{m_lock};
+  auto result = m_peers.insert(peer).second;
+  ceph_assert(result);
 }
 
 template <typename I>
 void InstanceReplayer<I>::release_all(Context *on_finish) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
   for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
        it = m_image_replayers.erase(it)) {
     auto image_replayer = it->second;
     auto ctx = gather_ctx->new_sub();
-    ctx = new FunctionContext(
+    ctx = new LambdaContext(
       [image_replayer, ctx] (int r) {
         image_replayer->destroy();
         ctx->complete(0);
@@ -141,111 +144,101 @@ void InstanceReplayer<I>::release_all(Context *on_finish) {
 template <typename I>
 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
                                         const std::string &global_image_id,
-                                        const std::string &peer_mirror_uuid,
-                                        const std::string &peer_image_id,
                                         Context *on_finish) {
-  dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
-           << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+  dout(10) << "global_image_id=" << global_image_id << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
-  assert(m_on_shut_down == nullptr);
+  ceph_assert(m_on_shut_down == nullptr);
 
   auto it = m_image_replayers.find(global_image_id);
-
   if (it == m_image_replayers.end()) {
     auto image_replayer = ImageReplayer<I>::create(
-        m_threads, m_image_deleter, instance_watcher, m_local_rados,
-        m_local_mirror_uuid, m_local_pool_id, global_image_id);
+        m_local_io_ctx, m_local_mirror_uuid, global_image_id,
+        m_threads, instance_watcher, m_local_status_updater,
+        m_cache_manager_handler, m_pool_meta_cache);
 
-    dout(20) << global_image_id << ": creating replayer " << image_replayer
+    dout(10) << global_image_id << ": creating replayer " << image_replayer
              << dendl;
 
     it = m_image_replayers.insert(std::make_pair(global_image_id,
                                                  image_replayer)).first;
-  }
-
-  auto image_replayer = it->second;
-  if (!peer_mirror_uuid.empty()) {
-    auto iter = m_peers.find(Peer(peer_mirror_uuid));
-    assert(iter != m_peers.end());
-    auto io_ctx = iter->io_ctx;
 
-    image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
+    // TODO only a single peer is currently supported
+    ceph_assert(m_peers.size() == 1);
+    auto peer = *m_peers.begin();
+    image_replayer->add_peer(peer);
+    start_image_replayer(image_replayer);
+  } else {
+    // A duplicate acquire notification implies (1) connection hiccup or
+    // (2) new leader election. For the second case, restart the replayer to
+    // detect if the image has been deleted while the leader was offline
+    auto& image_replayer = it->second;
+    image_replayer->set_finished(false);
+    image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
   }
-  start_image_replayer(image_replayer);
 
   m_threads->work_queue->queue(on_finish, 0);
 }
 
 template <typename I>
 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
-                                        const std::string &peer_mirror_uuid,
-                                        const std::string &peer_image_id,
-                                        bool schedule_delete,
                                         Context *on_finish) {
-  dout(20) << "global_image_id=" << global_image_id << ", peer_mirror_uuid="
-           << peer_mirror_uuid << ", peer_image_id=" << peer_image_id << dendl;
+  dout(10) << "global_image_id=" << global_image_id << dendl;
 
-  Mutex::Locker locker(m_lock);
-
-  assert(m_on_shut_down == nullptr);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_on_shut_down == nullptr);
 
   auto it = m_image_replayers.find(global_image_id);
-
   if (it == m_image_replayers.end()) {
-    dout(20) << global_image_id << ": not found" << dendl;
+    dout(5) << global_image_id << ": not found" << dendl;
     m_threads->work_queue->queue(on_finish, 0);
     return;
   }
 
   auto image_replayer = it->second;
-  if (!peer_mirror_uuid.empty()) {
-    image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
-                                       schedule_delete);
-  }
-
-  if (!image_replayer->remote_images_empty()) {
-    dout(20) << global_image_id << ": still has peer images" << dendl;
-    m_threads->work_queue->queue(on_finish, 0);
-    return;
-  }
-
   m_image_replayers.erase(it);
 
-  on_finish = new FunctionContext(
+  on_finish = new LambdaContext(
     [image_replayer, on_finish] (int r) {
       image_replayer->destroy();
       on_finish->complete(0);
     });
-
-  if (schedule_delete) {
-    on_finish = new FunctionContext(
-      [this, image_replayer, on_finish] (int r) {
-        auto global_image_id = image_replayer->get_global_image_id();
-        m_image_deleter->schedule_image_delete(
-          m_local_rados, m_local_pool_id, global_image_id, false);
-        on_finish->complete(0);
-      });
-  }
-
   stop_image_replayer(image_replayer, on_finish);
 }
 
 template <typename I>
-void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
-  dout(20) << dendl;
+void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
+                                            const std::string &peer_mirror_uuid,
+                                            Context *on_finish) {
+  dout(10) << "global_image_id=" << global_image_id << ", "
+           << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
 
-  if (!f) {
-    return;
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_on_shut_down == nullptr);
+
+  auto it = m_image_replayers.find(global_image_id);
+  if (it != m_image_replayers.end()) {
+    // TODO only a single peer is currently supported, therefore
+    // we can just interrupt the current image replayer and
+    // it will eventually detect that the peer image is missing and
+    // determine if a delete propagation is required.
+    auto image_replayer = it->second;
+    image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
   }
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::print_status(Formatter *f) {
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   f->open_array_section("image_replayers");
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
-    image_replayer->print_status(f, ss);
+    image_replayer->print_status(f);
   }
   f->close_section();
 }
@@ -253,54 +246,80 @@ void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
 template <typename I>
 void InstanceReplayer<I>::start()
 {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   m_manual_stop = false;
 
+  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+  auto gather_ctx = new C_Gather(
+    cct, new C_TrackedOp(m_async_op_tracker, nullptr));
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
-    image_replayer->start(nullptr, true);
+    image_replayer->start(gather_ctx->new_sub(), true);
   }
+
+  gather_ctx->activate();
 }
 
 template <typename I>
 void InstanceReplayer<I>::stop()
 {
-  dout(20) << dendl;
+  stop(nullptr);
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop(Context *on_finish)
+{
+  dout(10) << dendl;
+
+  if (on_finish == nullptr) {
+    on_finish = new C_TrackedOp(m_async_op_tracker, on_finish);
+  } else {
+    on_finish = new LambdaContext(
+      [this, on_finish] (int r) {
+        m_async_op_tracker.wait_for_ops(on_finish);
+      });
+  }
 
-  Mutex::Locker locker(m_lock);
+  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+  auto gather_ctx = new C_Gather(cct, on_finish);
+  {
+    std::lock_guard locker{m_lock};
 
-  m_manual_stop = true;
+    m_manual_stop = true;
 
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
-    image_replayer->stop(nullptr, true);
+    for (auto &kv : m_image_replayers) {
+      auto &image_replayer = kv.second;
+      image_replayer->stop(gather_ctx->new_sub(), true);
+    }
   }
+
+  gather_ctx->activate();
 }
 
 template <typename I>
 void InstanceReplayer<I>::restart()
 {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   m_manual_stop = false;
 
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
-    image_replayer->restart();
+    image_replayer->restart(new C_TrackedOp(m_async_op_tracker, nullptr));
   }
 }
 
 template <typename I>
 void InstanceReplayer<I>::flush()
 {
-  dout(20) << "enter" << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   for (auto &kv : m_image_replayers) {
     auto &image_replayer = kv.second;
@@ -311,51 +330,34 @@ void InstanceReplayer<I>::flush()
 template <typename I>
 void InstanceReplayer<I>::start_image_replayer(
     ImageReplayer<I> *image_replayer) {
-  assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   std::string global_image_id = image_replayer->get_global_image_id();
-  dout(20) << "global_image_id=" << global_image_id << dendl;
-
   if (!image_replayer->is_stopped()) {
     return;
   } else if (image_replayer->is_blacklisted()) {
-    derr << "blacklisted detected during image replay" << dendl;
+    derr << "global_image_id=" << global_image_id << ": blacklisted detected "
+         << "during image replay" << dendl;
+    m_blacklisted = true;
+    return;
+  } else if (image_replayer->is_finished()) {
+    // TODO temporary until policy integrated
+    dout(5) << "removing image replayer for global_image_id="
+            << global_image_id << dendl;
+    m_image_replayers.erase(image_replayer->get_global_image_id());
+    image_replayer->destroy();
+    return;
+  } else if (m_manual_stop) {
     return;
   }
 
-  FunctionContext *ctx = new FunctionContext(
-    [this, global_image_id] (int r) {
-      dout(20) << "image deleter result: r=" << r << ", "
-               << "global_image_id=" << global_image_id << dendl;
-
-      Mutex::Locker locker(m_lock);
-      m_async_op_tracker.finish_op();
-
-      if (r == -ESTALE || r == -ECANCELED) {
-        return;
-      }
-
-      auto it = m_image_replayers.find(global_image_id);
-      if (it == m_image_replayers.end()) {
-        return;
-      }
-
-      auto image_replayer = it->second;
-      if (r >= 0) {
-        image_replayer->start(nullptr, false);
-      } else {
-        start_image_replayer(image_replayer);
-      }
-    });
-
-  m_async_op_tracker.start_op();
-  m_image_deleter->wait_for_scheduled_deletion(
-    m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
+  dout(10) << "global_image_id=" << global_image_id << dendl;
+  image_replayer->start(new C_TrackedOp(m_async_op_tracker, nullptr), false);
 }
 
 template <typename I>
 void InstanceReplayer<I>::queue_start_image_replayers() {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
   Context *ctx = create_context_callback<
     InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
@@ -365,34 +367,41 @@ void InstanceReplayer<I>::queue_start_image_replayers() {
 
 template <typename I>
 void InstanceReplayer<I>::start_image_replayers(int r) {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   if (m_on_shut_down != nullptr) {
     return;
   }
 
-  size_t image_count = 0;
-  size_t warning_count = 0;
-  size_t error_count = 0;
-  for (auto &it : m_image_replayers) {
+  uint64_t image_count = 0;
+  uint64_t warning_count = 0;
+  uint64_t error_count = 0;
+  for (auto it = m_image_replayers.begin();
+       it != m_image_replayers.end();) {
+    auto current_it(it);
+    ++it;
+
     ++image_count;
-    auto health_state = it.second->get_health_state();
+    auto health_state = current_it->second->get_health_state();
     if (health_state == image_replayer::HEALTH_STATE_WARNING) {
       ++warning_count;
     } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
       ++error_count;
     }
 
-    start_image_replayer(it.second);
+    start_image_replayer(current_it->second);
   }
 
-  m_service_daemon->add_or_update_attribute(
-    m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
-  m_service_daemon->add_or_update_attribute(
-    m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
-  m_service_daemon->add_or_update_attribute(
-    m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
+  m_service_daemon->add_or_update_namespace_attribute(
+    m_local_io_ctx.get_id(), m_local_io_ctx.get_namespace(),
+    SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
 
   m_async_op_tracker.finish_op();
 }
@@ -400,7 +409,7 @@ void InstanceReplayer<I>::start_image_replayers(int r) {
 template <typename I>
 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
                                               Context *on_finish) {
-  dout(20) << image_replayer << " global_image_id="
+  dout(10) << image_replayer << " global_image_id="
            << image_replayer->get_global_image_id() << ", on_finish="
            << on_finish << dendl;
 
@@ -411,7 +420,7 @@ void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
 
   m_async_op_tracker.start_op();
   Context *ctx = create_async_context_callback(
-    m_threads->work_queue, new FunctionContext(
+    m_threads->work_queue, new LambdaContext(
       [this, image_replayer, on_finish] (int r) {
         stop_image_replayer(image_replayer, on_finish);
         m_async_op_tracker.finish_op();
@@ -421,11 +430,11 @@ void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
     image_replayer->stop(ctx, false);
   } else {
     int after = 1;
-    dout(20) << "scheduling image replayer " << image_replayer << " stop after "
+    dout(10) << "scheduling image replayer " << image_replayer << " stop after "
              << after << " sec (task " << ctx << ")" << dendl;
-    ctx = new FunctionContext(
+    ctx = new LambdaContext(
       [this, after, ctx] (int r) {
-        Mutex::Locker timer_locker(m_threads->timer_lock);
+        std::lock_guard timer_locker{m_threads->timer_lock};
         m_threads->timer->add_event_after(after, ctx);
       });
     m_threads->work_queue->queue(ctx, 0);
@@ -434,7 +443,7 @@ void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
 
 template <typename I>
 void InstanceReplayer<I>::wait_for_ops() {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
   Context *ctx = create_context_callback<
     InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
@@ -444,19 +453,19 @@ void InstanceReplayer<I>::wait_for_ops() {
 
 template <typename I>
 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
-  dout(20) << "r=" << r << dendl;
+  dout(10) << "r=" << r << dendl;
 
-  assert(r == 0);
+  ceph_assert(r == 0);
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   stop_image_replayers();
 }
 
 template <typename I>
 void InstanceReplayer<I>::stop_image_replayers() {
-  dout(20) << dendl;
+  dout(10) << dendl;
 
-  assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   Context *ctx = create_async_context_callback(
     m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
@@ -471,21 +480,21 @@ void InstanceReplayer<I>::stop_image_replayers() {
 
 template <typename I>
 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
-  dout(20) << "r=" << r << dendl;
+  dout(10) << "r=" << r << dendl;
 
-  assert(r == 0);
+  ceph_assert(r == 0);
 
   Context *on_finish = nullptr;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
 
     for (auto &it : m_image_replayers) {
-      assert(it.second->is_stopped());
+      ceph_assert(it.second->is_stopped());
       it.second->destroy();
     }
     m_image_replayers.clear();
 
-    assert(m_on_shut_down != nullptr);
+    ceph_assert(m_on_shut_down != nullptr);
     std::swap(on_finish, m_on_shut_down);
   }
   on_finish->complete(r);
@@ -493,34 +502,36 @@ void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
 
 template <typename I>
 void InstanceReplayer<I>::cancel_image_state_check_task() {
-  Mutex::Locker timer_locker(m_threads->timer_lock);
+  std::lock_guard timer_locker{m_threads->timer_lock};
 
   if (m_image_state_check_task == nullptr) {
     return;
   }
 
-  dout(20) << m_image_state_check_task << dendl;
+  dout(10) << m_image_state_check_task << dendl;
   bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
-  assert(canceled);
+  ceph_assert(canceled);
   m_image_state_check_task = nullptr;
 }
 
 template <typename I>
 void InstanceReplayer<I>::schedule_image_state_check_task() {
-  assert(m_threads->timer_lock.is_locked());
-  assert(m_image_state_check_task == nullptr);
+  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+  ceph_assert(m_image_state_check_task == nullptr);
 
-  m_image_state_check_task = new FunctionContext(
+  m_image_state_check_task = new LambdaContext(
     [this](int r) {
-      assert(m_threads->timer_lock.is_locked());
+      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
       m_image_state_check_task = nullptr;
       schedule_image_state_check_task();
       queue_start_image_replayers();
     });
 
-  int after = g_ceph_context->_conf->rbd_mirror_image_state_check_interval;
+  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
+  int after = cct->_conf.get_val<uint64_t>(
+    "rbd_mirror_image_state_check_interval");
 
-  dout(20) << "scheduling image state check after " << after << " sec (task "
+  dout(10) << "scheduling image state check after " << after << " sec (task "
            << m_image_state_check_task << ")" << dendl;
   m_threads->timer->add_event_after(after, m_image_state_check_task);
 }