]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/Watcher.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / librbd / Watcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/Watcher.h"
5 #include "librbd/watcher/RewatchRequest.h"
6 #include "librbd/Utils.h"
7 #include "librbd/TaskFinisher.h"
8 #include "librbd/asio/ContextWQ.h"
9 #include "include/encoding.h"
10 #include "common/errno.h"
11 #include <boost/bind/bind.hpp>
12
13 // re-include our assert to clobber the system one; fix dout:
14 #include "include/ceph_assert.h"
15
16 #define dout_subsys ceph_subsys_rbd
17
18 namespace librbd {
19
20 using namespace boost::placeholders;
21
22 using namespace watcher;
23
24 using util::create_context_callback;
25 using util::create_rados_callback;
26 using std::string;
27
28 namespace {
29
30 struct C_UnwatchAndFlush : public Context {
31 librados::Rados rados;
32 Context *on_finish;
33 bool flushing = false;
34 int ret_val = 0;
35
36 C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
37 : rados(io_ctx), on_finish(on_finish) {
38 }
39
40 void complete(int r) override {
41 if (ret_val == 0 && r < 0) {
42 ret_val = r;
43 }
44
45 if (!flushing) {
46 flushing = true;
47
48 librados::AioCompletion *aio_comp = create_rados_callback(this);
49 r = rados.aio_watch_flush(aio_comp);
50 ceph_assert(r == 0);
51 aio_comp->release();
52 return;
53 }
54
55 // ensure our reference to the RadosClient is released prior
56 // to completing the callback to avoid racing an explicit
57 // librados shutdown
58 Context *ctx = on_finish;
59 r = ret_val;
60 delete this;
61
62 ctx->complete(r);
63 }
64
65 void finish(int r) override {
66 }
67 };
68
69 } // anonymous namespace
70
71 #undef dout_prefix
72 #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
73 << __func__ << ": "
74
75 Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
76 uint64_t handle)
77 : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
78 handle(handle) {
79 ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
80 }
81
82 void Watcher::C_NotifyAck::finish(int r) {
83 ldout(cct, 10) << "r=" << r << dendl;
84 ceph_assert(r == 0);
85 watcher->acknowledge_notify(notify_id, handle, out);
86 }
87
88 #undef dout_prefix
89 #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
90 << ": "
91
92 Watcher::Watcher(librados::IoCtx& ioctx, asio::ContextWQ *work_queue,
93 const string& oid)
94 : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
95 m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
96 m_watch_lock(ceph::make_shared_mutex(
97 util::unique_lock_name("librbd::Watcher::m_watch_lock", this))),
98 m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
99 m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) {
100 }
101
102 Watcher::~Watcher() {
103 std::shared_lock l{m_watch_lock};
104 ceph_assert(is_unregistered(m_watch_lock));
105 }
106
107 void Watcher::register_watch(Context *on_finish) {
108 ldout(m_cct, 10) << dendl;
109
110 std::unique_lock watch_locker{m_watch_lock};
111 ceph_assert(is_unregistered(m_watch_lock));
112 m_watch_state = WATCH_STATE_REGISTERING;
113 m_watch_blocklisted = false;
114
115 librados::AioCompletion *aio_comp = create_rados_callback(
116 new C_RegisterWatch(this, on_finish));
117 int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
118 ceph_assert(r == 0);
119 aio_comp->release();
120 }
121
122 void Watcher::handle_register_watch(int r, Context *on_finish) {
123 ldout(m_cct, 10) << "r=" << r << dendl;
124
125 bool watch_error = false;
126 Context *unregister_watch_ctx = nullptr;
127 {
128 std::unique_lock watch_locker{m_watch_lock};
129 ceph_assert(m_watch_state == WATCH_STATE_REGISTERING);
130
131 m_watch_state = WATCH_STATE_IDLE;
132 if (r < 0) {
133 lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
134 << dendl;
135 m_watch_handle = 0;
136 }
137
138 if (m_unregister_watch_ctx != nullptr) {
139 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
140 } else if (r == 0 && m_watch_error) {
141 lderr(m_cct) << "re-registering watch after error" << dendl;
142 m_watch_state = WATCH_STATE_REWATCHING;
143 watch_error = true;
144 } else {
145 m_watch_blocklisted = (r == -EBLOCKLISTED);
146 }
147 }
148
149 on_finish->complete(r);
150
151 if (unregister_watch_ctx != nullptr) {
152 unregister_watch_ctx->complete(0);
153 } else if (watch_error) {
154 rewatch();
155 }
156 }
157
158 void Watcher::unregister_watch(Context *on_finish) {
159 ldout(m_cct, 10) << dendl;
160
161 {
162 std::unique_lock watch_locker{m_watch_lock};
163 if (m_watch_state != WATCH_STATE_IDLE) {
164 ldout(m_cct, 10) << "delaying unregister until register completed"
165 << dendl;
166
167 ceph_assert(m_unregister_watch_ctx == nullptr);
168 m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
169 unregister_watch(on_finish);
170 });
171 return;
172 } else if (is_registered(m_watch_lock)) {
173 librados::AioCompletion *aio_comp = create_rados_callback(
174 new C_UnwatchAndFlush(m_ioctx, on_finish));
175 int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
176 ceph_assert(r == 0);
177 aio_comp->release();
178
179 m_watch_handle = 0;
180 m_watch_blocklisted = false;
181 return;
182 }
183 }
184
185 on_finish->complete(0);
186 }
187
188 bool Watcher::notifications_blocked() const {
189 std::shared_lock locker{m_watch_lock};
190
191 bool blocked = (m_blocked_count > 0);
192 ldout(m_cct, 5) << "blocked=" << blocked << dendl;
193 return blocked;
194 }
195
196 void Watcher::block_notifies(Context *on_finish) {
197 {
198 std::unique_lock locker{m_watch_lock};
199 ++m_blocked_count;
200 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
201 }
202 m_async_op_tracker.wait_for_ops(on_finish);
203 }
204
205 void Watcher::unblock_notifies() {
206 std::unique_lock locker{m_watch_lock};
207 ceph_assert(m_blocked_count > 0);
208 --m_blocked_count;
209 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
210 }
211
212 void Watcher::flush(Context *on_finish) {
213 m_notifier.flush(on_finish);
214 }
215
216 std::string Watcher::get_oid() const {
217 std::shared_lock locker{m_watch_lock};
218 return m_oid;
219 }
220
221 void Watcher::set_oid(const string& oid) {
222 std::unique_lock watch_locker{m_watch_lock};
223 ceph_assert(is_unregistered(m_watch_lock));
224
225 m_oid = oid;
226 }
227
228 void Watcher::handle_error(uint64_t handle, int err) {
229 lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
230
231 std::unique_lock watch_locker{m_watch_lock};
232 m_watch_error = true;
233
234 if (is_registered(m_watch_lock)) {
235 m_watch_state = WATCH_STATE_REWATCHING;
236 if (err == -EBLOCKLISTED) {
237 m_watch_blocklisted = true;
238 }
239
240 auto ctx = new LambdaContext(
241 boost::bind(&Watcher::rewatch, this));
242 m_work_queue->queue(ctx);
243 }
244 }
245
246 void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
247 bufferlist &out) {
248 m_ioctx.notify_ack(m_oid, notify_id, handle, out);
249 }
250
251 void Watcher::rewatch() {
252 ldout(m_cct, 10) << dendl;
253
254 Context *unregister_watch_ctx = nullptr;
255 {
256 std::unique_lock watch_locker{m_watch_lock};
257 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
258
259 if (m_unregister_watch_ctx != nullptr) {
260 m_watch_state = WATCH_STATE_IDLE;
261 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
262 } else {
263 m_watch_error = false;
264 auto ctx = create_context_callback<
265 Watcher, &Watcher::handle_rewatch>(this);
266 auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
267 &m_watch_ctx, &m_watch_handle, ctx);
268 req->send();
269 return;
270 }
271 }
272
273 unregister_watch_ctx->complete(0);
274 }
275
276 void Watcher::handle_rewatch(int r) {
277 ldout(m_cct, 10) << "r=" << r << dendl;
278
279 bool watch_error = false;
280 Context *unregister_watch_ctx = nullptr;
281 {
282 std::unique_lock watch_locker{m_watch_lock};
283 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
284
285 m_watch_blocklisted = false;
286 if (m_unregister_watch_ctx != nullptr) {
287 ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl;
288 m_watch_state = WATCH_STATE_IDLE;
289 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
290 } else if (r == -EBLOCKLISTED) {
291 lderr(m_cct) << "client blocklisted" << dendl;
292 m_watch_blocklisted = true;
293 } else if (r == -ENOENT) {
294 ldout(m_cct, 5) << "object does not exist" << dendl;
295 } else if (r < 0) {
296 lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl;
297 watch_error = true;
298 } else if (m_watch_error) {
299 lderr(m_cct) << "re-registering watch after error" << dendl;
300 watch_error = true;
301 }
302 }
303
304 if (unregister_watch_ctx != nullptr) {
305 unregister_watch_ctx->complete(0);
306 return;
307 } else if (watch_error) {
308 rewatch();
309 return;
310 }
311
312 auto ctx = create_context_callback<
313 Watcher, &Watcher::handle_rewatch_callback>(this);
314 m_work_queue->queue(ctx, r);
315 }
316
317 void Watcher::handle_rewatch_callback(int r) {
318 ldout(m_cct, 10) << "r=" << r << dendl;
319 handle_rewatch_complete(r);
320
321 bool watch_error = false;
322 Context *unregister_watch_ctx = nullptr;
323 {
324 std::unique_lock watch_locker{m_watch_lock};
325 ceph_assert(m_watch_state == WATCH_STATE_REWATCHING);
326
327 if (m_unregister_watch_ctx != nullptr) {
328 m_watch_state = WATCH_STATE_IDLE;
329 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
330 } else if (r == -EBLOCKLISTED || r == -ENOENT) {
331 m_watch_state = WATCH_STATE_IDLE;
332 } else if (r < 0 || m_watch_error) {
333 watch_error = true;
334 } else {
335 m_watch_state = WATCH_STATE_IDLE;
336 }
337 }
338
339 if (unregister_watch_ctx != nullptr) {
340 unregister_watch_ctx->complete(0);
341 } else if (watch_error) {
342 rewatch();
343 }
344 }
345
346 void Watcher::send_notify(bufferlist& payload,
347 watcher::NotifyResponse *response,
348 Context *on_finish) {
349 m_notifier.notify(payload, response, on_finish);
350 }
351
352 void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
353 uint64_t notifier_id, bufferlist& bl) {
354 // if notifications are blocked, finish the notification w/o
355 // bubbling the notification up to the derived class
356 watcher.m_async_op_tracker.start_op();
357 if (watcher.notifications_blocked()) {
358 bufferlist bl;
359 watcher.acknowledge_notify(notify_id, handle, bl);
360 } else {
361 watcher.handle_notify(notify_id, handle, notifier_id, bl);
362 }
363 watcher.m_async_op_tracker.finish_op();
364 }
365
366 void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
367 watcher.handle_error(handle, err);
368 }
369
370 } // namespace librbd