]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/librados_test_stub/TestWatchNotify.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / librados_test_stub / TestWatchNotify.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "test/librados_test_stub/TestWatchNotify.h"
5#include "include/Context.h"
11fdf7f2 6#include "common/Cond.h"
7c673cae
FG
7#include "include/stringify.h"
8#include "common/Finisher.h"
11fdf7f2 9#include "test/librados_test_stub/TestCluster.h"
7c673cae
FG
10#include "test/librados_test_stub/TestRadosClient.h"
11#include <boost/bind.hpp>
12#include <boost/function.hpp>
11fdf7f2 13#include "include/ceph_assert.h"
7c673cae
FG
14
15#define dout_subsys ceph_subsys_rados
16#undef dout_prefix
17#define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
18
19namespace librados {
20
21std::ostream& operator<<(std::ostream& out,
22 const TestWatchNotify::WatcherID &watcher_id) {
23 out << "(" << watcher_id.first << "," << watcher_id.second << ")";
24 return out;
25}
26
11fdf7f2
TL
27struct TestWatchNotify::ObjectHandler : public TestCluster::ObjectHandler {
28 TestWatchNotify* test_watch_notify;
29 int64_t pool_id;
30 std::string nspace;
31 std::string oid;
32
33 ObjectHandler(TestWatchNotify* test_watch_notify, int64_t pool_id,
34 const std::string& nspace, const std::string& oid)
35 : test_watch_notify(test_watch_notify), pool_id(pool_id),
36 nspace(nspace), oid(oid) {
37 }
38
39 void handle_removed(TestRadosClient* test_rados_client) override {
40 // copy member variables since this object might be deleted
41 auto _test_watch_notify = test_watch_notify;
42 auto _pool_id = pool_id;
43 auto _nspace = nspace;
44 auto _oid = oid;
9f95a23c 45 auto ctx = new LambdaContext([_test_watch_notify, _pool_id, _nspace, _oid](int r) {
11fdf7f2
TL
46 _test_watch_notify->handle_object_removed(_pool_id, _nspace, _oid);
47 });
48 test_rados_client->get_aio_finisher()->queue(ctx);
49 }
50};
51
52TestWatchNotify::TestWatchNotify(TestCluster* test_cluster)
9f95a23c 53 : m_test_cluster(test_cluster) {
7c673cae
FG
54}
55
56void TestWatchNotify::flush(TestRadosClient *rados_client) {
57 CephContext *cct = rados_client->cct();
58
59 ldout(cct, 20) << "enter" << dendl;
60 // block until we know no additional async notify callbacks will occur
11fdf7f2
TL
61 C_SaferCond ctx;
62 m_async_op_tracker.wait_for_ops(&ctx);
63 ctx.wait();
7c673cae
FG
64}
65
11fdf7f2
TL
66int TestWatchNotify::list_watchers(int64_t pool_id, const std::string& nspace,
67 const std::string& o,
7c673cae 68 std::list<obj_watch_t> *out_watchers) {
9f95a23c 69 std::lock_guard lock{m_lock};
11fdf7f2
TL
70 SharedWatcher watcher = get_watcher(pool_id, nspace, o);
71 if (!watcher) {
72 return -ENOENT;
73 }
7c673cae
FG
74
75 out_watchers->clear();
76 for (TestWatchNotify::WatchHandles::iterator it =
77 watcher->watch_handles.begin();
78 it != watcher->watch_handles.end(); ++it) {
79 obj_watch_t obj;
9f95a23c
TL
80 strncpy(obj.addr, it->second.addr.c_str(), sizeof(obj.addr) - 1);
81 obj.addr[sizeof(obj.addr) - 1] = '\0';
7c673cae
FG
82 obj.watcher_id = static_cast<int64_t>(it->second.gid);
83 obj.cookie = it->second.handle;
84 obj.timeout_seconds = 30;
85 out_watchers->push_back(obj);
86 }
87 return 0;
88}
89
90void TestWatchNotify::aio_flush(TestRadosClient *rados_client,
91 Context *on_finish) {
92 rados_client->get_aio_finisher()->queue(on_finish);
93}
94
11fdf7f2
TL
95int TestWatchNotify::watch(TestRadosClient *rados_client, int64_t pool_id,
96 const std::string& nspace, const std::string& o,
97 uint64_t gid, uint64_t *handle,
98 librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) {
99 C_SaferCond cond;
100 aio_watch(rados_client, pool_id, nspace, o, gid, handle, ctx, ctx2, &cond);
101 return cond.wait();
102}
103
104void TestWatchNotify::aio_watch(TestRadosClient *rados_client, int64_t pool_id,
105 const std::string& nspace, const std::string& o,
106 uint64_t gid, uint64_t *handle,
107 librados::WatchCtx *watch_ctx,
108 librados::WatchCtx2 *watch_ctx2,
7c673cae 109 Context *on_finish) {
9f95a23c 110 auto ctx = new LambdaContext([=](int) {
11fdf7f2
TL
111 execute_watch(rados_client, pool_id, nspace, o, gid, handle, watch_ctx,
112 watch_ctx2, on_finish);
113 });
114 rados_client->get_aio_finisher()->queue(ctx);
115}
116
117int TestWatchNotify::unwatch(TestRadosClient *rados_client,
118 uint64_t handle) {
119 C_SaferCond ctx;
120 aio_unwatch(rados_client, handle, &ctx);
121 return ctx.wait();
7c673cae
FG
122}
123
124void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
125 uint64_t handle, Context *on_finish) {
9f95a23c 126 auto ctx = new LambdaContext([this, rados_client, handle, on_finish](int) {
11fdf7f2
TL
127 execute_unwatch(rados_client, handle, on_finish);
128 });
129 rados_client->get_aio_finisher()->queue(ctx);
7c673cae
FG
130}
131
11fdf7f2
TL
132void TestWatchNotify::aio_notify(TestRadosClient *rados_client, int64_t pool_id,
133 const std::string& nspace,
134 const std::string& oid, const bufferlist& bl,
7c673cae
FG
135 uint64_t timeout_ms, bufferlist *pbl,
136 Context *on_notify) {
9f95a23c 137 auto ctx = new LambdaContext([=](int) {
11fdf7f2
TL
138 execute_notify(rados_client, pool_id, nspace, oid, bl, pbl, on_notify);
139 });
7c673cae
FG
140 rados_client->get_aio_finisher()->queue(ctx);
141}
142
11fdf7f2
TL
143int TestWatchNotify::notify(TestRadosClient *rados_client, int64_t pool_id,
144 const std::string& nspace, const std::string& oid,
145 bufferlist& bl, uint64_t timeout_ms,
146 bufferlist *pbl) {
7c673cae 147 C_SaferCond cond;
11fdf7f2 148 aio_notify(rados_client, pool_id, nspace, oid, bl, timeout_ms, pbl, &cond);
7c673cae
FG
149 return cond.wait();
150}
151
11fdf7f2
TL
152void TestWatchNotify::notify_ack(TestRadosClient *rados_client, int64_t pool_id,
153 const std::string& nspace,
7c673cae
FG
154 const std::string& o, uint64_t notify_id,
155 uint64_t handle, uint64_t gid,
156 bufferlist& bl) {
157 CephContext *cct = rados_client->cct();
158 ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
159 << ", gid=" << gid << dendl;
9f95a23c 160 std::lock_guard lock{m_lock};
7c673cae 161 WatcherID watcher_id = std::make_pair(gid, handle);
11fdf7f2
TL
162 ack_notify(rados_client, pool_id, nspace, o, notify_id, watcher_id, bl);
163 finish_notify(rados_client, pool_id, nspace, o, notify_id);
7c673cae
FG
164}
165
11fdf7f2
TL
166void TestWatchNotify::execute_watch(TestRadosClient *rados_client,
167 int64_t pool_id, const std::string& nspace,
168 const std::string& o, uint64_t gid,
169 uint64_t *handle, librados::WatchCtx *ctx,
170 librados::WatchCtx2 *ctx2,
171 Context* on_finish) {
7c673cae
FG
172 CephContext *cct = rados_client->cct();
173
9f95a23c 174 m_lock.lock();
11fdf7f2
TL
175 SharedWatcher watcher = get_watcher(pool_id, nspace, o);
176 if (!watcher) {
9f95a23c 177 m_lock.unlock();
11fdf7f2
TL
178 on_finish->complete(-ENOENT);
179 return;
180 }
7c673cae
FG
181
182 WatchHandle watch_handle;
183 watch_handle.rados_client = rados_client;
184 watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce());
185 watch_handle.nonce = rados_client->get_nonce();
186 watch_handle.gid = gid;
187 watch_handle.handle = ++m_handle;
188 watch_handle.watch_ctx = ctx;
189 watch_handle.watch_ctx2 = ctx2;
190 watcher->watch_handles[watch_handle.handle] = watch_handle;
191
192 *handle = watch_handle.handle;
193
194 ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
195 << dendl;
9f95a23c 196 m_lock.unlock();
11fdf7f2
TL
197
198 on_finish->complete(0);
7c673cae
FG
199}
200
11fdf7f2
TL
201void TestWatchNotify::execute_unwatch(TestRadosClient *rados_client,
202 uint64_t handle, Context* on_finish) {
7c673cae
FG
203 CephContext *cct = rados_client->cct();
204
205 ldout(cct, 20) << "handle=" << handle << dendl;
11fdf7f2 206 {
9f95a23c 207 std::lock_guard locker{m_lock};
11fdf7f2
TL
208 for (FileWatchers::iterator it = m_file_watchers.begin();
209 it != m_file_watchers.end(); ++it) {
210 SharedWatcher watcher = it->second;
211
212 WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
213 if (w_it != watcher->watch_handles.end()) {
214 watcher->watch_handles.erase(w_it);
215 maybe_remove_watcher(watcher);
216 break;
7c673cae 217 }
7c673cae
FG
218 }
219 }
11fdf7f2 220 on_finish->complete(0);
7c673cae
FG
221}
222
223TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
11fdf7f2 224 int64_t pool_id, const std::string& nspace, const std::string& oid) {
9f95a23c 225 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
226
227 auto it = m_file_watchers.find({pool_id, nspace, oid});
228 if (it == m_file_watchers.end()) {
229 SharedWatcher watcher(new Watcher(pool_id, nspace, oid));
230 watcher->object_handler.reset(new ObjectHandler(
231 this, pool_id, nspace, oid));
232 int r = m_test_cluster->register_object_handler(
233 pool_id, {nspace, oid}, watcher->object_handler.get());
234 if (r < 0) {
235 // object doesn't exist
236 return SharedWatcher();
237 }
238 m_file_watchers[{pool_id, nspace, oid}] = watcher;
239 return watcher;
240 }
241
242 return it->second;
243}
244
245void TestWatchNotify::maybe_remove_watcher(SharedWatcher watcher) {
9f95a23c 246 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
247
248 // TODO
249 if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
250 auto pool_id = watcher->pool_id;
251 auto& nspace = watcher->nspace;
252 auto& oid = watcher->oid;
253 if (watcher->object_handler) {
254 m_test_cluster->unregister_object_handler(pool_id, {nspace, oid},
255 watcher->object_handler.get());
256 watcher->object_handler.reset();
257 }
258
259 m_file_watchers.erase({pool_id, nspace, oid});
7c673cae 260 }
7c673cae
FG
261}
262
263void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
11fdf7f2 264 int64_t pool_id, const std::string& nspace,
7c673cae 265 const std::string &oid,
11fdf7f2
TL
266 const bufferlist &bl, bufferlist *pbl,
267 Context *on_notify) {
7c673cae
FG
268 CephContext *cct = rados_client->cct();
269
9f95a23c 270 m_lock.lock();
11fdf7f2 271 uint64_t notify_id = ++m_notify_id;
7c673cae 272
11fdf7f2
TL
273 SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
274 if (!watcher) {
275 ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
9f95a23c 276 m_lock.unlock();
11fdf7f2 277 on_notify->complete(-ENOENT);
7c673cae
FG
278 return;
279 }
280
11fdf7f2 281 ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
7c673cae 282
11fdf7f2
TL
283 SharedNotifyHandle notify_handle(new NotifyHandle());
284 notify_handle->rados_client = rados_client;
285 notify_handle->pbl = pbl;
286 notify_handle->on_notify = on_notify;
7c673cae 287
11fdf7f2
TL
288 WatchHandles &watch_handles = watcher->watch_handles;
289 for (auto &watch_handle_pair : watch_handles) {
290 WatchHandle &watch_handle = watch_handle_pair.second;
291 notify_handle->pending_watcher_ids.insert(std::make_pair(
292 watch_handle.gid, watch_handle.handle));
293
294 m_async_op_tracker.start_op();
295 uint64_t notifier_id = rados_client->get_instance_id();
9f95a23c 296 watch_handle.rados_client->get_aio_finisher()->queue(new LambdaContext(
11fdf7f2
TL
297 [this, pool_id, nspace, oid, bl, notify_id, watch_handle, notifier_id](int r) {
298 bufferlist notify_bl;
299 notify_bl.append(bl);
300
301 if (watch_handle.watch_ctx2 != NULL) {
302 watch_handle.watch_ctx2->handle_notify(notify_id,
303 watch_handle.handle,
304 notifier_id, notify_bl);
305 } else if (watch_handle.watch_ctx != NULL) {
306 watch_handle.watch_ctx->notify(0, 0, notify_bl);
307
308 // auto ack old-style watch/notify clients
309 ack_notify(watch_handle.rados_client, pool_id, nspace, oid, notify_id,
310 {watch_handle.gid, watch_handle.handle}, bufferlist());
311 }
312
313 m_async_op_tracker.finish_op();
314 }));
7c673cae 315 }
11fdf7f2
TL
316 watcher->notify_handles[notify_id] = notify_handle;
317
318 finish_notify(rados_client, pool_id, nspace, oid, notify_id);
9f95a23c 319 m_lock.unlock();
7c673cae
FG
320}
321
11fdf7f2
TL
322void TestWatchNotify::ack_notify(TestRadosClient *rados_client, int64_t pool_id,
323 const std::string& nspace,
324 const std::string &oid, uint64_t notify_id,
7c673cae
FG
325 const WatcherID &watcher_id,
326 const bufferlist &bl) {
327 CephContext *cct = rados_client->cct();
328
9f95a23c 329 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
330 SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
331 if (!watcher) {
332 ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
333 return;
334 }
7c673cae
FG
335
336 NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
337 if (it == watcher->notify_handles.end()) {
338 ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
339 << ", WatcherID=" << watcher_id << ": not found" << dendl;
340 return;
341 }
342
11fdf7f2
TL
343 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
344 << ", WatcherID=" << watcher_id << dendl;
345
7c673cae
FG
346 bufferlist response;
347 response.append(bl);
348
349 SharedNotifyHandle notify_handle = it->second;
350 notify_handle->notify_responses[watcher_id] = response;
351 notify_handle->pending_watcher_ids.erase(watcher_id);
352}
353
354void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
11fdf7f2 355 int64_t pool_id, const std::string& nspace,
7c673cae
FG
356 const std::string &oid,
357 uint64_t notify_id) {
358 CephContext *cct = rados_client->cct();
359
360 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
361
9f95a23c 362 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
363 SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
364 if (!watcher) {
365 ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
366 return;
367 }
7c673cae
FG
368
369 NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
370 if (it == watcher->notify_handles.end()) {
371 ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
372 << ": not found" << dendl;
373 return;
374 }
375
376 SharedNotifyHandle notify_handle = it->second;
377 if (!notify_handle->pending_watcher_ids.empty()) {
378 ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
379 << ": pending watchers, returning" << dendl;
380 return;
381 }
382
383 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
384 << ": completing" << dendl;
385
386 if (notify_handle->pbl != NULL) {
11fdf7f2
TL
387 encode(notify_handle->notify_responses, *notify_handle->pbl);
388 encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
7c673cae
FG
389 }
390
391 notify_handle->rados_client->get_aio_finisher()->queue(
392 notify_handle->on_notify, 0);
393 watcher->notify_handles.erase(notify_id);
11fdf7f2 394 maybe_remove_watcher(watcher);
7c673cae
FG
395}
396
397void TestWatchNotify::blacklist(uint32_t nonce) {
9f95a23c 398 std::lock_guard locker{m_lock};
7c673cae
FG
399
400 for (auto file_it = m_file_watchers.begin();
401 file_it != m_file_watchers.end(); ) {
402 auto &watcher = file_it->second;
403 for (auto w_it = watcher->watch_handles.begin();
404 w_it != watcher->watch_handles.end();) {
405 if (w_it->second.nonce == nonce) {
406 w_it = watcher->watch_handles.erase(w_it);
407 } else {
408 ++w_it;
409 }
410 }
11fdf7f2
TL
411
412 ++file_it;
413 maybe_remove_watcher(watcher);
414 }
415}
416
417void TestWatchNotify::handle_object_removed(int64_t pool_id,
418 const std::string& nspace,
419 const std::string& oid) {
9f95a23c 420 std::lock_guard locker{m_lock};
11fdf7f2
TL
421 auto it = m_file_watchers.find({pool_id, nspace, oid});
422 if (it == m_file_watchers.end()) {
423 return;
424 }
425
426 auto watcher = it->second;
427
428 // cancel all in-flight notifications
429 for (auto& notify_handle_pair : watcher->notify_handles) {
430 auto notify_handle = notify_handle_pair.second;
431 notify_handle->rados_client->get_aio_finisher()->queue(
432 notify_handle->on_notify, -ENOENT);
433 }
434
435 // alert all watchers of the loss of connection
436 for (auto& watch_handle_pair : watcher->watch_handles) {
437 auto& watch_handle = watch_handle_pair.second;
438 auto handle = watch_handle.handle;
439 auto watch_ctx2 = watch_handle.watch_ctx2;
440 if (watch_ctx2 != nullptr) {
9f95a23c 441 auto ctx = new LambdaContext([handle, watch_ctx2](int) {
11fdf7f2
TL
442 watch_ctx2->handle_error(handle, -ENOTCONN);
443 });
444 watch_handle.rados_client->get_aio_finisher()->queue(ctx);
7c673cae
FG
445 }
446 }
11fdf7f2 447 m_file_watchers.erase(it);
7c673cae
FG
448}
449
450} // namespace librados