1 #include "include/rados/librados.h"
2 #include "include/rados/librados.hpp"
3 #include "include/rados/rados_types.h"
4 #include "test/librados/test.h"
5 #include "test/librados/TestCase.h"
10 #include "gtest/gtest.h"
11 #include "include/encoding.h"
15 using namespace librados
;
17 typedef RadosTestEC LibRadosWatchNotifyEC
;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP
;
25 static void watch_notify_test_cb(uint8_t opcode
, uint64_t ver
, void *arg
)
27 std::cout
<< __func__
<< std::endl
;
31 class WatchNotifyTestCtx
: public WatchCtx
34 void notify(uint8_t opcode
, uint64_t ver
, bufferlist
& bl
) override
36 std::cout
<< __func__
<< std::endl
;
41 class LibRadosWatchNotify
: public RadosTest
46 std::set
<uint64_t> notify_cookies
;
47 rados_ioctx_t notify_io
;
48 const char *notify_oid
= nullptr;
51 static void watch_notify2_test_cb(void *arg
,
54 uint64_t notifier_gid
,
57 static void watch_notify2_test_errcb(void *arg
, uint64_t cookie
, int err
);
61 void LibRadosWatchNotify::watch_notify2_test_cb(void *arg
,
64 uint64_t notifier_gid
,
68 std::cout
<< __func__
<< " from " << notifier_gid
<< " notify_id " << notify_id
69 << " cookie " << cookie
<< std::endl
;
70 assert(notifier_gid
> 0);
71 auto thiz
= reinterpret_cast<LibRadosWatchNotify
*>(arg
);
73 thiz
->notify_cookies
.insert(cookie
);
74 thiz
->notify_bl
.clear();
75 thiz
->notify_bl
.append((char*)data
, data_len
);
78 rados_notify_ack(thiz
->notify_io
, thiz
->notify_oid
, notify_id
, cookie
,
82 void LibRadosWatchNotify::watch_notify2_test_errcb(void *arg
,
86 std::cout
<< __func__
<< " cookie " << cookie
<< " err " << err
<< std::endl
;
87 assert(cookie
> 1000);
88 auto thiz
= reinterpret_cast<LibRadosWatchNotify
*>(arg
);
90 thiz
->notify_err
= err
;
93 class WatchNotifyTestCtx2
;
94 class LibRadosWatchNotifyPP
: public RadosTestParamPP
98 std::set
<uint64_t> notify_cookies
;
99 rados_ioctx_t notify_io
;
100 const char *notify_oid
= nullptr;
103 friend class WatchNotifyTestCtx2
;
108 class WatchNotifyTestCtx2
: public WatchCtx2
110 LibRadosWatchNotifyPP
*notify
;
113 WatchNotifyTestCtx2(LibRadosWatchNotifyPP
*notify
)
117 void handle_notify(uint64_t notify_id
, uint64_t cookie
, uint64_t notifier_gid
,
118 bufferlist
& bl
) override
{
119 std::cout
<< __func__
<< " cookie " << cookie
<< " notify_id " << notify_id
120 << " notifier_gid " << notifier_gid
<< std::endl
;
121 notify
->notify_bl
= bl
;
122 notify
->notify_cookies
.insert(cookie
);
124 reply
.append("reply", 5);
127 notify_ioctx
->notify_ack(notify
->notify_oid
, notify_id
, cookie
, reply
);
130 void handle_error(uint64_t cookie
, int err
) override
{
131 std::cout
<< __func__
<< " cookie " << cookie
132 << " err " << err
<< std::endl
;
133 assert(cookie
> 1000);
134 notify
->notify_err
= err
;
140 #pragma GCC diagnostic ignored "-Wpragmas"
141 #pragma GCC diagnostic push
142 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
144 TEST_F(LibRadosWatchNotify
, WatchNotify
) {
145 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
147 memset(buf
, 0xcc, sizeof(buf
));
148 ASSERT_EQ(0, rados_write(ioctx
, "foo", buf
, sizeof(buf
), 0));
151 rados_watch(ioctx
, "foo", 0, &handle
, watch_notify_test_cb
, NULL
));
152 ASSERT_EQ(0, rados_notify(ioctx
, "foo", 0, NULL
, 0));
155 rados_unwatch(ioctx
, "foo", handle
);
159 rados_watch(ioctx
, "dne", 0, &handle
, watch_notify_test_cb
, NULL
));
164 TEST_P(LibRadosWatchNotifyPP
, WatchNotify
) {
165 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
167 memset(buf
, 0xcc, sizeof(buf
));
169 bl1
.append(buf
, sizeof(buf
));
170 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
172 WatchNotifyTestCtx ctx
;
173 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
174 std::list
<obj_watch_t
> watches
;
175 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
176 ASSERT_EQ(watches
.size(), 1u);
178 ASSERT_EQ(0, ioctx
.notify("foo", 0, bl2
));
181 ioctx
.unwatch("foo", handle
);
185 TEST_F(LibRadosWatchNotifyEC
, WatchNotify
) {
186 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
188 memset(buf
, 0xcc, sizeof(buf
));
189 ASSERT_EQ(0, rados_write(ioctx
, "foo", buf
, sizeof(buf
), 0));
192 rados_watch(ioctx
, "foo", 0, &handle
, watch_notify_test_cb
, NULL
));
193 ASSERT_EQ(0, rados_notify(ioctx
, "foo", 0, NULL
, 0));
196 rados_unwatch(ioctx
, "foo", handle
);
200 TEST_F(LibRadosWatchNotifyECPP
, WatchNotify
) {
201 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
203 memset(buf
, 0xcc, sizeof(buf
));
205 bl1
.append(buf
, sizeof(buf
));
206 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
208 WatchNotifyTestCtx ctx
;
209 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
210 std::list
<obj_watch_t
> watches
;
211 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
212 ASSERT_EQ(watches
.size(), 1u);
214 ASSERT_EQ(0, ioctx
.notify("foo", 0, bl2
));
217 ioctx
.unwatch("foo", handle
);
223 TEST_P(LibRadosWatchNotifyPP
, WatchNotifyTimeout
) {
224 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
225 ioctx
.set_notify_timeout(1);
227 WatchNotifyTestCtx ctx
;
230 memset(buf
, 0xcc, sizeof(buf
));
232 bl1
.append(buf
, sizeof(buf
));
233 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
235 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
237 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
240 TEST_F(LibRadosWatchNotifyECPP
, WatchNotifyTimeout
) {
241 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
242 ioctx
.set_notify_timeout(1);
244 WatchNotifyTestCtx ctx
;
247 memset(buf
, 0xcc, sizeof(buf
));
249 bl1
.append(buf
, sizeof(buf
));
250 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
252 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
254 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
257 #pragma GCC diagnostic pop
258 #pragma GCC diagnostic warning "-Wpragmas"
263 TEST_F(LibRadosWatchNotify
, Watch2Delete
) {
268 memset(buf
, 0xcc, sizeof(buf
));
269 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
272 rados_watch2(ioctx
, notify_oid
, &handle
,
273 watch_notify2_test_cb
,
274 watch_notify2_test_errcb
, this));
275 ASSERT_EQ(0, rados_remove(ioctx
, notify_oid
));
277 std::cout
<< "waiting up to " << left
<< " for disconnect notification ..."
279 while (notify_err
== 0 && --left
) {
282 ASSERT_TRUE(left
> 0);
283 ASSERT_EQ(-ENOTCONN
, notify_err
);
284 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
285 rados_unwatch2(ioctx
, handle
);
288 TEST_F(LibRadosWatchNotify
, AioWatchDelete
) {
293 memset(buf
, 0xcc, sizeof(buf
));
294 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
297 rados_completion_t comp
;
299 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
300 rados_aio_watch(ioctx
, notify_oid
, comp
, &handle
,
301 watch_notify2_test_cb
, watch_notify2_test_errcb
, this);
302 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
303 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
304 rados_aio_release(comp
);
305 ASSERT_EQ(0, rados_remove(ioctx
, notify_oid
));
307 std::cout
<< "waiting up to " << left
<< " for disconnect notification ..."
309 while (notify_err
== 0 && --left
) {
312 ASSERT_TRUE(left
> 0);
313 ASSERT_EQ(-ENOTCONN
, notify_err
);
314 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
315 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
316 rados_aio_unwatch(ioctx
, handle
, comp
);
317 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
318 ASSERT_EQ(-ENOENT
, rados_aio_get_return_value(comp
));
319 rados_aio_release(comp
);
324 TEST_F(LibRadosWatchNotify
, WatchNotify2
) {
327 notify_cookies
.clear();
329 memset(buf
, 0xcc, sizeof(buf
));
330 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
333 rados_watch2(ioctx
, notify_oid
, &handle
,
334 watch_notify2_test_cb
,
335 watch_notify2_test_errcb
, this));
336 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
338 size_t reply_buf_len
;
339 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
341 &reply_buf
, &reply_buf_len
));
343 reply
.append(reply_buf
, reply_buf_len
);
344 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
345 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
346 bufferlist::iterator reply_p
= reply
.begin();
347 ::decode(reply_map
, reply_p
);
348 ::decode(missed_map
, reply_p
);
349 ASSERT_EQ(1u, reply_map
.size());
350 ASSERT_EQ(0u, missed_map
.size());
351 ASSERT_EQ(1u, notify_cookies
.size());
352 ASSERT_EQ(1u, notify_cookies
.count(handle
));
353 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
354 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
355 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
356 rados_buffer_free(reply_buf
);
358 // try it on a non-existent object ... our buffer pointers
359 // should get zeroed.
360 ASSERT_EQ(-ENOENT
, rados_notify2(ioctx
, "doesnotexist",
362 &reply_buf
, &reply_buf_len
));
363 ASSERT_EQ((char*)0, reply_buf
);
364 ASSERT_EQ(0u, reply_buf_len
);
366 rados_unwatch2(ioctx
, handle
);
369 TEST_F(LibRadosWatchNotify
, AioWatchNotify2
) {
372 notify_cookies
.clear();
374 memset(buf
, 0xcc, sizeof(buf
));
375 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
377 rados_completion_t comp
;
379 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
380 rados_aio_watch(ioctx
, notify_oid
, comp
, &handle
,
381 watch_notify2_test_cb
, watch_notify2_test_errcb
, this);
382 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
383 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
384 rados_aio_release(comp
);
386 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
388 size_t reply_buf_len
;
389 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
391 &reply_buf
, &reply_buf_len
));
393 reply
.append(reply_buf
, reply_buf_len
);
394 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
395 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
396 bufferlist::iterator reply_p
= reply
.begin();
397 ::decode(reply_map
, reply_p
);
398 ::decode(missed_map
, reply_p
);
399 ASSERT_EQ(1u, reply_map
.size());
400 ASSERT_EQ(0u, missed_map
.size());
401 ASSERT_EQ(1u, notify_cookies
.size());
402 ASSERT_EQ(1u, notify_cookies
.count(handle
));
403 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
404 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
405 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
406 rados_buffer_free(reply_buf
);
408 // try it on a non-existent object ... our buffer pointers
409 // should get zeroed.
410 ASSERT_EQ(-ENOENT
, rados_notify2(ioctx
, "doesnotexist",
412 &reply_buf
, &reply_buf_len
));
413 ASSERT_EQ((char*)0, reply_buf
);
414 ASSERT_EQ(0u, reply_buf_len
);
416 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
417 rados_aio_unwatch(ioctx
, handle
, comp
);
418 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
419 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
420 rados_aio_release(comp
);
423 TEST_F(LibRadosWatchNotify
, AioNotify
) {
426 notify_cookies
.clear();
428 memset(buf
, 0xcc, sizeof(buf
));
429 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
432 rados_watch2(ioctx
, notify_oid
, &handle
,
433 watch_notify2_test_cb
,
434 watch_notify2_test_errcb
, this));
435 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
437 size_t reply_buf_len
;
438 rados_completion_t comp
;
439 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
440 ASSERT_EQ(0, rados_aio_notify(ioctx
, "foo", comp
, "notify", 6, 300000,
441 &reply_buf
, &reply_buf_len
));
442 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
443 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
444 rados_aio_release(comp
);
447 reply
.append(reply_buf
, reply_buf_len
);
448 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
449 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
450 bufferlist::iterator reply_p
= reply
.begin();
451 ::decode(reply_map
, reply_p
);
452 ::decode(missed_map
, reply_p
);
453 ASSERT_EQ(1u, reply_map
.size());
454 ASSERT_EQ(0u, missed_map
.size());
455 ASSERT_EQ(1u, notify_cookies
.size());
456 ASSERT_EQ(1u, notify_cookies
.count(handle
));
457 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
458 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
459 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
460 rados_buffer_free(reply_buf
);
462 // try it on a non-existent object ... our buffer pointers
463 // should get zeroed.
464 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
465 ASSERT_EQ(0, rados_aio_notify(ioctx
, "doesnotexist", comp
, "notify", 6,
466 300000, &reply_buf
, &reply_buf_len
));
467 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
468 ASSERT_EQ(-ENOENT
, rados_aio_get_return_value(comp
));
469 rados_aio_release(comp
);
470 ASSERT_EQ((char*)0, reply_buf
);
471 ASSERT_EQ(0u, reply_buf_len
);
473 rados_unwatch2(ioctx
, handle
);
476 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2
) {
478 notify_ioctx
= &ioctx
;
479 notify_cookies
.clear();
481 memset(buf
, 0xcc, sizeof(buf
));
483 bl1
.append(buf
, sizeof(buf
));
484 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
486 WatchNotifyTestCtx2
ctx(this);
487 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
488 ASSERT_GT(ioctx
.watch_check(handle
), 0);
489 std::list
<obj_watch_t
> watches
;
490 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
491 ASSERT_EQ(watches
.size(), 1u);
492 bufferlist bl2
, bl_reply
;
493 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
494 bufferlist::iterator p
= bl_reply
.begin();
495 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
496 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
497 ::decode(reply_map
, p
);
498 ::decode(missed_map
, p
);
499 ASSERT_EQ(1u, notify_cookies
.size());
500 ASSERT_EQ(1u, notify_cookies
.count(handle
));
501 ASSERT_EQ(1u, reply_map
.size());
502 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
503 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
504 ASSERT_EQ(0u, missed_map
.size());
505 ASSERT_GT(ioctx
.watch_check(handle
), 0);
506 ioctx
.unwatch2(handle
);
509 TEST_P(LibRadosWatchNotifyPP
, AioWatchNotify2
) {
511 notify_ioctx
= &ioctx
;
512 notify_cookies
.clear();
514 memset(buf
, 0xcc, sizeof(buf
));
516 bl1
.append(buf
, sizeof(buf
));
517 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
520 WatchNotifyTestCtx2
ctx(this);
521 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
522 ASSERT_EQ(0, ioctx
.aio_watch(notify_oid
, comp
, &handle
, &ctx
));
523 ASSERT_EQ(0, comp
->wait_for_complete());
524 ASSERT_EQ(0, comp
->get_return_value());
527 ASSERT_GT(ioctx
.watch_check(handle
), 0);
528 std::list
<obj_watch_t
> watches
;
529 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
530 ASSERT_EQ(watches
.size(), 1u);
531 bufferlist bl2
, bl_reply
;
532 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
533 bufferlist::iterator p
= bl_reply
.begin();
534 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
535 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
536 ::decode(reply_map
, p
);
537 ::decode(missed_map
, p
);
538 ASSERT_EQ(1u, notify_cookies
.size());
539 ASSERT_EQ(1u, notify_cookies
.count(handle
));
540 ASSERT_EQ(1u, reply_map
.size());
541 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
542 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
543 ASSERT_EQ(0u, missed_map
.size());
544 ASSERT_GT(ioctx
.watch_check(handle
), 0);
546 comp
= cluster
.aio_create_completion();
547 ioctx
.aio_unwatch(handle
, comp
);
548 ASSERT_EQ(0, comp
->wait_for_complete());
553 TEST_P(LibRadosWatchNotifyPP
, AioNotify
) {
555 notify_ioctx
= &ioctx
;
556 notify_cookies
.clear();
558 memset(buf
, 0xcc, sizeof(buf
));
560 bl1
.append(buf
, sizeof(buf
));
561 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
563 WatchNotifyTestCtx2
ctx(this);
564 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
565 ASSERT_GT(ioctx
.watch_check(handle
), 0);
566 std::list
<obj_watch_t
> watches
;
567 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
568 ASSERT_EQ(watches
.size(), 1u);
569 bufferlist bl2
, bl_reply
;
570 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
571 ASSERT_EQ(0, ioctx
.aio_notify(notify_oid
, comp
, bl2
, 300000, &bl_reply
));
572 ASSERT_EQ(0, comp
->wait_for_complete());
573 ASSERT_EQ(0, comp
->get_return_value());
575 bufferlist::iterator p
= bl_reply
.begin();
576 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
577 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
578 ::decode(reply_map
, p
);
579 ::decode(missed_map
, p
);
580 ASSERT_EQ(1u, notify_cookies
.size());
581 ASSERT_EQ(1u, notify_cookies
.count(handle
));
582 ASSERT_EQ(1u, reply_map
.size());
583 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
584 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
585 ASSERT_EQ(0u, missed_map
.size());
586 ASSERT_GT(ioctx
.watch_check(handle
), 0);
587 ioctx
.unwatch2(handle
);
592 TEST_F(LibRadosWatchNotify
, WatchNotify2Multi
) {
595 notify_cookies
.clear();
597 memset(buf
, 0xcc, sizeof(buf
));
598 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
599 uint64_t handle1
, handle2
;
601 rados_watch2(ioctx
, notify_oid
, &handle1
,
602 watch_notify2_test_cb
,
603 watch_notify2_test_errcb
, this));
605 rados_watch2(ioctx
, notify_oid
, &handle2
,
606 watch_notify2_test_cb
,
607 watch_notify2_test_errcb
, this));
608 ASSERT_GT(rados_watch_check(ioctx
, handle1
), 0);
609 ASSERT_GT(rados_watch_check(ioctx
, handle2
), 0);
610 ASSERT_NE(handle1
, handle2
);
612 size_t reply_buf_len
;
613 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
615 &reply_buf
, &reply_buf_len
));
617 reply
.append(reply_buf
, reply_buf_len
);
618 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
619 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
620 bufferlist::iterator reply_p
= reply
.begin();
621 ::decode(reply_map
, reply_p
);
622 ::decode(missed_map
, reply_p
);
623 ASSERT_EQ(2u, reply_map
.size());
624 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
625 ASSERT_EQ(0u, missed_map
.size());
626 ASSERT_EQ(2u, notify_cookies
.size());
627 ASSERT_EQ(1u, notify_cookies
.count(handle1
));
628 ASSERT_EQ(1u, notify_cookies
.count(handle2
));
629 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
630 ASSERT_GT(rados_watch_check(ioctx
, handle1
), 0);
631 ASSERT_GT(rados_watch_check(ioctx
, handle2
), 0);
632 rados_buffer_free(reply_buf
);
633 rados_unwatch2(ioctx
, handle1
);
634 rados_unwatch2(ioctx
, handle2
);
639 TEST_F(LibRadosWatchNotify
, WatchNotify2Timeout
) {
642 notify_sleep
= 3; // 3s
643 notify_cookies
.clear();
645 memset(buf
, 0xcc, sizeof(buf
));
646 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
649 rados_watch2(ioctx
, notify_oid
, &handle
,
650 watch_notify2_test_cb
,
651 watch_notify2_test_errcb
, this));
652 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
654 size_t reply_buf_len
;
655 ASSERT_EQ(-ETIMEDOUT
, rados_notify2(ioctx
, notify_oid
,
656 "notify", 6, 1000, // 1s
657 &reply_buf
, &reply_buf_len
));
658 ASSERT_EQ(1u, notify_cookies
.size());
661 reply
.append(reply_buf
, reply_buf_len
);
662 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
663 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
664 bufferlist::iterator reply_p
= reply
.begin();
665 ::decode(reply_map
, reply_p
);
666 ::decode(missed_map
, reply_p
);
667 ASSERT_EQ(0u, reply_map
.size());
668 ASSERT_EQ(1u, missed_map
.size());
670 rados_buffer_free(reply_buf
);
672 // we should get the next notify, though!
674 notify_cookies
.clear();
675 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
676 "notify", 6, 300000, // 300s
677 &reply_buf
, &reply_buf_len
));
678 ASSERT_EQ(1u, notify_cookies
.size());
679 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
681 rados_unwatch2(ioctx
, handle
);
683 rados_completion_t comp
;
684 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
685 rados_aio_watch_flush(cluster
, comp
);
686 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
687 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
688 rados_aio_release(comp
);
689 rados_buffer_free(reply_buf
);
693 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2Timeout
) {
695 notify_ioctx
= &ioctx
;
696 notify_sleep
= 3; // 3s
697 notify_cookies
.clear();
699 memset(buf
, 0xcc, sizeof(buf
));
701 bl1
.append(buf
, sizeof(buf
));
702 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
704 WatchNotifyTestCtx2
ctx(this);
705 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
706 ASSERT_GT(ioctx
.watch_check(handle
), 0);
707 std::list
<obj_watch_t
> watches
;
708 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
709 ASSERT_EQ(watches
.size(), 1u);
710 ASSERT_EQ(0u, notify_cookies
.size());
711 bufferlist bl2
, bl_reply
;
712 std::cout
<< " trying..." << std::endl
;
713 ASSERT_EQ(-ETIMEDOUT
, ioctx
.notify2(notify_oid
, bl2
, 1000 /* 1s */,
715 std::cout
<< " timed out" << std::endl
;
716 ASSERT_GT(ioctx
.watch_check(handle
), 0);
717 ioctx
.unwatch2(handle
);
719 std::cout
<< " flushing" << std::endl
;
720 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
721 cluster
.aio_watch_flush(comp
);
722 ASSERT_EQ(0, comp
->wait_for_complete());
723 ASSERT_EQ(0, comp
->get_return_value());
724 std::cout
<< " flushed" << std::endl
;
728 TEST_P(LibRadosWatchNotifyPP
, WatchNotify3
) {
730 notify_ioctx
= &ioctx
;
731 notify_cookies
.clear();
732 uint32_t timeout
= 12; // configured timeout
734 memset(buf
, 0xcc, sizeof(buf
));
736 bl1
.append(buf
, sizeof(buf
));
737 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
739 WatchNotifyTestCtx2
ctx(this);
740 ASSERT_EQ(0, ioctx
.watch3(notify_oid
, &handle
, &ctx
, timeout
));
741 ASSERT_GT(ioctx
.watch_check(handle
), 0);
742 std::list
<obj_watch_t
> watches
;
743 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
744 ASSERT_EQ(watches
.size(), 1u);
745 std::cout
<< "List watches" << std::endl
;
746 for (std::list
<obj_watch_t
>::iterator it
= watches
.begin();
747 it
!= watches
.end(); ++it
) {
748 ASSERT_EQ(it
->timeout_seconds
, timeout
);
750 bufferlist bl2
, bl_reply
;
751 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
752 bufferlist::iterator p
= bl_reply
.begin();
753 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
754 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
755 ::decode(reply_map
, p
);
756 ::decode(missed_map
, p
);
757 ASSERT_EQ(1u, notify_cookies
.size());
758 ASSERT_EQ(1u, notify_cookies
.count(handle
));
759 ASSERT_EQ(1u, reply_map
.size());
760 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
761 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
762 ASSERT_EQ(0u, missed_map
.size());
763 ASSERT_GT(ioctx
.watch_check(handle
), 0);
764 ioctx
.unwatch2(handle
);
767 TEST_F(LibRadosWatchNotify
, Watch3Timeout
) {
770 notify_cookies
.clear();
773 memset(buf
, 0xcc, sizeof(buf
));
774 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
776 time_t start
= time(0);
777 const uint32_t timeout
= 4;
779 // make sure i timeout before the messenger reconnects to the OSD,
780 // it will resend a watch request on behalf of the client, and the
781 // timer of timeout on OSD side will be reset by the new request.
783 ASSERT_EQ(0, rados_conf_get(cluster
,
784 "ms_tcp_read_timeout",
785 conf
, sizeof(conf
)));
786 auto tcp_read_timeout
= std::stoll(conf
);
787 ASSERT_LT(timeout
, tcp_read_timeout
);
790 rados_watch3(ioctx
, notify_oid
, &handle
,
791 watch_notify2_test_cb
, watch_notify2_test_errcb
,
793 int age
= rados_watch_check(ioctx
, handle
);
794 time_t age_bound
= time(0) + 1 - start
;
795 ASSERT_LT(age
, age_bound
* 1000);
797 rados_conf_set(cluster
, "objecter_inject_no_watch_ping", "true");
798 // allow a long time here since an osd peering event will renew our
800 int left
= 16 * timeout
;
801 std::cout
<< "waiting up to " << left
<< " for osd to time us out ..."
803 while (notify_err
== 0 && --left
) {
807 rados_conf_set(cluster
, "objecter_inject_no_watch_ping", "false");
808 ASSERT_EQ(-ENOTCONN
, notify_err
);
809 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
811 // a subsequent notify should not reach us
812 char *reply_buf
= nullptr;
813 size_t reply_buf_len
;
814 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
816 &reply_buf
, &reply_buf_len
));
819 reply
.append(reply_buf
, reply_buf_len
);
820 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
821 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
822 bufferlist::iterator reply_p
= reply
.begin();
823 ::decode(reply_map
, reply_p
);
824 ::decode(missed_map
, reply_p
);
825 ASSERT_EQ(0u, reply_map
.size());
826 ASSERT_EQ(0u, missed_map
.size());
828 ASSERT_EQ(0u, notify_cookies
.size());
829 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
830 rados_buffer_free(reply_buf
);
833 rados_unwatch2(ioctx
, handle
);
836 rados_watch2(ioctx
, notify_oid
, &handle
,
837 watch_notify2_test_cb
,
838 watch_notify2_test_errcb
, this));
839 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
841 // and now a notify will work.
842 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
844 &reply_buf
, &reply_buf_len
));
847 reply
.append(reply_buf
, reply_buf_len
);
848 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
849 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
850 bufferlist::iterator reply_p
= reply
.begin();
851 ::decode(reply_map
, reply_p
);
852 ::decode(missed_map
, reply_p
);
853 ASSERT_EQ(1u, reply_map
.size());
854 ASSERT_EQ(0u, missed_map
.size());
855 ASSERT_EQ(1u, notify_cookies
.count(handle
));
856 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
857 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
859 ASSERT_EQ(1u, notify_cookies
.size());
860 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
862 rados_buffer_free(reply_buf
);
863 rados_unwatch2(ioctx
, handle
);
866 TEST_F(LibRadosWatchNotify
, AioWatchDelete2
) {
871 uint32_t timeout
= 3;
872 memset(buf
, 0xcc, sizeof(buf
));
873 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
876 rados_completion_t comp
;
878 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
879 rados_aio_watch2(ioctx
, notify_oid
, comp
, &handle
,
880 watch_notify2_test_cb
, watch_notify2_test_errcb
, timeout
, this);
881 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
882 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
883 rados_aio_release(comp
);
884 ASSERT_EQ(0, rados_remove(ioctx
, notify_oid
));
886 std::cout
<< "waiting up to " << left
<< " for disconnect notification ..."
888 while (notify_err
== 0 && --left
) {
891 ASSERT_TRUE(left
> 0);
892 ASSERT_EQ(-ENOTCONN
, notify_err
);
893 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
894 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
895 rados_aio_unwatch(ioctx
, handle
, comp
);
896 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
897 ASSERT_EQ(-ENOENT
, rados_aio_get_return_value(comp
));
898 rados_aio_release(comp
);
902 INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests
, LibRadosWatchNotifyPP
,
903 ::testing::Values("", "cache"));