]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/librados_test_stub/TestWatchNotify.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / test / librados_test_stub / TestWatchNotify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/librados_test_stub/TestWatchNotify.h"
5 #include "include/Context.h"
6 #include "common/Cond.h"
7 #include "include/stringify.h"
8 #include "common/Finisher.h"
9 #include "test/librados_test_stub/TestCluster.h"
10 #include "test/librados_test_stub/TestRadosClient.h"
11 #include <boost/bind/bind.hpp>
12 #include <boost/function.hpp>
13 #include "include/ceph_assert.h"
14
15 #define dout_subsys ceph_subsys_rados
16 #undef dout_prefix
17 #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
18
19 namespace librados {
20
21 std::ostream& operator<<(std::ostream& out,
22 const TestWatchNotify::WatcherID &watcher_id) {
23 out << "(" << watcher_id.first << "," << watcher_id.second << ")";
24 return out;
25 }
26
27 struct TestWatchNotify::ObjectHandler : public TestCluster::ObjectHandler {
28 TestWatchNotify* test_watch_notify;
29 int64_t pool_id;
30 std::string nspace;
31 std::string oid;
32
33 ObjectHandler(TestWatchNotify* test_watch_notify, int64_t pool_id,
34 const std::string& nspace, const std::string& oid)
35 : test_watch_notify(test_watch_notify), pool_id(pool_id),
36 nspace(nspace), oid(oid) {
37 }
38
39 void handle_removed(TestRadosClient* test_rados_client) override {
40 // copy member variables since this object might be deleted
41 auto _test_watch_notify = test_watch_notify;
42 auto _pool_id = pool_id;
43 auto _nspace = nspace;
44 auto _oid = oid;
45 auto ctx = new LambdaContext([_test_watch_notify, _pool_id, _nspace, _oid](int r) {
46 _test_watch_notify->handle_object_removed(_pool_id, _nspace, _oid);
47 });
48 test_rados_client->get_aio_finisher()->queue(ctx);
49 }
50 };
51
52 TestWatchNotify::TestWatchNotify(TestCluster* test_cluster)
53 : m_test_cluster(test_cluster) {
54 }
55
56 void TestWatchNotify::flush(TestRadosClient *rados_client) {
57 CephContext *cct = rados_client->cct();
58
59 ldout(cct, 20) << "enter" << dendl;
60 // block until we know no additional async notify callbacks will occur
61 C_SaferCond ctx;
62 m_async_op_tracker.wait_for_ops(&ctx);
63 ctx.wait();
64 }
65
66 int TestWatchNotify::list_watchers(int64_t pool_id, const std::string& nspace,
67 const std::string& o,
68 std::list<obj_watch_t> *out_watchers) {
69 std::lock_guard lock{m_lock};
70 SharedWatcher watcher = get_watcher(pool_id, nspace, o);
71 if (!watcher) {
72 return -ENOENT;
73 }
74
75 out_watchers->clear();
76 for (TestWatchNotify::WatchHandles::iterator it =
77 watcher->watch_handles.begin();
78 it != watcher->watch_handles.end(); ++it) {
79 obj_watch_t obj;
80 strncpy(obj.addr, it->second.addr.c_str(), sizeof(obj.addr) - 1);
81 obj.addr[sizeof(obj.addr) - 1] = '\0';
82 obj.watcher_id = static_cast<int64_t>(it->second.gid);
83 obj.cookie = it->second.handle;
84 obj.timeout_seconds = 30;
85 out_watchers->push_back(obj);
86 }
87 return 0;
88 }
89
90 void TestWatchNotify::aio_flush(TestRadosClient *rados_client,
91 Context *on_finish) {
92 rados_client->get_aio_finisher()->queue(on_finish);
93 }
94
95 int TestWatchNotify::watch(TestRadosClient *rados_client, int64_t pool_id,
96 const std::string& nspace, const std::string& o,
97 uint64_t gid, uint64_t *handle,
98 librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) {
99 C_SaferCond cond;
100 aio_watch(rados_client, pool_id, nspace, o, gid, handle, ctx, ctx2, &cond);
101 return cond.wait();
102 }
103
104 void TestWatchNotify::aio_watch(TestRadosClient *rados_client, int64_t pool_id,
105 const std::string& nspace, const std::string& o,
106 uint64_t gid, uint64_t *handle,
107 librados::WatchCtx *watch_ctx,
108 librados::WatchCtx2 *watch_ctx2,
109 Context *on_finish) {
110 auto ctx = new LambdaContext([=, this](int) {
111 execute_watch(rados_client, pool_id, nspace, o, gid, handle, watch_ctx,
112 watch_ctx2, on_finish);
113 });
114 rados_client->get_aio_finisher()->queue(ctx);
115 }
116
117 int TestWatchNotify::unwatch(TestRadosClient *rados_client,
118 uint64_t handle) {
119 C_SaferCond ctx;
120 aio_unwatch(rados_client, handle, &ctx);
121 return ctx.wait();
122 }
123
124 void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
125 uint64_t handle, Context *on_finish) {
126 auto ctx = new LambdaContext([this, rados_client, handle, on_finish](int) {
127 execute_unwatch(rados_client, handle, on_finish);
128 });
129 rados_client->get_aio_finisher()->queue(ctx);
130 }
131
132 void TestWatchNotify::aio_notify(TestRadosClient *rados_client, int64_t pool_id,
133 const std::string& nspace,
134 const std::string& oid, const bufferlist& bl,
135 uint64_t timeout_ms, bufferlist *pbl,
136 Context *on_notify) {
137 auto ctx = new LambdaContext([=, this](int) {
138 execute_notify(rados_client, pool_id, nspace, oid, bl, pbl, on_notify);
139 });
140 rados_client->get_aio_finisher()->queue(ctx);
141 }
142
143 int TestWatchNotify::notify(TestRadosClient *rados_client, int64_t pool_id,
144 const std::string& nspace, const std::string& oid,
145 bufferlist& bl, uint64_t timeout_ms,
146 bufferlist *pbl) {
147 C_SaferCond cond;
148 aio_notify(rados_client, pool_id, nspace, oid, bl, timeout_ms, pbl, &cond);
149 return cond.wait();
150 }
151
152 void TestWatchNotify::notify_ack(TestRadosClient *rados_client, int64_t pool_id,
153 const std::string& nspace,
154 const std::string& o, uint64_t notify_id,
155 uint64_t handle, uint64_t gid,
156 bufferlist& bl) {
157 CephContext *cct = rados_client->cct();
158 ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
159 << ", gid=" << gid << dendl;
160 std::lock_guard lock{m_lock};
161 WatcherID watcher_id = std::make_pair(gid, handle);
162 ack_notify(rados_client, pool_id, nspace, o, notify_id, watcher_id, bl);
163 finish_notify(rados_client, pool_id, nspace, o, notify_id);
164 }
165
166 void TestWatchNotify::execute_watch(TestRadosClient *rados_client,
167 int64_t pool_id, const std::string& nspace,
168 const std::string& o, uint64_t gid,
169 uint64_t *handle, librados::WatchCtx *ctx,
170 librados::WatchCtx2 *ctx2,
171 Context* on_finish) {
172 CephContext *cct = rados_client->cct();
173
174 m_lock.lock();
175 SharedWatcher watcher = get_watcher(pool_id, nspace, o);
176 if (!watcher) {
177 m_lock.unlock();
178 on_finish->complete(-ENOENT);
179 return;
180 }
181
182 WatchHandle watch_handle;
183 watch_handle.rados_client = rados_client;
184 watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce());
185 watch_handle.nonce = rados_client->get_nonce();
186 watch_handle.gid = gid;
187 watch_handle.handle = ++m_handle;
188 watch_handle.watch_ctx = ctx;
189 watch_handle.watch_ctx2 = ctx2;
190 watcher->watch_handles[watch_handle.handle] = watch_handle;
191
192 *handle = watch_handle.handle;
193
194 ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
195 << dendl;
196 m_lock.unlock();
197
198 on_finish->complete(0);
199 }
200
201 void TestWatchNotify::execute_unwatch(TestRadosClient *rados_client,
202 uint64_t handle, Context* on_finish) {
203 CephContext *cct = rados_client->cct();
204
205 ldout(cct, 20) << "handle=" << handle << dendl;
206 {
207 std::lock_guard locker{m_lock};
208 for (FileWatchers::iterator it = m_file_watchers.begin();
209 it != m_file_watchers.end(); ++it) {
210 SharedWatcher watcher = it->second;
211
212 WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
213 if (w_it != watcher->watch_handles.end()) {
214 watcher->watch_handles.erase(w_it);
215 maybe_remove_watcher(watcher);
216 break;
217 }
218 }
219 }
220 on_finish->complete(0);
221 }
222
223 TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
224 int64_t pool_id, const std::string& nspace, const std::string& oid) {
225 ceph_assert(ceph_mutex_is_locked(m_lock));
226
227 auto it = m_file_watchers.find({pool_id, nspace, oid});
228 if (it == m_file_watchers.end()) {
229 SharedWatcher watcher(new Watcher(pool_id, nspace, oid));
230 watcher->object_handler.reset(new ObjectHandler(
231 this, pool_id, nspace, oid));
232 int r = m_test_cluster->register_object_handler(
233 pool_id, {nspace, oid}, watcher->object_handler.get());
234 if (r < 0) {
235 // object doesn't exist
236 return SharedWatcher();
237 }
238 m_file_watchers[{pool_id, nspace, oid}] = watcher;
239 return watcher;
240 }
241
242 return it->second;
243 }
244
245 void TestWatchNotify::maybe_remove_watcher(SharedWatcher watcher) {
246 ceph_assert(ceph_mutex_is_locked(m_lock));
247
248 // TODO
249 if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
250 auto pool_id = watcher->pool_id;
251 auto& nspace = watcher->nspace;
252 auto& oid = watcher->oid;
253 if (watcher->object_handler) {
254 m_test_cluster->unregister_object_handler(pool_id, {nspace, oid},
255 watcher->object_handler.get());
256 watcher->object_handler.reset();
257 }
258
259 m_file_watchers.erase({pool_id, nspace, oid});
260 }
261 }
262
263 void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
264 int64_t pool_id, const std::string& nspace,
265 const std::string &oid,
266 const bufferlist &bl, bufferlist *pbl,
267 Context *on_notify) {
268 CephContext *cct = rados_client->cct();
269
270 m_lock.lock();
271 uint64_t notify_id = ++m_notify_id;
272
273 SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
274 if (!watcher) {
275 ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
276 m_lock.unlock();
277 on_notify->complete(-ENOENT);
278 return;
279 }
280
281 ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
282
283 SharedNotifyHandle notify_handle(new NotifyHandle());
284 notify_handle->rados_client = rados_client;
285 notify_handle->pbl = pbl;
286 notify_handle->on_notify = on_notify;
287
288 WatchHandles &watch_handles = watcher->watch_handles;
289 for (auto &watch_handle_pair : watch_handles) {
290 WatchHandle &watch_handle = watch_handle_pair.second;
291 notify_handle->pending_watcher_ids.insert(std::make_pair(
292 watch_handle.gid, watch_handle.handle));
293
294 m_async_op_tracker.start_op();
295 uint64_t notifier_id = rados_client->get_instance_id();
296 watch_handle.rados_client->get_aio_finisher()->queue(new LambdaContext(
297 [this, pool_id, nspace, oid, bl, notify_id, watch_handle, notifier_id](int r) {
298 bufferlist notify_bl;
299 notify_bl.append(bl);
300
301 if (watch_handle.watch_ctx2 != NULL) {
302 watch_handle.watch_ctx2->handle_notify(notify_id,
303 watch_handle.handle,
304 notifier_id, notify_bl);
305 } else if (watch_handle.watch_ctx != NULL) {
306 watch_handle.watch_ctx->notify(0, 0, notify_bl);
307
308 // auto ack old-style watch/notify clients
309 ack_notify(watch_handle.rados_client, pool_id, nspace, oid, notify_id,
310 {watch_handle.gid, watch_handle.handle}, bufferlist());
311 }
312
313 m_async_op_tracker.finish_op();
314 }));
315 }
316 watcher->notify_handles[notify_id] = notify_handle;
317
318 finish_notify(rados_client, pool_id, nspace, oid, notify_id);
319 m_lock.unlock();
320 }
321
322 void TestWatchNotify::ack_notify(TestRadosClient *rados_client, int64_t pool_id,
323 const std::string& nspace,
324 const std::string &oid, uint64_t notify_id,
325 const WatcherID &watcher_id,
326 const bufferlist &bl) {
327 CephContext *cct = rados_client->cct();
328
329 ceph_assert(ceph_mutex_is_locked(m_lock));
330 SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
331 if (!watcher) {
332 ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
333 return;
334 }
335
336 NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
337 if (it == watcher->notify_handles.end()) {
338 ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
339 << ", WatcherID=" << watcher_id << ": not found" << dendl;
340 return;
341 }
342
343 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
344 << ", WatcherID=" << watcher_id << dendl;
345
346 bufferlist response;
347 response.append(bl);
348
349 SharedNotifyHandle notify_handle = it->second;
350 notify_handle->notify_responses[watcher_id] = response;
351 notify_handle->pending_watcher_ids.erase(watcher_id);
352 }
353
354 void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
355 int64_t pool_id, const std::string& nspace,
356 const std::string &oid,
357 uint64_t notify_id) {
358 CephContext *cct = rados_client->cct();
359
360 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
361
362 ceph_assert(ceph_mutex_is_locked(m_lock));
363 SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
364 if (!watcher) {
365 ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
366 return;
367 }
368
369 NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
370 if (it == watcher->notify_handles.end()) {
371 ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
372 << ": not found" << dendl;
373 return;
374 }
375
376 SharedNotifyHandle notify_handle = it->second;
377 if (!notify_handle->pending_watcher_ids.empty()) {
378 ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
379 << ": pending watchers, returning" << dendl;
380 return;
381 }
382
383 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
384 << ": completing" << dendl;
385
386 if (notify_handle->pbl != NULL) {
387 encode(notify_handle->notify_responses, *notify_handle->pbl);
388 encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
389 }
390
391 notify_handle->rados_client->get_aio_finisher()->queue(
392 notify_handle->on_notify, 0);
393 watcher->notify_handles.erase(notify_id);
394 maybe_remove_watcher(watcher);
395 }
396
397 void TestWatchNotify::blocklist(uint32_t nonce) {
398 std::lock_guard locker{m_lock};
399
400 for (auto file_it = m_file_watchers.begin();
401 file_it != m_file_watchers.end(); ) {
402 auto &watcher = file_it->second;
403 for (auto w_it = watcher->watch_handles.begin();
404 w_it != watcher->watch_handles.end();) {
405 auto& watch_handle = w_it->second;
406 if (watch_handle.nonce == nonce) {
407 auto handle = watch_handle.handle;
408 auto watch_ctx2 = watch_handle.watch_ctx2;
409 if (watch_ctx2 != nullptr) {
410 auto ctx = new LambdaContext([handle, watch_ctx2](int) {
411 watch_ctx2->handle_error(handle, -ENOTCONN);
412 });
413 watch_handle.rados_client->get_aio_finisher()->queue(ctx);
414 }
415 w_it = watcher->watch_handles.erase(w_it);
416 } else {
417 ++w_it;
418 }
419 }
420
421 ++file_it;
422 maybe_remove_watcher(watcher);
423 }
424 }
425
426 void TestWatchNotify::handle_object_removed(int64_t pool_id,
427 const std::string& nspace,
428 const std::string& oid) {
429 std::lock_guard locker{m_lock};
430 auto it = m_file_watchers.find({pool_id, nspace, oid});
431 if (it == m_file_watchers.end()) {
432 return;
433 }
434
435 auto watcher = it->second;
436
437 // cancel all in-flight notifications
438 for (auto& notify_handle_pair : watcher->notify_handles) {
439 auto notify_handle = notify_handle_pair.second;
440 notify_handle->rados_client->get_aio_finisher()->queue(
441 notify_handle->on_notify, -ENOENT);
442 }
443
444 // alert all watchers of the loss of connection
445 for (auto& watch_handle_pair : watcher->watch_handles) {
446 auto& watch_handle = watch_handle_pair.second;
447 auto handle = watch_handle.handle;
448 auto watch_ctx2 = watch_handle.watch_ctx2;
449 if (watch_ctx2 != nullptr) {
450 auto ctx = new LambdaContext([handle, watch_ctx2](int) {
451 watch_ctx2->handle_error(handle, -ENOTCONN);
452 });
453 watch_handle.rados_client->get_aio_finisher()->queue(ctx);
454 }
455 }
456 m_file_watchers.erase(it);
457 }
458
459 } // namespace librados