]>
git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/Watcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/Watcher.h"
5 #include "librbd/watcher/RewatchRequest.h"
6 #include "librbd/Utils.h"
7 #include "librbd/TaskFinisher.h"
8 #include "librbd/asio/ContextWQ.h"
9 #include "include/encoding.h"
10 #include "common/errno.h"
11 #include <boost/bind/bind.hpp>
13 // re-include our assert to clobber the system one; fix dout:
14 #include "include/ceph_assert.h"
16 #define dout_subsys ceph_subsys_rbd
20 using namespace boost::placeholders
;
22 using namespace watcher
;
24 using util::create_context_callback
;
25 using util::create_rados_callback
;
30 struct C_UnwatchAndFlush
: public Context
{
31 librados::Rados rados
;
33 bool flushing
= false;
36 C_UnwatchAndFlush(librados::IoCtx
&io_ctx
, Context
*on_finish
)
37 : rados(io_ctx
), on_finish(on_finish
) {
40 void complete(int r
) override
{
41 if (ret_val
== 0 && r
< 0) {
48 librados::AioCompletion
*aio_comp
= create_rados_callback(this);
49 r
= rados
.aio_watch_flush(aio_comp
);
55 // ensure our reference to the RadosClient is released prior
56 // to completing the callback to avoid racing an explicit
58 Context
*ctx
= on_finish
;
65 void finish(int r
) override
{
69 } // anonymous namespace
72 #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
75 Watcher::C_NotifyAck::C_NotifyAck(Watcher
*watcher
, uint64_t notify_id
,
77 : watcher(watcher
), cct(watcher
->m_cct
), notify_id(notify_id
),
79 ldout(cct
, 10) << "id=" << notify_id
<< ", " << "handle=" << handle
<< dendl
;
82 void Watcher::C_NotifyAck::finish(int r
) {
83 ldout(cct
, 10) << "r=" << r
<< dendl
;
85 watcher
->acknowledge_notify(notify_id
, handle
, out
);
89 #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
92 Watcher::Watcher(librados::IoCtx
& ioctx
, asio::ContextWQ
*work_queue
,
94 : m_ioctx(ioctx
), m_work_queue(work_queue
), m_oid(oid
),
95 m_cct(reinterpret_cast<CephContext
*>(ioctx
.cct())),
96 m_watch_lock(ceph::make_shared_mutex(
97 util::unique_lock_name("librbd::Watcher::m_watch_lock", this))),
98 m_watch_handle(0), m_notifier(work_queue
, ioctx
, oid
),
99 m_watch_state(WATCH_STATE_IDLE
), m_watch_ctx(*this) {
102 Watcher::~Watcher() {
103 std::shared_lock l
{m_watch_lock
};
104 ceph_assert(is_unregistered(m_watch_lock
));
107 void Watcher::register_watch(Context
*on_finish
) {
108 ldout(m_cct
, 10) << dendl
;
110 std::unique_lock watch_locker
{m_watch_lock
};
111 ceph_assert(is_unregistered(m_watch_lock
));
112 m_watch_state
= WATCH_STATE_REGISTERING
;
113 m_watch_blocklisted
= false;
115 librados::AioCompletion
*aio_comp
= create_rados_callback(
116 new C_RegisterWatch(this, on_finish
));
117 int r
= m_ioctx
.aio_watch(m_oid
, aio_comp
, &m_watch_handle
, &m_watch_ctx
);
122 void Watcher::handle_register_watch(int r
, Context
*on_finish
) {
123 ldout(m_cct
, 10) << "r=" << r
<< dendl
;
125 bool watch_error
= false;
126 Context
*unregister_watch_ctx
= nullptr;
128 std::unique_lock watch_locker
{m_watch_lock
};
129 ceph_assert(m_watch_state
== WATCH_STATE_REGISTERING
);
131 m_watch_state
= WATCH_STATE_IDLE
;
133 lderr(m_cct
) << "failed to register watch: " << cpp_strerror(r
)
138 if (m_unregister_watch_ctx
!= nullptr) {
139 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
140 } else if (r
== 0 && m_watch_error
) {
141 lderr(m_cct
) << "re-registering watch after error" << dendl
;
142 m_watch_state
= WATCH_STATE_REWATCHING
;
145 m_watch_blocklisted
= (r
== -EBLOCKLISTED
);
149 on_finish
->complete(r
);
151 if (unregister_watch_ctx
!= nullptr) {
152 unregister_watch_ctx
->complete(0);
153 } else if (watch_error
) {
158 void Watcher::unregister_watch(Context
*on_finish
) {
159 ldout(m_cct
, 10) << dendl
;
162 std::unique_lock watch_locker
{m_watch_lock
};
163 if (m_watch_state
!= WATCH_STATE_IDLE
) {
164 ldout(m_cct
, 10) << "delaying unregister until register completed"
167 ceph_assert(m_unregister_watch_ctx
== nullptr);
168 m_unregister_watch_ctx
= new LambdaContext([this, on_finish
](int r
) {
169 unregister_watch(on_finish
);
172 } else if (is_registered(m_watch_lock
)) {
173 librados::AioCompletion
*aio_comp
= create_rados_callback(
174 new C_UnwatchAndFlush(m_ioctx
, on_finish
));
175 int r
= m_ioctx
.aio_unwatch(m_watch_handle
, aio_comp
);
180 m_watch_blocklisted
= false;
185 on_finish
->complete(0);
188 bool Watcher::notifications_blocked() const {
189 std::shared_lock locker
{m_watch_lock
};
191 bool blocked
= (m_blocked_count
> 0);
192 ldout(m_cct
, 5) << "blocked=" << blocked
<< dendl
;
196 void Watcher::block_notifies(Context
*on_finish
) {
198 std::unique_lock locker
{m_watch_lock
};
200 ldout(m_cct
, 5) << "blocked_count=" << m_blocked_count
<< dendl
;
202 m_async_op_tracker
.wait_for_ops(on_finish
);
205 void Watcher::unblock_notifies() {
206 std::unique_lock locker
{m_watch_lock
};
207 ceph_assert(m_blocked_count
> 0);
209 ldout(m_cct
, 5) << "blocked_count=" << m_blocked_count
<< dendl
;
212 void Watcher::flush(Context
*on_finish
) {
213 m_notifier
.flush(on_finish
);
216 std::string
Watcher::get_oid() const {
217 std::shared_lock locker
{m_watch_lock
};
221 void Watcher::set_oid(const string
& oid
) {
222 std::unique_lock watch_locker
{m_watch_lock
};
223 ceph_assert(is_unregistered(m_watch_lock
));
228 void Watcher::handle_error(uint64_t handle
, int err
) {
229 lderr(m_cct
) << "handle=" << handle
<< ": " << cpp_strerror(err
) << dendl
;
231 std::unique_lock watch_locker
{m_watch_lock
};
232 m_watch_error
= true;
234 if (is_registered(m_watch_lock
)) {
235 m_watch_state
= WATCH_STATE_REWATCHING
;
236 if (err
== -EBLOCKLISTED
) {
237 m_watch_blocklisted
= true;
240 auto ctx
= new LambdaContext(
241 boost::bind(&Watcher::rewatch
, this));
242 m_work_queue
->queue(ctx
);
246 void Watcher::acknowledge_notify(uint64_t notify_id
, uint64_t handle
,
248 m_ioctx
.notify_ack(m_oid
, notify_id
, handle
, out
);
251 void Watcher::rewatch() {
252 ldout(m_cct
, 10) << dendl
;
254 Context
*unregister_watch_ctx
= nullptr;
256 std::unique_lock watch_locker
{m_watch_lock
};
257 ceph_assert(m_watch_state
== WATCH_STATE_REWATCHING
);
259 if (m_unregister_watch_ctx
!= nullptr) {
260 m_watch_state
= WATCH_STATE_IDLE
;
261 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
263 m_watch_error
= false;
264 auto ctx
= create_context_callback
<
265 Watcher
, &Watcher::handle_rewatch
>(this);
266 auto req
= RewatchRequest::create(m_ioctx
, m_oid
, m_watch_lock
,
267 &m_watch_ctx
, &m_watch_handle
, ctx
);
273 unregister_watch_ctx
->complete(0);
276 void Watcher::handle_rewatch(int r
) {
277 ldout(m_cct
, 10) << "r=" << r
<< dendl
;
279 bool watch_error
= false;
280 Context
*unregister_watch_ctx
= nullptr;
282 std::unique_lock watch_locker
{m_watch_lock
};
283 ceph_assert(m_watch_state
== WATCH_STATE_REWATCHING
);
285 m_watch_blocklisted
= false;
286 if (m_unregister_watch_ctx
!= nullptr) {
287 ldout(m_cct
, 10) << "image is closing, skip rewatch" << dendl
;
288 m_watch_state
= WATCH_STATE_IDLE
;
289 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
290 } else if (r
== -EBLOCKLISTED
) {
291 lderr(m_cct
) << "client blocklisted" << dendl
;
292 m_watch_blocklisted
= true;
293 } else if (r
== -ENOENT
) {
294 ldout(m_cct
, 5) << "object does not exist" << dendl
;
296 lderr(m_cct
) << "failed to rewatch: " << cpp_strerror(r
) << dendl
;
298 } else if (m_watch_error
) {
299 lderr(m_cct
) << "re-registering watch after error" << dendl
;
304 if (unregister_watch_ctx
!= nullptr) {
305 unregister_watch_ctx
->complete(0);
307 } else if (watch_error
) {
312 auto ctx
= create_context_callback
<
313 Watcher
, &Watcher::handle_rewatch_callback
>(this);
314 m_work_queue
->queue(ctx
, r
);
317 void Watcher::handle_rewatch_callback(int r
) {
318 ldout(m_cct
, 10) << "r=" << r
<< dendl
;
319 handle_rewatch_complete(r
);
321 bool watch_error
= false;
322 Context
*unregister_watch_ctx
= nullptr;
324 std::unique_lock watch_locker
{m_watch_lock
};
325 ceph_assert(m_watch_state
== WATCH_STATE_REWATCHING
);
327 if (m_unregister_watch_ctx
!= nullptr) {
328 m_watch_state
= WATCH_STATE_IDLE
;
329 std::swap(unregister_watch_ctx
, m_unregister_watch_ctx
);
330 } else if (r
== -EBLOCKLISTED
|| r
== -ENOENT
) {
331 m_watch_state
= WATCH_STATE_IDLE
;
332 } else if (r
< 0 || m_watch_error
) {
335 m_watch_state
= WATCH_STATE_IDLE
;
339 if (unregister_watch_ctx
!= nullptr) {
340 unregister_watch_ctx
->complete(0);
341 } else if (watch_error
) {
346 void Watcher::send_notify(bufferlist
& payload
,
347 watcher::NotifyResponse
*response
,
348 Context
*on_finish
) {
349 m_notifier
.notify(payload
, response
, on_finish
);
352 void Watcher::WatchCtx::handle_notify(uint64_t notify_id
, uint64_t handle
,
353 uint64_t notifier_id
, bufferlist
& bl
) {
354 // if notifications are blocked, finish the notification w/o
355 // bubbling the notification up to the derived class
356 watcher
.m_async_op_tracker
.start_op();
357 if (watcher
.notifications_blocked()) {
359 watcher
.acknowledge_notify(notify_id
, handle
, bl
);
361 watcher
.handle_notify(notify_id
, handle
, notifier_id
, bl
);
363 watcher
.m_async_op_tracker
.finish_op();
366 void Watcher::WatchCtx::handle_error(uint64_t handle
, int err
) {
367 watcher
.handle_error(handle
, err
);
370 } // namespace librbd