]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/Journal.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / librbd / Journal.cc
index 9fe57cd3f7b87874f619fdf259023d6c37ba22d7..0cd38b22adaa9ee29b0f3bc17f82ebbc0c3e3e06 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "librbd/Journal.h"
 #include "include/rados/librados.hpp"
+#include "common/AsyncOpTracker.h"
 #include "common/errno.h"
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "journal/ReplayEntry.h"
 #include "journal/Settings.h"
 #include "journal/Utils.h"
-#include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
-#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ObjectRequest.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
 #include "librbd/journal/CreateRequest.h"
 #include "librbd/journal/DemoteRequest.h"
+#include "librbd/journal/ObjectDispatch.h"
 #include "librbd/journal/OpenRequest.h"
 #include "librbd/journal/RemoveRequest.h"
+#include "librbd/journal/ResetRequest.h"
 #include "librbd/journal/Replay.h"
 #include "librbd/journal/PromoteRequest.h"
 
@@ -39,15 +42,24 @@ using journal::util::C_DecodeTags;
 
 namespace {
 
-// TODO: once journaler is 100% async, remove separate threads and
-// reuse ImageCtx's thread pool
+// TODO: once journaler is 100% async and converted to ASIO, remove separate
+// threads and reuse librbd's AsioEngine
 class ThreadPoolSingleton : public ThreadPool {
 public:
+  ContextWQ *work_queue;
+
   explicit ThreadPoolSingleton(CephContext *cct)
-    : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+    : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
+      work_queue(new ContextWQ("librbd::journal::work_queue",
+                               ceph::make_timespan(
+                                 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
+                               this)) {
     start();
   }
   ~ThreadPoolSingleton() override {
+    work_queue->drain();
+    delete work_queue;
+
     stop();
   }
 };
@@ -57,23 +69,24 @@ struct C_IsTagOwner : public Context {
   librados::IoCtx &io_ctx;
   std::string image_id;
   bool *is_tag_owner;
-  ContextWQ *op_work_queue;
+  asio::ContextWQ *op_work_queue;
   Context *on_finish;
 
   CephContext *cct = nullptr;
   Journaler *journaler;
   cls::journal::Client client;
   journal::ImageClientMeta client_meta;
-  uint64_t tag_tid;
+  uint64_t tag_tid = 0;
   journal::TagData tag_data;
 
   C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
-               bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
+               bool *is_tag_owner, asio::ContextWQ *op_work_queue,
+               Context *on_finish)
     : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
       op_work_queue(op_work_queue), on_finish(on_finish),
       cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
       journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
-                              {})) {
+                              {}, nullptr)) {
   }
 
   void finish(int r) override {
@@ -88,7 +101,7 @@ struct C_IsTagOwner : public Context {
 
     Journaler *journaler = this->journaler;
     Context *on_finish = this->on_finish;
-    FunctionContext *ctx = new FunctionContext(
+    auto ctx = new LambdaContext(
       [journaler, on_finish](int r) {
        on_finish->complete(r);
        delete journaler;
@@ -104,13 +117,13 @@ struct C_GetTagOwner : public Context {
   Journaler journaler;
   cls::journal::Client client;
   journal::ImageClientMeta client_meta;
-  uint64_t tag_tid;
+  uint64_t tag_tid = 0;
   journal::TagData tag_data;
 
   C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
                 std::string *mirror_uuid, Context *on_finish)
     : mirror_uuid(mirror_uuid), on_finish(on_finish),
-      journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}) {
+      journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}, nullptr) {
   }
 
   virtual void finish(int r) {
@@ -131,13 +144,13 @@ struct GetTagsRequest {
   journal::TagData *tag_data;
   Context *on_finish;
 
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("lock");
 
   GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
                  journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
                  journal::TagData *tag_data, Context *on_finish)
     : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
-      tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish), lock("lock") {
+      tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish) {
   }
 
   /**
@@ -164,7 +177,7 @@ struct GetTagsRequest {
   void send_get_client() {
     ldout(cct, 20) << __func__ << dendl;
 
-    FunctionContext *ctx = new FunctionContext(
+    auto ctx = new LambdaContext(
       [this](int r) {
         handle_get_client(r);
       });
@@ -180,9 +193,9 @@ struct GetTagsRequest {
     }
 
     librbd::journal::ClientData client_data;
-    bufferlist::iterator bl_it = client->data.begin();
+    auto bl_it = client->data.cbegin();
     try {
-      ::decode(client_data, bl_it);
+      decode(client_data, bl_it);
     } catch (const buffer::error &err) {
       lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
                  << "failed to decode client data" << dendl;
@@ -206,7 +219,7 @@ struct GetTagsRequest {
   void send_get_tags() {
     ldout(cct, 20) << __func__ << dendl;
 
-    FunctionContext *ctx = new FunctionContext(
+    auto ctx = new LambdaContext(
       [this](int r) {
         handle_get_tags(r);
       });
@@ -252,7 +265,7 @@ int allocate_journaler_tag(CephContext *cct, J *journaler,
   tag_data.predecessor = predecessor;
 
   bufferlist tag_bl;
-  ::encode(tag_data, tag_bl);
+  encode(tag_data, tag_bl);
 
   C_SaferCond allocate_tag_ctx;
   journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
@@ -321,24 +334,38 @@ std::ostream &operator<<(std::ostream &os,
   return os;
 }
 
+
+template <typename I>
+void Journal<I>::MetadataListener::handle_update(::journal::JournalMetadata *) {
+  auto ctx = new LambdaContext([this](int r) {
+    journal->handle_metadata_updated();
+  });
+  journal->m_work_queue->queue(ctx, 0);
+}
+
+
+template <typename I>
+void Journal<I>::get_work_queue(CephContext *cct, ContextWQ **work_queue) {
+  auto thread_pool_singleton =
+    &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+      "librbd::journal::thread_pool", false, cct);
+  *work_queue = thread_pool_singleton->work_queue;
+}
+
 template <typename I>
 Journal<I>::Journal(I &image_ctx)
-  : m_image_ctx(image_ctx), m_journaler(NULL),
-    m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
+  : RefCountedObject(image_ctx.cct),
+    m_image_ctx(image_ctx), m_journaler(NULL),
+    m_state(STATE_UNINITIALIZED),
     m_error_result(0), m_replay_handler(this), m_close_pending(false),
-    m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
+    m_event_tid(0),
     m_blocking_writes(false), m_journal_replay(NULL),
     m_metadata_listener(this) {
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
 
-  ThreadPoolSingleton *thread_pool_singleton;
-  cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
-    thread_pool_singleton, "librbd::journal::thread_pool");
-  m_work_queue = new ContextWQ("librbd::journal::work_queue",
-                               cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
-                               thread_pool_singleton);
+  get_work_queue(cct, &m_work_queue);
   ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
 }
 
@@ -346,18 +373,18 @@ template <typename I>
 Journal<I>::~Journal() {
   if (m_work_queue != nullptr) {
     m_work_queue->drain();
-    delete m_work_queue;
   }
 
-  assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
-  assert(m_journaler == NULL);
-  assert(m_journal_replay == NULL);
-  assert(m_wait_for_state_contexts.empty());
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
+  ceph_assert(m_journaler == NULL);
+  ceph_assert(m_journal_replay == NULL);
+  ceph_assert(m_wait_for_state_contexts.empty());
 }
 
 template <typename I>
 bool Journal<I>::is_journal_supported(I &image_ctx) {
-  assert(image_ctx.snap_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(image_ctx.image_lock));
   return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
           !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
 }
@@ -369,15 +396,14 @@ int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  ThreadPool *thread_pool;
-  ContextWQ *op_work_queue;
-  ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+  ContextWQ *work_queue;
+  get_work_queue(cct, &work_queue);
 
   C_SaferCond cond;
   journal::TagData tag_data(LOCAL_MIRROR_UUID);
   journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
     io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
-    tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
+    tag_data, IMAGE_CLIENT_ID, work_queue, &cond);
   req->send();
 
   return cond.wait();
@@ -388,13 +414,12 @@ int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  ThreadPool *thread_pool;
-  ContextWQ *op_work_queue;
-  ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+  ContextWQ *work_queue;
+  get_work_queue(cct, &work_queue);
 
   C_SaferCond cond;
   journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
-    io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
+    io_ctx, image_id, IMAGE_CLIENT_ID, work_queue, &cond);
   req->send();
 
   return cond.wait();
@@ -405,54 +430,16 @@ int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, {});
+  ContextWQ *work_queue;
+  get_work_queue(cct, &work_queue);
 
   C_SaferCond cond;
-  journaler.init(&cond);
-  BOOST_SCOPE_EXIT_ALL(&journaler) {
-    journaler.shut_down();
-  };
-
-  int r = cond.wait();
-  if (r == -ENOENT) {
-    return 0;
-  } else if (r < 0) {
-    lderr(cct) << __func__ << ": "
-               << "failed to initialize journal: " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  uint8_t order, splay_width;
-  int64_t pool_id;
-  journaler.get_metadata(&order, &splay_width, &pool_id);
-
-  std::string pool_name;
-  if (pool_id != -1) {
-    librados::Rados rados(io_ctx);
-    r = rados.pool_reverse_lookup(pool_id, &pool_name);
-    if (r < 0) {
-      lderr(cct) << __func__ << ": "
-                 << "failed to lookup data pool: " << cpp_strerror(r) << dendl;
-      return r;
-    }
-  }
-
-  C_SaferCond ctx1;
-  journaler.remove(true, &ctx1);
-  r = ctx1.wait();
-  if (r < 0) {
-    lderr(cct) << __func__ << ": "
-               << "failed to reset journal: " << cpp_strerror(r) << dendl;
-    return r;
-  }
+  auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
+                                              Journal<>::LOCAL_MIRROR_UUID,
+                                              work_queue, &cond);
+  req->send();
 
-  r = create(io_ctx, image_id, order, splay_width, pool_name);
-  if (r < 0) {
-    lderr(cct) << __func__ << ": "
-               << "failed to create journal: " << cpp_strerror(r) << dendl;
-    return r;
-  }
-  return 0;
+  return cond.wait();
 }
 
 template <typename I>
@@ -464,7 +451,8 @@ void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
 
 template <typename I>
 void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
-                              bool *is_tag_owner, ContextWQ *op_work_queue,
+                              bool *is_tag_owner,
+                              asio::ContextWQ *op_work_queue,
                               Context *on_finish) {
   CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
   ldout(cct, 20) << __func__ << dendl;
@@ -479,8 +467,9 @@ void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
 template <typename I>
 void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
                                std::string *mirror_uuid,
-                               ContextWQ *op_work_queue, Context *on_finish) {
-  CephContext *cct = (CephContext *)io_ctx.cct();
+                               asio::ContextWQ *op_work_queue,
+                               Context *on_finish) {
+  CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 20) << __func__ << dendl;
 
   auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
@@ -493,9 +482,10 @@ int Journal<I>::request_resync(I *image_ctx) {
   CephContext *cct = image_ctx->cct;
   ldout(cct, 20) << __func__ << dendl;
 
-  Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {});
+  Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {},
+                      nullptr);
 
-  Mutex lock("lock");
+  ceph::mutex lock = ceph::make_mutex("lock");
   journal::ImageClientMeta client_meta;
   uint64_t tag_tid;
   journal::TagData tag_data;
@@ -519,7 +509,7 @@ int Journal<I>::request_resync(I *image_ctx) {
 
   journal::ClientData client_data(client_meta);
   bufferlist client_data_bl;
-  ::encode(client_data, client_data_bl);
+  encode(client_data, client_data_bl);
 
   C_SaferCond update_client_ctx;
   journaler.update_client(client_data_bl, &update_client_ctx);
@@ -554,19 +544,19 @@ void Journal<I>::demote(I *image_ctx, Context *on_finish) {
 
 template <typename I>
 bool Journal<I>::is_journal_ready() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return (m_state == STATE_READY);
 }
 
 template <typename I>
 bool Journal<I>::is_journal_replaying() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return is_journal_replaying(m_lock);
 }
 
 template <typename I>
-bool Journal<I>::is_journal_replaying(const Mutex &) const {
-  assert(m_lock.is_locked());
+bool Journal<I>::is_journal_replaying(const ceph::mutex &) const {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   return (m_state == STATE_REPLAYING ||
           m_state == STATE_FLUSHING_REPLAY ||
           m_state == STATE_FLUSHING_RESTART ||
@@ -575,8 +565,8 @@ bool Journal<I>::is_journal_replaying(const Mutex &) const {
 
 template <typename I>
 bool Journal<I>::is_journal_appending() const {
-  assert(m_image_ctx.snap_lock.is_locked());
-  Mutex::Locker locker(m_lock);
+  ceph_assert(ceph_mutex_is_locked(m_image_ctx.image_lock));
+  std::lock_guard locker{m_lock};
   return (m_state == STATE_READY &&
           !m_image_ctx.get_journal_policy()->append_disabled());
 }
@@ -585,7 +575,7 @@ template <typename I>
 void Journal<I>::wait_for_journal_ready(Context *on_ready) {
   on_ready = create_async_context_callback(m_image_ctx, on_ready);
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   if (m_state == STATE_READY) {
     on_ready->complete(m_error_result);
   } else {
@@ -598,10 +588,16 @@ void Journal<I>::open(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
+  on_finish = create_context_callback<Context>(on_finish, this);
+
   on_finish = create_async_context_callback(m_image_ctx, on_finish);
 
-  Mutex::Locker locker(m_lock);
-  assert(m_state == STATE_UNINITIALIZED);
+  // inject our handler into the object dispatcher chain
+  m_image_ctx.io_object_dispatcher->register_dispatch(
+    journal::ObjectDispatch<I>::create(&m_image_ctx, this));
+
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_state == STATE_UNINITIALIZED);
   wait_for_steady_state(on_finish);
   create_journaler();
 }
@@ -611,25 +607,33 @@ void Journal<I>::close(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
+  on_finish = create_context_callback<Context>(on_finish, this);
+
+  on_finish = new LambdaContext([this, on_finish](int r) {
+      // remove our handler from object dispatcher chain - preserve error
+      auto ctx = new LambdaContext([on_finish, r](int _) {
+          on_finish->complete(r);
+        });
+      m_image_ctx.io_object_dispatcher->shut_down_dispatch(
+        io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
+    });
   on_finish = create_async_context_callback(m_image_ctx, on_finish);
 
-  Mutex::Locker locker(m_lock);
-  while (m_listener_notify) {
-    m_listener_cond.Wait(m_lock);
-  }
+  std::unique_lock locker{m_lock};
+  m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
 
   Listeners listeners(m_listeners);
   m_listener_notify = true;
-  m_lock.Unlock();
+  locker.unlock();
   for (auto listener : listeners) {
     listener->handle_close();
   }
 
-  m_lock.Lock();
+  locker.lock();
   m_listener_notify = false;
-  m_listener_cond.Signal();
+  m_listener_cond.notify_all();
 
-  assert(m_state != STATE_UNINITIALIZED);
+  ceph_assert(m_state != STATE_UNINITIALIZED);
   if (m_state == STATE_CLOSED) {
     on_finish->complete(m_error_result);
     return;
@@ -645,25 +649,25 @@ void Journal<I>::close(Context *on_finish) {
 
 template <typename I>
 bool Journal<I>::is_tag_owner() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return is_tag_owner(m_lock);
 }
 
 template <typename I>
-bool Journal<I>::is_tag_owner(const Mutex &) const {
-  assert(m_lock.is_locked());
+bool Journal<I>::is_tag_owner(const ceph::mutex &) const {
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
 }
 
 template <typename I>
 uint64_t Journal<I>::get_tag_tid() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return m_tag_tid;
 }
 
 template <typename I>
 journal::TagData Journal<I>::get_tag_data() const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   return m_tag_data;
 }
 
@@ -675,8 +679,8 @@ void Journal<I>::allocate_local_tag(Context *on_finish) {
   journal::TagPredecessor predecessor;
   predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
   {
-    Mutex::Locker locker(m_lock);
-    assert(m_journaler != nullptr && is_tag_owner(m_lock));
+    std::lock_guard locker{m_lock};
+    ceph_assert(m_journaler != nullptr && is_tag_owner(m_lock));
 
     cls::journal::Client client;
     int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
@@ -689,7 +693,7 @@ void Journal<I>::allocate_local_tag(Context *on_finish) {
 
     // since we are primary, populate the predecessor with our known commit
     // position
-    assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
+    ceph_assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
     if (!client.commit_position.object_positions.empty()) {
       auto position = client.commit_position.object_positions.front();
       predecessor.commit_valid = true;
@@ -709,15 +713,15 @@ void Journal<I>::allocate_tag(const std::string &mirror_uuid,
   ldout(cct, 20) << this << " " << __func__ << ":  mirror_uuid=" << mirror_uuid
                  << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_journaler != nullptr);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_journaler != nullptr);
 
   journal::TagData tag_data;
   tag_data.mirror_uuid = mirror_uuid;
   tag_data.predecessor = predecessor;
 
   bufferlist tag_bl;
-  ::encode(tag_data, tag_bl);
+  encode(tag_data, tag_bl);
 
   C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
                                                 &m_tag_data, on_finish);
@@ -730,17 +734,36 @@ void Journal<I>::flush_commit_position(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_journaler != nullptr);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_journaler != nullptr);
   m_journaler->flush_commit_position(on_finish);
 }
 
+template <typename I>
+void Journal<I>::user_flushed() {
+  if (m_state == STATE_READY && !m_user_flushed.exchange(true) &&
+      m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
+    std::lock_guard locker{m_lock};
+    if (m_state == STATE_READY) {
+      CephContext *cct = m_image_ctx.cct;
+      ldout(cct, 5) << this << " " << __func__ << dendl;
+
+      ceph_assert(m_journaler != nullptr);
+      m_journaler->set_append_batch_options(
+        m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
+        m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
+        m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
+    } else {
+      m_user_flushed = false;
+    }
+  }
+}
+
 template <typename I>
 uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
                                         const bufferlist &bl,
-                                        const IOObjectRequests &requests,
                                         bool flush_entry) {
-  assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
+  ceph_assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
   uint64_t max_write_data_size =
     m_max_append_size - journal::AioWriteEvent::get_fixed_size();
 
@@ -749,7 +772,7 @@ uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
   uint64_t bytes_remaining = length;
   uint64_t event_offset = 0;
   do {
-    uint64_t event_length = MIN(bytes_remaining, max_write_data_size);
+    uint64_t event_length = std::min(bytes_remaining, max_write_data_size);
 
     bufferlist event_bl;
     event_bl.substr_of(bl, event_offset, event_length);
@@ -759,60 +782,57 @@ uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
                                     ceph_clock_now());
 
     bufferlists.emplace_back();
-    ::encode(event_entry, bufferlists.back());
+    encode(event_entry, bufferlists.back());
 
     event_offset += event_length;
     bytes_remaining -= event_length;
   } while (bytes_remaining > 0);
 
-  return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
-                          offset, length, flush_entry);
+  return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, offset,
+                          length, flush_entry, 0);
 }
 
 template <typename I>
 uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
-                                     const IOObjectRequests &requests,
                                      uint64_t offset, size_t length,
-                                     bool flush_entry) {
+                                     bool flush_entry, int filter_ret_val) {
   bufferlist bl;
   event_entry.timestamp = ceph_clock_now();
-  ::encode(event_entry, bl);
-  return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
-                          length, flush_entry);
+  encode(event_entry, bl);
+  return append_io_events(event_entry.get_event_type(), {bl}, offset, length,
+                          flush_entry, filter_ret_val);
 }
 
 template <typename I>
 uint64_t Journal<I>::append_io_events(journal::EventType event_type,
                                       const Bufferlists &bufferlists,
-                                      const IOObjectRequests &requests,
                                       uint64_t offset, size_t length,
-                                      bool flush_entry) {
-  assert(!bufferlists.empty());
+                                      bool flush_entry, int filter_ret_val) {
+  ceph_assert(!bufferlists.empty());
 
   uint64_t tid;
   {
-    Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_READY);
+    std::lock_guard locker{m_lock};
+    ceph_assert(m_state == STATE_READY);
 
     tid = ++m_event_tid;
-    assert(tid != 0);
+    ceph_assert(tid != 0);
   }
 
   Futures futures;
   for (auto &bl : bufferlists) {
-    assert(bl.length() <= m_max_append_size);
+    ceph_assert(bl.length() <= m_max_append_size);
     futures.push_back(m_journaler->append(m_tag_tid, bl));
   }
 
   {
-    Mutex::Locker event_locker(m_event_lock);
-    m_events[tid] = Event(futures, requests, offset, length);
+    std::lock_guard event_locker{m_event_lock};
+    m_events[tid] = Event(futures, offset, length, filter_ret_val);
   }
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": "
                  << "event=" << event_type << ", "
-                 << "new_reqs=" << requests.size() << ", "
                  << "offset=" << offset << ", "
                  << "length=" << length << ", "
                  << "flush=" << flush_entry << ", tid=" << tid << dendl;
@@ -834,7 +854,7 @@ void Journal<I>::commit_io_event(uint64_t tid, int r) {
   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
                  "r=" << r << dendl;
 
-  Mutex::Locker event_locker(m_event_lock);
+  std::lock_guard event_locker{m_event_lock};
   typename Events::iterator it = m_events.find(tid);
   if (it == m_events.end()) {
     return;
@@ -845,7 +865,7 @@ void Journal<I>::commit_io_event(uint64_t tid, int r) {
 template <typename I>
 void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
                                         uint64_t length, int r) {
-  assert(length > 0);
+  ceph_assert(length > 0);
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
@@ -853,7 +873,7 @@ void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
                  << "length=" << length << ", "
                  << "r=" << r << dendl;
 
-  Mutex::Locker event_locker(m_event_lock);
+  std::lock_guard event_locker{m_event_lock};
   typename Events::iterator it = m_events.find(tid);
   if (it == m_events.end()) {
     return;
@@ -883,26 +903,26 @@ template <typename I>
 void Journal<I>::append_op_event(uint64_t op_tid,
                                  journal::EventEntry &&event_entry,
                                  Context *on_safe) {
-  assert(m_image_ctx.owner_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
 
   bufferlist bl;
   event_entry.timestamp = ceph_clock_now();
-  ::encode(event_entry, bl);
+  encode(event_entry, bl);
 
   Future future;
   {
-    Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_READY);
+    std::lock_guard locker{m_lock};
+    ceph_assert(m_state == STATE_READY);
 
     future = m_journaler->append(m_tag_tid, bl);
 
     // delay committing op event to ensure consistent replay
-    assert(m_op_futures.count(op_tid) == 0);
+    ceph_assert(m_op_futures.count(op_tid) == 0);
     m_op_futures[op_tid] = future;
   }
 
   on_safe = create_async_context_callback(m_image_ctx, on_safe);
-  on_safe = new FunctionContext([this, on_safe](int r) {
+  on_safe = new LambdaContext([this, on_safe](int r) {
       // ensure all committed IO before this op is committed
       m_journaler->flush_commit_position(on_safe);
     });
@@ -924,17 +944,17 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
                                   ceph_clock_now());
 
   bufferlist bl;
-  ::encode(event_entry, bl);
+  encode(event_entry, bl);
 
   Future op_start_future;
   Future op_finish_future;
   {
-    Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_READY);
+    std::lock_guard locker{m_lock};
+    ceph_assert(m_state == STATE_READY);
 
     // ready to commit op event
     auto it = m_op_futures.find(op_tid);
-    assert(it != m_op_futures.end());
+    ceph_assert(it != m_op_futures.end());
     op_start_future = it->second;
     m_op_futures.erase(it);
 
@@ -952,8 +972,8 @@ void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
   ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
 
   {
-    Mutex::Locker locker(m_lock);
-    assert(m_journal_replay != nullptr);
+    std::lock_guard locker{m_lock};
+    ceph_assert(m_journal_replay != nullptr);
     m_journal_replay->replay_op_ready(op_tid, on_resume);
   }
 }
@@ -964,9 +984,11 @@ void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
                  << "on_safe=" << on_safe << dendl;
 
+  on_safe = create_context_callback<Context>(on_safe, this);
+
   Future future;
   {
-    Mutex::Locker event_locker(m_event_lock);
+    std::lock_guard event_locker{m_event_lock};
     future = wait_event(m_lock, tid, on_safe);
   }
 
@@ -981,18 +1003,20 @@ void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
                  << "on_safe=" << on_safe << dendl;
 
-  Mutex::Locker event_locker(m_event_lock);
+  on_safe = create_context_callback<Context>(on_safe, this);
+
+  std::lock_guard event_locker{m_event_lock};
   wait_event(m_lock, tid, on_safe);
 }
 
 template <typename I>
-typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
+typename Journal<I>::Future Journal<I>::wait_event(ceph::mutex &lock, uint64_t tid,
                                                    Context *on_safe) {
-  assert(m_event_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_event_lock));
   CephContext *cct = m_image_ctx.cct;
 
   typename Events::iterator it = m_events.find(tid);
-  assert(it != m_events.end());
+  ceph_assert(it != m_events.end());
 
   Event &event = it->second;
   if (event.safe) {
@@ -1014,12 +1038,12 @@ void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_state == STATE_READY);
-  assert(m_journal_replay == nullptr);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_state == STATE_READY);
+  ceph_assert(m_journal_replay == nullptr);
 
   on_start = util::create_async_context_callback(m_image_ctx, on_start);
-  on_start = new FunctionContext(
+  on_start = new LambdaContext(
     [this, journal_replay, on_start](int r) {
       handle_start_external_replay(r, journal_replay, on_start);
     });
@@ -1036,9 +1060,9 @@ void Journal<I>::handle_start_external_replay(int r,
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_state == STATE_READY);
-  assert(m_journal_replay == nullptr);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_state == STATE_READY);
+  ceph_assert(m_journal_replay == nullptr);
 
   if (r < 0) {
     lderr(cct) << this << " " << __func__ << ": "
@@ -1062,9 +1086,9 @@ void Journal<I>::stop_external_replay() {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_journal_replay != nullptr);
-  assert(m_state == STATE_REPLAYING);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_journal_replay != nullptr);
+  ceph_assert(m_state == STATE_REPLAYING);
 
   delete m_journal_replay;
   m_journal_replay = nullptr;
@@ -1082,23 +1106,25 @@ void Journal<I>::create_journaler() {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  assert(m_lock.is_locked());
-  assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
-  assert(m_journaler == NULL);
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+  ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
+  ceph_assert(m_journaler == NULL);
 
   transition_state(STATE_INITIALIZING, 0);
   ::journal::Settings settings;
-  settings.commit_interval = m_image_ctx.journal_commit_age;
-  settings.max_payload_bytes = m_image_ctx.journal_max_payload_bytes;
+  settings.commit_interval =
+    m_image_ctx.config.template get_val<double>("rbd_journal_commit_age");
+  settings.max_payload_bytes =
+    m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_max_payload_bytes");
   settings.max_concurrent_object_sets =
-    m_image_ctx.journal_max_concurrent_object_sets;
+    m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
   // TODO: a configurable filter to exclude certain peers from being
   // disconnected.
-  settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
+  settings.ignored_laggy_clients = {IMAGE_CLIENT_ID};
 
   m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
                              m_image_ctx.md_ctx, m_image_ctx.id,
-                             IMAGE_CLIENT_ID, settings);
+                             IMAGE_CLIENT_ID, settings, nullptr);
   m_journaler->add_listener(&m_metadata_listener);
 
   Context *ctx = create_async_context_callback(
@@ -1115,7 +1141,7 @@ void Journal<I>::destroy_journaler(int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
-  assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
 
   delete m_journal_replay;
   m_journal_replay = NULL;
@@ -1127,12 +1153,13 @@ void Journal<I>::destroy_journaler(int r) {
   Context *ctx = create_async_context_callback(
     m_image_ctx, create_context_callback<
       Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
-  ctx = new FunctionContext(
+  ctx = new LambdaContext(
     [this, ctx](int r) {
-      Mutex::Locker locker(m_lock);
+      std::lock_guard locker{m_lock};
       m_journaler->shut_down(ctx);
     });
-  m_async_journal_op_tracker.wait(m_image_ctx, ctx);
+  ctx = create_async_context_callback(m_image_ctx, ctx);
+  m_async_journal_op_tracker.wait_for_ops(ctx);
 }
 
 template <typename I>
@@ -1140,9 +1167,9 @@ void Journal<I>::recreate_journaler(int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
-  assert(m_lock.is_locked());
-  assert(m_state == STATE_FLUSHING_RESTART ||
-         m_state == STATE_FLUSHING_REPLAY);
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+  ceph_assert(m_state == STATE_FLUSHING_RESTART ||
+              m_state == STATE_FLUSHING_REPLAY);
 
   delete m_journal_replay;
   m_journal_replay = NULL;
@@ -1157,18 +1184,22 @@ void Journal<I>::recreate_journaler(int r) {
 
 template <typename I>
 void Journal<I>::complete_event(typename Events::iterator it, int r) {
-  assert(m_event_lock.is_locked());
-  assert(m_state == STATE_READY);
+  ceph_assert(ceph_mutex_is_locked(m_event_lock));
+  ceph_assert(m_state == STATE_READY);
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
                  << "r=" << r << dendl;
 
   Event &event = it->second;
+  if (r < 0 && r == event.filter_ret_val) {
+    // ignore allowed error codes
+    r = 0;
+  }
   if (r < 0) {
     // event recorded to journal but failed to update disk, we cannot
     // commit this IO event. this event must be replayed.
-    assert(event.safe);
+    ceph_assert(event.safe);
     lderr(cct) << this << " " << __func__ << ": "
                << "failed to commit IO to disk, replay required: "
                << cpp_strerror(r) << dendl;
@@ -1187,10 +1218,17 @@ void Journal<I>::complete_event(typename Events::iterator it, int r) {
 
 template <typename I>
 void Journal<I>::start_append() {
-  assert(m_lock.is_locked());
-  m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
-                           m_image_ctx.journal_object_flush_bytes,
-                           m_image_ctx.journal_object_flush_age);
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+
+  m_journaler->start_append(
+    m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+  if (!m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
+    m_journaler->set_append_batch_options(
+      m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
+      m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
+      m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
+  }
+
   transition_state(STATE_READY, 0);
 }
 
@@ -1199,8 +1237,8 @@ void Journal<I>::handle_open(int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_state == STATE_INITIALIZING);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_state == STATE_INITIALIZING);
 
   if (r < 0) {
     lderr(cct) << this << " " << __func__ << ": "
@@ -1226,7 +1264,7 @@ void Journal<I>::handle_replay_ready() {
   CephContext *cct = m_image_ctx.cct;
   ReplayEntry replay_entry;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     if (m_state != STATE_REPLAYING) {
       return;
     }
@@ -1237,12 +1275,14 @@ void Journal<I>::handle_replay_ready() {
     }
 
     // only one entry should be in-flight at a time
-    assert(!m_processing_entry);
+    ceph_assert(!m_processing_entry);
     m_processing_entry = true;
   }
 
+  m_async_journal_op_tracker.start_op();
+
   bufferlist data = replay_entry.get_data();
-  bufferlist::iterator it = data.begin();
+  auto it = data.cbegin();
 
   journal::EventEntry event_entry;
   int r = m_journal_replay->decode(&it, &event_entry);
@@ -1265,7 +1305,7 @@ void Journal<I>::handle_replay_complete(int r) {
 
   bool cancel_ops = false;
   {
-    Mutex::Locker locker(m_lock);
+    std::lock_guard locker{m_lock};
     if (m_state != STATE_REPLAYING) {
       return;
     }
@@ -1280,15 +1320,15 @@ void Journal<I>::handle_replay_complete(int r) {
     }
   }
 
-  Context *ctx = new FunctionContext([this, cct](int r) {
+  Context *ctx = new LambdaContext([this, cct](int r) {
       ldout(cct, 20) << this << " handle_replay_complete: "
                      << "handle shut down replay" << dendl;
 
       State state;
       {
-        Mutex::Locker locker(m_lock);
-        assert(m_state == STATE_FLUSHING_RESTART ||
-               m_state == STATE_FLUSHING_REPLAY);
+       std::lock_guard locker{m_lock};
+        ceph_assert(m_state == STATE_FLUSHING_RESTART ||
+                    m_state == STATE_FLUSHING_REPLAY);
         state = m_state;
       }
 
@@ -1298,11 +1338,20 @@ void Journal<I>::handle_replay_complete(int r) {
         handle_flushing_replay();
       }
     });
-  ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
+  ctx = new LambdaContext([this, ctx](int r) {
+      // ensure the commit position is flushed to disk
+      m_journaler->flush_commit_position(ctx);
+    });
+  ctx = create_async_context_callback(m_image_ctx, ctx);
+  ctx = new LambdaContext([this, ctx](int r) {
+      m_async_journal_op_tracker.wait_for_ops(ctx);
+    });
+  ctx = new LambdaContext([this, cct, cancel_ops, ctx](int r) {
       ldout(cct, 20) << this << " handle_replay_complete: "
                      << "shut down replay" << dendl;
       m_journal_replay->shut_down(cancel_ops, ctx);
     });
+
   m_journaler->stop_replay(ctx);
 }
 
@@ -1312,10 +1361,10 @@ void Journal<I>::handle_replay_process_ready(int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  assert(r == 0);
+  ceph_assert(r == 0);
   {
-    Mutex::Locker locker(m_lock);
-    assert(m_processing_entry);
+    std::lock_guard locker{m_lock};
+    ceph_assert(m_processing_entry);
     m_processing_entry = false;
   }
   handle_replay_ready();
@@ -1325,10 +1374,10 @@ template <typename I>
 void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
   CephContext *cct = m_image_ctx.cct;
 
-  m_lock.Lock();
-  assert(m_state == STATE_REPLAYING ||
-         m_state == STATE_FLUSHING_RESTART ||
-         m_state == STATE_FLUSHING_REPLAY);
+  std::unique_lock locker{m_lock};
+  ceph_assert(m_state == STATE_REPLAYING ||
+              m_state == STATE_FLUSHING_RESTART ||
+              m_state == STATE_FLUSHING_REPLAY);
 
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
   if (r < 0) {
@@ -1341,44 +1390,52 @@ void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
     if (m_state == STATE_REPLAYING) {
       // abort the replay if we have an error
       transition_state(STATE_FLUSHING_RESTART, r);
-      m_lock.Unlock();
+      locker.unlock();
 
       // stop replay, shut down, and restart
-      Context *ctx = new FunctionContext([this, cct](int r) {
+      Context* ctx = create_context_callback<
+        Journal<I>, &Journal<I>::handle_flushing_restart>(this);
+      ctx = new LambdaContext([this, ctx](int r) {
+          // ensure the commit position is flushed to disk
+          m_journaler->flush_commit_position(ctx);
+        });
+      ctx = new LambdaContext([this, cct, ctx](int r) {
           ldout(cct, 20) << this << " handle_replay_process_safe: "
                          << "shut down replay" << dendl;
           {
-            Mutex::Locker locker(m_lock);
-            assert(m_state == STATE_FLUSHING_RESTART);
+           std::lock_guard locker{m_lock};
+            ceph_assert(m_state == STATE_FLUSHING_RESTART);
           }
 
-          m_journal_replay->shut_down(true, create_context_callback<
-            Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+          m_journal_replay->shut_down(true, ctx);
         });
       m_journaler->stop_replay(ctx);
+      m_async_journal_op_tracker.finish_op();
       return;
     } else if (m_state == STATE_FLUSHING_REPLAY) {
       // end-of-replay flush in-progress -- we need to restart replay
       transition_state(STATE_FLUSHING_RESTART, r);
-      m_lock.Unlock();
+      locker.unlock();
+      m_async_journal_op_tracker.finish_op();
       return;
     }
   } else {
     // only commit the entry if written successfully
     m_journaler->committed(replay_entry);
   }
-  m_lock.Unlock();
+  locker.unlock();
+  m_async_journal_op_tracker.finish_op();
 }
 
 template <typename I>
 void Journal<I>::handle_flushing_restart(int r) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  assert(r == 0);
-  assert(m_state == STATE_FLUSHING_RESTART);
+  ceph_assert(r == 0);
+  ceph_assert(m_state == STATE_FLUSHING_RESTART);
   if (m_close_pending) {
     destroy_journaler(r);
     return;
@@ -1389,12 +1446,13 @@ void Journal<I>::handle_flushing_restart(int r) {
 
 template <typename I>
 void Journal<I>::handle_flushing_replay() {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
+  ceph_assert(m_state == STATE_FLUSHING_REPLAY ||
+              m_state == STATE_FLUSHING_RESTART);
   if (m_close_pending) {
     destroy_journaler(0);
     return;
@@ -1416,8 +1474,8 @@ void Journal<I>::handle_recording_stopped(int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
 
-  Mutex::Locker locker(m_lock);
-  assert(m_state == STATE_STOPPING);
+  std::lock_guard locker{m_lock};
+  ceph_assert(m_state == STATE_STOPPING);
 
   destroy_journaler(r);
 }
@@ -1433,11 +1491,11 @@ void Journal<I>::handle_journal_destroyed(int r) {
                << dendl;
   }
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   delete m_journaler;
   m_journaler = nullptr;
 
-  assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
+  ceph_assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
   if (m_state == STATE_RESTARTING_REPLAY) {
     create_journaler();
     return;
@@ -1453,21 +1511,19 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
                  << "tid=" << tid << dendl;
 
   // journal will be flushed before closing
-  assert(m_state == STATE_READY || m_state == STATE_STOPPING);
+  ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
   if (r < 0) {
     lderr(cct) << this << " " << __func__ << ": "
                << "failed to commit IO event: "  << cpp_strerror(r) << dendl;
   }
 
-  IOObjectRequests aio_object_requests;
   Contexts on_safe_contexts;
   {
-    Mutex::Locker event_locker(m_event_lock);
+    std::lock_guard event_locker{m_event_lock};
     typename Events::iterator it = m_events.find(tid);
-    assert(it != m_events.end());
+    ceph_assert(it != m_events.end());
 
     Event &event = it->second;
-    aio_object_requests.swap(event.aio_object_requests);
     on_safe_contexts.swap(event.on_safe_contexts);
 
     if (r < 0 || event.committed_io) {
@@ -1488,16 +1544,6 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
 
   ldout(cct, 20) << this << " " << __func__ << ": "
                  << "completing tid=" << tid << dendl;
-  for (IOObjectRequests::iterator it = aio_object_requests.begin();
-       it != aio_object_requests.end(); ++it) {
-    if (r < 0) {
-      // don't send aio requests if the journal fails -- bubble error up
-      (*it)->complete(r);
-    } else {
-      // send any waiting aio requests now that journal entry is safe
-      (*it)->send();
-    }
-  }
 
   // alert the cache about the journal event status
   for (Contexts::iterator it = on_safe_contexts.begin();
@@ -1516,7 +1562,7 @@ void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
                  << "tid=" << tid << dendl;
 
   // journal will be flushed before closing
-  assert(m_state == STATE_READY || m_state == STATE_STOPPING);
+  ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
   if (r < 0) {
     lderr(cct) << this << " " << __func__ << ": "
                << "failed to commit op event: "  << cpp_strerror(r) << dendl;
@@ -1531,10 +1577,10 @@ void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
 
 template <typename I>
 void Journal<I>::stop_recording() {
-  assert(m_lock.is_locked());
-  assert(m_journaler != NULL);
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+  ceph_assert(m_journaler != NULL);
 
-  assert(m_state == STATE_READY);
+  ceph_assert(m_state == STATE_READY);
   transition_state(STATE_STOPPING, 0);
 
   m_journaler->stop_append(util::create_async_context_callback(
@@ -1546,7 +1592,7 @@ template <typename I>
 void Journal<I>::transition_state(State state, int r) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
-  assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   m_state = state;
 
   if (m_error_result == 0 && r < 0) {
@@ -1554,7 +1600,9 @@ void Journal<I>::transition_state(State state, int r) {
   }
 
   if (is_steady_state()) {
-    Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+    auto wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+    m_wait_for_state_contexts.clear();
+
     for (auto ctx : wait_for_state_contexts) {
       ctx->complete(m_error_result);
     }
@@ -1563,7 +1611,7 @@ void Journal<I>::transition_state(State state, int r) {
 
 template <typename I>
 bool Journal<I>::is_steady_state() const {
-  assert(m_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
   switch (m_state) {
   case STATE_READY:
   case STATE_CLOSED:
@@ -1583,8 +1631,8 @@ bool Journal<I>::is_steady_state() const {
 
 template <typename I>
 void Journal<I>::wait_for_steady_state(Context *on_state) {
-  assert(m_lock.is_locked());
-  assert(!is_steady_state());
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+  ceph_assert(!is_steady_state());
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
@@ -1594,7 +1642,7 @@ void Journal<I>::wait_for_steady_state(Context *on_state) {
 
 template <typename I>
 int Journal<I>::is_resync_requested(bool *do_resync) {
-  Mutex::Locker l(m_lock);
+  std::lock_guard l{m_lock};
   return check_resync_requested(do_resync);
 }
 
@@ -1603,8 +1651,8 @@ int Journal<I>::check_resync_requested(bool *do_resync) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
 
-  assert(m_lock.is_locked());
-  assert(do_resync != nullptr);
+  ceph_assert(ceph_mutex_is_locked(m_lock));
+  ceph_assert(do_resync != nullptr);
 
   cls::journal::Client client;
   int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
@@ -1615,12 +1663,12 @@ int Journal<I>::check_resync_requested(bool *do_resync) {
   }
 
   librbd::journal::ClientData client_data;
-  bufferlist::iterator bl_it = client.data.begin();
+  auto bl_it = client.data.cbegin();
   try {
-    ::decode(client_data, bl_it);
+    decode(client_data, bl_it);
   } catch (const buffer::error &err) {
     lderr(cct) << this << " " << __func__ << ": "
-               << "failed to decode client data: " << err << dendl;
+               << "failed to decode client data: " << err.what() << dendl;
     return -EINVAL;
   }
 
@@ -1638,16 +1686,16 @@ int Journal<I>::check_resync_requested(bool *do_resync) {
 }
 
 struct C_RefreshTags : public Context {
-  util::AsyncOpTracker &async_op_tracker;
+  AsyncOpTracker &async_op_tracker;
   Context *on_finish = nullptr;
 
-  Mutex lock;
-  uint64_t tag_tid;
+  ceph::mutex lock =
+    ceph::make_mutex("librbd::Journal::C_RefreshTags::lock");
+  uint64_t tag_tid = 0;
   journal::TagData tag_data;
 
-  C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
-    : async_op_tracker(async_op_tracker),
-      lock("librbd::Journal::C_RefreshTags::lock") {
+  explicit C_RefreshTags(AsyncOpTracker &async_op_tracker)
+    : async_op_tracker(async_op_tracker) {
     async_op_tracker.start_op();
   }
   ~C_RefreshTags() override {
@@ -1662,7 +1710,7 @@ struct C_RefreshTags : public Context {
 template <typename I>
 void Journal<I>::handle_metadata_updated() {
   CephContext *cct = m_image_ctx.cct;
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
 
   if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
     return;
@@ -1681,7 +1729,7 @@ void Journal<I>::handle_metadata_updated() {
   // pull the most recent tags from the journal, decode, and
   // update the internal tag state
   C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
-  refresh_ctx->on_finish = new FunctionContext(
+  refresh_ctx->on_finish = new LambdaContext(
     [this, refresh_sequence, refresh_ctx](int r) {
       handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
                               refresh_ctx->tag_data, r);
@@ -1698,7 +1746,7 @@ void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
                                          uint64_t tag_tid,
                                          journal::TagData tag_data, int r) {
   CephContext *cct = m_image_ctx.cct;
-  Mutex::Locker locker(m_lock);
+  std::unique_lock locker{m_lock};
 
   if (r < 0) {
     lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
@@ -1715,9 +1763,7 @@ void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
                  << "refresh_sequence=" << refresh_sequence << ", "
                  << "tag_tid=" << tag_tid << ", "
                  << "tag_data=" << tag_data << dendl;
-  while (m_listener_notify) {
-    m_listener_cond.Wait(m_lock);
-  }
+  m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
 
   bool was_tag_owner = is_tag_owner(m_lock);
   if (m_tag_tid < tag_tid) {
@@ -1736,7 +1782,7 @@ void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
 
   Listeners listeners(m_listeners);
   m_listener_notify = true;
-  m_lock.Unlock();
+  locker.unlock();
 
   if (promoted_to_primary) {
     for (auto listener : listeners) {
@@ -1748,23 +1794,21 @@ void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
     }
   }
 
-  m_lock.Lock();
+  locker.lock();
   m_listener_notify = false;
-  m_listener_cond.Signal();
+  m_listener_cond.notify_all();
 }
 
 template <typename I>
 void Journal<I>::add_listener(journal::Listener *listener) {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   m_listeners.insert(listener);
 }
 
 template <typename I>
 void Journal<I>::remove_listener(journal::Listener *listener) {
-  Mutex::Locker locker(m_lock);
-  while (m_listener_notify) {
-    m_listener_cond.Wait(m_lock);
-  }
+  std::unique_lock locker{m_lock};
+  m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
   m_listeners.erase(listener);
 }