7 #include "gtest/gtest.h"
9 #include "include/encoding.h"
10 #include "include/rados/librados.hpp"
11 #include "include/rados/rados_types.h"
12 #include "test/librados/test_cxx.h"
13 #include "test/librados/testcase_cxx.h"
14 #include "crimson_utils.h"
16 using namespace librados
;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP
;
22 #pragma GCC diagnostic ignored "-Wpragmas"
23 #pragma GCC diagnostic push
24 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
26 class LibRadosWatchNotifyPP
: public RadosTestParamPP
30 std::set
<uint64_t> notify_cookies
;
31 rados_ioctx_t notify_io
;
32 const char *notify_oid
= nullptr;
35 friend class WatchNotifyTestCtx2
;
36 friend class WatchNotifyTestCtx2TimeOut
;
41 class WatchNotifyTestCtx2
: public WatchCtx2
43 LibRadosWatchNotifyPP
*notify
;
46 WatchNotifyTestCtx2(LibRadosWatchNotifyPP
*notify
)
50 void handle_notify(uint64_t notify_id
, uint64_t cookie
, uint64_t notifier_gid
,
51 bufferlist
& bl
) override
{
52 std::cout
<< __func__
<< " cookie " << cookie
<< " notify_id " << notify_id
53 << " notifier_gid " << notifier_gid
<< std::endl
;
54 notify
->notify_bl
= bl
;
55 notify
->notify_cookies
.insert(cookie
);
57 reply
.append("reply", 5);
60 notify_ioctx
->notify_ack(notify
->notify_oid
, notify_id
, cookie
, reply
);
63 void handle_error(uint64_t cookie
, int err
) override
{
64 std::cout
<< __func__
<< " cookie " << cookie
65 << " err " << err
<< std::endl
;
66 ceph_assert(cookie
> 1000);
67 notify_ioctx
->unwatch2(cookie
);
68 notify
->notify_cookies
.erase(cookie
);
69 notify
->notify_err
= notify_ioctx
->watch2(notify
->notify_oid
, &cookie
, this);
70 if (notify
->notify_err
< err
) {
71 std::cout
<< "reconnect notify_err " << notify
->notify_err
<< " err " << err
<< std::endl
;
76 class WatchNotifyTestCtx2TimeOut
: public WatchCtx2
78 LibRadosWatchNotifyPP
*notify
;
81 WatchNotifyTestCtx2TimeOut(LibRadosWatchNotifyPP
*notify
)
85 void handle_notify(uint64_t notify_id
, uint64_t cookie
, uint64_t notifier_gid
,
86 bufferlist
& bl
) override
{
87 std::cout
<< __func__
<< " cookie " << cookie
<< " notify_id " << notify_id
88 << " notifier_gid " << notifier_gid
<< std::endl
;
89 notify
->notify_bl
= bl
;
90 notify
->notify_cookies
.insert(cookie
);
92 reply
.append("reply", 5);
95 notify_ioctx
->notify_ack(notify
->notify_oid
, notify_id
, cookie
, reply
);
98 void handle_error(uint64_t cookie
, int err
) override
{
99 std::cout
<< __func__
<< " cookie " << cookie
100 << " err " << err
<< std::endl
;
101 ceph_assert(cookie
> 1000);
102 notify
->notify_err
= err
;
109 class WatchNotifyTestCtx
: public WatchCtx
112 void notify(uint8_t opcode
, uint64_t ver
, bufferlist
& bl
) override
114 std::cout
<< __func__
<< std::endl
;
119 TEST_P(LibRadosWatchNotifyPP
, WatchNotify
) {
120 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
122 memset(buf
, 0xcc, sizeof(buf
));
124 bl1
.append(buf
, sizeof(buf
));
125 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
127 WatchNotifyTestCtx ctx
;
128 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
129 std::list
<obj_watch_t
> watches
;
130 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
131 ASSERT_EQ(1u, watches
.size());
133 for (unsigned i
=0; i
<10; ++i
) {
134 int r
= ioctx
.notify("foo", 0, bl2
);
138 if (!getenv("ALLOW_TIMEOUTS")) {
144 ioctx
.unwatch("foo", handle
);
148 TEST_F(LibRadosWatchNotifyECPP
, WatchNotify
) {
150 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
152 memset(buf
, 0xcc, sizeof(buf
));
154 bl1
.append(buf
, sizeof(buf
));
155 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
157 WatchNotifyTestCtx ctx
;
158 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
159 std::list
<obj_watch_t
> watches
;
160 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
161 ASSERT_EQ(1u, watches
.size());
163 for (unsigned i
=0; i
<10; ++i
) {
164 int r
= ioctx
.notify("foo", 0, bl2
);
168 if (!getenv("ALLOW_TIMEOUTS")) {
174 ioctx
.unwatch("foo", handle
);
180 TEST_P(LibRadosWatchNotifyPP
, WatchNotifyTimeout
) {
181 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
182 ioctx
.set_notify_timeout(1);
184 WatchNotifyTestCtx ctx
;
187 memset(buf
, 0xcc, sizeof(buf
));
189 bl1
.append(buf
, sizeof(buf
));
190 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
192 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
194 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
197 TEST_F(LibRadosWatchNotifyECPP
, WatchNotifyTimeout
) {
199 ASSERT_EQ(0, sem_init(&sem
, 0, 0));
200 ioctx
.set_notify_timeout(1);
202 WatchNotifyTestCtx ctx
;
205 memset(buf
, 0xcc, sizeof(buf
));
207 bl1
.append(buf
, sizeof(buf
));
208 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
210 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
212 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
215 #pragma GCC diagnostic pop
216 #pragma GCC diagnostic warning "-Wpragmas"
218 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2
) {
220 notify_ioctx
= &ioctx
;
221 notify_cookies
.clear();
223 memset(buf
, 0xcc, sizeof(buf
));
225 bl1
.append(buf
, sizeof(buf
));
226 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
228 WatchNotifyTestCtx2
ctx(this);
229 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
230 ASSERT_GT(ioctx
.watch_check(handle
), 0);
231 std::list
<obj_watch_t
> watches
;
232 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
233 ASSERT_EQ(watches
.size(), 1u);
234 bufferlist bl2
, bl_reply
;
235 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
236 auto p
= bl_reply
.cbegin();
237 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
238 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
239 decode(reply_map
, p
);
240 decode(missed_map
, p
);
241 ASSERT_EQ(1u, notify_cookies
.size());
242 ASSERT_EQ(1u, notify_cookies
.count(handle
));
243 ASSERT_EQ(1u, reply_map
.size());
244 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
245 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
246 ASSERT_EQ(0u, missed_map
.size());
247 ASSERT_GT(ioctx
.watch_check(handle
), 0);
248 ioctx
.unwatch2(handle
);
251 TEST_P(LibRadosWatchNotifyPP
, AioWatchNotify2
) {
253 notify_ioctx
= &ioctx
;
254 notify_cookies
.clear();
256 memset(buf
, 0xcc, sizeof(buf
));
258 bl1
.append(buf
, sizeof(buf
));
259 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
262 WatchNotifyTestCtx2
ctx(this);
263 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
264 ASSERT_EQ(0, ioctx
.aio_watch(notify_oid
, comp
, &handle
, &ctx
));
265 ASSERT_EQ(0, comp
->wait_for_complete());
266 ASSERT_EQ(0, comp
->get_return_value());
269 ASSERT_GT(ioctx
.watch_check(handle
), 0);
270 std::list
<obj_watch_t
> watches
;
271 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
272 ASSERT_EQ(watches
.size(), 1u);
273 bufferlist bl2
, bl_reply
;
274 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
275 auto p
= bl_reply
.cbegin();
276 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
277 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
278 decode(reply_map
, p
);
279 decode(missed_map
, p
);
280 ASSERT_EQ(1u, notify_cookies
.size());
281 ASSERT_EQ(1u, notify_cookies
.count(handle
));
282 ASSERT_EQ(1u, reply_map
.size());
283 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
284 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
285 ASSERT_EQ(0u, missed_map
.size());
286 ASSERT_GT(ioctx
.watch_check(handle
), 0);
288 comp
= cluster
.aio_create_completion();
289 ioctx
.aio_unwatch(handle
, comp
);
290 ASSERT_EQ(0, comp
->wait_for_complete());
295 TEST_P(LibRadosWatchNotifyPP
, AioNotify
) {
297 notify_ioctx
= &ioctx
;
298 notify_cookies
.clear();
300 memset(buf
, 0xcc, sizeof(buf
));
302 bl1
.append(buf
, sizeof(buf
));
303 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
305 WatchNotifyTestCtx2
ctx(this);
306 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
307 ASSERT_GT(ioctx
.watch_check(handle
), 0);
308 std::list
<obj_watch_t
> watches
;
309 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
310 ASSERT_EQ(watches
.size(), 1u);
311 bufferlist bl2
, bl_reply
;
312 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
313 ASSERT_EQ(0, ioctx
.aio_notify(notify_oid
, comp
, bl2
, 300000, &bl_reply
));
314 ASSERT_EQ(0, comp
->wait_for_complete());
315 ASSERT_EQ(0, comp
->get_return_value());
317 std::vector
<librados::notify_ack_t
> acks
;
318 std::vector
<librados::notify_timeout_t
> timeouts
;
319 ioctx
.decode_notify_response(bl_reply
, &acks
, &timeouts
);
320 ASSERT_EQ(1u, notify_cookies
.size());
321 ASSERT_EQ(1u, notify_cookies
.count(handle
));
322 ASSERT_EQ(1u, acks
.size());
323 ASSERT_EQ(5u, acks
[0].payload_bl
.length());
324 ASSERT_EQ(0, strncmp("reply", acks
[0].payload_bl
.c_str(), acks
[0].payload_bl
.length()));
325 ASSERT_EQ(0u, timeouts
.size());
326 ASSERT_GT(ioctx
.watch_check(handle
), 0);
327 ioctx
.unwatch2(handle
);
328 cluster
.watch_flush();
332 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2Timeout
) {
334 notify_ioctx
= &ioctx
;
335 notify_sleep
= 3; // 3s
336 notify_cookies
.clear();
338 memset(buf
, 0xcc, sizeof(buf
));
340 bl1
.append(buf
, sizeof(buf
));
341 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
343 WatchNotifyTestCtx2TimeOut
ctx(this);
344 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
345 ASSERT_GT(ioctx
.watch_check(handle
), 0);
346 std::list
<obj_watch_t
> watches
;
347 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
348 ASSERT_EQ(watches
.size(), 1u);
349 ASSERT_EQ(0u, notify_cookies
.size());
350 bufferlist bl2
, bl_reply
;
351 std::cout
<< " trying..." << std::endl
;
352 ASSERT_EQ(-ETIMEDOUT
, ioctx
.notify2(notify_oid
, bl2
, 1000 /* 1s */,
354 std::cout
<< " timed out" << std::endl
;
355 ASSERT_GT(ioctx
.watch_check(handle
), 0);
356 ioctx
.unwatch2(handle
);
358 std::cout
<< " flushing" << std::endl
;
359 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
360 cluster
.aio_watch_flush(comp
);
361 ASSERT_EQ(0, comp
->wait_for_complete());
362 ASSERT_EQ(0, comp
->get_return_value());
363 std::cout
<< " flushed" << std::endl
;
367 TEST_P(LibRadosWatchNotifyPP
, WatchNotify3
) {
369 notify_ioctx
= &ioctx
;
370 notify_cookies
.clear();
371 uint32_t timeout
= 12; // configured timeout
373 memset(buf
, 0xcc, sizeof(buf
));
375 bl1
.append(buf
, sizeof(buf
));
376 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
378 WatchNotifyTestCtx2TimeOut
ctx(this);
379 ASSERT_EQ(0, ioctx
.watch3(notify_oid
, &handle
, &ctx
, timeout
));
380 ASSERT_GT(ioctx
.watch_check(handle
), 0);
381 std::list
<obj_watch_t
> watches
;
382 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
383 ASSERT_EQ(watches
.size(), 1u);
384 std::cout
<< "List watches" << std::endl
;
385 for (std::list
<obj_watch_t
>::iterator it
= watches
.begin();
386 it
!= watches
.end(); ++it
) {
387 ASSERT_EQ(it
->timeout_seconds
, timeout
);
389 bufferlist bl2
, bl_reply
;
390 std::cout
<< "notify2" << std::endl
;
391 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
392 std::cout
<< "notify2 done" << std::endl
;
393 auto p
= bl_reply
.cbegin();
394 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
395 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
396 decode(reply_map
, p
);
397 decode(missed_map
, p
);
398 ASSERT_EQ(1u, notify_cookies
.size());
399 ASSERT_EQ(1u, notify_cookies
.count(handle
));
400 ASSERT_EQ(1u, reply_map
.size());
401 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
402 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
403 ASSERT_EQ(0u, missed_map
.size());
404 std::cout
<< "watch_check" << std::endl
;
405 ASSERT_GT(ioctx
.watch_check(handle
), 0);
406 std::cout
<< "unwatch2" << std::endl
;
407 ioctx
.unwatch2(handle
);
409 std::cout
<< " flushing" << std::endl
;
410 cluster
.watch_flush();
411 std::cout
<< "done" << std::endl
;
415 INSTANTIATE_TEST_SUITE_P(LibRadosWatchNotifyPPTests
, LibRadosWatchNotifyPP
,
416 ::testing::Values("", "cache"));