1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "test/librados_test_stub/TestWatchNotify.h"
5 #include "include/Context.h"
6 #include "common/Cond.h"
7 #include "include/stringify.h"
8 #include "common/Finisher.h"
9 #include "test/librados_test_stub/TestCluster.h"
10 #include "test/librados_test_stub/TestRadosClient.h"
11 #include <boost/bind.hpp>
12 #include <boost/function.hpp>
13 #include "include/ceph_assert.h"
15 #define dout_subsys ceph_subsys_rados
17 #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
21 std::ostream
& operator<<(std::ostream
& out
,
22 const TestWatchNotify::WatcherID
&watcher_id
) {
23 out
<< "(" << watcher_id
.first
<< "," << watcher_id
.second
<< ")";
27 struct TestWatchNotify::ObjectHandler
: public TestCluster::ObjectHandler
{
28 TestWatchNotify
* test_watch_notify
;
33 ObjectHandler(TestWatchNotify
* test_watch_notify
, int64_t pool_id
,
34 const std::string
& nspace
, const std::string
& oid
)
35 : test_watch_notify(test_watch_notify
), pool_id(pool_id
),
36 nspace(nspace
), oid(oid
) {
39 void handle_removed(TestRadosClient
* test_rados_client
) override
{
40 // copy member variables since this object might be deleted
41 auto _test_watch_notify
= test_watch_notify
;
42 auto _pool_id
= pool_id
;
43 auto _nspace
= nspace
;
45 auto ctx
= new LambdaContext([_test_watch_notify
, _pool_id
, _nspace
, _oid
](int r
) {
46 _test_watch_notify
->handle_object_removed(_pool_id
, _nspace
, _oid
);
48 test_rados_client
->get_aio_finisher()->queue(ctx
);
52 TestWatchNotify::TestWatchNotify(TestCluster
* test_cluster
)
53 : m_test_cluster(test_cluster
) {
56 void TestWatchNotify::flush(TestRadosClient
*rados_client
) {
57 CephContext
*cct
= rados_client
->cct();
59 ldout(cct
, 20) << "enter" << dendl
;
60 // block until we know no additional async notify callbacks will occur
62 m_async_op_tracker
.wait_for_ops(&ctx
);
66 int TestWatchNotify::list_watchers(int64_t pool_id
, const std::string
& nspace
,
68 std::list
<obj_watch_t
> *out_watchers
) {
69 std::lock_guard lock
{m_lock
};
70 SharedWatcher watcher
= get_watcher(pool_id
, nspace
, o
);
75 out_watchers
->clear();
76 for (TestWatchNotify::WatchHandles::iterator it
=
77 watcher
->watch_handles
.begin();
78 it
!= watcher
->watch_handles
.end(); ++it
) {
80 strncpy(obj
.addr
, it
->second
.addr
.c_str(), sizeof(obj
.addr
) - 1);
81 obj
.addr
[sizeof(obj
.addr
) - 1] = '\0';
82 obj
.watcher_id
= static_cast<int64_t>(it
->second
.gid
);
83 obj
.cookie
= it
->second
.handle
;
84 obj
.timeout_seconds
= 30;
85 out_watchers
->push_back(obj
);
90 void TestWatchNotify::aio_flush(TestRadosClient
*rados_client
,
92 rados_client
->get_aio_finisher()->queue(on_finish
);
95 int TestWatchNotify::watch(TestRadosClient
*rados_client
, int64_t pool_id
,
96 const std::string
& nspace
, const std::string
& o
,
97 uint64_t gid
, uint64_t *handle
,
98 librados::WatchCtx
*ctx
, librados::WatchCtx2
*ctx2
) {
100 aio_watch(rados_client
, pool_id
, nspace
, o
, gid
, handle
, ctx
, ctx2
, &cond
);
104 void TestWatchNotify::aio_watch(TestRadosClient
*rados_client
, int64_t pool_id
,
105 const std::string
& nspace
, const std::string
& o
,
106 uint64_t gid
, uint64_t *handle
,
107 librados::WatchCtx
*watch_ctx
,
108 librados::WatchCtx2
*watch_ctx2
,
109 Context
*on_finish
) {
110 auto ctx
= new LambdaContext([=](int) {
111 execute_watch(rados_client
, pool_id
, nspace
, o
, gid
, handle
, watch_ctx
,
112 watch_ctx2
, on_finish
);
114 rados_client
->get_aio_finisher()->queue(ctx
);
117 int TestWatchNotify::unwatch(TestRadosClient
*rados_client
,
120 aio_unwatch(rados_client
, handle
, &ctx
);
124 void TestWatchNotify::aio_unwatch(TestRadosClient
*rados_client
,
125 uint64_t handle
, Context
*on_finish
) {
126 auto ctx
= new LambdaContext([this, rados_client
, handle
, on_finish
](int) {
127 execute_unwatch(rados_client
, handle
, on_finish
);
129 rados_client
->get_aio_finisher()->queue(ctx
);
132 void TestWatchNotify::aio_notify(TestRadosClient
*rados_client
, int64_t pool_id
,
133 const std::string
& nspace
,
134 const std::string
& oid
, const bufferlist
& bl
,
135 uint64_t timeout_ms
, bufferlist
*pbl
,
136 Context
*on_notify
) {
137 auto ctx
= new LambdaContext([=](int) {
138 execute_notify(rados_client
, pool_id
, nspace
, oid
, bl
, pbl
, on_notify
);
140 rados_client
->get_aio_finisher()->queue(ctx
);
143 int TestWatchNotify::notify(TestRadosClient
*rados_client
, int64_t pool_id
,
144 const std::string
& nspace
, const std::string
& oid
,
145 bufferlist
& bl
, uint64_t timeout_ms
,
148 aio_notify(rados_client
, pool_id
, nspace
, oid
, bl
, timeout_ms
, pbl
, &cond
);
152 void TestWatchNotify::notify_ack(TestRadosClient
*rados_client
, int64_t pool_id
,
153 const std::string
& nspace
,
154 const std::string
& o
, uint64_t notify_id
,
155 uint64_t handle
, uint64_t gid
,
157 CephContext
*cct
= rados_client
->cct();
158 ldout(cct
, 20) << "notify_id=" << notify_id
<< ", handle=" << handle
159 << ", gid=" << gid
<< dendl
;
160 std::lock_guard lock
{m_lock
};
161 WatcherID watcher_id
= std::make_pair(gid
, handle
);
162 ack_notify(rados_client
, pool_id
, nspace
, o
, notify_id
, watcher_id
, bl
);
163 finish_notify(rados_client
, pool_id
, nspace
, o
, notify_id
);
166 void TestWatchNotify::execute_watch(TestRadosClient
*rados_client
,
167 int64_t pool_id
, const std::string
& nspace
,
168 const std::string
& o
, uint64_t gid
,
169 uint64_t *handle
, librados::WatchCtx
*ctx
,
170 librados::WatchCtx2
*ctx2
,
171 Context
* on_finish
) {
172 CephContext
*cct
= rados_client
->cct();
175 SharedWatcher watcher
= get_watcher(pool_id
, nspace
, o
);
178 on_finish
->complete(-ENOENT
);
182 WatchHandle watch_handle
;
183 watch_handle
.rados_client
= rados_client
;
184 watch_handle
.addr
= "127.0.0.1:0/" + stringify(rados_client
->get_nonce());
185 watch_handle
.nonce
= rados_client
->get_nonce();
186 watch_handle
.gid
= gid
;
187 watch_handle
.handle
= ++m_handle
;
188 watch_handle
.watch_ctx
= ctx
;
189 watch_handle
.watch_ctx2
= ctx2
;
190 watcher
->watch_handles
[watch_handle
.handle
] = watch_handle
;
192 *handle
= watch_handle
.handle
;
194 ldout(cct
, 20) << "oid=" << o
<< ", gid=" << gid
<< ": handle=" << *handle
198 on_finish
->complete(0);
201 void TestWatchNotify::execute_unwatch(TestRadosClient
*rados_client
,
202 uint64_t handle
, Context
* on_finish
) {
203 CephContext
*cct
= rados_client
->cct();
205 ldout(cct
, 20) << "handle=" << handle
<< dendl
;
207 std::lock_guard locker
{m_lock
};
208 for (FileWatchers::iterator it
= m_file_watchers
.begin();
209 it
!= m_file_watchers
.end(); ++it
) {
210 SharedWatcher watcher
= it
->second
;
212 WatchHandles::iterator w_it
= watcher
->watch_handles
.find(handle
);
213 if (w_it
!= watcher
->watch_handles
.end()) {
214 watcher
->watch_handles
.erase(w_it
);
215 maybe_remove_watcher(watcher
);
220 on_finish
->complete(0);
223 TestWatchNotify::SharedWatcher
TestWatchNotify::get_watcher(
224 int64_t pool_id
, const std::string
& nspace
, const std::string
& oid
) {
225 ceph_assert(ceph_mutex_is_locked(m_lock
));
227 auto it
= m_file_watchers
.find({pool_id
, nspace
, oid
});
228 if (it
== m_file_watchers
.end()) {
229 SharedWatcher
watcher(new Watcher(pool_id
, nspace
, oid
));
230 watcher
->object_handler
.reset(new ObjectHandler(
231 this, pool_id
, nspace
, oid
));
232 int r
= m_test_cluster
->register_object_handler(
233 pool_id
, {nspace
, oid
}, watcher
->object_handler
.get());
235 // object doesn't exist
236 return SharedWatcher();
238 m_file_watchers
[{pool_id
, nspace
, oid
}] = watcher
;
245 void TestWatchNotify::maybe_remove_watcher(SharedWatcher watcher
) {
246 ceph_assert(ceph_mutex_is_locked(m_lock
));
249 if (watcher
->watch_handles
.empty() && watcher
->notify_handles
.empty()) {
250 auto pool_id
= watcher
->pool_id
;
251 auto& nspace
= watcher
->nspace
;
252 auto& oid
= watcher
->oid
;
253 if (watcher
->object_handler
) {
254 m_test_cluster
->unregister_object_handler(pool_id
, {nspace
, oid
},
255 watcher
->object_handler
.get());
256 watcher
->object_handler
.reset();
259 m_file_watchers
.erase({pool_id
, nspace
, oid
});
263 void TestWatchNotify::execute_notify(TestRadosClient
*rados_client
,
264 int64_t pool_id
, const std::string
& nspace
,
265 const std::string
&oid
,
266 const bufferlist
&bl
, bufferlist
*pbl
,
267 Context
*on_notify
) {
268 CephContext
*cct
= rados_client
->cct();
271 uint64_t notify_id
= ++m_notify_id
;
273 SharedWatcher watcher
= get_watcher(pool_id
, nspace
, oid
);
275 ldout(cct
, 1) << "oid=" << oid
<< ": not found" << dendl
;
277 on_notify
->complete(-ENOENT
);
281 ldout(cct
, 20) << "oid=" << oid
<< ": notify_id=" << notify_id
<< dendl
;
283 SharedNotifyHandle
notify_handle(new NotifyHandle());
284 notify_handle
->rados_client
= rados_client
;
285 notify_handle
->pbl
= pbl
;
286 notify_handle
->on_notify
= on_notify
;
288 WatchHandles
&watch_handles
= watcher
->watch_handles
;
289 for (auto &watch_handle_pair
: watch_handles
) {
290 WatchHandle
&watch_handle
= watch_handle_pair
.second
;
291 notify_handle
->pending_watcher_ids
.insert(std::make_pair(
292 watch_handle
.gid
, watch_handle
.handle
));
294 m_async_op_tracker
.start_op();
295 uint64_t notifier_id
= rados_client
->get_instance_id();
296 watch_handle
.rados_client
->get_aio_finisher()->queue(new LambdaContext(
297 [this, pool_id
, nspace
, oid
, bl
, notify_id
, watch_handle
, notifier_id
](int r
) {
298 bufferlist notify_bl
;
299 notify_bl
.append(bl
);
301 if (watch_handle
.watch_ctx2
!= NULL
) {
302 watch_handle
.watch_ctx2
->handle_notify(notify_id
,
304 notifier_id
, notify_bl
);
305 } else if (watch_handle
.watch_ctx
!= NULL
) {
306 watch_handle
.watch_ctx
->notify(0, 0, notify_bl
);
308 // auto ack old-style watch/notify clients
309 ack_notify(watch_handle
.rados_client
, pool_id
, nspace
, oid
, notify_id
,
310 {watch_handle
.gid
, watch_handle
.handle
}, bufferlist());
313 m_async_op_tracker
.finish_op();
316 watcher
->notify_handles
[notify_id
] = notify_handle
;
318 finish_notify(rados_client
, pool_id
, nspace
, oid
, notify_id
);
322 void TestWatchNotify::ack_notify(TestRadosClient
*rados_client
, int64_t pool_id
,
323 const std::string
& nspace
,
324 const std::string
&oid
, uint64_t notify_id
,
325 const WatcherID
&watcher_id
,
326 const bufferlist
&bl
) {
327 CephContext
*cct
= rados_client
->cct();
329 ceph_assert(ceph_mutex_is_locked(m_lock
));
330 SharedWatcher watcher
= get_watcher(pool_id
, nspace
, oid
);
332 ldout(cct
, 1) << "oid=" << oid
<< ": not found" << dendl
;
336 NotifyHandles::iterator it
= watcher
->notify_handles
.find(notify_id
);
337 if (it
== watcher
->notify_handles
.end()) {
338 ldout(cct
, 1) << "oid=" << oid
<< ", notify_id=" << notify_id
339 << ", WatcherID=" << watcher_id
<< ": not found" << dendl
;
343 ldout(cct
, 20) << "oid=" << oid
<< ", notify_id=" << notify_id
344 << ", WatcherID=" << watcher_id
<< dendl
;
349 SharedNotifyHandle notify_handle
= it
->second
;
350 notify_handle
->notify_responses
[watcher_id
] = response
;
351 notify_handle
->pending_watcher_ids
.erase(watcher_id
);
354 void TestWatchNotify::finish_notify(TestRadosClient
*rados_client
,
355 int64_t pool_id
, const std::string
& nspace
,
356 const std::string
&oid
,
357 uint64_t notify_id
) {
358 CephContext
*cct
= rados_client
->cct();
360 ldout(cct
, 20) << "oid=" << oid
<< ", notify_id=" << notify_id
<< dendl
;
362 ceph_assert(ceph_mutex_is_locked(m_lock
));
363 SharedWatcher watcher
= get_watcher(pool_id
, nspace
, oid
);
365 ldout(cct
, 1) << "oid=" << oid
<< ": not found" << dendl
;
369 NotifyHandles::iterator it
= watcher
->notify_handles
.find(notify_id
);
370 if (it
== watcher
->notify_handles
.end()) {
371 ldout(cct
, 1) << "oid=" << oid
<< ", notify_id=" << notify_id
372 << ": not found" << dendl
;
376 SharedNotifyHandle notify_handle
= it
->second
;
377 if (!notify_handle
->pending_watcher_ids
.empty()) {
378 ldout(cct
, 10) << "oid=" << oid
<< ", notify_id=" << notify_id
379 << ": pending watchers, returning" << dendl
;
383 ldout(cct
, 20) << "oid=" << oid
<< ", notify_id=" << notify_id
384 << ": completing" << dendl
;
386 if (notify_handle
->pbl
!= NULL
) {
387 encode(notify_handle
->notify_responses
, *notify_handle
->pbl
);
388 encode(notify_handle
->pending_watcher_ids
, *notify_handle
->pbl
);
391 notify_handle
->rados_client
->get_aio_finisher()->queue(
392 notify_handle
->on_notify
, 0);
393 watcher
->notify_handles
.erase(notify_id
);
394 maybe_remove_watcher(watcher
);
397 void TestWatchNotify::blacklist(uint32_t nonce
) {
398 std::lock_guard locker
{m_lock
};
400 for (auto file_it
= m_file_watchers
.begin();
401 file_it
!= m_file_watchers
.end(); ) {
402 auto &watcher
= file_it
->second
;
403 for (auto w_it
= watcher
->watch_handles
.begin();
404 w_it
!= watcher
->watch_handles
.end();) {
405 if (w_it
->second
.nonce
== nonce
) {
406 w_it
= watcher
->watch_handles
.erase(w_it
);
413 maybe_remove_watcher(watcher
);
417 void TestWatchNotify::handle_object_removed(int64_t pool_id
,
418 const std::string
& nspace
,
419 const std::string
& oid
) {
420 std::lock_guard locker
{m_lock
};
421 auto it
= m_file_watchers
.find({pool_id
, nspace
, oid
});
422 if (it
== m_file_watchers
.end()) {
426 auto watcher
= it
->second
;
428 // cancel all in-flight notifications
429 for (auto& notify_handle_pair
: watcher
->notify_handles
) {
430 auto notify_handle
= notify_handle_pair
.second
;
431 notify_handle
->rados_client
->get_aio_finisher()->queue(
432 notify_handle
->on_notify
, -ENOENT
);
435 // alert all watchers of the loss of connection
436 for (auto& watch_handle_pair
: watcher
->watch_handles
) {
437 auto& watch_handle
= watch_handle_pair
.second
;
438 auto handle
= watch_handle
.handle
;
439 auto watch_ctx2
= watch_handle
.watch_ctx2
;
440 if (watch_ctx2
!= nullptr) {
441 auto ctx
= new LambdaContext([handle
, watch_ctx2
](int) {
442 watch_ctx2
->handle_error(handle
, -ENOTCONN
);
444 watch_handle
.rados_client
->get_aio_finisher()->queue(ctx
);
447 m_file_watchers
.erase(it
);
450 } // namespace librados