1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "test/librados_test_stub/TestWatchNotify.h"
5 #include "include/Context.h"
6 #include "include/stringify.h"
7 #include "common/Finisher.h"
8 #include "test/librados_test_stub/TestRadosClient.h"
9 #include <boost/bind.hpp>
10 #include <boost/function.hpp>
11 #include "include/assert.h"
13 #define dout_subsys ceph_subsys_rados
15 #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
19 std::ostream
& operator<<(std::ostream
& out
,
20 const TestWatchNotify::WatcherID
&watcher_id
) {
21 out
<< "(" << watcher_id
.first
<< "," << watcher_id
.second
<< ")";
25 TestWatchNotify::TestWatchNotify()
26 : m_lock("librados::TestWatchNotify::m_lock") {
29 void TestWatchNotify::flush(TestRadosClient
*rados_client
) {
30 CephContext
*cct
= rados_client
->cct();
32 ldout(cct
, 20) << "enter" << dendl
;
33 // block until we know no additional async notify callbacks will occur
34 Mutex::Locker
locker(m_lock
);
35 while (m_pending_notifies
> 0) {
36 m_file_watcher_cond
.Wait(m_lock
);
40 int TestWatchNotify::list_watchers(const std::string
& o
,
41 std::list
<obj_watch_t
> *out_watchers
) {
42 Mutex::Locker
lock(m_lock
);
43 SharedWatcher watcher
= get_watcher(o
);
45 out_watchers
->clear();
46 for (TestWatchNotify::WatchHandles::iterator it
=
47 watcher
->watch_handles
.begin();
48 it
!= watcher
->watch_handles
.end(); ++it
) {
50 strcpy(obj
.addr
, it
->second
.addr
.c_str());
51 obj
.watcher_id
= static_cast<int64_t>(it
->second
.gid
);
52 obj
.cookie
= it
->second
.handle
;
53 obj
.timeout_seconds
= 30;
54 out_watchers
->push_back(obj
);
59 void TestWatchNotify::aio_flush(TestRadosClient
*rados_client
,
61 rados_client
->get_aio_finisher()->queue(on_finish
);
64 void TestWatchNotify::aio_watch(TestRadosClient
*rados_client
,
65 const std::string
& o
, uint64_t gid
,
67 librados::WatchCtx2
*watch_ctx
,
69 int r
= watch(rados_client
, o
, gid
, handle
, nullptr, watch_ctx
);
70 rados_client
->get_aio_finisher()->queue(on_finish
, r
);
73 void TestWatchNotify::aio_unwatch(TestRadosClient
*rados_client
,
74 uint64_t handle
, Context
*on_finish
) {
75 unwatch(rados_client
, handle
);
76 rados_client
->get_aio_finisher()->queue(on_finish
);
79 void TestWatchNotify::aio_notify(TestRadosClient
*rados_client
,
80 const std::string
& oid
, bufferlist
& bl
,
81 uint64_t timeout_ms
, bufferlist
*pbl
,
83 CephContext
*cct
= rados_client
->cct();
85 Mutex::Locker
lock(m_lock
);
87 uint64_t notify_id
= ++m_notify_id
;
89 ldout(cct
, 20) << "oid=" << oid
<< ": notify_id=" << notify_id
<< dendl
;
91 SharedWatcher watcher
= get_watcher(oid
);
93 SharedNotifyHandle
notify_handle(new NotifyHandle());
94 notify_handle
->rados_client
= rados_client
;
95 notify_handle
->pbl
= pbl
;
96 notify_handle
->on_notify
= on_notify
;
97 for (auto &watch_handle_pair
: watcher
->watch_handles
) {
98 WatchHandle
&watch_handle
= watch_handle_pair
.second
;
99 notify_handle
->pending_watcher_ids
.insert(std::make_pair(
100 watch_handle
.gid
, watch_handle
.handle
));
102 watcher
->notify_handles
[notify_id
] = notify_handle
;
104 FunctionContext
*ctx
= new FunctionContext(
105 boost::bind(&TestWatchNotify::execute_notify
, this, rados_client
, oid
, bl
,
107 rados_client
->get_aio_finisher()->queue(ctx
);
110 int TestWatchNotify::notify(TestRadosClient
*rados_client
,
111 const std::string
& oid
, bufferlist
& bl
,
112 uint64_t timeout_ms
, bufferlist
*pbl
) {
114 aio_notify(rados_client
, oid
, bl
, timeout_ms
, pbl
, &cond
);
118 void TestWatchNotify::notify_ack(TestRadosClient
*rados_client
,
119 const std::string
& o
, uint64_t notify_id
,
120 uint64_t handle
, uint64_t gid
,
122 CephContext
*cct
= rados_client
->cct();
123 ldout(cct
, 20) << "notify_id=" << notify_id
<< ", handle=" << handle
124 << ", gid=" << gid
<< dendl
;
125 Mutex::Locker
lock(m_lock
);
126 WatcherID watcher_id
= std::make_pair(gid
, handle
);
127 ack_notify(rados_client
, o
, notify_id
, watcher_id
, bl
);
128 finish_notify(rados_client
, o
, notify_id
);
131 int TestWatchNotify::watch(TestRadosClient
*rados_client
,
132 const std::string
& o
, uint64_t gid
,
133 uint64_t *handle
, librados::WatchCtx
*ctx
,
134 librados::WatchCtx2
*ctx2
) {
135 CephContext
*cct
= rados_client
->cct();
137 Mutex::Locker
lock(m_lock
);
138 SharedWatcher watcher
= get_watcher(o
);
140 WatchHandle watch_handle
;
141 watch_handle
.rados_client
= rados_client
;
142 watch_handle
.addr
= "127.0.0.1:0/" + stringify(rados_client
->get_nonce());
143 watch_handle
.nonce
= rados_client
->get_nonce();
144 watch_handle
.gid
= gid
;
145 watch_handle
.handle
= ++m_handle
;
146 watch_handle
.watch_ctx
= ctx
;
147 watch_handle
.watch_ctx2
= ctx2
;
148 watcher
->watch_handles
[watch_handle
.handle
] = watch_handle
;
150 *handle
= watch_handle
.handle
;
152 ldout(cct
, 20) << "oid=" << o
<< ", gid=" << gid
<< ": handle=" << *handle
157 int TestWatchNotify::unwatch(TestRadosClient
*rados_client
,
159 CephContext
*cct
= rados_client
->cct();
161 ldout(cct
, 20) << "handle=" << handle
<< dendl
;
162 Mutex::Locker
locker(m_lock
);
163 for (FileWatchers::iterator it
= m_file_watchers
.begin();
164 it
!= m_file_watchers
.end(); ++it
) {
165 SharedWatcher watcher
= it
->second
;
167 WatchHandles::iterator w_it
= watcher
->watch_handles
.find(handle
);
168 if (w_it
!= watcher
->watch_handles
.end()) {
169 watcher
->watch_handles
.erase(w_it
);
170 if (watcher
->watch_handles
.empty() && watcher
->notify_handles
.empty()) {
171 m_file_watchers
.erase(it
);
179 TestWatchNotify::SharedWatcher
TestWatchNotify::get_watcher(
180 const std::string
& oid
) {
181 assert(m_lock
.is_locked());
182 SharedWatcher
&watcher
= m_file_watchers
[oid
];
184 watcher
.reset(new Watcher());
189 void TestWatchNotify::execute_notify(TestRadosClient
*rados_client
,
190 const std::string
&oid
,
191 bufferlist
&bl
, uint64_t notify_id
) {
192 CephContext
*cct
= rados_client
->cct();
194 ldout(cct
, 20) << "oid=" << oid
<< ", notify_id=" << notify_id
<< dendl
;
196 Mutex::Locker
lock(m_lock
);
197 SharedWatcher watcher
= get_watcher(oid
);
198 WatchHandles
&watch_handles
= watcher
->watch_handles
;
200 NotifyHandles::iterator n_it
= watcher
->notify_handles
.find(notify_id
);
201 if (n_it
== watcher
->notify_handles
.end()) {
202 ldout(cct
, 1) << "oid=" << oid
<< ", notify_id=" << notify_id
203 << ": not found" << dendl
;
207 SharedNotifyHandle notify_handle
= n_it
->second
;
208 WatcherIDs
watcher_ids(notify_handle
->pending_watcher_ids
);
209 for (WatcherIDs::iterator w_id_it
= watcher_ids
.begin();
210 w_id_it
!= watcher_ids
.end(); ++w_id_it
) {
211 WatcherID watcher_id
= *w_id_it
;
212 WatchHandles::iterator w_it
= watch_handles
.find(watcher_id
.second
);
213 if (w_it
== watch_handles
.end()) {
214 // client disconnected before notification processed
215 notify_handle
->pending_watcher_ids
.erase(watcher_id
);
217 WatchHandle watch_handle
= w_it
->second
;
218 assert(watch_handle
.gid
== watcher_id
.first
);
219 assert(watch_handle
.handle
== watcher_id
.second
);
221 uint64_t notifier_id
= rados_client
->get_instance_id();
222 watch_handle
.rados_client
->get_aio_finisher()->queue(new FunctionContext(
223 [this, oid
, bl
, notify_id
, watch_handle
, notifier_id
](int r
) {
224 bufferlist notify_bl
;
225 notify_bl
.append(bl
);
227 if (watch_handle
.watch_ctx2
!= NULL
) {
228 watch_handle
.watch_ctx2
->handle_notify(notify_id
,
230 notifier_id
, notify_bl
);
231 } else if (watch_handle
.watch_ctx
!= NULL
) {
232 watch_handle
.watch_ctx
->notify(0, 0, notify_bl
);
234 // auto ack old-style watch/notify clients
235 ack_notify(watch_handle
.rados_client
, oid
, notify_id
,
236 {watch_handle
.gid
, watch_handle
.handle
}, bufferlist());
242 finish_notify(rados_client
, oid
, notify_id
);
244 if (--m_pending_notifies
== 0) {
245 m_file_watcher_cond
.Signal();
249 void TestWatchNotify::ack_notify(TestRadosClient
*rados_client
,
250 const std::string
&oid
,
252 const WatcherID
&watcher_id
,
253 const bufferlist
&bl
) {
254 CephContext
*cct
= rados_client
->cct();
256 ldout(cct
, 20) << "oid=" << oid
<< ", notify_id=" << notify_id
257 << ", WatcherID=" << watcher_id
<< dendl
;
259 assert(m_lock
.is_locked());
260 SharedWatcher watcher
= get_watcher(oid
);
262 NotifyHandles::iterator it
= watcher
->notify_handles
.find(notify_id
);
263 if (it
== watcher
->notify_handles
.end()) {
264 ldout(cct
, 1) << "oid=" << oid
<< ", notify_id=" << notify_id
265 << ", WatcherID=" << watcher_id
<< ": not found" << dendl
;
272 SharedNotifyHandle notify_handle
= it
->second
;
273 notify_handle
->notify_responses
[watcher_id
] = response
;
274 notify_handle
->pending_watcher_ids
.erase(watcher_id
);
277 void TestWatchNotify::finish_notify(TestRadosClient
*rados_client
,
278 const std::string
&oid
,
279 uint64_t notify_id
) {
280 CephContext
*cct
= rados_client
->cct();
282 ldout(cct
, 20) << "oid=" << oid
<< ", notify_id=" << notify_id
<< dendl
;
284 assert(m_lock
.is_locked());
285 SharedWatcher watcher
= get_watcher(oid
);
287 NotifyHandles::iterator it
= watcher
->notify_handles
.find(notify_id
);
288 if (it
== watcher
->notify_handles
.end()) {
289 ldout(cct
, 1) << "oid=" << oid
<< ", notify_id=" << notify_id
290 << ": not found" << dendl
;
294 SharedNotifyHandle notify_handle
= it
->second
;
295 if (!notify_handle
->pending_watcher_ids
.empty()) {
296 ldout(cct
, 10) << "oid=" << oid
<< ", notify_id=" << notify_id
297 << ": pending watchers, returning" << dendl
;
301 ldout(cct
, 20) << "oid=" << oid
<< ", notify_id=" << notify_id
302 << ": completing" << dendl
;
304 if (notify_handle
->pbl
!= NULL
) {
305 ::encode(notify_handle
->notify_responses
, *notify_handle
->pbl
);
306 ::encode(notify_handle
->pending_watcher_ids
, *notify_handle
->pbl
);
309 notify_handle
->rados_client
->get_aio_finisher()->queue(
310 notify_handle
->on_notify
, 0);
311 watcher
->notify_handles
.erase(notify_id
);
312 if (watcher
->watch_handles
.empty() && watcher
->notify_handles
.empty()) {
313 m_file_watchers
.erase(oid
);
317 void TestWatchNotify::blacklist(uint32_t nonce
) {
318 Mutex::Locker
locker(m_lock
);
320 for (auto file_it
= m_file_watchers
.begin();
321 file_it
!= m_file_watchers
.end(); ) {
322 auto &watcher
= file_it
->second
;
323 for (auto w_it
= watcher
->watch_handles
.begin();
324 w_it
!= watcher
->watch_handles
.end();) {
325 if (w_it
->second
.nonce
== nonce
) {
326 w_it
= watcher
->watch_handles
.erase(w_it
);
331 if (watcher
->watch_handles
.empty() && watcher
->notify_handles
.empty()) {
332 file_it
= m_file_watchers
.erase(file_it
);
339 } // namespace librados