]>
git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/Watcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/Watcher.h"
5 #include "librbd/watcher/RewatchRequest.h"
6 #include "librbd/Utils.h"
7 #include "librbd/TaskFinisher.h"
8 #include "include/encoding.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include <boost/bind.hpp>
13 // re-include our assert to clobber the system one; fix dout:
14 #include "include/ceph_assert.h"
16 #define dout_subsys ceph_subsys_rbd
20 using namespace watcher
;
22 using util::create_context_callback
;
23 using util::create_rados_callback
;
28 struct C_UnwatchAndFlush
: public Context
{
29 librados::Rados rados
;
31 bool flushing
= false;
34 C_UnwatchAndFlush(librados::IoCtx
&io_ctx
, Context
*on_finish
)
35 : rados(io_ctx
), on_finish(on_finish
) {
38 void complete(int r
) override
{
39 if (ret_val
== 0 && r
< 0) {
46 librados::AioCompletion
*aio_comp
= create_rados_callback(this);
47 r
= rados
.aio_watch_flush(aio_comp
);
53 // ensure our reference to the RadosClient is released prior
54 // to completing the callback to avoid racing an explicit
56 Context
*ctx
= on_finish
;
63 void finish(int r
) override
{
67 } // anonymous namespace
70 #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
73 Watcher::C_NotifyAck::C_NotifyAck(Watcher
*watcher
, uint64_t notify_id
,
75 : watcher(watcher
), cct(watcher
->m_cct
), notify_id(notify_id
),
77 ldout(cct
, 10) << "id=" << notify_id
<< ", " << "handle=" << handle
<< dendl
;
80 void Watcher::C_NotifyAck::finish(int r
) {
81 ldout(cct
, 10) << "r=" << r
<< dendl
;
83 watcher
->acknowledge_notify(notify_id
, handle
, out
);
87 #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
90 Watcher::Watcher(librados::IoCtx
& ioctx
, ContextWQ
*work_queue
,
92 : m_ioctx(ioctx
), m_work_queue(work_queue
), m_oid(oid
),
93 m_cct(reinterpret_cast<CephContext
*>(ioctx
.cct())),
94 m_watch_lock(ceph::make_shared_mutex(util::unique_lock_name("librbd::Watcher::m_watch_lock", this))),
95 m_watch_handle(0), m_notifier(work_queue
, ioctx
, oid
),
96 m_watch_state(WATCH_STATE_IDLE
), m_watch_ctx(*this) {
100 std::shared_lock l
{m_watch_lock
};
101 ceph_assert(is_unregistered(m_watch_lock
));
104 void Watcher::register_watch(Context
*on_finish
) {
105 ldout(m_cct
, 10) << dendl
;
107 std::unique_lock watch_locker
{m_watch_lock
};
108 ceph_assert(is_unregistered(m_watch_lock
));
109 m_watch_state
= WATCH_STATE_REGISTERING
;
110 m_watch_blacklisted
= false;
112 librados::AioCompletion
*aio_comp
= create_rados_callback(
113 new C_RegisterWatch(this, on_finish
));
114 int r
= m_ioctx
.aio_watch(m_oid
, aio_comp
, &m_watch_handle
, &m_watch_ctx
);
119 void Watcher::handle_register_watch(int r
, Context
*on_finish
) {
120 ldout(m_cct
, 10) << "r=" << r
<< dendl
;
122 bool watch_error
= false;
123 Context
*unregister_watch_ctx
= nullptr;
125 std::unique_lock watch_locker
{m_watch_lock
};
126 ceph_assert(m_watch_state
== WATCH_STATE_REGISTERING
);
128 m_watch_state
= WATCH_STATE_IDLE
;
130 lderr(m_cct
) << "failed to register watch: " << cpp_strerror(r
)
135 if (m_unregister_watch_ctx
!= nullptr) {
136 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
137 } else if (r
== 0 && m_watch_error
) {
138 lderr(m_cct
) << "re-registering watch after error" << dendl
;
139 m_watch_state
= WATCH_STATE_REWATCHING
;
142 m_watch_blacklisted
= (r
== -EBLACKLISTED
);
146 on_finish
->complete(r
);
148 if (unregister_watch_ctx
!= nullptr) {
149 unregister_watch_ctx
->complete(0);
150 } else if (watch_error
) {
155 void Watcher::unregister_watch(Context
*on_finish
) {
156 ldout(m_cct
, 10) << dendl
;
159 std::unique_lock watch_locker
{m_watch_lock
};
160 if (m_watch_state
!= WATCH_STATE_IDLE
) {
161 ldout(m_cct
, 10) << "delaying unregister until register completed"
164 ceph_assert(m_unregister_watch_ctx
== nullptr);
165 m_unregister_watch_ctx
= new LambdaContext([this, on_finish
](int r
) {
166 unregister_watch(on_finish
);
169 } else if (is_registered(m_watch_lock
)) {
170 librados::AioCompletion
*aio_comp
= create_rados_callback(
171 new C_UnwatchAndFlush(m_ioctx
, on_finish
));
172 int r
= m_ioctx
.aio_unwatch(m_watch_handle
, aio_comp
);
177 m_watch_blacklisted
= false;
182 on_finish
->complete(0);
185 bool Watcher::notifications_blocked() const {
186 std::shared_lock locker
{m_watch_lock
};
188 bool blocked
= (m_blocked_count
> 0);
189 ldout(m_cct
, 5) << "blocked=" << blocked
<< dendl
;
193 void Watcher::block_notifies(Context
*on_finish
) {
195 std::unique_lock locker
{m_watch_lock
};
197 ldout(m_cct
, 5) << "blocked_count=" << m_blocked_count
<< dendl
;
199 m_async_op_tracker
.wait_for_ops(on_finish
);
202 void Watcher::unblock_notifies() {
203 std::unique_lock locker
{m_watch_lock
};
204 ceph_assert(m_blocked_count
> 0);
206 ldout(m_cct
, 5) << "blocked_count=" << m_blocked_count
<< dendl
;
209 void Watcher::flush(Context
*on_finish
) {
210 m_notifier
.flush(on_finish
);
213 std::string
Watcher::get_oid() const {
214 std::shared_lock locker
{m_watch_lock
};
218 void Watcher::set_oid(const string
& oid
) {
219 std::unique_lock watch_locker
{m_watch_lock
};
220 ceph_assert(is_unregistered(m_watch_lock
));
225 void Watcher::handle_error(uint64_t handle
, int err
) {
226 lderr(m_cct
) << "handle=" << handle
<< ": " << cpp_strerror(err
) << dendl
;
228 std::unique_lock watch_locker
{m_watch_lock
};
229 m_watch_error
= true;
231 if (is_registered(m_watch_lock
)) {
232 m_watch_state
= WATCH_STATE_REWATCHING
;
233 if (err
== -EBLACKLISTED
) {
234 m_watch_blacklisted
= true;
237 auto ctx
= new LambdaContext(
238 boost::bind(&Watcher::rewatch
, this));
239 m_work_queue
->queue(ctx
);
243 void Watcher::acknowledge_notify(uint64_t notify_id
, uint64_t handle
,
245 m_ioctx
.notify_ack(m_oid
, notify_id
, handle
, out
);
248 void Watcher::rewatch() {
249 ldout(m_cct
, 10) << dendl
;
251 Context
*unregister_watch_ctx
= nullptr;
253 std::unique_lock watch_locker
{m_watch_lock
};
254 ceph_assert(m_watch_state
== WATCH_STATE_REWATCHING
);
256 if (m_unregister_watch_ctx
!= nullptr) {
257 m_watch_state
= WATCH_STATE_IDLE
;
258 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
260 m_watch_error
= false;
261 auto ctx
= create_context_callback
<
262 Watcher
, &Watcher::handle_rewatch
>(this);
263 auto req
= RewatchRequest::create(m_ioctx
, m_oid
, m_watch_lock
,
264 &m_watch_ctx
, &m_watch_handle
, ctx
);
270 unregister_watch_ctx
->complete(0);
273 void Watcher::handle_rewatch(int r
) {
274 ldout(m_cct
, 10) << "r=" << r
<< dendl
;
276 bool watch_error
= false;
277 Context
*unregister_watch_ctx
= nullptr;
279 std::unique_lock watch_locker
{m_watch_lock
};
280 ceph_assert(m_watch_state
== WATCH_STATE_REWATCHING
);
282 m_watch_blacklisted
= false;
283 if (m_unregister_watch_ctx
!= nullptr) {
284 ldout(m_cct
, 10) << "image is closing, skip rewatch" << dendl
;
285 m_watch_state
= WATCH_STATE_IDLE
;
286 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
287 } else if (r
== -EBLACKLISTED
) {
288 lderr(m_cct
) << "client blacklisted" << dendl
;
289 m_watch_blacklisted
= true;
290 } else if (r
== -ENOENT
) {
291 ldout(m_cct
, 5) << "object does not exist" << dendl
;
293 lderr(m_cct
) << "failed to rewatch: " << cpp_strerror(r
) << dendl
;
295 } else if (m_watch_error
) {
296 lderr(m_cct
) << "re-registering watch after error" << dendl
;
301 if (unregister_watch_ctx
!= nullptr) {
302 unregister_watch_ctx
->complete(0);
304 } else if (watch_error
) {
309 auto ctx
= create_context_callback
<
310 Watcher
, &Watcher::handle_rewatch_callback
>(this);
311 m_work_queue
->queue(ctx
, r
);
314 void Watcher::handle_rewatch_callback(int r
) {
315 ldout(m_cct
, 10) << "r=" << r
<< dendl
;
316 handle_rewatch_complete(r
);
318 bool watch_error
= false;
319 Context
*unregister_watch_ctx
= nullptr;
321 std::unique_lock watch_locker
{m_watch_lock
};
322 ceph_assert(m_watch_state
== WATCH_STATE_REWATCHING
);
324 if (m_unregister_watch_ctx
!= nullptr) {
325 m_watch_state
= WATCH_STATE_IDLE
;
326 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
327 } else if (r
== -EBLACKLISTED
|| r
== -ENOENT
) {
328 m_watch_state
= WATCH_STATE_IDLE
;
329 } else if (r
< 0 || m_watch_error
) {
332 m_watch_state
= WATCH_STATE_IDLE
;
336 if (unregister_watch_ctx
!= nullptr) {
337 unregister_watch_ctx
->complete(0);
338 } else if (watch_error
) {
343 void Watcher::send_notify(bufferlist
& payload
,
344 watcher::NotifyResponse
*response
,
345 Context
*on_finish
) {
346 m_notifier
.notify(payload
, response
, on_finish
);
349 void Watcher::WatchCtx::handle_notify(uint64_t notify_id
, uint64_t handle
,
350 uint64_t notifier_id
, bufferlist
& bl
) {
351 // if notifications are blocked, finish the notification w/o
352 // bubbling the notification up to the derived class
353 watcher
.m_async_op_tracker
.start_op();
354 if (watcher
.notifications_blocked()) {
356 watcher
.acknowledge_notify(notify_id
, handle
, bl
);
358 watcher
.handle_notify(notify_id
, handle
, notifier_id
, bl
);
360 watcher
.m_async_op_tracker
.finish_op();
363 void Watcher::WatchCtx::handle_error(uint64_t handle
, int err
) {
364 watcher
.handle_error(handle
, err
);
367 } // namespace librbd