]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/Watcher.cc
update sources to 12.2.7
[ceph.git] / ceph / src / librbd / Watcher.cc
index 54a2246f12cff7ea82c41353141b30a0624de84a..6e31ad7c635929408ace2bd9a80c4a845b74ec12 100644 (file)
@@ -93,23 +93,23 @@ Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
     m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
     m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
     m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
-    m_watch_state(WATCH_STATE_UNREGISTERED), m_watch_ctx(*this) {
+    m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) {
 }
 
 Watcher::~Watcher() {
   RWLock::RLocker l(m_watch_lock);
-  assert(m_watch_state != WATCH_STATE_REGISTERED);
+  assert(is_unregistered(m_watch_lock));
 }
 
 void Watcher::register_watch(Context *on_finish) {
   ldout(m_cct, 10) << dendl;
 
   RWLock::RLocker watch_locker(m_watch_lock);
-  assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+  assert(is_unregistered(m_watch_lock));
   m_watch_state = WATCH_STATE_REGISTERING;
 
   librados::AioCompletion *aio_comp = create_rados_callback(
-                                         new C_RegisterWatch(this, on_finish));
+    new C_RegisterWatch(this, on_finish));
   int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
   assert(r == 0);
   aio_comp->release();
@@ -117,27 +117,35 @@ void Watcher::register_watch(Context *on_finish) {
 
 void Watcher::handle_register_watch(int r, Context *on_finish) {
   ldout(m_cct, 10) << "r=" << r << dendl;
+
+  bool watch_error = false;
   Context *unregister_watch_ctx = nullptr;
   {
     RWLock::WLocker watch_locker(m_watch_lock);
     assert(m_watch_state == WATCH_STATE_REGISTERING);
 
-    std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    m_watch_state = WATCH_STATE_IDLE;
     if (r < 0) {
       lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
                    << dendl;
       m_watch_handle = 0;
-      m_watch_state = WATCH_STATE_UNREGISTERED;
-    } else if (r >= 0) {
-      m_watch_state = WATCH_STATE_REGISTERED;
+    }
+
+    if (m_unregister_watch_ctx != nullptr) {
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else if (r == 0 && m_watch_error) {
+      lderr(m_cct) << "re-registering watch after error" << dendl;
+      m_watch_state = WATCH_STATE_REWATCHING;
+      watch_error = true;
     }
   }
 
   on_finish->complete(r);
 
-  // wake up pending unregister request
   if (unregister_watch_ctx != nullptr) {
     unregister_watch_ctx->complete(0);
+  } else if (watch_error) {
+    rewatch();
   }
 }
 
@@ -146,8 +154,7 @@ void Watcher::unregister_watch(Context *on_finish) {
 
   {
     RWLock::WLocker watch_locker(m_watch_lock);
-    if (m_watch_state == WATCH_STATE_REGISTERING ||
-        m_watch_state == WATCH_STATE_REWATCHING) {
+    if (m_watch_state != WATCH_STATE_IDLE) {
       ldout(m_cct, 10) << "delaying unregister until register completed"
                        << dendl;
 
@@ -156,17 +163,13 @@ void Watcher::unregister_watch(Context *on_finish) {
           unregister_watch(on_finish);
         });
       return;
-    }
-
-    if (m_watch_state == WATCH_STATE_REGISTERED ||
-        m_watch_state == WATCH_STATE_ERROR) {
-      m_watch_state = WATCH_STATE_UNREGISTERED;
-
+    } else if (is_registered(m_watch_lock)) {
       librados::AioCompletion *aio_comp = create_rados_callback(
                         new C_UnwatchAndFlush(m_ioctx, on_finish));
       int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
       assert(r == 0);
       aio_comp->release();
+      m_watch_handle = 0;
       return;
     }
   }
@@ -208,8 +211,8 @@ std::string Watcher::get_oid() const {
 }
 
 void Watcher::set_oid(const string& oid) {
-  RWLock::WLocker l(m_watch_lock);
-  assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+  RWLock::WLocker watch_locker(m_watch_lock);
+  assert(is_unregistered(m_watch_lock));
 
   m_oid = oid;
 }
@@ -217,9 +220,11 @@ void Watcher::set_oid(const string& oid) {
 void Watcher::handle_error(uint64_t handle, int err) {
   lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
 
-  RWLock::WLocker l(m_watch_lock);
-  if (m_watch_state == WATCH_STATE_REGISTERED) {
-    m_watch_state = WATCH_STATE_ERROR;
+  RWLock::WLocker watch_locker(m_watch_lock);
+  m_watch_error = true;
+
+  if (is_registered(m_watch_lock)) {
+    m_watch_state = WATCH_STATE_REWATCHING;
 
     FunctionContext *ctx = new FunctionContext(
         boost::bind(&Watcher::rewatch, this));
@@ -235,46 +240,93 @@ void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
 void Watcher::rewatch() {
   ldout(m_cct, 10) << dendl;
 
-  RWLock::WLocker l(m_watch_lock);
-  if (m_watch_state != WATCH_STATE_ERROR) {
-    return;
+  Context *unregister_watch_ctx = nullptr;
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_REWATCHING);
+
+    if (m_unregister_watch_ctx != nullptr) {
+      m_watch_state = WATCH_STATE_IDLE;
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else {
+      m_watch_error = false;
+      auto ctx = create_context_callback<
+        Watcher, &Watcher::handle_rewatch>(this);
+      auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
+                                        &m_watch_ctx, &m_watch_handle, ctx);
+      req->send();
+      return;
+    }
   }
-  m_watch_state = WATCH_STATE_REWATCHING;
-
-  Context *ctx = create_context_callback<Watcher,
-                                         &Watcher::handle_rewatch>(this);
-  RewatchRequest *req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
-                                               &m_watch_ctx,
-                                               &m_watch_handle, ctx);
-  req->send();
+
+  unregister_watch_ctx->complete(0);
 }
 
 void Watcher::handle_rewatch(int r) {
   ldout(m_cct, 10) "r=" << r << dendl;
 
-  WatchState next_watch_state = WATCH_STATE_REGISTERED;
-  if (r < 0) {
-    // only EBLACKLISTED or ENOENT can be returned
-    assert(r == -EBLACKLISTED || r == -ENOENT);
-    next_watch_state = WATCH_STATE_UNREGISTERED;
+  bool watch_error = false;
+  Context *unregister_watch_ctx = nullptr;
+  {
+    RWLock::WLocker watch_locker(m_watch_lock);
+    assert(m_watch_state == WATCH_STATE_REWATCHING);
+
+    if (m_unregister_watch_ctx != nullptr) {
+      ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl;
+      m_watch_state = WATCH_STATE_IDLE;
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else if (r  == -EBLACKLISTED) {
+      lderr(m_cct) << "client blacklisted" << dendl;
+    } else if (r == -ENOENT) {
+      ldout(m_cct, 5) << "object does not exist" << dendl;
+    } else if (r < 0) {
+      lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl;
+      watch_error = true;
+    } else if (m_watch_error) {
+      lderr(m_cct) << "re-registering watch after error" << dendl;
+      watch_error = true;
+    }
   }
 
+  if (unregister_watch_ctx != nullptr) {
+    unregister_watch_ctx->complete(0);
+    return;
+  } else if (watch_error) {
+    rewatch();
+    return;
+  }
+
+  auto ctx = create_context_callback<
+    Watcher, &Watcher::handle_rewatch_callback>(this);
+  m_work_queue->queue(ctx, r);
+}
+
+void Watcher::handle_rewatch_callback(int r) {
+  ldout(m_cct, 10) << "r=" << r << dendl;
+  handle_rewatch_complete(r);
+
+  bool watch_error = false;
   Context *unregister_watch_ctx = nullptr;
   {
     RWLock::WLocker watch_locker(m_watch_lock);
     assert(m_watch_state == WATCH_STATE_REWATCHING);
-    m_watch_state = next_watch_state;
 
-    std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
-
-    m_work_queue->queue(
-      create_context_callback<Watcher,
-                              &Watcher::handle_rewatch_complete>(this), r);
+    if (m_unregister_watch_ctx != nullptr) {
+      m_watch_state = WATCH_STATE_IDLE;
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else if (r  == -EBLACKLISTED || r == -ENOENT) {
+      m_watch_state = WATCH_STATE_IDLE;
+    } else if (m_watch_error) {
+      watch_error = true;
+    } else {
+      m_watch_state = WATCH_STATE_IDLE;
+    }
   }
 
-  // wake up pending unregister request
   if (unregister_watch_ctx != nullptr) {
     unregister_watch_ctx->complete(0);
+  } else if (watch_error) {
+    rewatch();
   }
 }