1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/Watcher.h"
5 #include "librbd/watcher/RewatchRequest.h"
6 #include "librbd/Utils.h"
7 #include "librbd/TaskFinisher.h"
8 #include "include/encoding.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include <boost/bind.hpp>
13 // re-include our assert to clobber the system one; fix dout:
14 #include "include/assert.h"
16 #define dout_subsys ceph_subsys_rbd
20 using namespace watcher
;
22 using util::create_context_callback
;
23 using util::create_rados_callback
;
28 struct C_UnwatchAndFlush
: public Context
{
29 librados::Rados rados
;
31 bool flushing
= false;
34 C_UnwatchAndFlush(librados::IoCtx
&io_ctx
, Context
*on_finish
)
35 : rados(io_ctx
), on_finish(on_finish
) {
38 void complete(int r
) override
{
39 if (ret_val
== 0 && r
< 0) {
46 librados::AioCompletion
*aio_comp
= create_rados_callback(this);
47 r
= rados
.aio_watch_flush(aio_comp
);
53 // ensure our reference to the RadosClient is released prior
54 // to completing the callback to avoid racing an explicit
56 Context
*ctx
= on_finish
;
63 void finish(int r
) override
{
67 } // anonymous namespace
70 #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
73 Watcher::C_NotifyAck::C_NotifyAck(Watcher
*watcher
, uint64_t notify_id
,
75 : watcher(watcher
), cct(watcher
->m_cct
), notify_id(notify_id
),
77 ldout(cct
, 10) << "id=" << notify_id
<< ", " << "handle=" << handle
<< dendl
;
80 void Watcher::C_NotifyAck::finish(int r
) {
81 ldout(cct
, 10) << "r=" << r
<< dendl
;
83 watcher
->acknowledge_notify(notify_id
, handle
, out
);
87 #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
90 Watcher::Watcher(librados::IoCtx
& ioctx
, ContextWQ
*work_queue
,
92 : m_ioctx(ioctx
), m_work_queue(work_queue
), m_oid(oid
),
93 m_cct(reinterpret_cast<CephContext
*>(ioctx
.cct())),
94 m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
95 m_watch_handle(0), m_notifier(work_queue
, ioctx
, oid
),
96 m_watch_state(WATCH_STATE_UNREGISTERED
), m_watch_ctx(*this) {
100 RWLock::RLocker
l(m_watch_lock
);
101 assert(m_watch_state
!= WATCH_STATE_REGISTERED
);
104 void Watcher::register_watch(Context
*on_finish
) {
105 ldout(m_cct
, 10) << dendl
;
107 RWLock::RLocker
watch_locker(m_watch_lock
);
108 assert(m_watch_state
== WATCH_STATE_UNREGISTERED
);
109 m_watch_state
= WATCH_STATE_REGISTERING
;
111 librados::AioCompletion
*aio_comp
= create_rados_callback(
112 new C_RegisterWatch(this, on_finish
));
113 int r
= m_ioctx
.aio_watch(m_oid
, aio_comp
, &m_watch_handle
, &m_watch_ctx
);
118 void Watcher::handle_register_watch(int r
, Context
*on_finish
) {
119 ldout(m_cct
, 10) << "r=" << r
<< dendl
;
120 Context
*unregister_watch_ctx
= nullptr;
122 RWLock::WLocker
watch_locker(m_watch_lock
);
123 assert(m_watch_state
== WATCH_STATE_REGISTERING
);
125 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
127 lderr(m_cct
) << "failed to register watch: " << cpp_strerror(r
)
130 m_watch_state
= WATCH_STATE_UNREGISTERED
;
132 m_watch_state
= WATCH_STATE_REGISTERED
;
136 on_finish
->complete(r
);
138 // wake up pending unregister request
139 if (unregister_watch_ctx
!= nullptr) {
140 unregister_watch_ctx
->complete(0);
144 void Watcher::unregister_watch(Context
*on_finish
) {
145 ldout(m_cct
, 10) << dendl
;
148 RWLock::WLocker
watch_locker(m_watch_lock
);
149 if (m_watch_state
== WATCH_STATE_REGISTERING
||
150 m_watch_state
== WATCH_STATE_REWATCHING
) {
151 ldout(m_cct
, 10) << "delaying unregister until register completed"
154 assert(m_unregister_watch_ctx
== nullptr);
155 m_unregister_watch_ctx
= new FunctionContext([this, on_finish
](int r
) {
156 unregister_watch(on_finish
);
161 if (m_watch_state
== WATCH_STATE_REGISTERED
||
162 m_watch_state
== WATCH_STATE_ERROR
) {
163 m_watch_state
= WATCH_STATE_UNREGISTERED
;
165 librados::AioCompletion
*aio_comp
= create_rados_callback(
166 new C_UnwatchAndFlush(m_ioctx
, on_finish
));
167 int r
= m_ioctx
.aio_unwatch(m_watch_handle
, aio_comp
);
174 on_finish
->complete(0);
177 void Watcher::flush(Context
*on_finish
) {
178 m_notifier
.flush(on_finish
);
181 std::string
Watcher::get_oid() const {
182 RWLock::RLocker
locker(m_watch_lock
);
186 void Watcher::set_oid(const string
& oid
) {
187 RWLock::WLocker
l(m_watch_lock
);
188 assert(m_watch_state
== WATCH_STATE_UNREGISTERED
);
193 void Watcher::handle_error(uint64_t handle
, int err
) {
194 lderr(m_cct
) << "handle=" << handle
<< ": " << cpp_strerror(err
) << dendl
;
196 RWLock::WLocker
l(m_watch_lock
);
197 if (m_watch_state
== WATCH_STATE_REGISTERED
) {
198 m_watch_state
= WATCH_STATE_ERROR
;
200 FunctionContext
*ctx
= new FunctionContext(
201 boost::bind(&Watcher::rewatch
, this));
202 m_work_queue
->queue(ctx
);
206 void Watcher::acknowledge_notify(uint64_t notify_id
, uint64_t handle
,
208 m_ioctx
.notify_ack(m_oid
, notify_id
, handle
, out
);
211 void Watcher::rewatch() {
212 ldout(m_cct
, 10) << dendl
;
214 RWLock::WLocker
l(m_watch_lock
);
215 if (m_watch_state
!= WATCH_STATE_ERROR
) {
218 m_watch_state
= WATCH_STATE_REWATCHING
;
220 Context
*ctx
= create_context_callback
<Watcher
,
221 &Watcher::handle_rewatch
>(this);
222 RewatchRequest
*req
= RewatchRequest::create(m_ioctx
, m_oid
, m_watch_lock
,
224 &m_watch_handle
, ctx
);
228 void Watcher::handle_rewatch(int r
) {
229 ldout(m_cct
, 10) "r=" << r
<< dendl
;
231 WatchState next_watch_state
= WATCH_STATE_REGISTERED
;
233 // only EBLACKLISTED or ENOENT can be returned
234 assert(r
== -EBLACKLISTED
|| r
== -ENOENT
);
235 next_watch_state
= WATCH_STATE_UNREGISTERED
;
238 Context
*unregister_watch_ctx
= nullptr;
240 RWLock::WLocker
watch_locker(m_watch_lock
);
241 assert(m_watch_state
== WATCH_STATE_REWATCHING
);
242 m_watch_state
= next_watch_state
;
244 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
247 create_context_callback
<Watcher
,
248 &Watcher::handle_rewatch_complete
>(this), r
);
251 // wake up pending unregister request
252 if (unregister_watch_ctx
!= nullptr) {
253 unregister_watch_ctx
->complete(0);
257 void Watcher::send_notify(bufferlist
& payload
,
258 watcher::NotifyResponse
*response
,
259 Context
*on_finish
) {
260 m_notifier
.notify(payload
, response
, on_finish
);
263 void Watcher::WatchCtx::handle_notify(uint64_t notify_id
,
265 uint64_t notifier_id
,
267 watcher
.handle_notify(notify_id
, handle
, notifier_id
, bl
);
270 void Watcher::WatchCtx::handle_error(uint64_t handle
, int err
) {
271 watcher
.handle_error(handle
, err
);
274 } // namespace librbd