]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/Watcher.h" | |
5 | #include "librbd/watcher/RewatchRequest.h" | |
6 | #include "librbd/Utils.h" | |
7 | #include "librbd/TaskFinisher.h" | |
8 | #include "librbd/asio/ContextWQ.h" | |
9 | #include "include/encoding.h" | |
10 | #include "common/errno.h" | |
11 | #include <boost/bind/bind.hpp> | |
12 | ||
13 | // re-include our assert to clobber the system one; fix dout: | |
14 | #include "include/ceph_assert.h" | |
15 | ||
16 | #define dout_subsys ceph_subsys_rbd | |
17 | ||
18 | namespace librbd { | |
19 | ||
20 | using namespace boost::placeholders; | |
21 | ||
22 | using namespace watcher; | |
23 | ||
24 | using util::create_context_callback; | |
25 | using util::create_rados_callback; | |
26 | using std::string; | |
27 | ||
28 | namespace { | |
29 | ||
30 | struct C_UnwatchAndFlush : public Context { | |
31 | librados::Rados rados; | |
32 | Context *on_finish; | |
33 | bool flushing = false; | |
34 | int ret_val = 0; | |
35 | ||
36 | C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish) | |
37 | : rados(io_ctx), on_finish(on_finish) { | |
38 | } | |
39 | ||
40 | void complete(int r) override { | |
41 | if (ret_val == 0 && r < 0) { | |
42 | ret_val = r; | |
43 | } | |
44 | ||
45 | if (!flushing) { | |
46 | flushing = true; | |
47 | ||
48 | librados::AioCompletion *aio_comp = create_rados_callback(this); | |
49 | r = rados.aio_watch_flush(aio_comp); | |
50 | ceph_assert(r == 0); | |
51 | aio_comp->release(); | |
52 | return; | |
53 | } | |
54 | ||
55 | // ensure our reference to the RadosClient is released prior | |
56 | // to completing the callback to avoid racing an explicit | |
57 | // librados shutdown | |
58 | Context *ctx = on_finish; | |
59 | r = ret_val; | |
60 | delete this; | |
61 | ||
62 | ctx->complete(r); | |
63 | } | |
64 | ||
65 | void finish(int r) override { | |
66 | } | |
67 | }; | |
68 | ||
69 | } // anonymous namespace | |
70 | ||
71 | #undef dout_prefix | |
72 | #define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \ | |
73 | << __func__ << ": " | |
74 | ||
75 | Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id, | |
76 | uint64_t handle) | |
77 | : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id), | |
78 | handle(handle) { | |
79 | ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl; | |
80 | } | |
81 | ||
82 | void Watcher::C_NotifyAck::finish(int r) { | |
83 | ldout(cct, 10) << "r=" << r << dendl; | |
84 | ceph_assert(r == 0); | |
85 | watcher->acknowledge_notify(notify_id, handle, out); | |
86 | } | |
87 | ||
88 | #undef dout_prefix | |
89 | #define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \ | |
90 | << ": " | |
91 | ||
92 | Watcher::Watcher(librados::IoCtx& ioctx, asio::ContextWQ *work_queue, | |
93 | const string& oid) | |
94 | : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid), | |
95 | m_cct(reinterpret_cast<CephContext *>(ioctx.cct())), | |
96 | m_watch_lock(ceph::make_shared_mutex( | |
97 | util::unique_lock_name("librbd::Watcher::m_watch_lock", this))), | |
98 | m_watch_handle(0), m_notifier(work_queue, ioctx, oid), | |
99 | m_watch_state(WATCH_STATE_IDLE), m_watch_ctx(*this) { | |
100 | } | |
101 | ||
102 | Watcher::~Watcher() { | |
103 | std::shared_lock l{m_watch_lock}; | |
104 | ceph_assert(is_unregistered(m_watch_lock)); | |
105 | } | |
106 | ||
107 | void Watcher::register_watch(Context *on_finish) { | |
108 | ldout(m_cct, 10) << dendl; | |
109 | ||
110 | std::unique_lock watch_locker{m_watch_lock}; | |
111 | ceph_assert(is_unregistered(m_watch_lock)); | |
112 | m_watch_state = WATCH_STATE_REGISTERING; | |
113 | m_watch_blocklisted = false; | |
114 | ||
115 | librados::AioCompletion *aio_comp = create_rados_callback( | |
116 | new C_RegisterWatch(this, on_finish)); | |
117 | int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx); | |
118 | ceph_assert(r == 0); | |
119 | aio_comp->release(); | |
120 | } | |
121 | ||
122 | void Watcher::handle_register_watch(int r, Context *on_finish) { | |
123 | ldout(m_cct, 10) << "r=" << r << dendl; | |
124 | ||
125 | bool watch_error = false; | |
126 | Context *unregister_watch_ctx = nullptr; | |
127 | { | |
128 | std::unique_lock watch_locker{m_watch_lock}; | |
129 | ceph_assert(m_watch_state == WATCH_STATE_REGISTERING); | |
130 | ||
131 | m_watch_state = WATCH_STATE_IDLE; | |
132 | if (r < 0) { | |
133 | lderr(m_cct) << "failed to register watch: " << cpp_strerror(r) | |
134 | << dendl; | |
135 | m_watch_handle = 0; | |
136 | } | |
137 | ||
138 | if (m_unregister_watch_ctx != nullptr) { | |
139 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
140 | } else if (r == 0 && m_watch_error) { | |
141 | lderr(m_cct) << "re-registering watch after error" << dendl; | |
142 | m_watch_state = WATCH_STATE_REWATCHING; | |
143 | watch_error = true; | |
144 | } else { | |
145 | m_watch_blocklisted = (r == -EBLOCKLISTED); | |
146 | } | |
147 | } | |
148 | ||
149 | on_finish->complete(r); | |
150 | ||
151 | if (unregister_watch_ctx != nullptr) { | |
152 | unregister_watch_ctx->complete(0); | |
153 | } else if (watch_error) { | |
154 | rewatch(); | |
155 | } | |
156 | } | |
157 | ||
158 | void Watcher::unregister_watch(Context *on_finish) { | |
159 | ldout(m_cct, 10) << dendl; | |
160 | ||
161 | { | |
162 | std::unique_lock watch_locker{m_watch_lock}; | |
163 | if (m_watch_state != WATCH_STATE_IDLE) { | |
164 | ldout(m_cct, 10) << "delaying unregister until register completed" | |
165 | << dendl; | |
166 | ||
167 | ceph_assert(m_unregister_watch_ctx == nullptr); | |
168 | m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) { | |
169 | unregister_watch(on_finish); | |
170 | }); | |
171 | return; | |
172 | } else if (is_registered(m_watch_lock)) { | |
173 | librados::AioCompletion *aio_comp = create_rados_callback( | |
174 | new C_UnwatchAndFlush(m_ioctx, on_finish)); | |
175 | int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp); | |
176 | ceph_assert(r == 0); | |
177 | aio_comp->release(); | |
178 | ||
179 | m_watch_handle = 0; | |
180 | m_watch_blocklisted = false; | |
181 | return; | |
182 | } | |
183 | } | |
184 | ||
185 | on_finish->complete(0); | |
186 | } | |
187 | ||
188 | bool Watcher::notifications_blocked() const { | |
189 | std::shared_lock locker{m_watch_lock}; | |
190 | ||
191 | bool blocked = (m_blocked_count > 0); | |
192 | ldout(m_cct, 5) << "blocked=" << blocked << dendl; | |
193 | return blocked; | |
194 | } | |
195 | ||
196 | void Watcher::block_notifies(Context *on_finish) { | |
197 | { | |
198 | std::unique_lock locker{m_watch_lock}; | |
199 | ++m_blocked_count; | |
200 | ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl; | |
201 | } | |
202 | m_async_op_tracker.wait_for_ops(on_finish); | |
203 | } | |
204 | ||
205 | void Watcher::unblock_notifies() { | |
206 | std::unique_lock locker{m_watch_lock}; | |
207 | ceph_assert(m_blocked_count > 0); | |
208 | --m_blocked_count; | |
209 | ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl; | |
210 | } | |
211 | ||
212 | void Watcher::flush(Context *on_finish) { | |
213 | m_notifier.flush(on_finish); | |
214 | } | |
215 | ||
216 | std::string Watcher::get_oid() const { | |
217 | std::shared_lock locker{m_watch_lock}; | |
218 | return m_oid; | |
219 | } | |
220 | ||
221 | void Watcher::set_oid(const string& oid) { | |
222 | std::unique_lock watch_locker{m_watch_lock}; | |
223 | ceph_assert(is_unregistered(m_watch_lock)); | |
224 | ||
225 | m_oid = oid; | |
226 | } | |
227 | ||
228 | void Watcher::handle_error(uint64_t handle, int err) { | |
229 | lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl; | |
230 | ||
231 | std::unique_lock watch_locker{m_watch_lock}; | |
232 | m_watch_error = true; | |
233 | ||
234 | if (is_registered(m_watch_lock)) { | |
235 | m_watch_state = WATCH_STATE_REWATCHING; | |
236 | if (err == -EBLOCKLISTED) { | |
237 | m_watch_blocklisted = true; | |
238 | } | |
239 | ||
240 | auto ctx = new LambdaContext( | |
241 | boost::bind(&Watcher::rewatch, this)); | |
242 | m_work_queue->queue(ctx); | |
243 | } | |
244 | } | |
245 | ||
246 | void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, | |
247 | bufferlist &out) { | |
248 | m_ioctx.notify_ack(m_oid, notify_id, handle, out); | |
249 | } | |
250 | ||
251 | void Watcher::rewatch() { | |
252 | ldout(m_cct, 10) << dendl; | |
253 | ||
254 | Context *unregister_watch_ctx = nullptr; | |
255 | { | |
256 | std::unique_lock watch_locker{m_watch_lock}; | |
257 | ceph_assert(m_watch_state == WATCH_STATE_REWATCHING); | |
258 | ||
259 | if (m_unregister_watch_ctx != nullptr) { | |
260 | m_watch_state = WATCH_STATE_IDLE; | |
261 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
262 | } else { | |
263 | m_watch_error = false; | |
264 | auto ctx = create_context_callback< | |
265 | Watcher, &Watcher::handle_rewatch>(this); | |
266 | auto req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock, | |
267 | &m_watch_ctx, &m_watch_handle, ctx); | |
268 | req->send(); | |
269 | return; | |
270 | } | |
271 | } | |
272 | ||
273 | unregister_watch_ctx->complete(0); | |
274 | } | |
275 | ||
276 | void Watcher::handle_rewatch(int r) { | |
277 | ldout(m_cct, 10) << "r=" << r << dendl; | |
278 | ||
279 | bool watch_error = false; | |
280 | Context *unregister_watch_ctx = nullptr; | |
281 | { | |
282 | std::unique_lock watch_locker{m_watch_lock}; | |
283 | ceph_assert(m_watch_state == WATCH_STATE_REWATCHING); | |
284 | ||
285 | m_watch_blocklisted = false; | |
286 | if (m_unregister_watch_ctx != nullptr) { | |
287 | ldout(m_cct, 10) << "image is closing, skip rewatch" << dendl; | |
288 | m_watch_state = WATCH_STATE_IDLE; | |
289 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
290 | } else if (r == -EBLOCKLISTED) { | |
291 | lderr(m_cct) << "client blocklisted" << dendl; | |
292 | m_watch_blocklisted = true; | |
293 | } else if (r == -ENOENT) { | |
294 | ldout(m_cct, 5) << "object does not exist" << dendl; | |
295 | } else if (r < 0) { | |
296 | lderr(m_cct) << "failed to rewatch: " << cpp_strerror(r) << dendl; | |
297 | watch_error = true; | |
298 | } else if (m_watch_error) { | |
299 | lderr(m_cct) << "re-registering watch after error" << dendl; | |
300 | watch_error = true; | |
301 | } | |
302 | } | |
303 | ||
304 | if (unregister_watch_ctx != nullptr) { | |
305 | unregister_watch_ctx->complete(0); | |
306 | return; | |
307 | } else if (watch_error) { | |
308 | rewatch(); | |
309 | return; | |
310 | } | |
311 | ||
312 | auto ctx = create_context_callback< | |
313 | Watcher, &Watcher::handle_rewatch_callback>(this); | |
314 | m_work_queue->queue(ctx, r); | |
315 | } | |
316 | ||
317 | void Watcher::handle_rewatch_callback(int r) { | |
318 | ldout(m_cct, 10) << "r=" << r << dendl; | |
319 | handle_rewatch_complete(r); | |
320 | ||
321 | bool watch_error = false; | |
322 | Context *unregister_watch_ctx = nullptr; | |
323 | { | |
324 | std::unique_lock watch_locker{m_watch_lock}; | |
325 | ceph_assert(m_watch_state == WATCH_STATE_REWATCHING); | |
326 | ||
327 | if (m_unregister_watch_ctx != nullptr) { | |
328 | m_watch_state = WATCH_STATE_IDLE; | |
329 | std::swap(unregister_watch_ctx, m_unregister_watch_ctx); | |
330 | } else if (r == -EBLOCKLISTED || r == -ENOENT) { | |
331 | m_watch_state = WATCH_STATE_IDLE; | |
332 | } else if (r < 0 || m_watch_error) { | |
333 | watch_error = true; | |
334 | } else { | |
335 | m_watch_state = WATCH_STATE_IDLE; | |
336 | } | |
337 | } | |
338 | ||
339 | if (unregister_watch_ctx != nullptr) { | |
340 | unregister_watch_ctx->complete(0); | |
341 | } else if (watch_error) { | |
342 | rewatch(); | |
343 | } | |
344 | } | |
345 | ||
346 | void Watcher::send_notify(bufferlist& payload, | |
347 | watcher::NotifyResponse *response, | |
348 | Context *on_finish) { | |
349 | m_notifier.notify(payload, response, on_finish); | |
350 | } | |
351 | ||
352 | void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle, | |
353 | uint64_t notifier_id, bufferlist& bl) { | |
354 | // if notifications are blocked, finish the notification w/o | |
355 | // bubbling the notification up to the derived class | |
356 | watcher.m_async_op_tracker.start_op(); | |
357 | if (watcher.notifications_blocked()) { | |
358 | bufferlist bl; | |
359 | watcher.acknowledge_notify(notify_id, handle, bl); | |
360 | } else { | |
361 | watcher.handle_notify(notify_id, handle, notifier_id, bl); | |
362 | } | |
363 | watcher.m_async_op_tracker.finish_op(); | |
364 | } | |
365 | ||
366 | void Watcher::WatchCtx::handle_error(uint64_t handle, int err) { | |
367 | watcher.handle_error(handle, err); | |
368 | } | |
369 | ||
370 | } // namespace librbd |