]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/ImageState.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / librbd / ImageState.cc
index fda3148f3199d298c18b707e50b3ec283322bf6a..a81a8373dbd61cbe47fa37fc8d6837bf62c0168c 100644 (file)
@@ -7,8 +7,11 @@
 #include "common/errno.h"
 #include "common/Cond.h"
 #include "common/WorkQueue.h"
+#include "librbd/AsioEngine.h"
 #include "librbd/ImageCtx.h"
+#include "librbd/TaskFinisher.h"
 #include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
 #include "librbd/image/CloseRequest.h"
 #include "librbd/image/OpenRequest.h"
 #include "librbd/image/RefreshRequest.h"
@@ -76,7 +79,8 @@ public:
   }
 
   void register_watcher(UpdateWatchCtx *watcher, uint64_t *handle) {
-    ldout(m_cct, 20) << __func__ << ": watcher=" << watcher << dendl;
+    ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": watcher="
+                     << watcher << dendl;
 
     std::lock_guard locker{m_lock};
     ceph_assert(m_on_shut_down_finish == nullptr);
@@ -215,9 +219,10 @@ private:
     auto& thread_pool = m_cct->lookup_or_create_singleton_object<
       ThreadPoolSingleton>("librbd::ImageUpdateWatchers::thread_pool",
                           false, m_cct);
-    m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::op_work_queue",
-                                m_cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
-                                &thread_pool);
+    m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::work_queue",
+                                 ceph::make_timespan(
+                                   m_cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
+                                 &thread_pool);
   }
 
   void destroy_work_queue() {
@@ -229,18 +234,240 @@ private:
   }
 };
 
+class QuiesceWatchers {
+public:
+  explicit QuiesceWatchers(CephContext *cct, asio::ContextWQ* work_queue)
+    : m_cct(cct),
+      m_work_queue(work_queue),
+      m_lock(ceph::make_mutex(util::unique_lock_name(
+        "librbd::QuiesceWatchers::m_lock", this))) {
+  }
+
+  ~QuiesceWatchers() {
+    ceph_assert(m_pending_unregister.empty());
+    ceph_assert(m_on_notify == nullptr);
+  }
+
+  void register_watcher(QuiesceWatchCtx *watcher, uint64_t *handle) {
+    ldout(m_cct, 20) << "QuiesceWatchers::" << __func__ << ": watcher="
+                     << watcher << dendl;
+
+    std::lock_guard locker{m_lock};
+
+    *handle = m_next_handle++;
+    m_watchers[*handle] = watcher;
+  }
+
+  void unregister_watcher(uint64_t handle, Context *on_finish) {
+    int r = 0;
+    {
+      std::lock_guard locker{m_lock};
+      auto it = m_watchers.find(handle);
+      if (it == m_watchers.end()) {
+        r = -ENOENT;
+      } else {
+        if (m_on_notify != nullptr) {
+          ceph_assert(!m_pending_unregister.count(handle));
+          m_pending_unregister[handle] = on_finish;
+          on_finish = nullptr;
+        }
+        m_watchers.erase(it);
+      }
+    }
+
+    if (on_finish) {
+      ldout(m_cct, 20) << "QuiesceWatchers::" << __func__
+                       << ": completing unregister " << handle << dendl;
+      on_finish->complete(r);
+    }
+  }
+
+  void notify_quiesce(Context *on_finish) {
+    std::lock_guard locker{m_lock};
+    if (m_blocked) {
+      ldout(m_cct, 20) << "QuiesceWatchers::" << __func__ << ": queue" << dendl;
+      m_pending_notify.push_back(on_finish);
+      return;
+    }
+
+    notify(QUIESCE, on_finish);
+  }
+
+  void notify_unquiesce(Context *on_finish) {
+    std::lock_guard locker{m_lock};
+
+    notify(UNQUIESCE, on_finish);
+  }
+
+  void quiesce_complete(uint64_t handle, int r) {
+    Context *on_notify = nullptr;
+    {
+      std::lock_guard locker{m_lock};
+      ceph_assert(m_on_notify != nullptr);
+      ceph_assert(m_handle_quiesce_cnt > 0);
+
+      m_handle_quiesce_cnt--;
+
+      if (r < 0) {
+        ldout(m_cct, 10) << "QuiesceWatchers::" << __func__ << ": watcher "
+                         << handle << " failed" << dendl;
+        m_failed_watchers.insert(handle);
+        m_ret_val = r;
+      }
+
+      if (m_handle_quiesce_cnt > 0) {
+        return;
+      }
+
+      std::swap(on_notify, m_on_notify);
+      r = m_ret_val;
+    }
+
+    on_notify->complete(r);
+  }
+
+private:
+  enum EventType {QUIESCE, UNQUIESCE};
+
+  CephContext *m_cct;
+  asio::ContextWQ *m_work_queue;
+
+  ceph::mutex m_lock;
+  std::map<uint64_t, QuiesceWatchCtx*> m_watchers;
+  uint64_t m_next_handle = 0;
+  Context *m_on_notify = nullptr;
+  std::list<Context *> m_pending_notify;
+  std::map<uint64_t, Context*> m_pending_unregister;
+  uint64_t m_handle_quiesce_cnt = 0;
+  std::set<uint64_t> m_failed_watchers;
+  bool m_blocked = false;
+  int m_ret_val = 0;
+
+  void notify(EventType event_type, Context *on_finish) {
+    ceph_assert(ceph_mutex_is_locked(m_lock));
+
+    if (m_watchers.empty()) {
+      m_work_queue->queue(on_finish);
+      return;
+    }
+
+    ldout(m_cct, 20) << "QuiesceWatchers::" << __func__ << " event: "
+                     << event_type << dendl;
+
+    Context *ctx = nullptr;
+    if (event_type == QUIESCE) {
+      ceph_assert(!m_blocked);
+      ceph_assert(m_handle_quiesce_cnt == 0);
+
+      m_blocked = true;
+      m_handle_quiesce_cnt = m_watchers.size();
+      m_failed_watchers.clear();
+      m_ret_val = 0;
+    } else {
+      ceph_assert(event_type == UNQUIESCE);
+      ceph_assert(m_blocked);
+
+      ctx = create_async_context_callback(
+        m_work_queue, create_context_callback<
+          QuiesceWatchers, &QuiesceWatchers::handle_notify_unquiesce>(this));
+    }
+    auto gather_ctx = new C_Gather(m_cct, ctx);
+
+    ceph_assert(m_on_notify == nullptr);
+
+    m_on_notify = on_finish;
+
+    for (auto &[handle, watcher] : m_watchers) {
+      send_notify(handle, watcher, event_type, gather_ctx->new_sub());
+    }
+
+    gather_ctx->activate();
+  }
+
+  void send_notify(uint64_t handle, QuiesceWatchCtx *watcher,
+                   EventType event_type, Context *on_finish) {
+    auto ctx = new LambdaContext(
+      [this, handle, watcher, event_type, on_finish](int) {
+        ldout(m_cct, 20) << "QuiesceWatchers::" << __func__ << ": handle="
+                         << handle << ", event_type=" << event_type << dendl;
+        switch (event_type) {
+        case QUIESCE:
+          watcher->handle_quiesce();
+          break;
+        case UNQUIESCE:
+          {
+            std::lock_guard locker{m_lock};
+
+            if (m_failed_watchers.count(handle)) {
+              ldout(m_cct, 20) << "QuiesceWatchers::" << __func__
+                               << ": skip for failed watcher" << dendl;
+              break;
+            }
+          }
+          watcher->handle_unquiesce();
+          break;
+        default:
+          ceph_abort_msgf("invalid event_type %d", event_type);
+        }
+
+        on_finish->complete(0);
+      });
+
+    m_work_queue->queue(ctx);
+  }
+
+  void handle_notify_unquiesce(int r) {
+    ldout(m_cct, 20) << "QuiesceWatchers::" << __func__ << ": r=" << r
+                     << dendl;
+
+    ceph_assert(r == 0);
+
+    std::unique_lock locker{m_lock};
+
+    if (!m_pending_unregister.empty()) {
+      std::map<uint64_t, Context*> pending_unregister;
+      std::swap(pending_unregister, m_pending_unregister);
+      locker.unlock();
+      for (auto &it : pending_unregister) {
+        ldout(m_cct, 20) << "QuiesceWatchers::" << __func__
+                         << ": completing unregister " << it.first << dendl;
+        it.second->complete(0);
+      }
+      locker.lock();
+    }
+
+    Context *on_notify = nullptr;
+    std::swap(on_notify, m_on_notify);
+
+    ceph_assert(m_blocked);
+    m_blocked = false;
+
+    if (!m_pending_notify.empty()) {
+      auto on_finish = m_pending_notify.front();
+      m_pending_notify.pop_front();
+      notify(QUIESCE, on_finish);
+    }
+
+    locker.unlock();
+    on_notify->complete(0);
+  }
+};
+
 template <typename I>
 ImageState<I>::ImageState(I *image_ctx)
   : m_image_ctx(image_ctx), m_state(STATE_UNINITIALIZED),
     m_lock(ceph::make_mutex(util::unique_lock_name("librbd::ImageState::m_lock", this))),
     m_last_refresh(0), m_refresh_seq(0),
-    m_update_watchers(new ImageUpdateWatchers(image_ctx->cct)) {
+    m_update_watchers(new ImageUpdateWatchers(image_ctx->cct)),
+    m_quiesce_watchers(new QuiesceWatchers(
+      image_ctx->cct, image_ctx->asio_engine->get_work_queue())) {
 }
 
 template <typename I>
 ImageState<I>::~ImageState() {
   ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
   delete m_update_watchers;
+  delete m_quiesce_watchers;
 }
 
 template <typename I>
@@ -249,9 +476,6 @@ int ImageState<I>::open(uint64_t flags) {
   open(flags, &ctx);
 
   int r = ctx.wait();
-  if (r < 0) {
-    delete m_image_ctx;
-  }
   return r;
 }
 
@@ -276,7 +500,6 @@ int ImageState<I>::close() {
   close(&ctx);
 
   int r = ctx.wait();
-  delete m_image_ctx;
   return r;
 }
 
@@ -571,11 +794,28 @@ void ImageState<I>::complete_action_unlock(State next_state, int r) {
   m_state = next_state;
   m_lock.unlock();
 
-  for (auto ctx : action_contexts.second) {
-    ctx->complete(r);
-  }
+  if (next_state == STATE_CLOSED ||
+      (next_state == STATE_UNINITIALIZED && r < 0)) {
+    // the ImageCtx must be deleted outside the scope of its callback threads
+    auto ctx = new LambdaContext(
+      [image_ctx=m_image_ctx, contexts=std::move(action_contexts.second)]
+      (int r) {
+        delete image_ctx;
+        for (auto ctx : contexts) {
+          ctx->complete(r);
+        }
+      });
+    TaskFinisherSingleton::get_singleton(m_image_ctx->cct).queue(ctx, r);
+  } else {
+    for (auto ctx : action_contexts.second) {
+      if (next_state == STATE_OPEN) {
+        // we couldn't originally wrap the open callback w/ an async wrapper in
+        // case the image failed to open
+        ctx = create_async_context_callback(*m_image_ctx, ctx);
+      }
+      ctx->complete(r);
+    }
 
-  if (next_state != STATE_UNINITIALIZED && next_state != STATE_CLOSED) {
     m_lock.lock();
     if (!is_transition_state() && !m_actions_contexts.empty()) {
       execute_next_action_unlock();
@@ -593,9 +833,8 @@ void ImageState<I>::send_open_unlock() {
 
   m_state = STATE_OPENING;
 
-  Context *ctx = create_async_context_callback(
-    *m_image_ctx, create_context_callback<
-      ImageState<I>, &ImageState<I>::handle_open>(this));
+  Context *ctx = create_context_callback<
+    ImageState<I>, &ImageState<I>::handle_open>(this);
   image::OpenRequest<I> *req = image::OpenRequest<I>::create(
     m_image_ctx, m_open_flags, ctx);
 
@@ -751,6 +990,51 @@ void ImageState<I>::send_prepare_lock_unlock() {
   on_ready->complete(0);
 }
 
+template <typename I>
+int ImageState<I>::register_quiesce_watcher(QuiesceWatchCtx *watcher,
+                                                uint64_t *handle) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 20) << __func__ << dendl;
+
+  m_quiesce_watchers->register_watcher(watcher, handle);
+
+  ldout(cct, 20) << __func__ << ": handle=" << *handle << dendl;
+  return 0;
+}
+
+template <typename I>
+int ImageState<I>::unregister_quiesce_watcher(uint64_t handle) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 20) << __func__ << ": handle=" << handle << dendl;
+
+  C_SaferCond ctx;
+  m_quiesce_watchers->unregister_watcher(handle, &ctx);
+  return ctx.wait();
+}
+
+template <typename I>
+void ImageState<I>::notify_quiesce(Context *on_finish) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 20) << __func__ << dendl;
+
+  m_quiesce_watchers->notify_quiesce(on_finish);
+}
+
+template <typename I>
+void ImageState<I>::notify_unquiesce(Context *on_finish) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 20) << __func__ << dendl;
+
+  m_quiesce_watchers->notify_unquiesce(on_finish);
+}
+
+template <typename I>
+void ImageState<I>::quiesce_complete(uint64_t handle, int r) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 20) << __func__ << ": handle=" << handle << " r=" << r << dendl;
+  m_quiesce_watchers->quiesce_complete(handle, r);
+}
+
 } // namespace librbd
 
 template class librbd::ImageState<librbd::ImageCtx>;