]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/ImageWatcher.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / librbd / ImageWatcher.cc
index c46549a78d3d57479732cb3bad6279f69801842e..b2f9d610d7d2e1c7d78f577ca00e5622782b5c86 100644 (file)
@@ -6,18 +6,16 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/ImageState.h"
 #include "librbd/internal.h"
-#include "librbd/Operations.h"
 #include "librbd/TaskFinisher.h"
 #include "librbd/Types.h"
 #include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
 #include "librbd/exclusive_lock/Policy.h"
 #include "librbd/image_watcher/NotifyLockOwner.h"
 #include "librbd/io/AioCompletion.h"
-#include "librbd/watcher/Utils.h"
 #include "include/encoding.h"
 #include "common/errno.h"
-#include "common/WorkQueue.h"
-#include <boost/bind.hpp>
+#include <boost/bind/bind.hpp>
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -30,11 +28,12 @@ using namespace watch_notify;
 using util::create_async_context_callback;
 using util::create_context_callback;
 using util::create_rados_callback;
-using librbd::watcher::util::HandlePayloadVisitor;
 
 using ceph::encode;
 using ceph::decode;
 
+using namespace boost::placeholders;
+
 static const double    RETRY_DELAY_SECONDS = 1.0;
 
 template <typename I>
@@ -42,12 +41,13 @@ struct ImageWatcher<I>::C_ProcessPayload : public Context {
   ImageWatcher *image_watcher;
   uint64_t notify_id;
   uint64_t handle;
-  watch_notify::Payload payload;
+  std::unique_ptr<watch_notify::Payload> payload;
 
-  C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_,
-                   uint64_t handle_, const watch_notify::Payload &payload)
-    : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
-      payload(payload) {
+  C_ProcessPayload(ImageWatcher *image_watcher, uint64_t notify_id,
+                   uint64_t handle,
+                   std::unique_ptr<watch_notify::Payload> &&payload)
+    : image_watcher(image_watcher), notify_id(notify_id), handle(handle),
+      payload(std::move(payload)) {
   }
 
   void finish(int r) override {
@@ -57,7 +57,7 @@ struct ImageWatcher<I>::C_ProcessPayload : public Context {
       bufferlist bl;
       image_watcher->acknowledge_notify(notify_id, handle, bl);
     } else {
-      image_watcher->process_payload(notify_id, handle, payload);
+      image_watcher->process_payload(notify_id, handle, payload.get());
     }
     image_watcher->m_async_op_tracker.finish_op();
   }
@@ -88,13 +88,15 @@ void ImageWatcher<I>::unregister_watch(Context *on_finish) {
 
   cancel_async_requests();
 
+  // flush the task finisher queue before completing
+  on_finish = create_async_context_callback(m_task_finisher, on_finish);
+
   on_finish = new LambdaContext([this, on_finish](int r) {
+    cancel_quiesce_requests();
+    m_task_finisher->cancel_all();
     m_async_op_tracker.wait_for_ops(on_finish);
   });
-  auto ctx = new LambdaContext([this, on_finish](int r) {
-    m_task_finisher->cancel_all(on_finish);
-  });
-  Watcher::unregister_watch(ctx);
+  Watcher::unregister_watch(on_finish);
 }
 
 template <typename I>
@@ -112,9 +114,11 @@ void ImageWatcher<I>::block_notifies(Context *on_finish) {
 template <typename I>
 void ImageWatcher<I>::schedule_async_progress(const AsyncRequestId &request,
                                              uint64_t offset, uint64_t total) {
-  auto ctx = new LambdaContext(
-    boost::bind(&ImageWatcher<I>::notify_async_progress, this, request, offset,
-                total));
+  auto ctx = new LambdaContext([this, request, offset, total](int r) {
+    if (r != -ECANCELED) {
+      notify_async_progress(request, offset, total);
+    }
+  });
   m_task_finisher->queue(Task(TASK_CODE_ASYNC_PROGRESS, request), ctx);
 }
 
@@ -125,7 +129,7 @@ int ImageWatcher<I>::notify_async_progress(const AsyncRequestId &request,
                             << request << " @ " << offset
                             << "/" << total << dendl;
 
-  send_notify(AsyncProgressPayload(request, offset, total));
+  send_notify(new AsyncProgressPayload(request, offset, total));
   return 0;
 }
 
@@ -133,8 +137,11 @@ template <typename I>
 void ImageWatcher<I>::schedule_async_complete(const AsyncRequestId &request,
                                               int r) {
   m_async_op_tracker.start_op();
-  auto ctx = new LambdaContext(
-    boost::bind(&ImageWatcher<I>::notify_async_complete, this, request, r));
+  auto ctx = new LambdaContext([this, request, ret_val=r](int r) {
+    if (r != -ECANCELED) {
+      notify_async_complete(request, ret_val);
+    }
+  });
   m_task_finisher->queue(ctx);
 }
 
@@ -144,7 +151,7 @@ void ImageWatcher<I>::notify_async_complete(const AsyncRequestId &request,
   ldout(m_image_ctx.cct, 20) << this << " remote async request finished: "
                             << request << " = " << r << dendl;
 
-  send_notify(AsyncCompletePayload(request, r),
+  send_notify(new AsyncCompletePayload(request, r),
     new LambdaContext(boost::bind(&ImageWatcher<I>::handle_async_complete,
                         this, request, r, _1)));
 }
@@ -166,7 +173,7 @@ void ImageWatcher<I>::handle_async_complete(const AsyncRequestId &request,
   }
 
   std::unique_lock async_request_locker{m_async_request_lock};
-  m_async_pending.erase(request);
+  mark_async_request_complete(request, r);
   m_async_op_tracker.finish_op();
 }
 
@@ -180,7 +187,7 @@ void ImageWatcher<I>::notify_flatten(uint64_t request_id,
 
   AsyncRequestId async_request_id(get_client_id(), request_id);
 
-  notify_async_request(async_request_id, FlattenPayload(async_request_id),
+  notify_async_request(async_request_id, new FlattenPayload(async_request_id),
                        prog_ctx, on_finish);
 }
 
@@ -196,63 +203,92 @@ void ImageWatcher<I>::notify_resize(uint64_t request_id, uint64_t size,
   AsyncRequestId async_request_id(get_client_id(), request_id);
 
   notify_async_request(async_request_id,
-                       ResizePayload(size, allow_shrink, async_request_id),
+                       new ResizePayload(async_request_id, size, allow_shrink),
                        prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_create(const cls::rbd::SnapshotNamespace &snap_namespace,
+void ImageWatcher<I>::notify_snap_create(uint64_t request_id,
+                                         const cls::rbd::SnapshotNamespace &snap_namespace,
                                         const std::string &snap_name,
+                                         uint64_t flags,
+                                         ProgressContext &prog_ctx,
                                          Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(SnapCreatePayload(snap_namespace, snap_name), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(async_request_id,
+                       new SnapCreatePayload(async_request_id, snap_namespace,
+                                             snap_name, flags),
+                       prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_rename(const snapid_t &src_snap_id,
+void ImageWatcher<I>::notify_snap_rename(uint64_t request_id,
+                                         const snapid_t &src_snap_id,
                                         const std::string &dst_snap_name,
                                         Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(SnapRenamePayload(src_snap_id, dst_snap_name), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapRenamePayload(async_request_id, src_snap_id, dst_snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_remove(const cls::rbd::SnapshotNamespace &snap_namespace,
-                                        const std::string &snap_name,
-                                         Context *on_finish) {
+void ImageWatcher<I>::notify_snap_remove(
+    uint64_t request_id, const cls::rbd::SnapshotNamespace &snap_namespace,
+    const std::string &snap_name, Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(SnapRemovePayload(snap_namespace, snap_name), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapRemovePayload(async_request_id, snap_namespace, snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_protect(const cls::rbd::SnapshotNamespace &snap_namespace,
-                                         const std::string &snap_name,
-                                          Context *on_finish) {
+void ImageWatcher<I>::notify_snap_protect(
+    uint64_t request_id, const cls::rbd::SnapshotNamespace &snap_namespace,
+    const std::string &snap_name, Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(SnapProtectPayload(snap_namespace, snap_name), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapProtectPayload(async_request_id, snap_namespace, snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_unprotect(const cls::rbd::SnapshotNamespace &snap_namespace,
-                                           const std::string &snap_name,
-                                            Context *on_finish) {
+void ImageWatcher<I>::notify_snap_unprotect(
+    uint64_t request_id, const cls::rbd::SnapshotNamespace &snap_namespace,
+    const std::string &snap_name, Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(SnapUnprotectPayload(snap_namespace, snap_name), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapUnprotectPayload(async_request_id, snap_namespace, snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
@@ -266,28 +302,38 @@ void ImageWatcher<I>::notify_rebuild_object_map(uint64_t request_id,
   AsyncRequestId async_request_id(get_client_id(), request_id);
 
   notify_async_request(async_request_id,
-                       RebuildObjectMapPayload(async_request_id),
+                       new RebuildObjectMapPayload(async_request_id),
                        prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_rename(const std::string &image_name,
+void ImageWatcher<I>::notify_rename(uint64_t request_id,
+                                    const std::string &image_name,
                                     Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(RenamePayload(image_name), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(async_request_id,
+                       new RenamePayload(async_request_id, image_name),
+                       m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_update_features(uint64_t features, bool enabled,
+void ImageWatcher<I>::notify_update_features(uint64_t request_id,
+                                             uint64_t features, bool enabled,
                                              Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(UpdateFeaturesPayload(features, enabled), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(async_request_id,
+      new UpdateFeaturesPayload(async_request_id, features, enabled),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
@@ -300,7 +346,7 @@ void ImageWatcher<I>::notify_migrate(uint64_t request_id,
 
   AsyncRequestId async_request_id(get_client_id(), request_id);
 
-  notify_async_request(async_request_id, MigratePayload(async_request_id),
+  notify_async_request(async_request_id, new MigratePayload(async_request_id),
                        prog_ctx, on_finish);
 }
 
@@ -315,8 +361,8 @@ void ImageWatcher<I>::notify_sparsify(uint64_t request_id, size_t sparse_size,
   AsyncRequestId async_request_id(get_client_id(), request_id);
 
   notify_async_request(async_request_id,
-                       SparsifyPayload(async_request_id, sparse_size), prog_ctx,
-                       on_finish);
+                       new SparsifyPayload(async_request_id, sparse_size),
+                       prog_ctx, on_finish);
 }
 
 template <typename I>
@@ -324,7 +370,7 @@ void ImageWatcher<I>::notify_header_update(Context *on_finish) {
   ldout(m_image_ctx.cct, 10) << this << ": " << __func__ << dendl;
 
   // supports legacy (empty buffer) clients
-  send_notify(HeaderUpdatePayload(), on_finish);
+  send_notify(new HeaderUpdatePayload(), on_finish);
 }
 
 template <typename I>
@@ -332,26 +378,155 @@ void ImageWatcher<I>::notify_header_update(librados::IoCtx &io_ctx,
                                           const std::string &oid) {
   // supports legacy (empty buffer) clients
   bufferlist bl;
-  encode(NotifyMessage(HeaderUpdatePayload()), bl);
+  encode(NotifyMessage(new HeaderUpdatePayload()), bl);
   io_ctx.notify2(oid, bl, watcher::Notifier::NOTIFY_TIMEOUT, nullptr);
 }
 
+template <typename I>
+void ImageWatcher<I>::notify_quiesce(uint64_t *request_id,
+                                     ProgressContext &prog_ctx,
+                                     Context *on_finish) {
+  *request_id = util::reserve_async_request_id();
+
+  ldout(m_image_ctx.cct, 10) << this << " " << __func__ << ": request_id="
+                             << request_id << dendl;
+
+  AsyncRequestId async_request_id(get_client_id(), *request_id);
+
+  auto attempts = m_image_ctx.config.template get_val<uint64_t>(
+    "rbd_quiesce_notification_attempts");
+
+  notify_quiesce(async_request_id, attempts, prog_ctx, on_finish);
+}
+
+template <typename I>
+void ImageWatcher<I>::notify_quiesce(const AsyncRequestId &async_request_id,
+                                     size_t attempts, ProgressContext &prog_ctx,
+                                     Context *on_finish) {
+  ldout(m_image_ctx.cct, 10) << this << " " << __func__ << ": async_request_id="
+                             << async_request_id << " attempts=" << attempts
+                             << dendl;
+
+  ceph_assert(attempts > 0);
+  auto notify_response = new watcher::NotifyResponse();
+  auto on_notify = new LambdaContext(
+    [notify_response=std::unique_ptr<watcher::NotifyResponse>(notify_response),
+     this, async_request_id, &prog_ctx, on_finish, attempts=attempts-1](int r) {
+      auto total_attempts = m_image_ctx.config.template get_val<uint64_t>(
+        "rbd_quiesce_notification_attempts");
+      if (total_attempts < attempts) {
+        total_attempts = attempts;
+      }
+      prog_ctx.update_progress(total_attempts - attempts, total_attempts);
+
+      if (r == -ETIMEDOUT) {
+        ldout(m_image_ctx.cct, 10) << this << " " << __func__ << ": async_request_id="
+                                   << async_request_id << " timed out" << dendl;
+        if (attempts > 0) {
+          notify_quiesce(async_request_id, attempts, prog_ctx, on_finish);
+          return;
+        }
+      } else if (r == 0) {
+        for (auto &[client_id, bl] : notify_response->acks) {
+          if (bl.length() == 0) {
+            continue;
+          }
+          try {
+            auto iter = bl.cbegin();
+
+            ResponseMessage response_message;
+            using ceph::decode;
+            decode(response_message, iter);
+
+            if (response_message.result != -EOPNOTSUPP) {
+              r = response_message.result;
+            }
+          } catch (const buffer::error &err) {
+            r = -EINVAL;
+          }
+          if (r < 0) {
+            break;
+          }
+        }
+      }
+      if (r < 0) {
+        lderr(m_image_ctx.cct) << this << " failed to notify quiesce: "
+                               << cpp_strerror(r) << dendl;
+      }
+      on_finish->complete(r);
+    });
+
+  bufferlist bl;
+  encode(NotifyMessage(new QuiescePayload(async_request_id)), bl);
+  Watcher::send_notify(bl, notify_response, on_notify);
+}
+
+template <typename I>
+void ImageWatcher<I>::notify_unquiesce(uint64_t request_id, Context *on_finish) {
+  ldout(m_image_ctx.cct, 10) << this << " " << __func__ << ": request_id="
+                             << request_id << dendl;
+
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  send_notify(new UnquiescePayload(async_request_id), on_finish);
+}
+
+template <typename I>
+void ImageWatcher<I>::notify_metadata_set(uint64_t request_id,
+                                          const std::string &key,
+                                          const std::string &value,
+                                          Context *on_finish) {
+  ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
+  ceph_assert(m_image_ctx.exclusive_lock &&
+              !m_image_ctx.exclusive_lock->is_lock_owner());
+
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new MetadataUpdatePayload(async_request_id, key,
+                                std::optional<std::string>{value}),
+      m_no_op_prog_ctx,  on_finish);
+}
+
+template <typename I>
+void ImageWatcher<I>::notify_metadata_remove(uint64_t request_id,
+                                             const std::string &key,
+                                             Context *on_finish) {
+  ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
+  ceph_assert(m_image_ctx.exclusive_lock &&
+              !m_image_ctx.exclusive_lock->is_lock_owner());
+
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new MetadataUpdatePayload(async_request_id, key, std::nullopt),
+      m_no_op_prog_ctx,  on_finish);
+}
+
 template <typename I>
 void ImageWatcher<I>::schedule_cancel_async_requests() {
-  auto ctx = new LambdaContext(
-    boost::bind(&ImageWatcher<I>::cancel_async_requests, this));
+  auto ctx = new LambdaContext([this](int r) {
+    if (r != -ECANCELED) {
+      cancel_async_requests();
+    }
+  });
   m_task_finisher->queue(TASK_CODE_CANCEL_ASYNC_REQUESTS, ctx);
 }
 
 template <typename I>
 void ImageWatcher<I>::cancel_async_requests() {
   std::unique_lock l{m_async_request_lock};
-  for (std::map<AsyncRequestId, AsyncRequest>::iterator iter =
-        m_async_requests.begin();
-       iter != m_async_requests.end(); ++iter) {
-    iter->second.first->complete(-ERESTART);
+  for (auto iter = m_async_requests.begin(); iter != m_async_requests.end(); ) {
+    if (iter->second.second == nullptr) {
+      // Quiesce notify request. Skip.
+      iter++;
+    } else {
+      iter->second.first->complete(-ERESTART);
+      iter = m_async_requests.erase(iter);
+    }
   }
-  m_async_requests.clear();
 }
 
 template <typename I>
@@ -378,7 +553,7 @@ void ImageWatcher<I>::notify_acquired_lock() {
     set_owner_client_id(client_id);
   }
 
-  send_notify(AcquiredLockPayload(client_id));
+  send_notify(new AcquiredLockPayload(client_id));
 }
 
 template <typename I>
@@ -390,7 +565,7 @@ void ImageWatcher<I>::notify_released_lock() {
     set_owner_client_id(ClientId());
   }
 
-  send_notify(ReleasedLockPayload(get_client_id()));
+  send_notify(new ReleasedLockPayload(get_client_id()));
 }
 
 template <typename I>
@@ -408,8 +583,12 @@ void ImageWatcher<I>::schedule_request_lock(bool use_timer, int timer_delay) {
   if (this->is_registered(this->m_watch_lock)) {
     ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
 
-    auto ctx = new LambdaContext(
-      boost::bind(&ImageWatcher<I>::notify_request_lock, this));
+    auto ctx = new LambdaContext([this](int r) {
+      if (r != -ECANCELED) {
+        notify_request_lock();
+      }
+    });
+
     if (use_timer) {
       if (timer_delay < 0) {
         timer_delay = RETRY_DELAY_SECONDS;
@@ -436,7 +615,7 @@ void ImageWatcher<I>::notify_request_lock() {
 
   ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;
 
-  notify_lock_owner(RequestLockPayload(get_client_id(), false),
+  notify_lock_owner(new RequestLockPayload(get_client_id(), false),
       create_context_callback<
         ImageWatcher, &ImageWatcher<I>::handle_request_lock>(this));
 }
@@ -476,8 +655,7 @@ void ImageWatcher<I>::handle_request_lock(int r) {
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_lock_owner(const Payload& payload,
-                                        Context *on_finish) {
+void ImageWatcher<I>::notify_lock_owner(Payload *payload, Context *on_finish) {
   ceph_assert(on_finish != nullptr);
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
 
@@ -489,9 +667,56 @@ void ImageWatcher<I>::notify_lock_owner(const Payload& payload,
   notify_lock_owner->send();
 }
 
+template <typename I>
+bool ImageWatcher<I>::is_new_request(const AsyncRequestId &id) const {
+  ceph_assert(ceph_mutex_is_locked(m_async_request_lock));
+
+  return m_async_pending.count(id) == 0 && m_async_complete.count(id) == 0;
+}
+
+template <typename I>
+bool ImageWatcher<I>::mark_async_request_complete(const AsyncRequestId &id,
+                                                  int r) {
+  ceph_assert(ceph_mutex_is_locked(m_async_request_lock));
+
+  bool found = m_async_pending.erase(id);
+
+  auto now = ceph_clock_now();
+
+  auto it = m_async_complete_expiration.begin();
+  while (it != m_async_complete_expiration.end() && it->first < now) {
+    m_async_complete.erase(it->second);
+    it = m_async_complete_expiration.erase(it);
+  }
+
+  if (!m_async_complete.insert({id, r}).second) {
+    for (it = m_async_complete_expiration.begin();
+         it != m_async_complete_expiration.end(); it++) {
+      if (it->second == id) {
+        m_async_complete_expiration.erase(it);
+        break;
+      }
+    }
+  }
+  auto expiration_time = now;
+  expiration_time += 600;
+  m_async_complete_expiration.insert({expiration_time, id});
+
+  return found;
+}
+
 template <typename I>
 Context *ImageWatcher<I>::remove_async_request(const AsyncRequestId &id) {
   std::unique_lock async_request_locker{m_async_request_lock};
+
+  return remove_async_request(id, m_async_request_lock);
+}
+
+template <typename I>
+Context *ImageWatcher<I>::remove_async_request(const AsyncRequestId &id,
+                                               ceph::shared_mutex &lock) {
+  ceph_assert(ceph_mutex_is_locked(lock));
+
   auto it = m_async_requests.find(id);
   if (it != m_async_requests.end()) {
     Context *on_complete = it->second.first;
@@ -506,8 +731,11 @@ void ImageWatcher<I>::schedule_async_request_timed_out(const AsyncRequestId &id)
   ldout(m_image_ctx.cct, 20) << "scheduling async request time out: " << id
                              << dendl;
 
-  Context *ctx = new LambdaContext(boost::bind(
-    &ImageWatcher<I>::async_request_timed_out, this, id));
+  auto ctx = new LambdaContext([this, id](int r) {
+    if (r != -ECANCELED) {
+      async_request_timed_out(id);
+    }
+  });
 
   Task task(TASK_CODE_ASYNC_REQUEST, id);
   m_task_finisher->cancel(task);
@@ -527,10 +755,9 @@ void ImageWatcher<I>::async_request_timed_out(const AsyncRequestId &id) {
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_async_request(const AsyncRequestId &async_request_id,
-                                           const Payload& payload,
-                                           ProgressContext& prog_ctx,
-                                           Context *on_finish) {
+void ImageWatcher<I>::notify_async_request(
+    const AsyncRequestId &async_request_id, Payload *payload,
+    ProgressContext& prog_ctx, Context *on_finish) {
   ceph_assert(on_finish != nullptr);
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
 
@@ -570,18 +797,175 @@ int ImageWatcher<I>::prepare_async_request(const AsyncRequestId& async_request_i
     return -ERESTART;
   } else {
     std::unique_lock l{m_async_request_lock};
-    if (m_async_pending.count(async_request_id) == 0) {
+    if (is_new_request(async_request_id)) {
       m_async_pending.insert(async_request_id);
       *new_request = true;
       *prog_ctx = new RemoteProgressContext(*this, async_request_id);
       *ctx = new RemoteContext(*this, async_request_id, *prog_ctx);
     } else {
       *new_request = false;
+      auto it = m_async_complete.find(async_request_id);
+      if (it != m_async_complete.end()) {
+        int r = it->second;
+        // reset complete request expiration time
+        mark_async_request_complete(async_request_id, r);
+        return r;
+      }
     }
   }
   return 0;
 }
 
+template <typename I>
+Context *ImageWatcher<I>::prepare_quiesce_request(
+    const AsyncRequestId &request, C_NotifyAck *ack_ctx) {
+  std::unique_lock locker{m_async_request_lock};
+
+  auto timeout = 2 * watcher::Notifier::NOTIFY_TIMEOUT / 1000;
+
+  if (!is_new_request(request)) {
+    auto it = m_async_requests.find(request);
+    if (it != m_async_requests.end()) {
+      delete it->second.first;
+      it->second.first = ack_ctx;
+    } else {
+      auto it = m_async_complete.find(request);
+      ceph_assert(it != m_async_complete.end());
+      m_task_finisher->queue(new C_ResponseMessage(ack_ctx), it->second);
+      // reset complete request expiration time
+      mark_async_request_complete(request, it->second);
+    }
+    locker.unlock();
+
+    m_task_finisher->reschedule_event_after(Task(TASK_CODE_QUIESCE, request),
+                                            timeout);
+    return nullptr;
+  }
+
+  m_async_pending.insert(request);
+  m_async_requests[request] = AsyncRequest(ack_ctx, nullptr);
+  m_async_op_tracker.start_op();
+
+  return new LambdaContext(
+    [this, request, timeout](int r) {
+      auto unquiesce_ctx = new LambdaContext(
+        [this, request](int r) {
+          if (r == 0) {
+            ldout(m_image_ctx.cct, 10) << this << " quiesce request "
+                                       << request << " timed out" << dendl;
+          }
+
+          auto on_finish = new LambdaContext(
+            [this](int r) {
+              m_async_op_tracker.finish_op();
+            });
+
+          m_image_ctx.state->notify_unquiesce(on_finish);
+        });
+
+      m_task_finisher->add_event_after(Task(TASK_CODE_QUIESCE, request),
+                                       timeout, unquiesce_ctx);
+
+      std::unique_lock async_request_locker{m_async_request_lock};
+      mark_async_request_complete(request, r);
+      auto ctx = remove_async_request(request, m_async_request_lock);
+      async_request_locker.unlock();
+      if (ctx != nullptr) {
+        ctx = new C_ResponseMessage(static_cast<C_NotifyAck *>(ctx));
+        ctx->complete(r);
+      } else {
+        m_task_finisher->cancel(Task(TASK_CODE_QUIESCE, request));
+      }
+    });
+}
+
+template <typename I>
+void ImageWatcher<I>::prepare_unquiesce_request(const AsyncRequestId &request) {
+  {
+    std::unique_lock async_request_locker{m_async_request_lock};
+    auto it = m_async_complete.find(request);
+    if (it == m_async_complete.end()) {
+      ldout(m_image_ctx.cct, 20) << this << " " << request
+                                 << ": not found in complete" << dendl;
+      return;
+    }
+    // reset complete request expiration time
+    mark_async_request_complete(request, it->second);
+  }
+
+  bool canceled = m_task_finisher->cancel(Task(TASK_CODE_QUIESCE, request));
+  if (!canceled) {
+    ldout(m_image_ctx.cct, 20) << this << " " << request
+                               << ": timer task not found" << dendl;
+  }
+}
+
+template <typename I>
+void ImageWatcher<I>::cancel_quiesce_requests() {
+  std::unique_lock l{m_async_request_lock};
+  for (auto it = m_async_requests.begin(); it != m_async_requests.end(); ) {
+    if (it->second.second == nullptr) {
+      // Quiesce notify request.
+      mark_async_request_complete(it->first, 0);
+      delete it->second.first;
+      it = m_async_requests.erase(it);
+    } else {
+      it++;
+    }
+  }
+}
+
+template <typename I>
+bool ImageWatcher<I>::handle_operation_request(
+    const AsyncRequestId& async_request_id,
+    exclusive_lock::OperationRequestType request_type, Operation operation,
+    std::function<void(ProgressContext &prog_ctx, Context*)> execute,
+    C_NotifyAck *ack_ctx) {
+  std::shared_lock owner_locker{m_image_ctx.owner_lock};
+
+  if (m_image_ctx.exclusive_lock != nullptr) {
+    int r = 0;
+    if (m_image_ctx.exclusive_lock->accept_request(request_type, &r)) {
+      bool new_request;
+      Context *ctx;
+      ProgressContext *prog_ctx;
+      bool complete;
+      if (async_request_id) {
+        r = prepare_async_request(async_request_id, &new_request, &ctx,
+                                  &prog_ctx);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
+      } else {
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        prog_ctx = &m_no_op_prog_ctx;
+        complete = false;
+      }
+      if (r == 0 && new_request) {
+        ctx = new LambdaContext(
+          [this, operation, ctx](int r) {
+            m_image_ctx.operations->finish_op(operation, r);
+            ctx->complete(r);
+          });
+        ctx = new LambdaContext(
+          [this, execute, prog_ctx, ctx](int r) {
+            if (r < 0) {
+              ctx->complete(r);
+              return;
+            }
+            std::shared_lock l{m_image_ctx.owner_lock};
+            execute(*prog_ctx, ctx);
+          });
+        m_image_ctx.operations->start_op(operation, ctx);
+      }
+      return complete;
+    } else if (r < 0) {
+      encode(ResponseMessage(r), ack_ctx->out);
+    }
+  }
+  return true;
+}
+
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const HeaderUpdatePayload &payload,
                                     C_NotifyAck *ack_ctx) {
@@ -724,309 +1108,257 @@ bool ImageWatcher<I>::handle_payload(const AsyncCompletePayload &payload,
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const FlattenPayload &payload,
                                     C_NotifyAck *ack_ctx) {
+  ldout(m_image_ctx.cct, 10) << this << " remote flatten request: "
+                             << payload.async_request_id << dendl;
 
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      bool new_request;
-      Context *ctx;
-      ProgressContext *prog_ctx;
-      r = prepare_async_request(payload.async_request_id, &new_request,
-                                &ctx, &prog_ctx);
-      if (r == 0 && new_request) {
-        ldout(m_image_ctx.cct, 10) << this << " remote flatten request: "
-                                  << payload.async_request_id << dendl;
-        m_image_ctx.operations->execute_flatten(*prog_ctx, ctx);
-      }
-
-      encode(ResponseMessage(r), ack_ctx->out);
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_FLATTEN, std::bind(&Operations<I>::execute_flatten,
+                                   m_image_ctx.operations,
+                                   std::placeholders::_1,
+                                   std::placeholders::_2),
+      ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const ResizePayload &payload,
                                     C_NotifyAck *ack_ctx) {
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      bool new_request;
-      Context *ctx;
-      ProgressContext *prog_ctx;
-      r = prepare_async_request(payload.async_request_id, &new_request,
-                                &ctx, &prog_ctx);
-      if (r == 0 && new_request) {
-        ldout(m_image_ctx.cct, 10) << this << " remote resize request: "
-                                  << payload.async_request_id << " "
-                                  << payload.size << " "
-                                  << payload.allow_shrink << dendl;
-        m_image_ctx.operations->execute_resize(payload.size, payload.allow_shrink, *prog_ctx, ctx, 0);
-      }
+  ldout(m_image_ctx.cct, 10) << this << " remote resize request: "
+                             << payload.async_request_id << " "
+                             << payload.size << " "
+                             << payload.allow_shrink << dendl;
 
-      encode(ResponseMessage(r), ack_ctx->out);
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_RESIZE, std::bind(&Operations<I>::execute_resize,
+                                  m_image_ctx.operations, payload.size,
+                                  payload.allow_shrink, std::placeholders::_1,
+                                  std::placeholders::_2, 0), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const SnapCreatePayload &payload,
                                     C_NotifyAck *ack_ctx) {
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
-
-    // rbd-mirror needs to accept forced promotion orphan snap create requests
-    auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
-      &payload.snap_namespace);
-    if (mirror_ns != nullptr && mirror_ns->is_orphan()) {
-      request_type = exclusive_lock::OPERATION_REQUEST_TYPE_FORCE_PROMOTION;
-    }
-
-    if (m_image_ctx.exclusive_lock->accept_request(request_type, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_create request: "
-                                << payload.snap_name << dendl;
-
-      m_image_ctx.operations->execute_snap_create(payload.snap_namespace,
-                                                 payload.snap_name,
-                                                  new C_ResponseMessage(ack_ctx),
-                                                  0, false);
-      return false;
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
+  ldout(m_image_ctx.cct, 10) << this << " remote snap_create request: "
+                             << payload.async_request_id << " "
+                             << payload.snap_namespace << " "
+                             << payload.snap_name << " "
+                             << payload.flags << dendl;
+
+  auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
+
+  // rbd-mirror needs to accept forced promotion orphan snap create requests
+  auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+    &payload.snap_namespace);
+  if (mirror_ns != nullptr && mirror_ns->is_orphan()) {
+    request_type = exclusive_lock::OPERATION_REQUEST_TYPE_FORCE_PROMOTION;
   }
-  return true;
+
+  return handle_operation_request(
+      payload.async_request_id, request_type,
+      OPERATION_SNAP_CREATE, std::bind(&Operations<I>::execute_snap_create,
+                                       m_image_ctx.operations,
+                                       payload.snap_namespace,
+                                       payload.snap_name, std::placeholders::_2,
+                                       0, payload.flags, std::placeholders::_1),
+      ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const SnapRenamePayload &payload,
                                     C_NotifyAck *ack_ctx) {
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_rename request: "
-                                << payload.snap_id << " to "
-                                << payload.snap_name << dendl;
-
-      m_image_ctx.operations->execute_snap_rename(payload.snap_id,
-                                                  payload.snap_name,
-                                                  new C_ResponseMessage(ack_ctx));
-      return false;
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+  ldout(m_image_ctx.cct, 10) << this << " remote snap_rename request: "
+                             << payload.async_request_id << " "
+                             << payload.snap_id << " to "
+                             << payload.snap_name << dendl;
+
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_SNAP_RENAME, std::bind(&Operations<I>::execute_snap_rename,
+                                       m_image_ctx.operations, payload.snap_id,
+                                       payload.snap_name,
+                                       std::placeholders::_2), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const SnapRemovePayload &payload,
                                     C_NotifyAck *ack_ctx) {
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
-    if (cls::rbd::get_snap_namespace_type(payload.snap_namespace) ==
-        cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
-      request_type = exclusive_lock::OPERATION_REQUEST_TYPE_TRASH_SNAP_REMOVE;
-    }
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(request_type, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_remove request: "
-                                << payload.snap_name << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " remote snap_remove request: "
+                             << payload.snap_name << dendl;
 
-      m_image_ctx.operations->execute_snap_remove(payload.snap_namespace,
-                                                 payload.snap_name,
-                                                  new C_ResponseMessage(ack_ctx));
-      return false;
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
+  auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
+  if (cls::rbd::get_snap_namespace_type(payload.snap_namespace) ==
+      cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
+    request_type = exclusive_lock::OPERATION_REQUEST_TYPE_TRASH_SNAP_REMOVE;
   }
-  return true;
+
+  return handle_operation_request(
+      payload.async_request_id, request_type, OPERATION_SNAP_REMOVE,
+      std::bind(&Operations<I>::execute_snap_remove, m_image_ctx.operations,
+                payload.snap_namespace, payload.snap_name,
+                std::placeholders::_2), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const SnapProtectPayload& payload,
                                      C_NotifyAck *ack_ctx) {
-  std::shared_lock owner_locker{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_protect request: "
-                                 << payload.snap_name << dendl;
-
-      m_image_ctx.operations->execute_snap_protect(payload.snap_namespace,
-                                                  payload.snap_name,
-                                                   new C_ResponseMessage(ack_ctx));
-      return false;
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+    ldout(m_image_ctx.cct, 10) << this << " remote snap_protect request: "
+                               << payload.async_request_id << " "
+                               << payload.snap_name << dendl;
+
+    return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_SNAP_PROTECT, std::bind(&Operations<I>::execute_snap_protect,
+                                        m_image_ctx.operations,
+                                        payload.snap_namespace,
+                                        payload.snap_name,
+                                        std::placeholders::_2), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const SnapUnprotectPayload& payload,
                                      C_NotifyAck *ack_ctx) {
-  std::shared_lock owner_locker{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_unprotect request: "
-                                 << payload.snap_name << dendl;
-
-      m_image_ctx.operations->execute_snap_unprotect(payload.snap_namespace,
-                                                    payload.snap_name,
-                                                     new C_ResponseMessage(ack_ctx));
-      return false;
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+    ldout(m_image_ctx.cct, 10) << this << " remote snap_unprotect request: "
+                               << payload.async_request_id << " "
+                               << payload.snap_name << dendl;
+
+    return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_SNAP_UNPROTECT, std::bind(&Operations<I>::execute_snap_unprotect,
+                                          m_image_ctx.operations,
+                                          payload.snap_namespace,
+                                          payload.snap_name,
+                                          std::placeholders::_2), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const RebuildObjectMapPayload& payload,
                                      C_NotifyAck *ack_ctx) {
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      bool new_request;
-      Context *ctx;
-      ProgressContext *prog_ctx;
-      r = prepare_async_request(payload.async_request_id, &new_request,
-                                &ctx, &prog_ctx);
-      if (r == 0 && new_request) {
-        ldout(m_image_ctx.cct, 10) << this
-                                   << " remote rebuild object map request: "
-                                   << payload.async_request_id << dendl;
-        m_image_ctx.operations->execute_rebuild_object_map(*prog_ctx, ctx);
-      }
+  ldout(m_image_ctx.cct, 10) << this << " remote rebuild object map request: "
+                             << payload.async_request_id << dendl;
 
-      encode(ResponseMessage(r), ack_ctx->out);
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_REBUILD_OBJECT_MAP,
+      std::bind(&Operations<I>::execute_rebuild_object_map,
+                m_image_ctx.operations, std::placeholders::_1,
+                std::placeholders::_2), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const RenamePayload& payload,
                                      C_NotifyAck *ack_ctx) {
-  std::shared_lock owner_locker{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote rename request: "
-                                 << payload.image_name << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " remote rename request: "
+                             << payload.async_request_id << " "
+                             << payload.image_name << dendl;
 
-      m_image_ctx.operations->execute_rename(payload.image_name,
-                                             new C_ResponseMessage(ack_ctx));
-      return false;
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_RENAME, std::bind(&Operations<I>::execute_rename,
+                                  m_image_ctx.operations, payload.image_name,
+                                  std::placeholders::_2), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const UpdateFeaturesPayload& payload,
                                      C_NotifyAck *ack_ctx) {
-  std::shared_lock owner_locker{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote update_features request: "
-                                 << payload.features << " "
-                                 << (payload.enabled ? "enabled" : "disabled")
-                                 << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " remote update_features request: "
+                             << payload.async_request_id << " "
+                             << payload.features << " "
+                             << (payload.enabled ? "enabled" : "disabled")
+                             << dendl;
 
-      m_image_ctx.operations->execute_update_features(
-        payload.features, payload.enabled, new C_ResponseMessage(ack_ctx), 0);
-      return false;
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_UPDATE_FEATURES,
+      std::bind(&Operations<I>::execute_update_features, m_image_ctx.operations,
+                payload.features, payload.enabled, std::placeholders::_2, 0),
+      ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const MigratePayload &payload,
                                     C_NotifyAck *ack_ctx) {
+  ldout(m_image_ctx.cct, 10) << this << " remote migrate request: "
+                             << payload.async_request_id << dendl;
 
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      bool new_request;
-      Context *ctx;
-      ProgressContext *prog_ctx;
-      r = prepare_async_request(payload.async_request_id, &new_request,
-                                &ctx, &prog_ctx);
-      if (r == 0 && new_request) {
-        ldout(m_image_ctx.cct, 10) << this << " remote migrate request: "
-                                  << payload.async_request_id << dendl;
-        m_image_ctx.operations->execute_migrate(*prog_ctx, ctx);
-      }
-
-      encode(ResponseMessage(r), ack_ctx->out);
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
-  }
-  return true;
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_MIGRATE, std::bind(&Operations<I>::execute_migrate,
+                                   m_image_ctx.operations,
+                                   std::placeholders::_1,
+                                   std::placeholders::_2), ack_ctx);
 }
 
 template <typename I>
 bool ImageWatcher<I>::handle_payload(const SparsifyPayload &payload,
                                     C_NotifyAck *ack_ctx) {
-  std::shared_lock l{m_image_ctx.owner_lock};
-  if (m_image_ctx.exclusive_lock != nullptr) {
-    int r;
-    if (m_image_ctx.exclusive_lock->accept_request(
-          exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      bool new_request;
-      Context *ctx;
-      ProgressContext *prog_ctx;
-      r = prepare_async_request(payload.async_request_id, &new_request,
-                                &ctx, &prog_ctx);
-      if (r == 0 && new_request) {
-        ldout(m_image_ctx.cct, 10) << this << " remote sparsify request: "
-                                  << payload.async_request_id << dendl;
-        m_image_ctx.operations->execute_sparsify(payload.sparse_size, *prog_ctx,
-                                                 ctx);
-      }
+  ldout(m_image_ctx.cct, 10) << this << " remote sparsify request: "
+                             << payload.async_request_id << dendl;
 
-      encode(ResponseMessage(r), ack_ctx->out);
-    } else if (r < 0) {
-      encode(ResponseMessage(r), ack_ctx->out);
-    }
+  return handle_operation_request(
+      payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+      OPERATION_SPARSIFY, std::bind(&Operations<I>::execute_sparsify,
+                                    m_image_ctx.operations,
+                                    payload.sparse_size, std::placeholders::_1,
+                                    std::placeholders::_2), ack_ctx);
+}
+
+template <typename I>
+bool ImageWatcher<I>::handle_payload(const MetadataUpdatePayload &payload,
+                                     C_NotifyAck *ack_ctx) {
+  if (payload.value) {
+    ldout(m_image_ctx.cct, 10) << this << " remote metadata_set request: "
+                               << payload.async_request_id << " "
+                               << "key=" << payload.key << ", value="
+                               << *payload.value << dendl;
+
+    return handle_operation_request(
+        payload.async_request_id,
+        exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+        OPERATION_METADATA_UPDATE,
+        std::bind(&Operations<I>::execute_metadata_set,
+                  m_image_ctx.operations, payload.key, *payload.value,
+                  std::placeholders::_2),
+        ack_ctx);
+  } else {
+    ldout(m_image_ctx.cct, 10) << this << " remote metadata_remove request: "
+                               << payload.async_request_id << " "
+                               << "key=" << payload.key << dendl;
+
+    return handle_operation_request(
+        payload.async_request_id,
+        exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
+        OPERATION_METADATA_UPDATE,
+        std::bind(&Operations<I>::execute_metadata_remove,
+                  m_image_ctx.operations, payload.key, std::placeholders::_2),
+        ack_ctx);
+  }
+}
+
+template <typename I>
+bool ImageWatcher<I>::handle_payload(const QuiescePayload &payload,
+                                    C_NotifyAck *ack_ctx) {
+  auto on_finish = prepare_quiesce_request(payload.async_request_id, ack_ctx);
+  if (on_finish == nullptr) {
+    ldout(m_image_ctx.cct, 10) << this << " duplicate quiesce request: "
+                               << payload.async_request_id << dendl;
+    return false;
   }
+
+  ldout(m_image_ctx.cct, 10) << this << " quiesce request: "
+                             << payload.async_request_id << dendl;
+  m_image_ctx.state->notify_quiesce(on_finish);
+  return false;
+}
+
+template <typename I>
+bool ImageWatcher<I>::handle_payload(const UnquiescePayload &payload,
+                                    C_NotifyAck *ack_ctx) {
+  ldout(m_image_ctx.cct, 10) << this << " unquiesce request: "
+                             << payload.async_request_id << dendl;
+
+  prepare_unquiesce_request(payload.async_request_id);
   return true;
 }
 
@@ -1046,9 +1378,95 @@ bool ImageWatcher<I>::handle_payload(const UnknownPayload &payload,
 
 template <typename I>
 void ImageWatcher<I>::process_payload(uint64_t notify_id, uint64_t handle,
-                                      const Payload &payload) {
-  apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id, handle),
-                payload);
+                                      Payload *payload) {
+  auto ctx = new Watcher::C_NotifyAck(this, notify_id, handle);
+  bool complete;
+
+  switch (payload->get_notify_op()) {
+  case NOTIFY_OP_ACQUIRED_LOCK:
+    complete = handle_payload(*(static_cast<AcquiredLockPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_RELEASED_LOCK:
+    complete = handle_payload(*(static_cast<ReleasedLockPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_REQUEST_LOCK:
+    complete = handle_payload(*(static_cast<RequestLockPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_HEADER_UPDATE:
+    complete = handle_payload(*(static_cast<HeaderUpdatePayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_ASYNC_PROGRESS:
+    complete = handle_payload(*(static_cast<AsyncProgressPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_ASYNC_COMPLETE:
+    complete = handle_payload(*(static_cast<AsyncCompletePayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_FLATTEN:
+    complete = handle_payload(*(static_cast<FlattenPayload *>(payload)), ctx);
+    break;
+  case NOTIFY_OP_RESIZE:
+    complete = handle_payload(*(static_cast<ResizePayload *>(payload)), ctx);
+    break;
+  case NOTIFY_OP_SNAP_CREATE:
+    complete = handle_payload(*(static_cast<SnapCreatePayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_SNAP_REMOVE:
+    complete = handle_payload(*(static_cast<SnapRemovePayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_SNAP_RENAME:
+    complete = handle_payload(*(static_cast<SnapRenamePayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_SNAP_PROTECT:
+    complete = handle_payload(*(static_cast<SnapProtectPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_SNAP_UNPROTECT:
+    complete = handle_payload(*(static_cast<SnapUnprotectPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_REBUILD_OBJECT_MAP:
+    complete = handle_payload(*(static_cast<RebuildObjectMapPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_RENAME:
+    complete = handle_payload(*(static_cast<RenamePayload *>(payload)), ctx);
+    break;
+  case NOTIFY_OP_UPDATE_FEATURES:
+    complete = handle_payload(*(static_cast<UpdateFeaturesPayload *>(payload)),
+                              ctx);
+    break;
+  case NOTIFY_OP_MIGRATE:
+    complete = handle_payload(*(static_cast<MigratePayload *>(payload)), ctx);
+    break;
+  case NOTIFY_OP_SPARSIFY:
+    complete = handle_payload(*(static_cast<SparsifyPayload *>(payload)), ctx);
+    break;
+  case NOTIFY_OP_QUIESCE:
+    complete = handle_payload(*(static_cast<QuiescePayload *>(payload)), ctx);
+    break;
+  case NOTIFY_OP_UNQUIESCE:
+    complete = handle_payload(*(static_cast<UnquiescePayload *>(payload)), ctx);
+    break;
+  case NOTIFY_OP_METADATA_UPDATE:
+    complete = handle_payload(*(static_cast<MetadataUpdatePayload *>(payload)), ctx);
+    break;
+  default:
+    ceph_assert(payload->get_notify_op() == static_cast<NotifyOp>(-1));
+    complete = handle_payload(*(static_cast<UnknownPayload *>(payload)), ctx);
+  }
+
+  if (complete) {
+    ctx->complete(0);
+  }
 }
 
 template <typename I>
@@ -1057,7 +1475,7 @@ void ImageWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
   NotifyMessage notify_message;
   if (bl.length() == 0) {
     // legacy notification for header updates
-    notify_message = NotifyMessage(HeaderUpdatePayload());
+    notify_message = NotifyMessage(new HeaderUpdatePayload());
   } else {
     try {
       auto iter = bl.cbegin();
@@ -1072,10 +1490,12 @@ void ImageWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
   // if an image refresh is required, refresh before processing the request
   if (notify_message.check_for_refresh() &&
       m_image_ctx.state->is_refresh_required()) {
-    m_image_ctx.state->refresh(new C_ProcessPayload(this, notify_id, handle,
-                                                    notify_message.payload));
+
+    m_image_ctx.state->refresh(
+      new C_ProcessPayload(this, notify_id, handle,
+                           std::move(notify_message.payload)));
   } else {
-    process_payload(notify_id, handle, notify_message.payload);
+    process_payload(notify_id, handle, notify_message.payload.get());
   }
 }
 
@@ -1110,7 +1530,7 @@ void ImageWatcher<I>::handle_rewatch_complete(int r) {
 }
 
 template <typename I>
-void ImageWatcher<I>::send_notify(const Payload &payload, Context *ctx) {
+void ImageWatcher<I>::send_notify(Payload *payload, Context *ctx) {
   bufferlist bl;
 
   encode(NotifyMessage(payload), bl);