]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Watcher.cc
update sources to v12.1.0
[ceph.git] / ceph / src / librbd / Watcher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/Watcher.h"
5#include "librbd/watcher/RewatchRequest.h"
6#include "librbd/Utils.h"
7#include "librbd/TaskFinisher.h"
8#include "include/encoding.h"
9#include "common/errno.h"
10#include "common/WorkQueue.h"
11#include <boost/bind.hpp>
12
13// re-include our assert to clobber the system one; fix dout:
14#include "include/assert.h"
15
16#define dout_subsys ceph_subsys_rbd
17
18namespace librbd {
19
20using namespace watcher;
21
22using util::create_context_callback;
23using util::create_rados_callback;
24using std::string;
25
26namespace {
27
28struct C_UnwatchAndFlush : public Context {
29 librados::Rados rados;
30 Context *on_finish;
31 bool flushing = false;
32 int ret_val = 0;
33
34 C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
35 : rados(io_ctx), on_finish(on_finish) {
36 }
37
38 void complete(int r) override {
39 if (ret_val == 0 && r < 0) {
40 ret_val = r;
41 }
42
43 if (!flushing) {
44 flushing = true;
45
46 librados::AioCompletion *aio_comp = create_rados_callback(this);
47 r = rados.aio_watch_flush(aio_comp);
48 assert(r == 0);
49 aio_comp->release();
50 return;
51 }
52
53 // ensure our reference to the RadosClient is released prior
54 // to completing the callback to avoid racing an explicit
55 // librados shutdown
56 Context *ctx = on_finish;
57 r = ret_val;
58 delete this;
59
60 ctx->complete(r);
61 }
62
63 void finish(int r) override {
64 }
65};
66
67} // anonymous namespace
68
69#undef dout_prefix
70#define dout_prefix *_dout << "librbd::Watcher::C_NotifyAck " << this << " " \
71 << __func__ << ": "
72
73Watcher::C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id,
74 uint64_t handle)
75 : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id),
76 handle(handle) {
77 ldout(cct, 10) << "id=" << notify_id << ", " << "handle=" << handle << dendl;
78}
79
80void Watcher::C_NotifyAck::finish(int r) {
81 ldout(cct, 10) << "r=" << r << dendl;
82 assert(r == 0);
83 watcher->acknowledge_notify(notify_id, handle, out);
84}
85
86#undef dout_prefix
87#define dout_prefix *_dout << "librbd::Watcher: " << this << " " << __func__ \
88 << ": "
89
90Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue,
91 const string& oid)
92 : m_ioctx(ioctx), m_work_queue(work_queue), m_oid(oid),
93 m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
94 m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)),
95 m_watch_handle(0), m_notifier(work_queue, ioctx, oid),
96 m_watch_state(WATCH_STATE_UNREGISTERED), m_watch_ctx(*this) {
97}
98
99Watcher::~Watcher() {
100 RWLock::RLocker l(m_watch_lock);
101 assert(m_watch_state != WATCH_STATE_REGISTERED);
102}
103
104void Watcher::register_watch(Context *on_finish) {
105 ldout(m_cct, 10) << dendl;
106
107 RWLock::RLocker watch_locker(m_watch_lock);
108 assert(m_watch_state == WATCH_STATE_UNREGISTERED);
109 m_watch_state = WATCH_STATE_REGISTERING;
110
111 librados::AioCompletion *aio_comp = create_rados_callback(
112 new C_RegisterWatch(this, on_finish));
113 int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
114 assert(r == 0);
115 aio_comp->release();
116}
117
118void Watcher::handle_register_watch(int r, Context *on_finish) {
119 ldout(m_cct, 10) << "r=" << r << dendl;
120 Context *unregister_watch_ctx = nullptr;
121 {
122 RWLock::WLocker watch_locker(m_watch_lock);
123 assert(m_watch_state == WATCH_STATE_REGISTERING);
124
125 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
126 if (r < 0) {
127 lderr(m_cct) << "failed to register watch: " << cpp_strerror(r)
128 << dendl;
129 m_watch_handle = 0;
130 m_watch_state = WATCH_STATE_UNREGISTERED;
131 } else if (r >= 0) {
132 m_watch_state = WATCH_STATE_REGISTERED;
133 }
134 }
135
136 on_finish->complete(r);
137
138 // wake up pending unregister request
139 if (unregister_watch_ctx != nullptr) {
140 unregister_watch_ctx->complete(0);
141 }
142}
143
144void Watcher::unregister_watch(Context *on_finish) {
145 ldout(m_cct, 10) << dendl;
146
147 {
148 RWLock::WLocker watch_locker(m_watch_lock);
149 if (m_watch_state == WATCH_STATE_REGISTERING ||
150 m_watch_state == WATCH_STATE_REWATCHING) {
151 ldout(m_cct, 10) << "delaying unregister until register completed"
152 << dendl;
153
154 assert(m_unregister_watch_ctx == nullptr);
155 m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
156 unregister_watch(on_finish);
157 });
158 return;
159 }
160
161 if (m_watch_state == WATCH_STATE_REGISTERED ||
162 m_watch_state == WATCH_STATE_ERROR) {
163 m_watch_state = WATCH_STATE_UNREGISTERED;
164
165 librados::AioCompletion *aio_comp = create_rados_callback(
166 new C_UnwatchAndFlush(m_ioctx, on_finish));
167 int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
168 assert(r == 0);
169 aio_comp->release();
170 return;
171 }
172 }
173
174 on_finish->complete(0);
175}
176
31f18b77
FG
177bool Watcher::notifications_blocked() const {
178 RWLock::RLocker locker(m_watch_lock);
179
180 bool blocked = (m_blocked_count > 0);
181 ldout(m_cct, 5) << "blocked=" << blocked << dendl;
182 return blocked;
183}
184
185void Watcher::block_notifies(Context *on_finish) {
186 {
187 RWLock::WLocker locker(m_watch_lock);
188 ++m_blocked_count;
189 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
190 }
191 m_async_op_tracker.wait_for_ops(on_finish);
192}
193
194void Watcher::unblock_notifies() {
195 RWLock::WLocker locker(m_watch_lock);
196 assert(m_blocked_count > 0);
197 --m_blocked_count;
198 ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
199}
200
7c673cae
FG
201void Watcher::flush(Context *on_finish) {
202 m_notifier.flush(on_finish);
203}
204
205std::string Watcher::get_oid() const {
206 RWLock::RLocker locker(m_watch_lock);
207 return m_oid;
208}
209
210void Watcher::set_oid(const string& oid) {
211 RWLock::WLocker l(m_watch_lock);
212 assert(m_watch_state == WATCH_STATE_UNREGISTERED);
213
214 m_oid = oid;
215}
216
217void Watcher::handle_error(uint64_t handle, int err) {
218 lderr(m_cct) << "handle=" << handle << ": " << cpp_strerror(err) << dendl;
219
220 RWLock::WLocker l(m_watch_lock);
221 if (m_watch_state == WATCH_STATE_REGISTERED) {
222 m_watch_state = WATCH_STATE_ERROR;
223
224 FunctionContext *ctx = new FunctionContext(
225 boost::bind(&Watcher::rewatch, this));
226 m_work_queue->queue(ctx);
227 }
228}
229
230void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
231 bufferlist &out) {
232 m_ioctx.notify_ack(m_oid, notify_id, handle, out);
233}
234
235void Watcher::rewatch() {
236 ldout(m_cct, 10) << dendl;
237
238 RWLock::WLocker l(m_watch_lock);
239 if (m_watch_state != WATCH_STATE_ERROR) {
240 return;
241 }
242 m_watch_state = WATCH_STATE_REWATCHING;
243
244 Context *ctx = create_context_callback<Watcher,
245 &Watcher::handle_rewatch>(this);
246 RewatchRequest *req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock,
247 &m_watch_ctx,
248 &m_watch_handle, ctx);
249 req->send();
250}
251
252void Watcher::handle_rewatch(int r) {
253 ldout(m_cct, 10) "r=" << r << dendl;
254
255 WatchState next_watch_state = WATCH_STATE_REGISTERED;
256 if (r < 0) {
257 // only EBLACKLISTED or ENOENT can be returned
258 assert(r == -EBLACKLISTED || r == -ENOENT);
259 next_watch_state = WATCH_STATE_UNREGISTERED;
260 }
261
262 Context *unregister_watch_ctx = nullptr;
263 {
264 RWLock::WLocker watch_locker(m_watch_lock);
265 assert(m_watch_state == WATCH_STATE_REWATCHING);
266 m_watch_state = next_watch_state;
267
268 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
269
270 m_work_queue->queue(
271 create_context_callback<Watcher,
272 &Watcher::handle_rewatch_complete>(this), r);
273 }
274
275 // wake up pending unregister request
276 if (unregister_watch_ctx != nullptr) {
277 unregister_watch_ctx->complete(0);
278 }
279}
280
281void Watcher::send_notify(bufferlist& payload,
282 watcher::NotifyResponse *response,
283 Context *on_finish) {
284 m_notifier.notify(payload, response, on_finish);
285}
286
31f18b77
FG
287void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
288 uint64_t notifier_id, bufferlist& bl) {
289 // if notifications are blocked, finish the notification w/o
290 // bubbling the notification up to the derived class
291 watcher.m_async_op_tracker.start_op();
292 if (watcher.notifications_blocked()) {
293 bufferlist bl;
294 watcher.acknowledge_notify(notify_id, handle, bl);
295 } else {
296 watcher.handle_notify(notify_id, handle, notifier_id, bl);
297 }
298 watcher.m_async_op_tracker.finish_op();
7c673cae
FG
299}
300
301void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
302 watcher.handle_error(handle, err);
303}
304
305} // namespace librbd