]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/Watcher.cc
update sources to 12.2.7
[ceph.git] / ceph / src / librbd / Watcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/Watcher.h"
5 #include "librbd/watcher/RewatchRequest.h"
6 #include "librbd/Utils.h"
7 #include "librbd/TaskFinisher.h"
8 #include "include/encoding.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include <boost/bind.hpp>
12
13 // re-include our assert to clobber the system one; fix dout:
14 #include "include/assert.h"
15
16 #define dout_subsys ceph_subsys_rbd
17
18 namespace librbd {
19
20 using namespace watcher;
21
22 using util::create_context_callback;
23 using util::create_rados_callback;
24 using std::string;
25
26 namespace {
27
28 struct C_UnwatchAndFlush : public Context {
29 librados::Rados rados;
30 Context *on_finish;
31 bool flushing = false;
32 int ret_val = 0;
33
34 C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
35 : rados(io_ctx), on_finish(on_finish) {
36 }
37
38 void complete(int r) override {
39 if (ret_val == 0 && r < 0) {
40 ret_val = r;
41 }
42
43 if (!flushing) {
44 flushing = true;
45
46 librados::AioCompletion *aio_comp = create_rados_callback(this);
47 r = rados.aio_watch_flush(aio_comp);
48 assert(r == 0);
49 aio_comp->release();
50 return;
51 }
52
53 // ensure our reference to the RadosClient is released prior
54 // to completing the callback to avoid racing an explicit
55 // librados shutdown
56 Context *ctx = on_finish;
57 r = ret_val;
58 delete this;
59
60 ctx->complete(r);
61 }
62
63 void finish(int r) override {
64 }
65 };
66
67 } // anonymous namespace
68
69 #undef dout_prefix
70 #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
71 << __func__ << ": "
72
73 Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
74 uint64_t handle)
75 : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
76 handle(handle) {
77 ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
78 }
79
80 void Watcher::C_NotifyAck::finish(int r) {
81 ldout(cct, 10) << "r=" << r << dendl;
82 assert(r == 0);
83 watcher->acknowledge_notify(notify_id, handle, out);
84 }
85
86 #undef dout_prefix
87 #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
88 << ": "
89
90 Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
91 const string& oid)
92 : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
93 m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
94 m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
95 m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
96 m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) {
97 }
98
99 Watcher::~Watcher() {
100 RWLock::RLocker l(m_watch_lock);
101 assert(is_unregistered(m_watch_lock));
102 }
103
104 void Watcher::register_watch(Context *on_finish) {
105 ldout(m_cct, 10) << dendl;
106
107 RWLock::RLocker watch_locker(m_watch_lock);
108 assert(is_unregistered(m_watch_lock));
109 m_watch_state = WATCH_STATE_REGISTERING;
110
111 librados::AioCompletion *aio_comp = create_rados_callback(
112 new C_RegisterWatch(this, on_finish));
113 int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
114 assert(r == 0);
115 aio_comp->release();
116 }
117
118 void Watcher::handle_register_watch(int r, Context *on_finish) {
119 ldout(m_cct, 10) << "r=" << r << dendl;
120
121 bool watch_error = false;
122 Context *unregister_watch_ctx = nullptr;
123 {
124 RWLock::WLocker watch_locker(m_watch_lock);
125 assert(m_watch_state == WATCH_STATE_REGISTERING);
126
127 m_watch_state = WATCH_STATE_IDLE;
128 if (r < 0) {
129 lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
130 << dendl;
131 m_watch_handle = 0;
132 }
133
134 if (m_unregister_watch_ctx != nullptr) {
135 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
136 } else if (r == 0 && m_watch_error) {
137 lderr(m_cct) << "re-registering watch after error" << dendl;
138 m_watch_state = WATCH_STATE_REWATCHING;
139 watch_error = true;
140 }
141 }
142
143 on_finish->complete(r);
144
145 if (unregister_watch_ctx != nullptr) {
146 unregister_watch_ctx->complete(0);
147 } else if (watch_error) {
148 rewatch();
149 }
150 }
151
152 void Watcher::unregister_watch(Context *on_finish) {
153 ldout(m_cct, 10) << dendl;
154
155 {
156 RWLock::WLocker watch_locker(m_watch_lock);
157 if (m_watch_state != WATCH_STATE_IDLE) {
158 ldout(m_cct, 10) << "delaying unregister until register completed"
159 << dendl;
160
161 assert(m_unregister_watch_ctx == nullptr);
162 m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
163 unregister_watch(on_finish);
164 });
165 return;
166 } else if (is_registered(m_watch_lock)) {
167 librados::AioCompletion *aio_comp = create_rados_callback(
168 new C_UnwatchAndFlush(m_ioctx, on_finish));
169 int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
170 assert(r == 0);
171 aio_comp->release();
172 m_watch_handle = 0;
173 return;
174 }
175 }
176
177 on_finish->complete(0);
178 }
179
180 bool Watcher::notifications_blocked() const {
181 RWLock::RLocker locker(m_watch_lock);
182
183 bool blocked = (m_blocked_count > 0);
184 ldout(m_cct, 5) << "blocked=" << blocked << dendl;
185 return blocked;
186 }
187
188 void Watcher::block_notifies(Context *on_finish) {
189 {
190 RWLock::WLocker locker(m_watch_lock);
191 ++m_blocked_count;
192 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
193 }
194 m_async_op_tracker.wait_for_ops(on_finish);
195 }
196
197 void Watcher::unblock_notifies() {
198 RWLock::WLocker locker(m_watch_lock);
199 assert(m_blocked_count > 0);
200 --m_blocked_count;
201 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
202 }
203
204 void Watcher::flush(Context *on_finish) {
205 m_notifier.flush(on_finish);
206 }
207
208 std::string Watcher::get_oid() const {
209 RWLock::RLocker locker(m_watch_lock);
210 return m_oid;
211 }
212
213 void Watcher::set_oid(const string& oid) {
214 RWLock::WLocker watch_locker(m_watch_lock);
215 assert(is_unregistered(m_watch_lock));
216
217 m_oid = oid;
218 }
219
220 void Watcher::handle_error(uint64_t handle, int err) {
221 lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
222
223 RWLock::WLocker watch_locker(m_watch_lock);
224 m_watch_error = true;
225
226 if (is_registered(m_watch_lock)) {
227 m_watch_state = WATCH_STATE_REWATCHING;
228
229 FunctionContext *ctx = new FunctionContext(
230 boost::bind(&Watcher::rewatch, this));
231 m_work_queue->queue(ctx);
232 }
233 }
234
235 void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
236 bufferlist &out) {
237 m_ioctx.notify_ack(m_oid, notify_id, handle, out);
238 }
239
240 void Watcher::rewatch() {
241 ldout(m_cct, 10) << dendl;
242
243 Context *unregister_watch_ctx = nullptr;
244 {
245 RWLock::WLocker watch_locker(m_watch_lock);
246 assert(m_watch_state == WATCH_STATE_REWATCHING);
247
248 if (m_unregister_watch_ctx != nullptr) {
249 m_watch_state = WATCH_STATE_IDLE;
250 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
251 } else {
252 m_watch_error = false;
253 auto ctx = create_context_callback<
254 Watcher, &Watcher::handle_rewatch>(this);
255 auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
256 &m_watch_ctx, &m_watch_handle, ctx);
257 req->send();
258 return;
259 }
260 }
261
262 unregister_watch_ctx->complete(0);
263 }
264
265 void Watcher::handle_rewatch(int r) {
266 ldout(m_cct, 10) "r=" << r << dendl;
267
268 bool watch_error = false;
269 Context *unregister_watch_ctx = nullptr;
270 {
271 RWLock::WLocker watch_locker(m_watch_lock);
272 assert(m_watch_state == WATCH_STATE_REWATCHING);
273
274 if (m_unregister_watch_ctx != nullptr) {
275 ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl;
276 m_watch_state = WATCH_STATE_IDLE;
277 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
278 } else if (r == -EBLACKLISTED) {
279 lderr(m_cct) << "client blacklisted" << dendl;
280 } else if (r == -ENOENT) {
281 ldout(m_cct, 5) << "object does not exist" << dendl;
282 } else if (r < 0) {
283 lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl;
284 watch_error = true;
285 } else if (m_watch_error) {
286 lderr(m_cct) << "re-registering watch after error" << dendl;
287 watch_error = true;
288 }
289 }
290
291 if (unregister_watch_ctx != nullptr) {
292 unregister_watch_ctx->complete(0);
293 return;
294 } else if (watch_error) {
295 rewatch();
296 return;
297 }
298
299 auto ctx = create_context_callback<
300 Watcher, &Watcher::handle_rewatch_callback>(this);
301 m_work_queue->queue(ctx, r);
302 }
303
304 void Watcher::handle_rewatch_callback(int r) {
305 ldout(m_cct, 10) << "r=" << r << dendl;
306 handle_rewatch_complete(r);
307
308 bool watch_error = false;
309 Context *unregister_watch_ctx = nullptr;
310 {
311 RWLock::WLocker watch_locker(m_watch_lock);
312 assert(m_watch_state == WATCH_STATE_REWATCHING);
313
314 if (m_unregister_watch_ctx != nullptr) {
315 m_watch_state = WATCH_STATE_IDLE;
316 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
317 } else if (r == -EBLACKLISTED || r == -ENOENT) {
318 m_watch_state = WATCH_STATE_IDLE;
319 } else if (m_watch_error) {
320 watch_error = true;
321 } else {
322 m_watch_state = WATCH_STATE_IDLE;
323 }
324 }
325
326 if (unregister_watch_ctx != nullptr) {
327 unregister_watch_ctx->complete(0);
328 } else if (watch_error) {
329 rewatch();
330 }
331 }
332
333 void Watcher::send_notify(bufferlist& payload,
334 watcher::NotifyResponse *response,
335 Context *on_finish) {
336 m_notifier.notify(payload, response, on_finish);
337 }
338
339 void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
340 uint64_t notifier_id, bufferlist& bl) {
341 // if notifications are blocked, finish the notification w/o
342 // bubbling the notification up to the derived class
343 watcher.m_async_op_tracker.start_op();
344 if (watcher.notifications_blocked()) {
345 bufferlist bl;
346 watcher.acknowledge_notify(notify_id, handle, bl);
347 } else {
348 watcher.handle_notify(notify_id, handle, notifier_id, bl);
349 }
350 watcher.m_async_op_tracker.finish_op();
351 }
352
353 void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
354 watcher.handle_error(handle, err);
355 }
356
357 } // namespace librbd