m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
- m_watch_state(WATCH_STATE_UNREGISTERED), m_watch_ctx(*this) {
+ m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) {
}
Watcher::~Watcher() {
RWLock::RLocker l(m_watch_lock);
- assert(m_watch_state != WATCH_STATE_REGISTERED);
+ assert(is_unregistered(m_watch_lock));
}
void Watcher::register_watch(Context *on_finish) {
ldout(m_cct, 10) << dendl;
RWLock::RLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+ assert(is_unregistered(m_watch_lock));
m_watch_state = WATCH_STATE_REGISTERING;
librados::AioCompletion *aio_comp = create_rados_callback(
- new C_RegisterWatch(this, on_finish));
+ new C_RegisterWatch(this, on_finish));
int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
assert(r == 0);
aio_comp->release();
void Watcher::handle_register_watch(int r, Context *on_finish) {
ldout(m_cct, 10) << "r=" << r << dendl;
+
+ bool watch_error = false;
Context *unregister_watch_ctx = nullptr;
{
RWLock::WLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_REGISTERING);
- std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ m_watch_state = WATCH_STATE_IDLE;
if (r < 0) {
lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
<< dendl;
m_watch_handle = 0;
- m_watch_state = WATCH_STATE_UNREGISTERED;
- } else if (r >= 0) {
- m_watch_state = WATCH_STATE_REGISTERED;
+ }
+
+ if (m_unregister_watch_ctx != nullptr) {
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == 0 && m_watch_error) {
+ lderr(m_cct) << "re-registering watch after error" << dendl;
+ m_watch_state = WATCH_STATE_REWATCHING;
+ watch_error = true;
}
}
on_finish->complete(r);
- // wake up pending unregister request
if (unregister_watch_ctx != nullptr) {
unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
}
}
{
RWLock::WLocker watch_locker(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REGISTERING ||
- m_watch_state == WATCH_STATE_REWATCHING) {
+ if (m_watch_state != WATCH_STATE_IDLE) {
ldout(m_cct, 10) << "delaying unregister until register completed"
<< dendl;
unregister_watch(on_finish);
});
return;
- }
-
- if (m_watch_state == WATCH_STATE_REGISTERED ||
- m_watch_state == WATCH_STATE_ERROR) {
- m_watch_state = WATCH_STATE_UNREGISTERED;
-
+ } else if (is_registered(m_watch_lock)) {
librados::AioCompletion *aio_comp = create_rados_callback(
new C_UnwatchAndFlush(m_ioctx, on_finish));
int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
assert(r == 0);
aio_comp->release();
+ m_watch_handle = 0;
return;
}
}
}
void Watcher::set_oid(const string& oid) {
- RWLock::WLocker l(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(is_unregistered(m_watch_lock));
m_oid = oid;
}
void Watcher::handle_error(uint64_t handle, int err) {
lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
- RWLock::WLocker l(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REGISTERED) {
- m_watch_state = WATCH_STATE_ERROR;
+ RWLock::WLocker watch_locker(m_watch_lock);
+ m_watch_error = true;
+
+ if (is_registered(m_watch_lock)) {
+ m_watch_state = WATCH_STATE_REWATCHING;
FunctionContext *ctx = new FunctionContext(
boost::bind(&Watcher::rewatch, this));
void Watcher::rewatch() {
ldout(m_cct, 10) << dendl;
- RWLock::WLocker l(m_watch_lock);
- if (m_watch_state != WATCH_STATE_ERROR) {
- return;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ m_watch_state = WATCH_STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else {
+ m_watch_error = false;
+ auto ctx = create_context_callback<
+ Watcher, &Watcher::handle_rewatch>(this);
+ auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
+ &m_watch_ctx, &m_watch_handle, ctx);
+ req->send();
+ return;
+ }
}
- m_watch_state = WATCH_STATE_REWATCHING;
-
- Context *ctx = create_context_callback<Watcher,
- &Watcher::handle_rewatch>(this);
- RewatchRequest *req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
- &m_watch_ctx,
- &m_watch_handle, ctx);
- req->send();
+
+ unregister_watch_ctx->complete(0);
}
void Watcher::handle_rewatch(int r) {
ldout(m_cct, 10) "r=" << r << dendl;
- WatchState next_watch_state = WATCH_STATE_REGISTERED;
- if (r < 0) {
- // only EBLACKLISTED or ENOENT can be returned
- assert(r == -EBLACKLISTED || r == -ENOENT);
- next_watch_state = WATCH_STATE_UNREGISTERED;
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl;
+ m_watch_state = WATCH_STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLACKLISTED) {
+ lderr(m_cct) << "client blacklisted" << dendl;
+ } else if (r == -ENOENT) {
+ ldout(m_cct, 5) << "object does not exist" << dendl;
+ } else if (r < 0) {
+ lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl;
+ watch_error = true;
+ } else if (m_watch_error) {
+ lderr(m_cct) << "re-registering watch after error" << dendl;
+ watch_error = true;
+ }
}
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ return;
+ } else if (watch_error) {
+ rewatch();
+ return;
+ }
+
+ auto ctx = create_context_callback<
+ Watcher, &Watcher::handle_rewatch_callback>(this);
+ m_work_queue->queue(ctx, r);
+}
+
+void Watcher::handle_rewatch_callback(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+ handle_rewatch_complete(r);
+
+ bool watch_error = false;
Context *unregister_watch_ctx = nullptr;
{
RWLock::WLocker watch_locker(m_watch_lock);
assert(m_watch_state == WATCH_STATE_REWATCHING);
- m_watch_state = next_watch_state;
- std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
-
- m_work_queue->queue(
- create_context_callback<Watcher,
- &Watcher::handle_rewatch_complete>(this), r);
+ if (m_unregister_watch_ctx != nullptr) {
+ m_watch_state = WATCH_STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLACKLISTED || r == -ENOENT) {
+ m_watch_state = WATCH_STATE_IDLE;
+ } else if (m_watch_error) {
+ watch_error = true;
+ } else {
+ m_watch_state = WATCH_STATE_IDLE;
+ }
}
- // wake up pending unregister request
if (unregister_watch_ctx != nullptr) {
unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
}
}