on_finish->complete(0);
}
+bool Watcher::notifications_blocked() const {
+ RWLock::RLocker locker(m_watch_lock);
+
+ bool blocked = (m_blocked_count > 0);
+ ldout(m_cct, 5) << "blocked=" << blocked << dendl;
+ return blocked;
+}
+
+void Watcher::block_notifies(Context *on_finish) {
+ {
+ RWLock::WLocker locker(m_watch_lock);
+ ++m_blocked_count;
+ ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+ }
+ m_async_op_tracker.wait_for_ops(on_finish);
+}
+
+void Watcher::unblock_notifies() {
+ RWLock::WLocker locker(m_watch_lock);
+ assert(m_blocked_count > 0);
+ --m_blocked_count;
+ ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+}
+
void Watcher::flush(Context *on_finish) {
m_notifier.flush(on_finish);
}
m_notifier.notify(payload, response, on_finish);
}
-void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
- uint64_t handle,
- uint64_t notifier_id,
- bufferlist& bl) {
- watcher.handle_notify(notify_id, handle, notifier_id, bl);
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ // if notifications are blocked, finish the notification w/o
+ // bubbling the notification up to the derived class
+ watcher.m_async_op_tracker.start_op();
+ if (watcher.notifications_blocked()) {
+ bufferlist bl;
+ watcher.acknowledge_notify(notify_id, handle, bl);
+ } else {
+ watcher.handle_notify(notify_id, handle, notifier_id, bl);
+ }
+ watcher.m_async_op_tracker.finish_op();
}
void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {