]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "test/librados_test_stub/TestWatchNotify.h" | |
5 | #include "include/Context.h" | |
11fdf7f2 | 6 | #include "common/Cond.h" |
7c673cae FG |
7 | #include "include/stringify.h" |
8 | #include "common/Finisher.h" | |
11fdf7f2 | 9 | #include "test/librados_test_stub/TestCluster.h" |
7c673cae FG |
10 | #include "test/librados_test_stub/TestRadosClient.h" |
11 | #include <boost/bind.hpp> | |
12 | #include <boost/function.hpp> | |
11fdf7f2 | 13 | #include "include/ceph_assert.h" |
7c673cae FG |
14 | |
15 | #define dout_subsys ceph_subsys_rados | |
16 | #undef dout_prefix | |
17 | #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": " | |
18 | ||
19 | namespace librados { | |
20 | ||
21 | std::ostream& operator<<(std::ostream& out, | |
22 | const TestWatchNotify::WatcherID &watcher_id) { | |
23 | out << "(" << watcher_id.first << "," << watcher_id.second << ")"; | |
24 | return out; | |
25 | } | |
26 | ||
11fdf7f2 TL |
27 | struct TestWatchNotify::ObjectHandler : public TestCluster::ObjectHandler { |
28 | TestWatchNotify* test_watch_notify; | |
29 | int64_t pool_id; | |
30 | std::string nspace; | |
31 | std::string oid; | |
32 | ||
33 | ObjectHandler(TestWatchNotify* test_watch_notify, int64_t pool_id, | |
34 | const std::string& nspace, const std::string& oid) | |
35 | : test_watch_notify(test_watch_notify), pool_id(pool_id), | |
36 | nspace(nspace), oid(oid) { | |
37 | } | |
38 | ||
39 | void handle_removed(TestRadosClient* test_rados_client) override { | |
40 | // copy member variables since this object might be deleted | |
41 | auto _test_watch_notify = test_watch_notify; | |
42 | auto _pool_id = pool_id; | |
43 | auto _nspace = nspace; | |
44 | auto _oid = oid; | |
9f95a23c | 45 | auto ctx = new LambdaContext([_test_watch_notify, _pool_id, _nspace, _oid](int r) { |
11fdf7f2 TL |
46 | _test_watch_notify->handle_object_removed(_pool_id, _nspace, _oid); |
47 | }); | |
48 | test_rados_client->get_aio_finisher()->queue(ctx); | |
49 | } | |
50 | }; | |
51 | ||
52 | TestWatchNotify::TestWatchNotify(TestCluster* test_cluster) | |
9f95a23c | 53 | : m_test_cluster(test_cluster) { |
7c673cae FG |
54 | } |
55 | ||
56 | void TestWatchNotify::flush(TestRadosClient *rados_client) { | |
57 | CephContext *cct = rados_client->cct(); | |
58 | ||
59 | ldout(cct, 20) << "enter" << dendl; | |
60 | // block until we know no additional async notify callbacks will occur | |
11fdf7f2 TL |
61 | C_SaferCond ctx; |
62 | m_async_op_tracker.wait_for_ops(&ctx); | |
63 | ctx.wait(); | |
7c673cae FG |
64 | } |
65 | ||
11fdf7f2 TL |
66 | int TestWatchNotify::list_watchers(int64_t pool_id, const std::string& nspace, |
67 | const std::string& o, | |
7c673cae | 68 | std::list<obj_watch_t> *out_watchers) { |
9f95a23c | 69 | std::lock_guard lock{m_lock}; |
11fdf7f2 TL |
70 | SharedWatcher watcher = get_watcher(pool_id, nspace, o); |
71 | if (!watcher) { | |
72 | return -ENOENT; | |
73 | } | |
7c673cae FG |
74 | |
75 | out_watchers->clear(); | |
76 | for (TestWatchNotify::WatchHandles::iterator it = | |
77 | watcher->watch_handles.begin(); | |
78 | it != watcher->watch_handles.end(); ++it) { | |
79 | obj_watch_t obj; | |
9f95a23c TL |
80 | strncpy(obj.addr, it->second.addr.c_str(), sizeof(obj.addr) - 1); |
81 | obj.addr[sizeof(obj.addr) - 1] = '\0'; | |
7c673cae FG |
82 | obj.watcher_id = static_cast<int64_t>(it->second.gid); |
83 | obj.cookie = it->second.handle; | |
84 | obj.timeout_seconds = 30; | |
85 | out_watchers->push_back(obj); | |
86 | } | |
87 | return 0; | |
88 | } | |
89 | ||
90 | void TestWatchNotify::aio_flush(TestRadosClient *rados_client, | |
91 | Context *on_finish) { | |
92 | rados_client->get_aio_finisher()->queue(on_finish); | |
93 | } | |
94 | ||
11fdf7f2 TL |
95 | int TestWatchNotify::watch(TestRadosClient *rados_client, int64_t pool_id, |
96 | const std::string& nspace, const std::string& o, | |
97 | uint64_t gid, uint64_t *handle, | |
98 | librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) { | |
99 | C_SaferCond cond; | |
100 | aio_watch(rados_client, pool_id, nspace, o, gid, handle, ctx, ctx2, &cond); | |
101 | return cond.wait(); | |
102 | } | |
103 | ||
104 | void TestWatchNotify::aio_watch(TestRadosClient *rados_client, int64_t pool_id, | |
105 | const std::string& nspace, const std::string& o, | |
106 | uint64_t gid, uint64_t *handle, | |
107 | librados::WatchCtx *watch_ctx, | |
108 | librados::WatchCtx2 *watch_ctx2, | |
7c673cae | 109 | Context *on_finish) { |
9f95a23c | 110 | auto ctx = new LambdaContext([=](int) { |
11fdf7f2 TL |
111 | execute_watch(rados_client, pool_id, nspace, o, gid, handle, watch_ctx, |
112 | watch_ctx2, on_finish); | |
113 | }); | |
114 | rados_client->get_aio_finisher()->queue(ctx); | |
115 | } | |
116 | ||
117 | int TestWatchNotify::unwatch(TestRadosClient *rados_client, | |
118 | uint64_t handle) { | |
119 | C_SaferCond ctx; | |
120 | aio_unwatch(rados_client, handle, &ctx); | |
121 | return ctx.wait(); | |
7c673cae FG |
122 | } |
123 | ||
124 | void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client, | |
125 | uint64_t handle, Context *on_finish) { | |
9f95a23c | 126 | auto ctx = new LambdaContext([this, rados_client, handle, on_finish](int) { |
11fdf7f2 TL |
127 | execute_unwatch(rados_client, handle, on_finish); |
128 | }); | |
129 | rados_client->get_aio_finisher()->queue(ctx); | |
7c673cae FG |
130 | } |
131 | ||
11fdf7f2 TL |
132 | void TestWatchNotify::aio_notify(TestRadosClient *rados_client, int64_t pool_id, |
133 | const std::string& nspace, | |
134 | const std::string& oid, const bufferlist& bl, | |
7c673cae FG |
135 | uint64_t timeout_ms, bufferlist *pbl, |
136 | Context *on_notify) { | |
9f95a23c | 137 | auto ctx = new LambdaContext([=](int) { |
11fdf7f2 TL |
138 | execute_notify(rados_client, pool_id, nspace, oid, bl, pbl, on_notify); |
139 | }); | |
7c673cae FG |
140 | rados_client->get_aio_finisher()->queue(ctx); |
141 | } | |
142 | ||
11fdf7f2 TL |
143 | int TestWatchNotify::notify(TestRadosClient *rados_client, int64_t pool_id, |
144 | const std::string& nspace, const std::string& oid, | |
145 | bufferlist& bl, uint64_t timeout_ms, | |
146 | bufferlist *pbl) { | |
7c673cae | 147 | C_SaferCond cond; |
11fdf7f2 | 148 | aio_notify(rados_client, pool_id, nspace, oid, bl, timeout_ms, pbl, &cond); |
7c673cae FG |
149 | return cond.wait(); |
150 | } | |
151 | ||
11fdf7f2 TL |
152 | void TestWatchNotify::notify_ack(TestRadosClient *rados_client, int64_t pool_id, |
153 | const std::string& nspace, | |
7c673cae FG |
154 | const std::string& o, uint64_t notify_id, |
155 | uint64_t handle, uint64_t gid, | |
156 | bufferlist& bl) { | |
157 | CephContext *cct = rados_client->cct(); | |
158 | ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle | |
159 | << ", gid=" << gid << dendl; | |
9f95a23c | 160 | std::lock_guard lock{m_lock}; |
7c673cae | 161 | WatcherID watcher_id = std::make_pair(gid, handle); |
11fdf7f2 TL |
162 | ack_notify(rados_client, pool_id, nspace, o, notify_id, watcher_id, bl); |
163 | finish_notify(rados_client, pool_id, nspace, o, notify_id); | |
7c673cae FG |
164 | } |
165 | ||
11fdf7f2 TL |
166 | void TestWatchNotify::execute_watch(TestRadosClient *rados_client, |
167 | int64_t pool_id, const std::string& nspace, | |
168 | const std::string& o, uint64_t gid, | |
169 | uint64_t *handle, librados::WatchCtx *ctx, | |
170 | librados::WatchCtx2 *ctx2, | |
171 | Context* on_finish) { | |
7c673cae FG |
172 | CephContext *cct = rados_client->cct(); |
173 | ||
9f95a23c | 174 | m_lock.lock(); |
11fdf7f2 TL |
175 | SharedWatcher watcher = get_watcher(pool_id, nspace, o); |
176 | if (!watcher) { | |
9f95a23c | 177 | m_lock.unlock(); |
11fdf7f2 TL |
178 | on_finish->complete(-ENOENT); |
179 | return; | |
180 | } | |
7c673cae FG |
181 | |
182 | WatchHandle watch_handle; | |
183 | watch_handle.rados_client = rados_client; | |
184 | watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce()); | |
185 | watch_handle.nonce = rados_client->get_nonce(); | |
186 | watch_handle.gid = gid; | |
187 | watch_handle.handle = ++m_handle; | |
188 | watch_handle.watch_ctx = ctx; | |
189 | watch_handle.watch_ctx2 = ctx2; | |
190 | watcher->watch_handles[watch_handle.handle] = watch_handle; | |
191 | ||
192 | *handle = watch_handle.handle; | |
193 | ||
194 | ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle | |
195 | << dendl; | |
9f95a23c | 196 | m_lock.unlock(); |
11fdf7f2 TL |
197 | |
198 | on_finish->complete(0); | |
7c673cae FG |
199 | } |
200 | ||
11fdf7f2 TL |
201 | void TestWatchNotify::execute_unwatch(TestRadosClient *rados_client, |
202 | uint64_t handle, Context* on_finish) { | |
7c673cae FG |
203 | CephContext *cct = rados_client->cct(); |
204 | ||
205 | ldout(cct, 20) << "handle=" << handle << dendl; | |
11fdf7f2 | 206 | { |
9f95a23c | 207 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
208 | for (FileWatchers::iterator it = m_file_watchers.begin(); |
209 | it != m_file_watchers.end(); ++it) { | |
210 | SharedWatcher watcher = it->second; | |
211 | ||
212 | WatchHandles::iterator w_it = watcher->watch_handles.find(handle); | |
213 | if (w_it != watcher->watch_handles.end()) { | |
214 | watcher->watch_handles.erase(w_it); | |
215 | maybe_remove_watcher(watcher); | |
216 | break; | |
7c673cae | 217 | } |
7c673cae FG |
218 | } |
219 | } | |
11fdf7f2 | 220 | on_finish->complete(0); |
7c673cae FG |
221 | } |
222 | ||
223 | TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher( | |
11fdf7f2 | 224 | int64_t pool_id, const std::string& nspace, const std::string& oid) { |
9f95a23c | 225 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
11fdf7f2 TL |
226 | |
227 | auto it = m_file_watchers.find({pool_id, nspace, oid}); | |
228 | if (it == m_file_watchers.end()) { | |
229 | SharedWatcher watcher(new Watcher(pool_id, nspace, oid)); | |
230 | watcher->object_handler.reset(new ObjectHandler( | |
231 | this, pool_id, nspace, oid)); | |
232 | int r = m_test_cluster->register_object_handler( | |
233 | pool_id, {nspace, oid}, watcher->object_handler.get()); | |
234 | if (r < 0) { | |
235 | // object doesn't exist | |
236 | return SharedWatcher(); | |
237 | } | |
238 | m_file_watchers[{pool_id, nspace, oid}] = watcher; | |
239 | return watcher; | |
240 | } | |
241 | ||
242 | return it->second; | |
243 | } | |
244 | ||
245 | void TestWatchNotify::maybe_remove_watcher(SharedWatcher watcher) { | |
9f95a23c | 246 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
11fdf7f2 TL |
247 | |
248 | // TODO | |
249 | if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { | |
250 | auto pool_id = watcher->pool_id; | |
251 | auto& nspace = watcher->nspace; | |
252 | auto& oid = watcher->oid; | |
253 | if (watcher->object_handler) { | |
254 | m_test_cluster->unregister_object_handler(pool_id, {nspace, oid}, | |
255 | watcher->object_handler.get()); | |
256 | watcher->object_handler.reset(); | |
257 | } | |
258 | ||
259 | m_file_watchers.erase({pool_id, nspace, oid}); | |
7c673cae | 260 | } |
7c673cae FG |
261 | } |
262 | ||
263 | void TestWatchNotify::execute_notify(TestRadosClient *rados_client, | |
11fdf7f2 | 264 | int64_t pool_id, const std::string& nspace, |
7c673cae | 265 | const std::string &oid, |
11fdf7f2 TL |
266 | const bufferlist &bl, bufferlist *pbl, |
267 | Context *on_notify) { | |
7c673cae FG |
268 | CephContext *cct = rados_client->cct(); |
269 | ||
9f95a23c | 270 | m_lock.lock(); |
11fdf7f2 | 271 | uint64_t notify_id = ++m_notify_id; |
7c673cae | 272 | |
11fdf7f2 TL |
273 | SharedWatcher watcher = get_watcher(pool_id, nspace, oid); |
274 | if (!watcher) { | |
275 | ldout(cct, 1) << "oid=" << oid << ": not found" << dendl; | |
9f95a23c | 276 | m_lock.unlock(); |
11fdf7f2 | 277 | on_notify->complete(-ENOENT); |
7c673cae FG |
278 | return; |
279 | } | |
280 | ||
11fdf7f2 | 281 | ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl; |
7c673cae | 282 | |
11fdf7f2 TL |
283 | SharedNotifyHandle notify_handle(new NotifyHandle()); |
284 | notify_handle->rados_client = rados_client; | |
285 | notify_handle->pbl = pbl; | |
286 | notify_handle->on_notify = on_notify; | |
7c673cae | 287 | |
11fdf7f2 TL |
288 | WatchHandles &watch_handles = watcher->watch_handles; |
289 | for (auto &watch_handle_pair : watch_handles) { | |
290 | WatchHandle &watch_handle = watch_handle_pair.second; | |
291 | notify_handle->pending_watcher_ids.insert(std::make_pair( | |
292 | watch_handle.gid, watch_handle.handle)); | |
293 | ||
294 | m_async_op_tracker.start_op(); | |
295 | uint64_t notifier_id = rados_client->get_instance_id(); | |
9f95a23c | 296 | watch_handle.rados_client->get_aio_finisher()->queue(new LambdaContext( |
11fdf7f2 TL |
297 | [this, pool_id, nspace, oid, bl, notify_id, watch_handle, notifier_id](int r) { |
298 | bufferlist notify_bl; | |
299 | notify_bl.append(bl); | |
300 | ||
301 | if (watch_handle.watch_ctx2 != NULL) { | |
302 | watch_handle.watch_ctx2->handle_notify(notify_id, | |
303 | watch_handle.handle, | |
304 | notifier_id, notify_bl); | |
305 | } else if (watch_handle.watch_ctx != NULL) { | |
306 | watch_handle.watch_ctx->notify(0, 0, notify_bl); | |
307 | ||
308 | // auto ack old-style watch/notify clients | |
309 | ack_notify(watch_handle.rados_client, pool_id, nspace, oid, notify_id, | |
310 | {watch_handle.gid, watch_handle.handle}, bufferlist()); | |
311 | } | |
312 | ||
313 | m_async_op_tracker.finish_op(); | |
314 | })); | |
7c673cae | 315 | } |
11fdf7f2 TL |
316 | watcher->notify_handles[notify_id] = notify_handle; |
317 | ||
318 | finish_notify(rados_client, pool_id, nspace, oid, notify_id); | |
9f95a23c | 319 | m_lock.unlock(); |
7c673cae FG |
320 | } |
321 | ||
11fdf7f2 TL |
322 | void TestWatchNotify::ack_notify(TestRadosClient *rados_client, int64_t pool_id, |
323 | const std::string& nspace, | |
324 | const std::string &oid, uint64_t notify_id, | |
7c673cae FG |
325 | const WatcherID &watcher_id, |
326 | const bufferlist &bl) { | |
327 | CephContext *cct = rados_client->cct(); | |
328 | ||
9f95a23c | 329 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
11fdf7f2 TL |
330 | SharedWatcher watcher = get_watcher(pool_id, nspace, oid); |
331 | if (!watcher) { | |
332 | ldout(cct, 1) << "oid=" << oid << ": not found" << dendl; | |
333 | return; | |
334 | } | |
7c673cae FG |
335 | |
336 | NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); | |
337 | if (it == watcher->notify_handles.end()) { | |
338 | ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id | |
339 | << ", WatcherID=" << watcher_id << ": not found" << dendl; | |
340 | return; | |
341 | } | |
342 | ||
11fdf7f2 TL |
343 | ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id |
344 | << ", WatcherID=" << watcher_id << dendl; | |
345 | ||
7c673cae FG |
346 | bufferlist response; |
347 | response.append(bl); | |
348 | ||
349 | SharedNotifyHandle notify_handle = it->second; | |
350 | notify_handle->notify_responses[watcher_id] = response; | |
351 | notify_handle->pending_watcher_ids.erase(watcher_id); | |
352 | } | |
353 | ||
354 | void TestWatchNotify::finish_notify(TestRadosClient *rados_client, | |
11fdf7f2 | 355 | int64_t pool_id, const std::string& nspace, |
7c673cae FG |
356 | const std::string &oid, |
357 | uint64_t notify_id) { | |
358 | CephContext *cct = rados_client->cct(); | |
359 | ||
360 | ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl; | |
361 | ||
9f95a23c | 362 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
11fdf7f2 TL |
363 | SharedWatcher watcher = get_watcher(pool_id, nspace, oid); |
364 | if (!watcher) { | |
365 | ldout(cct, 1) << "oid=" << oid << ": not found" << dendl; | |
366 | return; | |
367 | } | |
7c673cae FG |
368 | |
369 | NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); | |
370 | if (it == watcher->notify_handles.end()) { | |
371 | ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id | |
372 | << ": not found" << dendl; | |
373 | return; | |
374 | } | |
375 | ||
376 | SharedNotifyHandle notify_handle = it->second; | |
377 | if (!notify_handle->pending_watcher_ids.empty()) { | |
378 | ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id | |
379 | << ": pending watchers, returning" << dendl; | |
380 | return; | |
381 | } | |
382 | ||
383 | ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id | |
384 | << ": completing" << dendl; | |
385 | ||
386 | if (notify_handle->pbl != NULL) { | |
11fdf7f2 TL |
387 | encode(notify_handle->notify_responses, *notify_handle->pbl); |
388 | encode(notify_handle->pending_watcher_ids, *notify_handle->pbl); | |
7c673cae FG |
389 | } |
390 | ||
391 | notify_handle->rados_client->get_aio_finisher()->queue( | |
392 | notify_handle->on_notify, 0); | |
393 | watcher->notify_handles.erase(notify_id); | |
11fdf7f2 | 394 | maybe_remove_watcher(watcher); |
7c673cae FG |
395 | } |
396 | ||
397 | void TestWatchNotify::blacklist(uint32_t nonce) { | |
9f95a23c | 398 | std::lock_guard locker{m_lock}; |
7c673cae FG |
399 | |
400 | for (auto file_it = m_file_watchers.begin(); | |
401 | file_it != m_file_watchers.end(); ) { | |
402 | auto &watcher = file_it->second; | |
403 | for (auto w_it = watcher->watch_handles.begin(); | |
404 | w_it != watcher->watch_handles.end();) { | |
405 | if (w_it->second.nonce == nonce) { | |
406 | w_it = watcher->watch_handles.erase(w_it); | |
407 | } else { | |
408 | ++w_it; | |
409 | } | |
410 | } | |
11fdf7f2 TL |
411 | |
412 | ++file_it; | |
413 | maybe_remove_watcher(watcher); | |
414 | } | |
415 | } | |
416 | ||
417 | void TestWatchNotify::handle_object_removed(int64_t pool_id, | |
418 | const std::string& nspace, | |
419 | const std::string& oid) { | |
9f95a23c | 420 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
421 | auto it = m_file_watchers.find({pool_id, nspace, oid}); |
422 | if (it == m_file_watchers.end()) { | |
423 | return; | |
424 | } | |
425 | ||
426 | auto watcher = it->second; | |
427 | ||
428 | // cancel all in-flight notifications | |
429 | for (auto& notify_handle_pair : watcher->notify_handles) { | |
430 | auto notify_handle = notify_handle_pair.second; | |
431 | notify_handle->rados_client->get_aio_finisher()->queue( | |
432 | notify_handle->on_notify, -ENOENT); | |
433 | } | |
434 | ||
435 | // alert all watchers of the loss of connection | |
436 | for (auto& watch_handle_pair : watcher->watch_handles) { | |
437 | auto& watch_handle = watch_handle_pair.second; | |
438 | auto handle = watch_handle.handle; | |
439 | auto watch_ctx2 = watch_handle.watch_ctx2; | |
440 | if (watch_ctx2 != nullptr) { | |
9f95a23c | 441 | auto ctx = new LambdaContext([handle, watch_ctx2](int) { |
11fdf7f2 TL |
442 | watch_ctx2->handle_error(handle, -ENOTCONN); |
443 | }); | |
444 | watch_handle.rados_client->get_aio_finisher()->queue(ctx); | |
7c673cae FG |
445 | } |
446 | } | |
11fdf7f2 | 447 | m_file_watchers.erase(it); |
7c673cae FG |
448 | } |
449 | ||
450 | } // namespace librados |