]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Watcher.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / librbd / Watcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/Watcher.h"
5#include "librbd/watcher/RewatchRequest.h"
6#include "librbd/Utils.h"
7#include "librbd/TaskFinisher.h"
f67539c2 8#include "librbd/asio/ContextWQ.h"
7c673cae
FG
9#include "include/encoding.h"
10#include "common/errno.h"
f67539c2 11#include <boost/bind/bind.hpp>
7c673cae
FG
12
13// re-include our assert to clobber the system one; fix dout:
11fdf7f2 14#include "include/ceph_assert.h"
7c673cae
FG
15
16#define dout_subsys ceph_subsys_rbd
17
18namespace librbd {
19
f67539c2
TL
20using namespace boost::placeholders;
21
7c673cae
FG
22using namespace watcher;
23
24using util::create_context_callback;
25using util::create_rados_callback;
26using std::string;
27
28namespace {
29
30struct C_UnwatchAndFlush : public Context {
31 librados::Rados rados;
32 Context *on_finish;
33 bool flushing = false;
34 int ret_val = 0;
35
36 C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
37 : rados(io_ctx), on_finish(on_finish) {
38 }
39
40 void complete(int r) override {
41 if (ret_val == 0 && r < 0) {
42 ret_val = r;
43 }
44
45 if (!flushing) {
46 flushing = true;
47
48 librados::AioCompletion *aio_comp = create_rados_callback(this);
49 r = rados.aio_watch_flush(aio_comp);
11fdf7f2 50 ceph_assert(r == 0);
7c673cae
FG
51 aio_comp->release();
52 return;
53 }
54
55 // ensure our reference to the RadosClient is released prior
56 // to completing the callback to avoid racing an explicit
57 // librados shutdown
58 Context *ctx = on_finish;
59 r = ret_val;
60 delete this;
61
62 ctx->complete(r);
63 }
64
65 void finish(int r) override {
66 }
67};
68
69} // anonymous namespace
70
71#undef dout_prefix
72#define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
73 << __func__ << ": "
74
75Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
76 uint64_t handle)
77 : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
78 handle(handle) {
79 ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
80}
81
82void Watcher::C_NotifyAck::finish(int r) {
83 ldout(cct, 10) << "r=" << r << dendl;
11fdf7f2 84 ceph_assert(r == 0);
7c673cae
FG
85 watcher->acknowledge_notify(notify_id, handle, out);
86}
87
88#undef dout_prefix
89#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
90 << ": "
91
f67539c2 92Watcher::Watcher(librados::IoCtx& ioctx, asio::ContextWQ *work_queue,
7c673cae
FG
93 const string& oid)
94 : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
95 m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
f67539c2
TL
96 m_watch_lock(ceph::make_shared_mutex(
97 util::unique_lock_name("librbd::Watcher::m_watch_lock", this))),
7c673cae 98 m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
28e407b8 99 m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) {
7c673cae
FG
100}
101
102Watcher::~Watcher() {
9f95a23c 103 std::shared_lock l{m_watch_lock};
11fdf7f2 104 ceph_assert(is_unregistered(m_watch_lock));
7c673cae
FG
105}
106
107void Watcher::register_watch(Context *on_finish) {
108 ldout(m_cct, 10) << dendl;
109
9f95a23c 110 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 111 ceph_assert(is_unregistered(m_watch_lock));
7c673cae 112 m_watch_state = WATCH_STATE_REGISTERING;
f67539c2 113 m_watch_blocklisted = false;
7c673cae
FG
114
115 librados::AioCompletion *aio_comp = create_rados_callback(
28e407b8 116 new C_RegisterWatch(this, on_finish));
7c673cae 117 int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
11fdf7f2 118 ceph_assert(r == 0);
7c673cae
FG
119 aio_comp->release();
120}
121
122void Watcher::handle_register_watch(int r, Context *on_finish) {
123 ldout(m_cct, 10) << "r=" << r << dendl;
28e407b8
AA
124
125 bool watch_error = false;
7c673cae
FG
126 Context *unregister_watch_ctx = nullptr;
127 {
9f95a23c 128 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 129 ceph_assert(m_watch_state == WATCH_STATE_REGISTERING);
7c673cae 130
28e407b8 131 m_watch_state = WATCH_STATE_IDLE;
7c673cae
FG
132 if (r < 0) {
133 lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
134 << dendl;
135 m_watch_handle = 0;
28e407b8
AA
136 }
137
138 if (m_unregister_watch_ctx != nullptr) {
139 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
140 } else if (r == 0 && m_watch_error) {
141 lderr(m_cct) << "re-registering watch after error" << dendl;
142 m_watch_state = WATCH_STATE_REWATCHING;
143 watch_error = true;
91327a77 144 } else {
f67539c2 145 m_watch_blocklisted = (r == -EBLOCKLISTED);
7c673cae
FG
146 }
147 }
148
149 on_finish->complete(r);
150
7c673cae
FG
151 if (unregister_watch_ctx != nullptr) {
152 unregister_watch_ctx->complete(0);
28e407b8
AA
153 } else if (watch_error) {
154 rewatch();
7c673cae
FG
155 }
156}
157
158void Watcher::unregister_watch(Context *on_finish) {
159 ldout(m_cct, 10) << dendl;
160
161 {
9f95a23c 162 std::unique_lock watch_locker{m_watch_lock};
28e407b8 163 if (m_watch_state != WATCH_STATE_IDLE) {
7c673cae
FG
164 ldout(m_cct, 10) << "delaying unregister until register completed"
165 << dendl;
166
11fdf7f2 167 ceph_assert(m_unregister_watch_ctx == nullptr);
9f95a23c 168 m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
7c673cae
FG
169 unregister_watch(on_finish);
170 });
171 return;
28e407b8 172 } else if (is_registered(m_watch_lock)) {
7c673cae 173 librados::AioCompletion *aio_comp = create_rados_callback(
91327a77 174 new C_UnwatchAndFlush(m_ioctx, on_finish));
7c673cae 175 int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
11fdf7f2 176 ceph_assert(r == 0);
7c673cae 177 aio_comp->release();
91327a77 178
28e407b8 179 m_watch_handle = 0;
f67539c2 180 m_watch_blocklisted = false;
7c673cae
FG
181 return;
182 }
183 }
184
185 on_finish->complete(0);
186}
187
31f18b77 188bool Watcher::notifications_blocked() const {
9f95a23c 189 std::shared_lock locker{m_watch_lock};
31f18b77
FG
190
191 bool blocked = (m_blocked_count > 0);
192 ldout(m_cct, 5) << "blocked=" << blocked << dendl;
193 return blocked;
194}
195
196void Watcher::block_notifies(Context *on_finish) {
197 {
9f95a23c 198 std::unique_lock locker{m_watch_lock};
31f18b77
FG
199 ++m_blocked_count;
200 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
201 }
202 m_async_op_tracker.wait_for_ops(on_finish);
203}
204
205void Watcher::unblock_notifies() {
9f95a23c 206 std::unique_lock locker{m_watch_lock};
11fdf7f2 207 ceph_assert(m_blocked_count > 0);
31f18b77
FG
208 --m_blocked_count;
209 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
210}
211
7c673cae
FG
212void Watcher::flush(Context *on_finish) {
213 m_notifier.flush(on_finish);
214}
215
216std::string Watcher::get_oid() const {
9f95a23c 217 std::shared_lock locker{m_watch_lock};
7c673cae
FG
218 return m_oid;
219}
220
221void Watcher::set_oid(const string& oid) {
9f95a23c 222 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 223 ceph_assert(is_unregistered(m_watch_lock));
7c673cae
FG
224
225 m_oid = oid;
226}
227
228void Watcher::handle_error(uint64_t handle, int err) {
229 lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
230
9f95a23c 231 std::unique_lock watch_locker{m_watch_lock};
28e407b8
AA
232 m_watch_error = true;
233
234 if (is_registered(m_watch_lock)) {
235 m_watch_state = WATCH_STATE_REWATCHING;
f67539c2
TL
236 if (err == -EBLOCKLISTED) {
237 m_watch_blocklisted = true;
91327a77 238 }
7c673cae 239
9f95a23c 240 auto ctx = new LambdaContext(
7c673cae
FG
241 boost::bind(&Watcher::rewatch, this));
242 m_work_queue->queue(ctx);
243 }
244}
245
246void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
247 bufferlist &out) {
248 m_ioctx.notify_ack(m_oid, notify_id, handle, out);
249}
250
251void Watcher::rewatch() {
252 ldout(m_cct, 10) << dendl;
253
28e407b8
AA
254 Context *unregister_watch_ctx = nullptr;
255 {
9f95a23c 256 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 257 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
28e407b8
AA
258
259 if (m_unregister_watch_ctx != nullptr) {
260 m_watch_state = WATCH_STATE_IDLE;
261 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
262 } else {
263 m_watch_error = false;
264 auto ctx = create_context_callback<
265 Watcher, &Watcher::handle_rewatch>(this);
266 auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
267 &m_watch_ctx, &m_watch_handle, ctx);
268 req->send();
269 return;
270 }
7c673cae 271 }
28e407b8
AA
272
273 unregister_watch_ctx->complete(0);
7c673cae
FG
274}
275
276void Watcher::handle_rewatch(int r) {
91327a77 277 ldout(m_cct, 10) << "r=" << r << dendl;
7c673cae 278
28e407b8
AA
279 bool watch_error = false;
280 Context *unregister_watch_ctx = nullptr;
281 {
9f95a23c 282 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 283 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
28e407b8 284
f67539c2 285 m_watch_blocklisted = false;
28e407b8
AA
286 if (m_unregister_watch_ctx != nullptr) {
287 ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl;
288 m_watch_state = WATCH_STATE_IDLE;
289 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
f67539c2
TL
290 } else if (r == -EBLOCKLISTED) {
291 lderr(m_cct) << "client blocklisted" << dendl;
292 m_watch_blocklisted = true;
28e407b8
AA
293 } else if (r == -ENOENT) {
294 ldout(m_cct, 5) << "object does not exist" << dendl;
295 } else if (r < 0) {
296 lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl;
297 watch_error = true;
298 } else if (m_watch_error) {
299 lderr(m_cct) << "re-registering watch after error" << dendl;
300 watch_error = true;
301 }
7c673cae
FG
302 }
303
28e407b8
AA
304 if (unregister_watch_ctx != nullptr) {
305 unregister_watch_ctx->complete(0);
306 return;
307 } else if (watch_error) {
308 rewatch();
309 return;
310 }
311
312 auto ctx = create_context_callback<
313 Watcher, &Watcher::handle_rewatch_callback>(this);
314 m_work_queue->queue(ctx, r);
315}
316
317void Watcher::handle_rewatch_callback(int r) {
318 ldout(m_cct, 10) << "r=" << r << dendl;
319 handle_rewatch_complete(r);
320
321 bool watch_error = false;
7c673cae
FG
322 Context *unregister_watch_ctx = nullptr;
323 {
9f95a23c 324 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 325 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
7c673cae 326
28e407b8
AA
327 if (m_unregister_watch_ctx != nullptr) {
328 m_watch_state = WATCH_STATE_IDLE;
329 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
f67539c2 330 } else if (r == -EBLOCKLISTED || r == -ENOENT) {
28e407b8 331 m_watch_state = WATCH_STATE_IDLE;
91327a77 332 } else if (r < 0 || m_watch_error) {
28e407b8
AA
333 watch_error = true;
334 } else {
335 m_watch_state = WATCH_STATE_IDLE;
336 }
7c673cae
FG
337 }
338
7c673cae
FG
339 if (unregister_watch_ctx != nullptr) {
340 unregister_watch_ctx->complete(0);
28e407b8
AA
341 } else if (watch_error) {
342 rewatch();
7c673cae
FG
343 }
344}
345
346void Watcher::send_notify(bufferlist& payload,
347 watcher::NotifyResponse *response,
348 Context *on_finish) {
349 m_notifier.notify(payload, response, on_finish);
350}
351
31f18b77
FG
352void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
353 uint64_t notifier_id, bufferlist& bl) {
354 // if notifications are blocked, finish the notification w/o
355 // bubbling the notification up to the derived class
356 watcher.m_async_op_tracker.start_op();
357 if (watcher.notifications_blocked()) {
358 bufferlist bl;
359 watcher.acknowledge_notify(notify_id, handle, bl);
360 } else {
361 watcher.handle_notify(notify_id, handle, notifier_id, bl);
362 }
363 watcher.m_async_op_tracker.finish_op();
7c673cae
FG
364}
365
366void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
367 watcher.handle_error(handle, err);
368}
369
370} // namespace librbd