]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/Watcher.h" | |
5 | #include "librbd/watcher/RewatchRequest.h" | |
6 | #include "librbd/Utils.h" | |
7 | #include "librbd/TaskFinisher.h" | |
8 | #include "include/encoding.h" | |
9 | #include "common/errno.h" | |
10 | #include "common/WorkQueue.h" | |
11 | #include <boost/bind.hpp> | |
12 | ||
13 | // re-include our assert to clobber the system one; fix dout: | |
11fdf7f2 | 14 | #include "include/ceph_assert.h" |
7c673cae FG |
15 | |
16 | #define dout_subsys ceph_subsys_rbd | |
17 | ||
18 | namespace librbd { | |
19 | ||
20 | using namespace watcher; | |
21 | ||
22 | using util::create_context_callback; | |
23 | using util::create_rados_callback; | |
24 | using std::string; | |
25 | ||
26 | namespace { | |
27 | ||
28 | struct C_UnwatchAndFlush : public Context { | |
29 | librados::Rados rados; | |
30 | Context *on_finish; | |
31 | bool flushing = false; | |
32 | int ret_val = 0; | |
33 | ||
34 | C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish) | |
35 | : rados(io_ctx), on_finish(on_finish) { | |
36 | } | |
37 | ||
38 | void complete(int r) override { | |
39 | if (ret_val == 0 && r < 0) { | |
40 | ret_val = r; | |
41 | } | |
42 | ||
43 | if (!flushing) { | |
44 | flushing = true; | |
45 | ||
46 | librados::AioCompletion *aio_comp = create_rados_callback(this); | |
47 | r = rados.aio_watch_flush(aio_comp); | |
11fdf7f2 | 48 | ceph_assert(r == 0); |
7c673cae FG |
49 | aio_comp->release(); |
50 | return; | |
51 | } | |
52 | ||
53 | // ensure our reference to the RadosClient is released prior | |
54 | // to completing the callback to avoid racing an explicit | |
55 | // librados shutdown | |
56 | Context *ctx = on_finish; | |
57 | r = ret_val; | |
58 | delete this; | |
59 | ||
60 | ctx->complete(r); | |
61 | } | |
62 | ||
63 | void finish(int r) override { | |
64 | } | |
65 | }; | |
66 | ||
67 | } // anonymous namespace | |
68 | ||
69 | #undef dout_prefix | |
70 | #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \ | |
71 | << __func__ << ": " | |
72 | ||
73 | Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id, | |
74 | uint64_t handle) | |
75 | : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id), | |
76 | handle(handle) { | |
77 | ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl; | |
78 | } | |
79 | ||
80 | void Watcher::C_NotifyAck::finish(int r) { | |
81 | ldout(cct, 10) << "r=" << r << dendl; | |
11fdf7f2 | 82 | ceph_assert(r == 0); |
7c673cae FG |
83 | watcher->acknowledge_notify(notify_id, handle, out); |
84 | } | |
85 | ||
86 | #undef dout_prefix | |
87 | #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \ | |
88 | << ": " | |
89 | ||
90 | Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue, | |
91 | const string& oid) | |
92 | : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid), | |
93 | m_cct(reinterpret_cast<CephContext *>(ioctx.cct())), | |
9f95a23c | 94 | m_watch_lock(ceph::make_shared_mutex(util::unique_lock_name("librbd::Watcher::m_watch_lock", this))), |
7c673cae | 95 | m_watch_handle(0), m_notifier(work_queue, ioctx, oid), |
28e407b8 | 96 | m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) { |
7c673cae FG |
97 | } |
98 | ||
99 | Watcher::~Watcher() { | |
9f95a23c | 100 | std::shared_lock l{m_watch_lock}; |
11fdf7f2 | 101 | ceph_assert(is_unregistered(m_watch_lock)); |
7c673cae FG |
102 | } |
103 | ||
104 | void Watcher::register_watch(Context *on_finish) { | |
105 | ldout(m_cct, 10) << dendl; | |
106 | ||
9f95a23c | 107 | std::unique_lock watch_locker{m_watch_lock}; |
11fdf7f2 | 108 | ceph_assert(is_unregistered(m_watch_lock)); |
7c673cae | 109 | m_watch_state = WATCH_STATE_REGISTERING; |
91327a77 | 110 | m_watch_blacklisted = false; |
7c673cae FG |
111 | |
112 | librados::AioCompletion *aio_comp = create_rados_callback( | |
28e407b8 | 113 | new C_RegisterWatch(this, on_finish)); |
7c673cae | 114 | int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx); |
11fdf7f2 | 115 | ceph_assert(r == 0); |
7c673cae FG |
116 | aio_comp->release(); |
117 | } | |
118 | ||
119 | void Watcher::handle_register_watch(int r, Context *on_finish) { | |
120 | ldout(m_cct, 10) << "r=" << r << dendl; | |
28e407b8 AA |
121 | |
122 | bool watch_error = false; | |
7c673cae FG |
123 | Context *unregister_watch_ctx = nullptr; |
124 | { | |
9f95a23c | 125 | std::unique_lock watch_locker{m_watch_lock}; |
11fdf7f2 | 126 | ceph_assert(m_watch_state == WATCH_STATE_REGISTERING); |
7c673cae | 127 | |
28e407b8 | 128 | m_watch_state = WATCH_STATE_IDLE; |
7c673cae FG |
129 | if (r < 0) { |
130 | lderr(m_cct) << "failed to register watch: " << cpp_strerror(r) | |
131 | << dendl; | |
132 | m_watch_handle = 0; | |
28e407b8 AA |
133 | } |
134 | ||
135 | if (m_unregister_watch_ctx != nullptr) { | |
136 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
137 | } else if (r == 0 && m_watch_error) { | |
138 | lderr(m_cct) << "re-registering watch after error" << dendl; | |
139 | m_watch_state = WATCH_STATE_REWATCHING; | |
140 | watch_error = true; | |
91327a77 AA |
141 | } else { |
142 | m_watch_blacklisted = (r == -EBLACKLISTED); | |
7c673cae FG |
143 | } |
144 | } | |
145 | ||
146 | on_finish->complete(r); | |
147 | ||
7c673cae FG |
148 | if (unregister_watch_ctx != nullptr) { |
149 | unregister_watch_ctx->complete(0); | |
28e407b8 AA |
150 | } else if (watch_error) { |
151 | rewatch(); | |
7c673cae FG |
152 | } |
153 | } | |
154 | ||
155 | void Watcher::unregister_watch(Context *on_finish) { | |
156 | ldout(m_cct, 10) << dendl; | |
157 | ||
158 | { | |
9f95a23c | 159 | std::unique_lock watch_locker{m_watch_lock}; |
28e407b8 | 160 | if (m_watch_state != WATCH_STATE_IDLE) { |
7c673cae FG |
161 | ldout(m_cct, 10) << "delaying unregister until register completed" |
162 | << dendl; | |
163 | ||
11fdf7f2 | 164 | ceph_assert(m_unregister_watch_ctx == nullptr); |
9f95a23c | 165 | m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) { |
7c673cae FG |
166 | unregister_watch(on_finish); |
167 | }); | |
168 | return; | |
28e407b8 | 169 | } else if (is_registered(m_watch_lock)) { |
7c673cae | 170 | librados::AioCompletion *aio_comp = create_rados_callback( |
91327a77 | 171 | new C_UnwatchAndFlush(m_ioctx, on_finish)); |
7c673cae | 172 | int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp); |
11fdf7f2 | 173 | ceph_assert(r == 0); |
7c673cae | 174 | aio_comp->release(); |
91327a77 | 175 | |
28e407b8 | 176 | m_watch_handle = 0; |
91327a77 | 177 | m_watch_blacklisted = false; |
7c673cae FG |
178 | return; |
179 | } | |
180 | } | |
181 | ||
182 | on_finish->complete(0); | |
183 | } | |
184 | ||
31f18b77 | 185 | bool Watcher::notifications_blocked() const { |
9f95a23c | 186 | std::shared_lock locker{m_watch_lock}; |
31f18b77 FG |
187 | |
188 | bool blocked = (m_blocked_count > 0); | |
189 | ldout(m_cct, 5) << "blocked=" << blocked << dendl; | |
190 | return blocked; | |
191 | } | |
192 | ||
193 | void Watcher::block_notifies(Context *on_finish) { | |
194 | { | |
9f95a23c | 195 | std::unique_lock locker{m_watch_lock}; |
31f18b77 FG |
196 | ++m_blocked_count; |
197 | ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl; | |
198 | } | |
199 | m_async_op_tracker.wait_for_ops(on_finish); | |
200 | } | |
201 | ||
202 | void Watcher::unblock_notifies() { | |
9f95a23c | 203 | std::unique_lock locker{m_watch_lock}; |
11fdf7f2 | 204 | ceph_assert(m_blocked_count > 0); |
31f18b77 FG |
205 | --m_blocked_count; |
206 | ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl; | |
207 | } | |
208 | ||
7c673cae FG |
209 | void Watcher::flush(Context *on_finish) { |
210 | m_notifier.flush(on_finish); | |
211 | } | |
212 | ||
213 | std::string Watcher::get_oid() const { | |
9f95a23c | 214 | std::shared_lock locker{m_watch_lock}; |
7c673cae FG |
215 | return m_oid; |
216 | } | |
217 | ||
218 | void Watcher::set_oid(const string& oid) { | |
9f95a23c | 219 | std::unique_lock watch_locker{m_watch_lock}; |
11fdf7f2 | 220 | ceph_assert(is_unregistered(m_watch_lock)); |
7c673cae FG |
221 | |
222 | m_oid = oid; | |
223 | } | |
224 | ||
225 | void Watcher::handle_error(uint64_t handle, int err) { | |
226 | lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl; | |
227 | ||
9f95a23c | 228 | std::unique_lock watch_locker{m_watch_lock}; |
28e407b8 AA |
229 | m_watch_error = true; |
230 | ||
231 | if (is_registered(m_watch_lock)) { | |
232 | m_watch_state = WATCH_STATE_REWATCHING; | |
91327a77 AA |
233 | if (err == -EBLACKLISTED) { |
234 | m_watch_blacklisted = true; | |
235 | } | |
7c673cae | 236 | |
9f95a23c | 237 | auto ctx = new LambdaContext( |
7c673cae FG |
238 | boost::bind(&Watcher::rewatch, this)); |
239 | m_work_queue->queue(ctx); | |
240 | } | |
241 | } | |
242 | ||
243 | void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, | |
244 | bufferlist &out) { | |
245 | m_ioctx.notify_ack(m_oid, notify_id, handle, out); | |
246 | } | |
247 | ||
248 | void Watcher::rewatch() { | |
249 | ldout(m_cct, 10) << dendl; | |
250 | ||
28e407b8 AA |
251 | Context *unregister_watch_ctx = nullptr; |
252 | { | |
9f95a23c | 253 | std::unique_lock watch_locker{m_watch_lock}; |
11fdf7f2 | 254 | ceph_assert(m_watch_state == WATCH_STATE_REWATCHING); |
28e407b8 AA |
255 | |
256 | if (m_unregister_watch_ctx != nullptr) { | |
257 | m_watch_state = WATCH_STATE_IDLE; | |
258 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
259 | } else { | |
260 | m_watch_error = false; | |
261 | auto ctx = create_context_callback< | |
262 | Watcher, &Watcher::handle_rewatch>(this); | |
263 | auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock, | |
264 | &m_watch_ctx, &m_watch_handle, ctx); | |
265 | req->send(); | |
266 | return; | |
267 | } | |
7c673cae | 268 | } |
28e407b8 AA |
269 | |
270 | unregister_watch_ctx->complete(0); | |
7c673cae FG |
271 | } |
272 | ||
273 | void Watcher::handle_rewatch(int r) { | |
91327a77 | 274 | ldout(m_cct, 10) << "r=" << r << dendl; |
7c673cae | 275 | |
28e407b8 AA |
276 | bool watch_error = false; |
277 | Context *unregister_watch_ctx = nullptr; | |
278 | { | |
9f95a23c | 279 | std::unique_lock watch_locker{m_watch_lock}; |
11fdf7f2 | 280 | ceph_assert(m_watch_state == WATCH_STATE_REWATCHING); |
28e407b8 | 281 | |
91327a77 | 282 | m_watch_blacklisted = false; |
28e407b8 AA |
283 | if (m_unregister_watch_ctx != nullptr) { |
284 | ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl; | |
285 | m_watch_state = WATCH_STATE_IDLE; | |
286 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
287 | } else if (r == -EBLACKLISTED) { | |
288 | lderr(m_cct) << "client blacklisted" << dendl; | |
91327a77 | 289 | m_watch_blacklisted = true; |
28e407b8 AA |
290 | } else if (r == -ENOENT) { |
291 | ldout(m_cct, 5) << "object does not exist" << dendl; | |
292 | } else if (r < 0) { | |
293 | lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl; | |
294 | watch_error = true; | |
295 | } else if (m_watch_error) { | |
296 | lderr(m_cct) << "re-registering watch after error" << dendl; | |
297 | watch_error = true; | |
298 | } | |
7c673cae FG |
299 | } |
300 | ||
28e407b8 AA |
301 | if (unregister_watch_ctx != nullptr) { |
302 | unregister_watch_ctx->complete(0); | |
303 | return; | |
304 | } else if (watch_error) { | |
305 | rewatch(); | |
306 | return; | |
307 | } | |
308 | ||
309 | auto ctx = create_context_callback< | |
310 | Watcher, &Watcher::handle_rewatch_callback>(this); | |
311 | m_work_queue->queue(ctx, r); | |
312 | } | |
313 | ||
314 | void Watcher::handle_rewatch_callback(int r) { | |
315 | ldout(m_cct, 10) << "r=" << r << dendl; | |
316 | handle_rewatch_complete(r); | |
317 | ||
318 | bool watch_error = false; | |
7c673cae FG |
319 | Context *unregister_watch_ctx = nullptr; |
320 | { | |
9f95a23c | 321 | std::unique_lock watch_locker{m_watch_lock}; |
11fdf7f2 | 322 | ceph_assert(m_watch_state == WATCH_STATE_REWATCHING); |
7c673cae | 323 | |
28e407b8 AA |
324 | if (m_unregister_watch_ctx != nullptr) { |
325 | m_watch_state = WATCH_STATE_IDLE; | |
326 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
e306af50 | 327 | } else if (r == -EBLACKLISTED || r == -ENOENT) { |
28e407b8 | 328 | m_watch_state = WATCH_STATE_IDLE; |
91327a77 | 329 | } else if (r < 0 || m_watch_error) { |
28e407b8 AA |
330 | watch_error = true; |
331 | } else { | |
332 | m_watch_state = WATCH_STATE_IDLE; | |
333 | } | |
7c673cae FG |
334 | } |
335 | ||
7c673cae FG |
336 | if (unregister_watch_ctx != nullptr) { |
337 | unregister_watch_ctx->complete(0); | |
28e407b8 AA |
338 | } else if (watch_error) { |
339 | rewatch(); | |
7c673cae FG |
340 | } |
341 | } | |
342 | ||
343 | void Watcher::send_notify(bufferlist& payload, | |
344 | watcher::NotifyResponse *response, | |
345 | Context *on_finish) { | |
346 | m_notifier.notify(payload, response, on_finish); | |
347 | } | |
348 | ||
31f18b77 FG |
349 | void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle, |
350 | uint64_t notifier_id, bufferlist& bl) { | |
351 | // if notifications are blocked, finish the notification w/o | |
352 | // bubbling the notification up to the derived class | |
353 | watcher.m_async_op_tracker.start_op(); | |
354 | if (watcher.notifications_blocked()) { | |
355 | bufferlist bl; | |
356 | watcher.acknowledge_notify(notify_id, handle, bl); | |
357 | } else { | |
358 | watcher.handle_notify(notify_id, handle, notifier_id, bl); | |
359 | } | |
360 | watcher.m_async_op_tracker.finish_op(); | |
7c673cae FG |
361 | } |
362 | ||
363 | void Watcher::WatchCtx::handle_error(uint64_t handle, int err) { | |
364 | watcher.handle_error(handle, err); | |
365 | } | |
366 | ||
367 | } // namespace librbd |