]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Watcher.cc
import ceph 15.2.10
[ceph.git] / ceph / src / librbd / Watcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/Watcher.h"
5#include "librbd/watcher/RewatchRequest.h"
6#include "librbd/Utils.h"
7#include "librbd/TaskFinisher.h"
8#include "include/encoding.h"
9#include "common/errno.h"
10#include "common/WorkQueue.h"
11#include <boost/bind.hpp>
12
13// re-include our assert to clobber the system one; fix dout:
11fdf7f2 14#include "include/ceph_assert.h"
7c673cae
FG
15
16#define dout_subsys ceph_subsys_rbd
17
18namespace librbd {
19
20using namespace watcher;
21
22using util::create_context_callback;
23using util::create_rados_callback;
24using std::string;
25
26namespace {
27
28struct C_UnwatchAndFlush : public Context {
29 librados::Rados rados;
30 Context *on_finish;
31 bool flushing = false;
32 int ret_val = 0;
33
34 C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
35 : rados(io_ctx), on_finish(on_finish) {
36 }
37
38 void complete(int r) override {
39 if (ret_val == 0 && r < 0) {
40 ret_val = r;
41 }
42
43 if (!flushing) {
44 flushing = true;
45
46 librados::AioCompletion *aio_comp = create_rados_callback(this);
47 r = rados.aio_watch_flush(aio_comp);
11fdf7f2 48 ceph_assert(r == 0);
7c673cae
FG
49 aio_comp->release();
50 return;
51 }
52
53 // ensure our reference to the RadosClient is released prior
54 // to completing the callback to avoid racing an explicit
55 // librados shutdown
56 Context *ctx = on_finish;
57 r = ret_val;
58 delete this;
59
60 ctx->complete(r);
61 }
62
63 void finish(int r) override {
64 }
65};
66
67} // anonymous namespace
68
69#undef dout_prefix
70#define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
71 << __func__ << ": "
72
73Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
74 uint64_t handle)
75 : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
76 handle(handle) {
77 ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
78}
79
80void Watcher::C_NotifyAck::finish(int r) {
81 ldout(cct, 10) << "r=" << r << dendl;
11fdf7f2 82 ceph_assert(r == 0);
7c673cae
FG
83 watcher->acknowledge_notify(notify_id, handle, out);
84}
85
86#undef dout_prefix
87#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
88 << ": "
89
90Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
91 const string& oid)
92 : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
93 m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
9f95a23c 94 m_watch_lock(ceph::make_shared_mutex(util::unique_lock_name("librbd::Watcher::m_watch_lock", this))),
7c673cae 95 m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
28e407b8 96 m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) {
7c673cae
FG
97}
98
99Watcher::~Watcher() {
9f95a23c 100 std::shared_lock l{m_watch_lock};
11fdf7f2 101 ceph_assert(is_unregistered(m_watch_lock));
7c673cae
FG
102}
103
104void Watcher::register_watch(Context *on_finish) {
105 ldout(m_cct, 10) << dendl;
106
9f95a23c 107 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 108 ceph_assert(is_unregistered(m_watch_lock));
7c673cae 109 m_watch_state = WATCH_STATE_REGISTERING;
91327a77 110 m_watch_blacklisted = false;
7c673cae
FG
111
112 librados::AioCompletion *aio_comp = create_rados_callback(
28e407b8 113 new C_RegisterWatch(this, on_finish));
7c673cae 114 int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
11fdf7f2 115 ceph_assert(r == 0);
7c673cae
FG
116 aio_comp->release();
117}
118
119void Watcher::handle_register_watch(int r, Context *on_finish) {
120 ldout(m_cct, 10) << "r=" << r << dendl;
28e407b8
AA
121
122 bool watch_error = false;
7c673cae
FG
123 Context *unregister_watch_ctx = nullptr;
124 {
9f95a23c 125 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 126 ceph_assert(m_watch_state == WATCH_STATE_REGISTERING);
7c673cae 127
28e407b8 128 m_watch_state = WATCH_STATE_IDLE;
7c673cae
FG
129 if (r < 0) {
130 lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
131 << dendl;
132 m_watch_handle = 0;
28e407b8
AA
133 }
134
135 if (m_unregister_watch_ctx != nullptr) {
136 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
137 } else if (r == 0 && m_watch_error) {
138 lderr(m_cct) << "re-registering watch after error" << dendl;
139 m_watch_state = WATCH_STATE_REWATCHING;
140 watch_error = true;
91327a77
AA
141 } else {
142 m_watch_blacklisted = (r == -EBLACKLISTED);
7c673cae
FG
143 }
144 }
145
146 on_finish->complete(r);
147
7c673cae
FG
148 if (unregister_watch_ctx != nullptr) {
149 unregister_watch_ctx->complete(0);
28e407b8
AA
150 } else if (watch_error) {
151 rewatch();
7c673cae
FG
152 }
153}
154
155void Watcher::unregister_watch(Context *on_finish) {
156 ldout(m_cct, 10) << dendl;
157
158 {
9f95a23c 159 std::unique_lock watch_locker{m_watch_lock};
28e407b8 160 if (m_watch_state != WATCH_STATE_IDLE) {
7c673cae
FG
161 ldout(m_cct, 10) << "delaying unregister until register completed"
162 << dendl;
163
11fdf7f2 164 ceph_assert(m_unregister_watch_ctx == nullptr);
9f95a23c 165 m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
7c673cae
FG
166 unregister_watch(on_finish);
167 });
168 return;
28e407b8 169 } else if (is_registered(m_watch_lock)) {
7c673cae 170 librados::AioCompletion *aio_comp = create_rados_callback(
91327a77 171 new C_UnwatchAndFlush(m_ioctx, on_finish));
7c673cae 172 int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
11fdf7f2 173 ceph_assert(r == 0);
7c673cae 174 aio_comp->release();
91327a77 175
28e407b8 176 m_watch_handle = 0;
91327a77 177 m_watch_blacklisted = false;
7c673cae
FG
178 return;
179 }
180 }
181
182 on_finish->complete(0);
183}
184
31f18b77 185bool Watcher::notifications_blocked() const {
9f95a23c 186 std::shared_lock locker{m_watch_lock};
31f18b77
FG
187
188 bool blocked = (m_blocked_count > 0);
189 ldout(m_cct, 5) << "blocked=" << blocked << dendl;
190 return blocked;
191}
192
193void Watcher::block_notifies(Context *on_finish) {
194 {
9f95a23c 195 std::unique_lock locker{m_watch_lock};
31f18b77
FG
196 ++m_blocked_count;
197 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
198 }
199 m_async_op_tracker.wait_for_ops(on_finish);
200}
201
202void Watcher::unblock_notifies() {
9f95a23c 203 std::unique_lock locker{m_watch_lock};
11fdf7f2 204 ceph_assert(m_blocked_count > 0);
31f18b77
FG
205 --m_blocked_count;
206 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
207}
208
7c673cae
FG
209void Watcher::flush(Context *on_finish) {
210 m_notifier.flush(on_finish);
211}
212
213std::string Watcher::get_oid() const {
9f95a23c 214 std::shared_lock locker{m_watch_lock};
7c673cae
FG
215 return m_oid;
216}
217
218void Watcher::set_oid(const string& oid) {
9f95a23c 219 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 220 ceph_assert(is_unregistered(m_watch_lock));
7c673cae
FG
221
222 m_oid = oid;
223}
224
225void Watcher::handle_error(uint64_t handle, int err) {
226 lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
227
9f95a23c 228 std::unique_lock watch_locker{m_watch_lock};
28e407b8
AA
229 m_watch_error = true;
230
231 if (is_registered(m_watch_lock)) {
232 m_watch_state = WATCH_STATE_REWATCHING;
91327a77
AA
233 if (err == -EBLACKLISTED) {
234 m_watch_blacklisted = true;
235 }
7c673cae 236
9f95a23c 237 auto ctx = new LambdaContext(
7c673cae
FG
238 boost::bind(&Watcher::rewatch, this));
239 m_work_queue->queue(ctx);
240 }
241}
242
243void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
244 bufferlist &out) {
245 m_ioctx.notify_ack(m_oid, notify_id, handle, out);
246}
247
248void Watcher::rewatch() {
249 ldout(m_cct, 10) << dendl;
250
28e407b8
AA
251 Context *unregister_watch_ctx = nullptr;
252 {
9f95a23c 253 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 254 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
28e407b8
AA
255
256 if (m_unregister_watch_ctx != nullptr) {
257 m_watch_state = WATCH_STATE_IDLE;
258 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
259 } else {
260 m_watch_error = false;
261 auto ctx = create_context_callback<
262 Watcher, &Watcher::handle_rewatch>(this);
263 auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
264 &m_watch_ctx, &m_watch_handle, ctx);
265 req->send();
266 return;
267 }
7c673cae 268 }
28e407b8
AA
269
270 unregister_watch_ctx->complete(0);
7c673cae
FG
271}
272
273void Watcher::handle_rewatch(int r) {
91327a77 274 ldout(m_cct, 10) << "r=" << r << dendl;
7c673cae 275
28e407b8
AA
276 bool watch_error = false;
277 Context *unregister_watch_ctx = nullptr;
278 {
9f95a23c 279 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 280 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
28e407b8 281
91327a77 282 m_watch_blacklisted = false;
28e407b8
AA
283 if (m_unregister_watch_ctx != nullptr) {
284 ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl;
285 m_watch_state = WATCH_STATE_IDLE;
286 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
287 } else if (r == -EBLACKLISTED) {
288 lderr(m_cct) << "client blacklisted" << dendl;
91327a77 289 m_watch_blacklisted = true;
28e407b8
AA
290 } else if (r == -ENOENT) {
291 ldout(m_cct, 5) << "object does not exist" << dendl;
292 } else if (r < 0) {
293 lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl;
294 watch_error = true;
295 } else if (m_watch_error) {
296 lderr(m_cct) << "re-registering watch after error" << dendl;
297 watch_error = true;
298 }
7c673cae
FG
299 }
300
28e407b8
AA
301 if (unregister_watch_ctx != nullptr) {
302 unregister_watch_ctx->complete(0);
303 return;
304 } else if (watch_error) {
305 rewatch();
306 return;
307 }
308
309 auto ctx = create_context_callback<
310 Watcher, &Watcher::handle_rewatch_callback>(this);
311 m_work_queue->queue(ctx, r);
312}
313
314void Watcher::handle_rewatch_callback(int r) {
315 ldout(m_cct, 10) << "r=" << r << dendl;
316 handle_rewatch_complete(r);
317
318 bool watch_error = false;
7c673cae
FG
319 Context *unregister_watch_ctx = nullptr;
320 {
9f95a23c 321 std::unique_lock watch_locker{m_watch_lock};
11fdf7f2 322 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
7c673cae 323
28e407b8
AA
324 if (m_unregister_watch_ctx != nullptr) {
325 m_watch_state = WATCH_STATE_IDLE;
326 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
e306af50 327 } else if (r == -EBLACKLISTED || r == -ENOENT) {
28e407b8 328 m_watch_state = WATCH_STATE_IDLE;
91327a77 329 } else if (r < 0 || m_watch_error) {
28e407b8
AA
330 watch_error = true;
331 } else {
332 m_watch_state = WATCH_STATE_IDLE;
333 }
7c673cae
FG
334 }
335
7c673cae
FG
336 if (unregister_watch_ctx != nullptr) {
337 unregister_watch_ctx->complete(0);
28e407b8
AA
338 } else if (watch_error) {
339 rewatch();
7c673cae
FG
340 }
341}
342
343void Watcher::send_notify(bufferlist& payload,
344 watcher::NotifyResponse *response,
345 Context *on_finish) {
346 m_notifier.notify(payload, response, on_finish);
347}
348
31f18b77
FG
349void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
350 uint64_t notifier_id, bufferlist& bl) {
351 // if notifications are blocked, finish the notification w/o
352 // bubbling the notification up to the derived class
353 watcher.m_async_op_tracker.start_op();
354 if (watcher.notifications_blocked()) {
355 bufferlist bl;
356 watcher.acknowledge_notify(notify_id, handle, bl);
357 } else {
358 watcher.handle_notify(notify_id, handle, notifier_id, bl);
359 }
360 watcher.m_async_op_tracker.finish_op();
7c673cae
FG
361}
362
363void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
364 watcher.handle_error(handle, err);
365}
366
367} // namespace librbd