]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/cephfs_mirror/Watcher.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / cephfs_mirror / Watcher.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "common/ceph_context.h"
5#include "common/debug.h"
6#include "common/errno.h"
7#include "common/WorkQueue.h"
8#include "include/stringify.h"
9#include "aio_utils.h"
10#include "watcher/RewatchRequest.h"
11#include "Watcher.h"
12
13#define dout_context g_ceph_context
14#define dout_subsys ceph_subsys_cephfs_mirror
15#undef dout_prefix
16#define dout_prefix *_dout << "cephfs::mirror::Watcher " << __func__
17
18using cephfs::mirror::watcher::RewatchRequest;
19
20namespace cephfs {
21namespace mirror {
22
23namespace {
24
25struct C_UnwatchAndFlush : public Context {
26 librados::Rados rados;
27 Context *on_finish;
28 bool flushing = false;
29 int ret_val = 0;
30
31 C_UnwatchAndFlush(librados::IoCtx &ioctx, Context *on_finish)
32 : rados(ioctx), on_finish(on_finish) {
33 }
34
35 void complete(int r) override {
36 if (ret_val == 0 && r < 0) {
37 ret_val = r;
38 }
39
40 if (!flushing) {
41 flushing = true;
42
43 librados::AioCompletion *aio_comp =
44 librados::Rados::aio_create_completion(
45 this, &rados_callback<Context, &Context::complete>);
46 r = rados.aio_watch_flush(aio_comp);
47
48 ceph_assert(r == 0);
49 aio_comp->release();
50 return;
51 }
52
53 // ensure our reference to the RadosClient is released prior
54 // to completing the callback to avoid racing an explicit
55 // librados shutdown
56 Context *ctx = on_finish;
57 r = ret_val;
58 delete this;
59
60 ctx->complete(r);
61 }
62
63 void finish(int r) override {
64 }
65};
66
67} // anonymous namespace
68
69Watcher::Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue)
70 : m_oid(oid),
71 m_ioctx(ioctx),
72 m_work_queue(work_queue),
73 m_lock(ceph::make_shared_mutex("cephfs::mirror::snap_watcher")),
74 m_state(STATE_IDLE),
75 m_watch_ctx(*this) {
76}
77
78Watcher::~Watcher() {
79}
80
81void Watcher::register_watch(Context *on_finish) {
82 dout(20) << dendl;
83
84 std::scoped_lock locker(m_lock);
85 m_state = STATE_REGISTERING;
86
87 on_finish = new C_RegisterWatch(this, on_finish);
88 librados::AioCompletion *aio_comp =
89 librados::Rados::aio_create_completion(on_finish, &rados_callback<Context, &Context::complete>);
90 int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
91 ceph_assert(r == 0);
92 aio_comp->release();
93}
94
95void Watcher::handle_register_watch(int r, Context *on_finish) {
96 dout(20) << ": r=" << r << dendl;
97
98 bool watch_error = false;
99 Context *unregister_watch_ctx = nullptr;
100 {
101 std::scoped_lock locker(m_lock);
102 ceph_assert(m_state == STATE_REGISTERING);
103
104 m_state = STATE_IDLE;
105 if (r < 0) {
106 derr << ": failed to register watch: " << cpp_strerror(r) << dendl;
107 m_watch_handle = 0;
108 }
109
110 if (m_unregister_watch_ctx != nullptr) {
111 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
112 } else if (r == 0 && m_watch_error) {
113 derr << ": re-registering after watch error" << dendl;
114 m_state = STATE_REGISTERING;
115 watch_error = true;
116 } else {
117 m_watch_blocklisted = (r == -EBLOCKLISTED);
118 }
119 }
120
121 on_finish->complete(r);
122 if (unregister_watch_ctx != nullptr) {
123 unregister_watch_ctx->complete(0);
124 } else if (watch_error) {
125 rewatch();
126 }
127}
128
129void Watcher::unregister_watch(Context *on_finish) {
130 dout(20) << dendl;
131
132 {
133 std::scoped_lock locker(m_lock);
134 if (m_state != STATE_IDLE) {
135 dout(10) << ": delaying unregister -- watch register in progress" << dendl;
136 ceph_assert(m_unregister_watch_ctx == nullptr);
137 m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
138 unregister_watch(on_finish);
139 });
140 return;
141 } else if (is_registered()) {
142 // watch is registered -- unwatch
143 librados::AioCompletion *aio_comp =
144 librados::Rados::aio_create_completion(new C_UnwatchAndFlush(m_ioctx, on_finish),
145 &rados_callback<Context, &Context::complete>);
146 int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
147 ceph_assert(r == 0);
148 aio_comp->release();
149 m_watch_handle = 0;
150 m_watch_blocklisted = false;
151 return;
152 }
153 }
154
155 on_finish->complete(0);
156}
157
158void Watcher::handle_error(uint64_t handle, int err) {
159 derr << ": handle=" << handle << ": " << cpp_strerror(err) << dendl;
160
161 std::scoped_lock locker(m_lock);
162 m_watch_error = true;
163
164 if (is_registered()) {
165 m_state = STATE_REWATCHING;
166 if (err == -EBLOCKLISTED) {
167 m_watch_blocklisted = true;
168 }
169 m_work_queue->queue(new LambdaContext([this] {
170 rewatch();
171 }), 0);
172 }
173}
174
175void Watcher::rewatch() {
176 dout(20) << dendl;
177
178 Context *unregister_watch_ctx = nullptr;
179 {
180 std::unique_lock locker(m_lock);
181 ceph_assert(m_state == STATE_REWATCHING);
182
183 if (m_unregister_watch_ctx != nullptr) {
184 m_state = STATE_IDLE;
185 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
186 } else {
187 m_watch_error = false;
188 Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch>(this);
189 auto req = RewatchRequest::create(m_ioctx, m_oid, m_lock,
190 &m_watch_ctx, &m_watch_handle, ctx);
191 req->send();
192 return;
193 }
194 }
195
196 unregister_watch_ctx->complete(0);
197}
198
199void Watcher::handle_rewatch(int r) {
200 dout(20) << ": r=" << r << dendl;
201
202 bool watch_error = false;
203 Context *unregister_watch_ctx = nullptr;
204 {
205 std::scoped_lock locker(m_lock);
206 ceph_assert(m_state == STATE_REWATCHING);
207
208 m_watch_blocklisted = false;
209 if (m_unregister_watch_ctx != nullptr) {
210 dout(10) << ": skipping rewatch -- unregistering" << dendl;
211 m_state = STATE_IDLE;
212 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
213 } else if (r == -EBLOCKLISTED) {
214 m_watch_blocklisted = true;
215 derr << ": client blocklisted" << dendl;
216 } else if (r == -ENOENT) {
217 dout(5) << ": object " << m_oid << " does not exist" << dendl;
218 } else if (r < 0) {
219 derr << ": failed to rewatch: " << cpp_strerror(r) << dendl;
220 watch_error = true;
221 } else if (m_watch_error) {
222 derr << ": re-registering watch after error" << dendl;
223 watch_error = true;
224 }
225 }
226
227 if (unregister_watch_ctx != nullptr) {
228 unregister_watch_ctx->complete(0);
229 return;
230 } else if (watch_error) {
231 rewatch();
232 return;
233 }
234
235 Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch_callback>(this);
236 m_work_queue->queue(ctx, r);
237}
238
239void Watcher::handle_rewatch_callback(int r) {
240 dout(10) << ": r=" << r << dendl;
241 handle_rewatch_complete(r);
242
243 bool watch_error = false;
244 Context *unregister_watch_ctx = nullptr;
245 {
246 std::scoped_lock locker(m_lock);
247 ceph_assert(m_state == STATE_REWATCHING);
248
249 if (m_unregister_watch_ctx != nullptr) {
250 m_state = STATE_IDLE;
251 std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
252 } else if (r == -EBLOCKLISTED || r == -ENOENT) {
253 m_state = STATE_IDLE;
254 } else if (r < 0 || m_watch_error) {
255 watch_error = true;
256 } else {
257 m_state = STATE_IDLE;
258 }
259 }
260
261 if (unregister_watch_ctx != nullptr) {
262 unregister_watch_ctx->complete(0);
263 } else if (watch_error) {
264 rewatch();
265 }
266}
267
268void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl) {
269 m_ioctx.notify_ack(m_oid, notify_id, handle, bl);
270}
271
272void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
273 uint64_t notifier_id, bufferlist& bl) {
274 dout(20) << ": notify_id=" << notify_id << ", handle=" << handle
275 << ", notifier_id=" << notifier_id << dendl;
276 watcher.handle_notify(notify_id, handle, notifier_id, bl);
277}
278
279void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
280 dout(20) << dendl;
281 watcher.handle_error(handle, err);
282}
283
284} // namespace mirror
285} // namespace cephfs