7 #include "gtest/gtest.h"
9 #include "include/encoding.h"
10 #include "include/rados/librados.hpp"
11 #include "include/rados/rados_types.h"
12 #include "test/librados/test_cxx.h"
13 #include "test/librados/testcase_cxx.h"
16 using namespace librados
;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP
;
22 #pragma GCC diagnostic ignored "-Wpragmas"
23 #pragma GCC diagnostic push
24 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
26 class LibRadosWatchNotifyPP
: public RadosTestParamPP
30 std::set
<uint64_t> notify_cookies
;
31 rados_ioctx_t notify_io
;
32 const char *notify_oid
= nullptr;
35 friend class WatchNotifyTestCtx2
;
40 class WatchNotifyTestCtx2
: public WatchCtx2
42 LibRadosWatchNotifyPP
*notify
;
45 WatchNotifyTestCtx2(LibRadosWatchNotifyPP
*notify
)
49 void handle_notify(uint64_t notify_id
, uint64_t cookie
, uint64_t notifier_gid
,
50 bufferlist
& bl
) override
{
51 std::cout
<< __func__
<< " cookie " << cookie
<< " notify_id " << notify_id
52 << " notifier_gid " << notifier_gid
<< std::endl
;
53 notify
->notify_bl
= bl
;
54 notify
->notify_cookies
.insert(cookie
);
56 reply
.append("reply", 5);
59 notify_ioctx
->notify_ack(notify
->notify_oid
, notify_id
, cookie
, reply
);
62 void handle_error(uint64_t cookie
, int err
) override
{
63 std::cout
<< __func__
<< " cookie " << cookie
64 << " err " << err
<< std::endl
;
65 ceph_assert(cookie
> 1000);
66 notify
->notify_err
= err
;
73 class WatchNotifyTestCtx
: public WatchCtx
76 void notify(uint8_t opcode
, uint64_t ver
, bufferlist
& bl
) override
78 std::cout
<< __func__
<< std::endl
;
83 TEST_P(LibRadosWatchNotifyPP
, WatchNotify
) {
84 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
86 memset(buf
, 0xcc, sizeof(buf
));
88 bl1
.append(buf
, sizeof(buf
));
89 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
91 WatchNotifyTestCtx ctx
;
92 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
93 std::list
<obj_watch_t
> watches
;
94 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
95 ASSERT_EQ(1u, watches
.size());
97 for (unsigned i
=0; i
<10; ++i
) {
98 int r
= ioctx
.notify("foo", 0, bl2
);
102 if (!getenv("ALLOW_TIMEOUTS")) {
108 ioctx
.unwatch("foo", handle
);
112 TEST_F(LibRadosWatchNotifyECPP
, WatchNotify
) {
113 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
115 memset(buf
, 0xcc, sizeof(buf
));
117 bl1
.append(buf
, sizeof(buf
));
118 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
120 WatchNotifyTestCtx ctx
;
121 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
122 std::list
<obj_watch_t
> watches
;
123 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
124 ASSERT_EQ(1u, watches
.size());
126 for (unsigned i
=0; i
<10; ++i
) {
127 int r
= ioctx
.notify("foo", 0, bl2
);
131 if (!getenv("ALLOW_TIMEOUTS")) {
137 ioctx
.unwatch("foo", handle
);
143 TEST_P(LibRadosWatchNotifyPP
, WatchNotifyTimeout
) {
144 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
145 ioctx
.set_notify_timeout(1);
147 WatchNotifyTestCtx ctx
;
150 memset(buf
, 0xcc, sizeof(buf
));
152 bl1
.append(buf
, sizeof(buf
));
153 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
155 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
157 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
160 TEST_F(LibRadosWatchNotifyECPP
, WatchNotifyTimeout
) {
161 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
162 ioctx
.set_notify_timeout(1);
164 WatchNotifyTestCtx ctx
;
167 memset(buf
, 0xcc, sizeof(buf
));
169 bl1
.append(buf
, sizeof(buf
));
170 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
172 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
174 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
177 #pragma GCC diagnostic pop
178 #pragma GCC diagnostic warning "-Wpragmas"
180 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2
) {
182 notify_ioctx
= &ioctx
;
183 notify_cookies
.clear();
185 memset(buf
, 0xcc, sizeof(buf
));
187 bl1
.append(buf
, sizeof(buf
));
188 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
190 WatchNotifyTestCtx2
ctx(this);
191 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
192 ASSERT_GT(ioctx
.watch_check(handle
), 0);
193 std::list
<obj_watch_t
> watches
;
194 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
195 ASSERT_EQ(watches
.size(), 1u);
196 bufferlist bl2
, bl_reply
;
197 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
198 auto p
= bl_reply
.cbegin();
199 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
200 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
201 decode(reply_map
, p
);
202 decode(missed_map
, p
);
203 ASSERT_EQ(1u, notify_cookies
.size());
204 ASSERT_EQ(1u, notify_cookies
.count(handle
));
205 ASSERT_EQ(1u, reply_map
.size());
206 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
207 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
208 ASSERT_EQ(0u, missed_map
.size());
209 ASSERT_GT(ioctx
.watch_check(handle
), 0);
210 ioctx
.unwatch2(handle
);
213 TEST_P(LibRadosWatchNotifyPP
, AioWatchNotify2
) {
215 notify_ioctx
= &ioctx
;
216 notify_cookies
.clear();
218 memset(buf
, 0xcc, sizeof(buf
));
220 bl1
.append(buf
, sizeof(buf
));
221 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
224 WatchNotifyTestCtx2
ctx(this);
225 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
226 ASSERT_EQ(0, ioctx
.aio_watch(notify_oid
, comp
, &handle
, &ctx
));
227 ASSERT_EQ(0, comp
->wait_for_complete());
228 ASSERT_EQ(0, comp
->get_return_value());
231 ASSERT_GT(ioctx
.watch_check(handle
), 0);
232 std::list
<obj_watch_t
> watches
;
233 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
234 ASSERT_EQ(watches
.size(), 1u);
235 bufferlist bl2
, bl_reply
;
236 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
237 auto p
= bl_reply
.cbegin();
238 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
239 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
240 decode(reply_map
, p
);
241 decode(missed_map
, p
);
242 ASSERT_EQ(1u, notify_cookies
.size());
243 ASSERT_EQ(1u, notify_cookies
.count(handle
));
244 ASSERT_EQ(1u, reply_map
.size());
245 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
246 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
247 ASSERT_EQ(0u, missed_map
.size());
248 ASSERT_GT(ioctx
.watch_check(handle
), 0);
250 comp
= cluster
.aio_create_completion();
251 ioctx
.aio_unwatch(handle
, comp
);
252 ASSERT_EQ(0, comp
->wait_for_complete());
257 TEST_P(LibRadosWatchNotifyPP
, AioNotify
) {
259 notify_ioctx
= &ioctx
;
260 notify_cookies
.clear();
262 memset(buf
, 0xcc, sizeof(buf
));
264 bl1
.append(buf
, sizeof(buf
));
265 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
267 WatchNotifyTestCtx2
ctx(this);
268 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
269 ASSERT_GT(ioctx
.watch_check(handle
), 0);
270 std::list
<obj_watch_t
> watches
;
271 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
272 ASSERT_EQ(watches
.size(), 1u);
273 bufferlist bl2
, bl_reply
;
274 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
275 ASSERT_EQ(0, ioctx
.aio_notify(notify_oid
, comp
, bl2
, 300000, &bl_reply
));
276 ASSERT_EQ(0, comp
->wait_for_complete());
277 ASSERT_EQ(0, comp
->get_return_value());
279 std::vector
<librados::notify_ack_t
> acks
;
280 std::vector
<librados::notify_timeout_t
> timeouts
;
281 ioctx
.decode_notify_response(bl_reply
, &acks
, &timeouts
);
282 ASSERT_EQ(1u, notify_cookies
.size());
283 ASSERT_EQ(1u, notify_cookies
.count(handle
));
284 ASSERT_EQ(1u, acks
.size());
285 ASSERT_EQ(5u, acks
[0].payload_bl
.length());
286 ASSERT_EQ(0, strncmp("reply", acks
[0].payload_bl
.c_str(), acks
[0].payload_bl
.length()));
287 ASSERT_EQ(0u, timeouts
.size());
288 ASSERT_GT(ioctx
.watch_check(handle
), 0);
289 ioctx
.unwatch2(handle
);
290 cluster
.watch_flush();
294 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2Timeout
) {
296 notify_ioctx
= &ioctx
;
297 notify_sleep
= 3; // 3s
298 notify_cookies
.clear();
300 memset(buf
, 0xcc, sizeof(buf
));
302 bl1
.append(buf
, sizeof(buf
));
303 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
305 WatchNotifyTestCtx2
ctx(this);
306 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
307 ASSERT_GT(ioctx
.watch_check(handle
), 0);
308 std::list
<obj_watch_t
> watches
;
309 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
310 ASSERT_EQ(watches
.size(), 1u);
311 ASSERT_EQ(0u, notify_cookies
.size());
312 bufferlist bl2
, bl_reply
;
313 std::cout
<< " trying..." << std::endl
;
314 ASSERT_EQ(-ETIMEDOUT
, ioctx
.notify2(notify_oid
, bl2
, 1000 /* 1s */,
316 std::cout
<< " timed out" << std::endl
;
317 ASSERT_GT(ioctx
.watch_check(handle
), 0);
318 ioctx
.unwatch2(handle
);
320 std::cout
<< " flushing" << std::endl
;
321 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
322 cluster
.aio_watch_flush(comp
);
323 ASSERT_EQ(0, comp
->wait_for_complete());
324 ASSERT_EQ(0, comp
->get_return_value());
325 std::cout
<< " flushed" << std::endl
;
329 TEST_P(LibRadosWatchNotifyPP
, WatchNotify3
) {
331 notify_ioctx
= &ioctx
;
332 notify_cookies
.clear();
333 uint32_t timeout
= 12; // configured timeout
335 memset(buf
, 0xcc, sizeof(buf
));
337 bl1
.append(buf
, sizeof(buf
));
338 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
340 WatchNotifyTestCtx2
ctx(this);
341 ASSERT_EQ(0, ioctx
.watch3(notify_oid
, &handle
, &ctx
, timeout
));
342 ASSERT_GT(ioctx
.watch_check(handle
), 0);
343 std::list
<obj_watch_t
> watches
;
344 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
345 ASSERT_EQ(watches
.size(), 1u);
346 std::cout
<< "List watches" << std::endl
;
347 for (std::list
<obj_watch_t
>::iterator it
= watches
.begin();
348 it
!= watches
.end(); ++it
) {
349 ASSERT_EQ(it
->timeout_seconds
, timeout
);
351 bufferlist bl2
, bl_reply
;
352 std::cout
<< "notify2" << std::endl
;
353 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
354 std::cout
<< "notify2 done" << std::endl
;
355 auto p
= bl_reply
.cbegin();
356 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
357 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
358 decode(reply_map
, p
);
359 decode(missed_map
, p
);
360 ASSERT_EQ(1u, notify_cookies
.size());
361 ASSERT_EQ(1u, notify_cookies
.count(handle
));
362 ASSERT_EQ(1u, reply_map
.size());
363 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
364 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
365 ASSERT_EQ(0u, missed_map
.size());
366 std::cout
<< "watch_check" << std::endl
;
367 ASSERT_GT(ioctx
.watch_check(handle
), 0);
368 std::cout
<< "unwatch2" << std::endl
;
369 ioctx
.unwatch2(handle
);
371 std::cout
<< " flushing" << std::endl
;
372 cluster
.watch_flush();
373 std::cout
<< "done" << std::endl
;
377 INSTANTIATE_TEST_SUITE_P(LibRadosWatchNotifyPPTests
, LibRadosWatchNotifyPP
,
378 ::testing::Values("", "cache"));