1 #include "include/rados/librados.h"
2 #include "include/rados/librados.hpp"
3 #include "include/rados/rados_types.h"
4 #include "test/librados/test.h"
5 #include "test/librados/TestCase.h"
10 #include "gtest/gtest.h"
11 #include "include/encoding.h"
15 using namespace librados
;
17 typedef RadosTestEC LibRadosWatchNotifyEC
;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP
;
25 static void watch_notify_test_cb(uint8_t opcode
, uint64_t ver
, void *arg
)
27 std::cout
<< __func__
<< std::endl
;
31 class WatchNotifyTestCtx
: public WatchCtx
34 void notify(uint8_t opcode
, uint64_t ver
, bufferlist
& bl
) override
36 std::cout
<< __func__
<< std::endl
;
41 class LibRadosWatchNotify
: public RadosTest
46 std::set
<uint64_t> notify_cookies
;
47 rados_ioctx_t notify_io
;
48 const char *notify_oid
= nullptr;
51 static void watch_notify2_test_cb(void *arg
,
54 uint64_t notifier_gid
,
57 static void watch_notify2_test_errcb(void *arg
, uint64_t cookie
, int err
);
61 void LibRadosWatchNotify::watch_notify2_test_cb(void *arg
,
64 uint64_t notifier_gid
,
68 std::cout
<< __func__
<< " from " << notifier_gid
<< " notify_id " << notify_id
69 << " cookie " << cookie
<< std::endl
;
70 assert(notifier_gid
> 0);
71 auto thiz
= reinterpret_cast<LibRadosWatchNotify
*>(arg
);
73 thiz
->notify_cookies
.insert(cookie
);
74 thiz
->notify_bl
.clear();
75 thiz
->notify_bl
.append((char*)data
, data_len
);
78 rados_notify_ack(thiz
->notify_io
, thiz
->notify_oid
, notify_id
, cookie
,
82 void LibRadosWatchNotify::watch_notify2_test_errcb(void *arg
,
86 std::cout
<< __func__
<< " cookie " << cookie
<< " err " << err
<< std::endl
;
87 assert(cookie
> 1000);
88 auto thiz
= reinterpret_cast<LibRadosWatchNotify
*>(arg
);
90 thiz
->notify_err
= err
;
93 class WatchNotifyTestCtx2
;
94 class LibRadosWatchNotifyPP
: public RadosTestParamPP
98 std::set
<uint64_t> notify_cookies
;
99 rados_ioctx_t notify_io
;
100 const char *notify_oid
= nullptr;
103 friend class WatchNotifyTestCtx2
;
108 class WatchNotifyTestCtx2
: public WatchCtx2
110 LibRadosWatchNotifyPP
*notify
;
113 WatchNotifyTestCtx2(LibRadosWatchNotifyPP
*notify
)
117 void handle_notify(uint64_t notify_id
, uint64_t cookie
, uint64_t notifier_gid
,
118 bufferlist
& bl
) override
{
119 std::cout
<< __func__
<< " cookie " << cookie
<< " notify_id " << notify_id
120 << " notifier_gid " << notifier_gid
<< std::endl
;
121 notify
->notify_bl
= bl
;
122 notify
->notify_cookies
.insert(cookie
);
124 reply
.append("reply", 5);
127 notify_ioctx
->notify_ack(notify
->notify_oid
, notify_id
, cookie
, reply
);
130 void handle_error(uint64_t cookie
, int err
) override
{
131 std::cout
<< __func__
<< " cookie " << cookie
132 << " err " << err
<< std::endl
;
133 assert(cookie
> 1000);
134 notify
->notify_err
= err
;
140 #pragma GCC diagnostic ignored "-Wpragmas"
141 #pragma GCC diagnostic push
142 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
144 TEST_F(LibRadosWatchNotify
, WatchNotify
) {
145 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
147 memset(buf
, 0xcc, sizeof(buf
));
148 ASSERT_EQ(0, rados_write(ioctx
, "foo", buf
, sizeof(buf
), 0));
151 rados_watch(ioctx
, "foo", 0, &handle
, watch_notify_test_cb
, NULL
));
152 ASSERT_EQ(0, rados_notify(ioctx
, "foo", 0, NULL
, 0));
155 rados_unwatch(ioctx
, "foo", handle
);
159 rados_watch(ioctx
, "dne", 0, &handle
, watch_notify_test_cb
, NULL
));
164 TEST_P(LibRadosWatchNotifyPP
, WatchNotify
) {
165 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
167 memset(buf
, 0xcc, sizeof(buf
));
169 bl1
.append(buf
, sizeof(buf
));
170 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
172 WatchNotifyTestCtx ctx
;
173 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
174 std::list
<obj_watch_t
> watches
;
175 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
176 ASSERT_EQ(watches
.size(), 1u);
178 ASSERT_EQ(0, ioctx
.notify("foo", 0, bl2
));
181 ioctx
.unwatch("foo", handle
);
185 TEST_F(LibRadosWatchNotifyEC
, WatchNotify
) {
186 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
188 memset(buf
, 0xcc, sizeof(buf
));
189 ASSERT_EQ(0, rados_write(ioctx
, "foo", buf
, sizeof(buf
), 0));
192 rados_watch(ioctx
, "foo", 0, &handle
, watch_notify_test_cb
, NULL
));
193 ASSERT_EQ(0, rados_notify(ioctx
, "foo", 0, NULL
, 0));
196 rados_unwatch(ioctx
, "foo", handle
);
200 TEST_F(LibRadosWatchNotifyECPP
, WatchNotify
) {
201 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
203 memset(buf
, 0xcc, sizeof(buf
));
205 bl1
.append(buf
, sizeof(buf
));
206 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
208 WatchNotifyTestCtx ctx
;
209 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
210 std::list
<obj_watch_t
> watches
;
211 ASSERT_EQ(0, ioctx
.list_watchers("foo", &watches
));
212 ASSERT_EQ(watches
.size(), 1u);
214 ASSERT_EQ(0, ioctx
.notify("foo", 0, bl2
));
217 ioctx
.unwatch("foo", handle
);
223 TEST_P(LibRadosWatchNotifyPP
, WatchNotifyTimeout
) {
224 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
225 ioctx
.set_notify_timeout(1);
227 WatchNotifyTestCtx ctx
;
230 memset(buf
, 0xcc, sizeof(buf
));
232 bl1
.append(buf
, sizeof(buf
));
233 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
235 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
237 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
240 TEST_F(LibRadosWatchNotifyECPP
, WatchNotifyTimeout
) {
241 ASSERT_NE(SEM_FAILED
, (sem
= sem_open("/test_watch_notify_sem", O_CREAT
, 0644, 0)));
242 ioctx
.set_notify_timeout(1);
244 WatchNotifyTestCtx ctx
;
247 memset(buf
, 0xcc, sizeof(buf
));
249 bl1
.append(buf
, sizeof(buf
));
250 ASSERT_EQ(0, ioctx
.write("foo", bl1
, sizeof(buf
), 0));
252 ASSERT_EQ(0, ioctx
.watch("foo", 0, &handle
, &ctx
));
254 ASSERT_EQ(0, ioctx
.unwatch("foo", handle
));
257 #pragma GCC diagnostic pop
258 #pragma GCC diagnostic warning "-Wpragmas"
263 TEST_F(LibRadosWatchNotify
, Watch2Delete
) {
268 memset(buf
, 0xcc, sizeof(buf
));
269 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
272 rados_watch2(ioctx
, notify_oid
, &handle
,
273 watch_notify2_test_cb
,
274 watch_notify2_test_errcb
, this));
275 ASSERT_EQ(0, rados_remove(ioctx
, notify_oid
));
277 std::cout
<< "waiting up to " << left
<< " for disconnect notification ..."
279 while (notify_err
== 0 && --left
) {
282 ASSERT_TRUE(left
> 0);
283 ASSERT_EQ(-ENOTCONN
, notify_err
);
284 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
285 rados_unwatch2(ioctx
, handle
);
286 rados_watch_flush(cluster
);
289 TEST_F(LibRadosWatchNotify
, AioWatchDelete
) {
294 memset(buf
, 0xcc, sizeof(buf
));
295 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
298 rados_completion_t comp
;
300 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
301 rados_aio_watch(ioctx
, notify_oid
, comp
, &handle
,
302 watch_notify2_test_cb
, watch_notify2_test_errcb
, this);
303 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
304 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
305 rados_aio_release(comp
);
306 ASSERT_EQ(0, rados_remove(ioctx
, notify_oid
));
308 std::cout
<< "waiting up to " << left
<< " for disconnect notification ..."
310 while (notify_err
== 0 && --left
) {
313 ASSERT_TRUE(left
> 0);
314 ASSERT_EQ(-ENOTCONN
, notify_err
);
315 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
316 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
317 rados_aio_unwatch(ioctx
, handle
, comp
);
318 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
319 ASSERT_EQ(-ENOENT
, rados_aio_get_return_value(comp
));
320 rados_aio_release(comp
);
325 TEST_F(LibRadosWatchNotify
, WatchNotify2
) {
328 notify_cookies
.clear();
330 memset(buf
, 0xcc, sizeof(buf
));
331 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
334 rados_watch2(ioctx
, notify_oid
, &handle
,
335 watch_notify2_test_cb
,
336 watch_notify2_test_errcb
, this));
337 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
339 size_t reply_buf_len
;
340 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
342 &reply_buf
, &reply_buf_len
));
344 reply
.append(reply_buf
, reply_buf_len
);
345 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
346 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
347 bufferlist::iterator reply_p
= reply
.begin();
348 ::decode(reply_map
, reply_p
);
349 ::decode(missed_map
, reply_p
);
350 ASSERT_EQ(1u, reply_map
.size());
351 ASSERT_EQ(0u, missed_map
.size());
352 ASSERT_EQ(1u, notify_cookies
.size());
353 ASSERT_EQ(1u, notify_cookies
.count(handle
));
354 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
355 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
356 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
357 rados_buffer_free(reply_buf
);
359 // try it on a non-existent object ... our buffer pointers
360 // should get zeroed.
361 ASSERT_EQ(-ENOENT
, rados_notify2(ioctx
, "doesnotexist",
363 &reply_buf
, &reply_buf_len
));
364 ASSERT_EQ((char*)0, reply_buf
);
365 ASSERT_EQ(0u, reply_buf_len
);
367 rados_unwatch2(ioctx
, handle
);
368 rados_watch_flush(cluster
);
371 TEST_F(LibRadosWatchNotify
, AioWatchNotify2
) {
374 notify_cookies
.clear();
376 memset(buf
, 0xcc, sizeof(buf
));
377 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
379 rados_completion_t comp
;
381 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
382 rados_aio_watch(ioctx
, notify_oid
, comp
, &handle
,
383 watch_notify2_test_cb
, watch_notify2_test_errcb
, this);
384 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
385 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
386 rados_aio_release(comp
);
388 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
390 size_t reply_buf_len
;
391 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
393 &reply_buf
, &reply_buf_len
));
395 reply
.append(reply_buf
, reply_buf_len
);
396 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
397 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
398 bufferlist::iterator reply_p
= reply
.begin();
399 ::decode(reply_map
, reply_p
);
400 ::decode(missed_map
, reply_p
);
401 ASSERT_EQ(1u, reply_map
.size());
402 ASSERT_EQ(0u, missed_map
.size());
403 ASSERT_EQ(1u, notify_cookies
.size());
404 ASSERT_EQ(1u, notify_cookies
.count(handle
));
405 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
406 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
407 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
408 rados_buffer_free(reply_buf
);
410 // try it on a non-existent object ... our buffer pointers
411 // should get zeroed.
412 ASSERT_EQ(-ENOENT
, rados_notify2(ioctx
, "doesnotexist",
414 &reply_buf
, &reply_buf_len
));
415 ASSERT_EQ((char*)0, reply_buf
);
416 ASSERT_EQ(0u, reply_buf_len
);
418 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
419 rados_aio_unwatch(ioctx
, handle
, comp
);
420 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
421 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
422 rados_aio_release(comp
);
425 TEST_F(LibRadosWatchNotify
, AioNotify
) {
428 notify_cookies
.clear();
430 memset(buf
, 0xcc, sizeof(buf
));
431 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
434 rados_watch2(ioctx
, notify_oid
, &handle
,
435 watch_notify2_test_cb
,
436 watch_notify2_test_errcb
, this));
437 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
439 size_t reply_buf_len
;
440 rados_completion_t comp
;
441 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
442 ASSERT_EQ(0, rados_aio_notify(ioctx
, "foo", comp
, "notify", 6, 300000,
443 &reply_buf
, &reply_buf_len
));
444 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
445 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
446 rados_aio_release(comp
);
449 reply
.append(reply_buf
, reply_buf_len
);
450 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
451 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
452 bufferlist::iterator reply_p
= reply
.begin();
453 ::decode(reply_map
, reply_p
);
454 ::decode(missed_map
, reply_p
);
455 ASSERT_EQ(1u, reply_map
.size());
456 ASSERT_EQ(0u, missed_map
.size());
457 ASSERT_EQ(1u, notify_cookies
.size());
458 ASSERT_EQ(1u, notify_cookies
.count(handle
));
459 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
460 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
461 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
462 rados_buffer_free(reply_buf
);
464 // try it on a non-existent object ... our buffer pointers
465 // should get zeroed.
466 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
467 ASSERT_EQ(0, rados_aio_notify(ioctx
, "doesnotexist", comp
, "notify", 6,
468 300000, &reply_buf
, &reply_buf_len
));
469 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
470 ASSERT_EQ(-ENOENT
, rados_aio_get_return_value(comp
));
471 rados_aio_release(comp
);
472 ASSERT_EQ((char*)0, reply_buf
);
473 ASSERT_EQ(0u, reply_buf_len
);
475 rados_unwatch2(ioctx
, handle
);
476 rados_watch_flush(cluster
);
479 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2
) {
481 notify_ioctx
= &ioctx
;
482 notify_cookies
.clear();
484 memset(buf
, 0xcc, sizeof(buf
));
486 bl1
.append(buf
, sizeof(buf
));
487 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
489 WatchNotifyTestCtx2
ctx(this);
490 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
491 ASSERT_GT(ioctx
.watch_check(handle
), 0);
492 std::list
<obj_watch_t
> watches
;
493 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
494 ASSERT_EQ(watches
.size(), 1u);
495 bufferlist bl2
, bl_reply
;
496 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
497 bufferlist::iterator p
= bl_reply
.begin();
498 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
499 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
500 ::decode(reply_map
, p
);
501 ::decode(missed_map
, p
);
502 ASSERT_EQ(1u, notify_cookies
.size());
503 ASSERT_EQ(1u, notify_cookies
.count(handle
));
504 ASSERT_EQ(1u, reply_map
.size());
505 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
506 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
507 ASSERT_EQ(0u, missed_map
.size());
508 ASSERT_GT(ioctx
.watch_check(handle
), 0);
509 ioctx
.unwatch2(handle
);
512 TEST_P(LibRadosWatchNotifyPP
, AioWatchNotify2
) {
514 notify_ioctx
= &ioctx
;
515 notify_cookies
.clear();
517 memset(buf
, 0xcc, sizeof(buf
));
519 bl1
.append(buf
, sizeof(buf
));
520 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
523 WatchNotifyTestCtx2
ctx(this);
524 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
525 ASSERT_EQ(0, ioctx
.aio_watch(notify_oid
, comp
, &handle
, &ctx
));
526 ASSERT_EQ(0, comp
->wait_for_complete());
527 ASSERT_EQ(0, comp
->get_return_value());
530 ASSERT_GT(ioctx
.watch_check(handle
), 0);
531 std::list
<obj_watch_t
> watches
;
532 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
533 ASSERT_EQ(watches
.size(), 1u);
534 bufferlist bl2
, bl_reply
;
535 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
536 bufferlist::iterator p
= bl_reply
.begin();
537 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
538 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
539 ::decode(reply_map
, p
);
540 ::decode(missed_map
, p
);
541 ASSERT_EQ(1u, notify_cookies
.size());
542 ASSERT_EQ(1u, notify_cookies
.count(handle
));
543 ASSERT_EQ(1u, reply_map
.size());
544 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
545 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
546 ASSERT_EQ(0u, missed_map
.size());
547 ASSERT_GT(ioctx
.watch_check(handle
), 0);
549 comp
= cluster
.aio_create_completion();
550 ioctx
.aio_unwatch(handle
, comp
);
551 ASSERT_EQ(0, comp
->wait_for_complete());
556 TEST_P(LibRadosWatchNotifyPP
, AioNotify
) {
558 notify_ioctx
= &ioctx
;
559 notify_cookies
.clear();
561 memset(buf
, 0xcc, sizeof(buf
));
563 bl1
.append(buf
, sizeof(buf
));
564 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
566 WatchNotifyTestCtx2
ctx(this);
567 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
568 ASSERT_GT(ioctx
.watch_check(handle
), 0);
569 std::list
<obj_watch_t
> watches
;
570 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
571 ASSERT_EQ(watches
.size(), 1u);
572 bufferlist bl2
, bl_reply
;
573 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
574 ASSERT_EQ(0, ioctx
.aio_notify(notify_oid
, comp
, bl2
, 300000, &bl_reply
));
575 ASSERT_EQ(0, comp
->wait_for_complete());
576 ASSERT_EQ(0, comp
->get_return_value());
578 bufferlist::iterator p
= bl_reply
.begin();
579 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
580 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
581 ::decode(reply_map
, p
);
582 ::decode(missed_map
, p
);
583 ASSERT_EQ(1u, notify_cookies
.size());
584 ASSERT_EQ(1u, notify_cookies
.count(handle
));
585 ASSERT_EQ(1u, reply_map
.size());
586 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
587 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
588 ASSERT_EQ(0u, missed_map
.size());
589 ASSERT_GT(ioctx
.watch_check(handle
), 0);
590 ioctx
.unwatch2(handle
);
591 cluster
.watch_flush();
596 TEST_F(LibRadosWatchNotify
, WatchNotify2Multi
) {
599 notify_cookies
.clear();
601 memset(buf
, 0xcc, sizeof(buf
));
602 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
603 uint64_t handle1
, handle2
;
605 rados_watch2(ioctx
, notify_oid
, &handle1
,
606 watch_notify2_test_cb
,
607 watch_notify2_test_errcb
, this));
609 rados_watch2(ioctx
, notify_oid
, &handle2
,
610 watch_notify2_test_cb
,
611 watch_notify2_test_errcb
, this));
612 ASSERT_GT(rados_watch_check(ioctx
, handle1
), 0);
613 ASSERT_GT(rados_watch_check(ioctx
, handle2
), 0);
614 ASSERT_NE(handle1
, handle2
);
616 size_t reply_buf_len
;
617 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
619 &reply_buf
, &reply_buf_len
));
621 reply
.append(reply_buf
, reply_buf_len
);
622 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
623 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
624 bufferlist::iterator reply_p
= reply
.begin();
625 ::decode(reply_map
, reply_p
);
626 ::decode(missed_map
, reply_p
);
627 ASSERT_EQ(2u, reply_map
.size());
628 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
629 ASSERT_EQ(0u, missed_map
.size());
630 ASSERT_EQ(2u, notify_cookies
.size());
631 ASSERT_EQ(1u, notify_cookies
.count(handle1
));
632 ASSERT_EQ(1u, notify_cookies
.count(handle2
));
633 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
634 ASSERT_GT(rados_watch_check(ioctx
, handle1
), 0);
635 ASSERT_GT(rados_watch_check(ioctx
, handle2
), 0);
636 rados_buffer_free(reply_buf
);
637 rados_unwatch2(ioctx
, handle1
);
638 rados_unwatch2(ioctx
, handle2
);
639 rados_watch_flush(cluster
);
644 TEST_F(LibRadosWatchNotify
, WatchNotify2Timeout
) {
647 notify_sleep
= 3; // 3s
648 notify_cookies
.clear();
650 memset(buf
, 0xcc, sizeof(buf
));
651 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
654 rados_watch2(ioctx
, notify_oid
, &handle
,
655 watch_notify2_test_cb
,
656 watch_notify2_test_errcb
, this));
657 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
659 size_t reply_buf_len
;
660 ASSERT_EQ(-ETIMEDOUT
, rados_notify2(ioctx
, notify_oid
,
661 "notify", 6, 1000, // 1s
662 &reply_buf
, &reply_buf_len
));
663 ASSERT_EQ(1u, notify_cookies
.size());
666 reply
.append(reply_buf
, reply_buf_len
);
667 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
668 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
669 bufferlist::iterator reply_p
= reply
.begin();
670 ::decode(reply_map
, reply_p
);
671 ::decode(missed_map
, reply_p
);
672 ASSERT_EQ(0u, reply_map
.size());
673 ASSERT_EQ(1u, missed_map
.size());
675 rados_buffer_free(reply_buf
);
677 // we should get the next notify, though!
679 notify_cookies
.clear();
680 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
681 "notify", 6, 300000, // 300s
682 &reply_buf
, &reply_buf_len
));
683 ASSERT_EQ(1u, notify_cookies
.size());
684 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
686 rados_unwatch2(ioctx
, handle
);
688 rados_completion_t comp
;
689 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
690 rados_aio_watch_flush(cluster
, comp
);
691 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
692 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
693 rados_aio_release(comp
);
694 rados_buffer_free(reply_buf
);
698 TEST_P(LibRadosWatchNotifyPP
, WatchNotify2Timeout
) {
700 notify_ioctx
= &ioctx
;
701 notify_sleep
= 3; // 3s
702 notify_cookies
.clear();
704 memset(buf
, 0xcc, sizeof(buf
));
706 bl1
.append(buf
, sizeof(buf
));
707 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
709 WatchNotifyTestCtx2
ctx(this);
710 ASSERT_EQ(0, ioctx
.watch2(notify_oid
, &handle
, &ctx
));
711 ASSERT_GT(ioctx
.watch_check(handle
), 0);
712 std::list
<obj_watch_t
> watches
;
713 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
714 ASSERT_EQ(watches
.size(), 1u);
715 ASSERT_EQ(0u, notify_cookies
.size());
716 bufferlist bl2
, bl_reply
;
717 std::cout
<< " trying..." << std::endl
;
718 ASSERT_EQ(-ETIMEDOUT
, ioctx
.notify2(notify_oid
, bl2
, 1000 /* 1s */,
720 std::cout
<< " timed out" << std::endl
;
721 ASSERT_GT(ioctx
.watch_check(handle
), 0);
722 ioctx
.unwatch2(handle
);
724 std::cout
<< " flushing" << std::endl
;
725 librados::AioCompletion
*comp
= cluster
.aio_create_completion();
726 cluster
.aio_watch_flush(comp
);
727 ASSERT_EQ(0, comp
->wait_for_complete());
728 ASSERT_EQ(0, comp
->get_return_value());
729 std::cout
<< " flushed" << std::endl
;
733 TEST_P(LibRadosWatchNotifyPP
, WatchNotify3
) {
735 notify_ioctx
= &ioctx
;
736 notify_cookies
.clear();
737 uint32_t timeout
= 12; // configured timeout
739 memset(buf
, 0xcc, sizeof(buf
));
741 bl1
.append(buf
, sizeof(buf
));
742 ASSERT_EQ(0, ioctx
.write(notify_oid
, bl1
, sizeof(buf
), 0));
744 WatchNotifyTestCtx2
ctx(this);
745 ASSERT_EQ(0, ioctx
.watch3(notify_oid
, &handle
, &ctx
, timeout
));
746 ASSERT_GT(ioctx
.watch_check(handle
), 0);
747 std::list
<obj_watch_t
> watches
;
748 ASSERT_EQ(0, ioctx
.list_watchers(notify_oid
, &watches
));
749 ASSERT_EQ(watches
.size(), 1u);
750 std::cout
<< "List watches" << std::endl
;
751 for (std::list
<obj_watch_t
>::iterator it
= watches
.begin();
752 it
!= watches
.end(); ++it
) {
753 ASSERT_EQ(it
->timeout_seconds
, timeout
);
755 bufferlist bl2
, bl_reply
;
756 std::cout
<< "notify2" << std::endl
;
757 ASSERT_EQ(0, ioctx
.notify2(notify_oid
, bl2
, 300000, &bl_reply
));
758 std::cout
<< "notify2 done" << std::endl
;
759 bufferlist::iterator p
= bl_reply
.begin();
760 std::map
<std::pair
<uint64_t,uint64_t>,bufferlist
> reply_map
;
761 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
762 ::decode(reply_map
, p
);
763 ::decode(missed_map
, p
);
764 ASSERT_EQ(1u, notify_cookies
.size());
765 ASSERT_EQ(1u, notify_cookies
.count(handle
));
766 ASSERT_EQ(1u, reply_map
.size());
767 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
768 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
769 ASSERT_EQ(0u, missed_map
.size());
770 std::cout
<< "watch_check" << std::endl
;
771 ASSERT_GT(ioctx
.watch_check(handle
), 0);
772 std::cout
<< "unwatch2" << std::endl
;
773 ioctx
.unwatch2(handle
);
775 std::cout
<< " flushing" << std::endl
;
776 cluster
.watch_flush();
777 std::cout
<< "done" << std::endl
;
780 TEST_F(LibRadosWatchNotify
, Watch3Timeout
) {
783 notify_cookies
.clear();
786 memset(buf
, 0xcc, sizeof(buf
));
787 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
789 time_t start
= time(0);
790 const uint32_t timeout
= 4;
792 // make sure i timeout before the messenger reconnects to the OSD,
793 // it will resend a watch request on behalf of the client, and the
794 // timer of timeout on OSD side will be reset by the new request.
796 ASSERT_EQ(0, rados_conf_get(cluster
,
797 "ms_tcp_read_timeout",
798 conf
, sizeof(conf
)));
799 auto tcp_read_timeout
= std::stoll(conf
);
800 ASSERT_LT(timeout
, tcp_read_timeout
);
803 rados_watch3(ioctx
, notify_oid
, &handle
,
804 watch_notify2_test_cb
, watch_notify2_test_errcb
,
806 int age
= rados_watch_check(ioctx
, handle
);
807 time_t age_bound
= time(0) + 1 - start
;
808 ASSERT_LT(age
, age_bound
* 1000);
810 rados_conf_set(cluster
, "objecter_inject_no_watch_ping", "true");
811 // allow a long time here since an osd peering event will renew our
813 int left
= 16 * timeout
;
814 std::cout
<< "waiting up to " << left
<< " for osd to time us out ..."
816 while (notify_err
== 0 && --left
) {
820 rados_conf_set(cluster
, "objecter_inject_no_watch_ping", "false");
821 ASSERT_EQ(-ENOTCONN
, notify_err
);
822 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
824 // a subsequent notify should not reach us
825 char *reply_buf
= nullptr;
826 size_t reply_buf_len
;
827 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
829 &reply_buf
, &reply_buf_len
));
832 reply
.append(reply_buf
, reply_buf_len
);
833 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
834 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
835 bufferlist::iterator reply_p
= reply
.begin();
836 ::decode(reply_map
, reply_p
);
837 ::decode(missed_map
, reply_p
);
838 ASSERT_EQ(0u, reply_map
.size());
839 ASSERT_EQ(0u, missed_map
.size());
841 ASSERT_EQ(0u, notify_cookies
.size());
842 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
843 rados_buffer_free(reply_buf
);
846 rados_unwatch2(ioctx
, handle
);
847 rados_watch_flush(cluster
);
851 rados_watch2(ioctx
, notify_oid
, &handle
,
852 watch_notify2_test_cb
,
853 watch_notify2_test_errcb
, this));
854 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
856 // and now a notify will work.
857 ASSERT_EQ(0, rados_notify2(ioctx
, notify_oid
,
859 &reply_buf
, &reply_buf_len
));
862 reply
.append(reply_buf
, reply_buf_len
);
863 std::map
<std::pair
<uint64_t,uint64_t>, bufferlist
> reply_map
;
864 std::set
<std::pair
<uint64_t,uint64_t> > missed_map
;
865 bufferlist::iterator reply_p
= reply
.begin();
866 ::decode(reply_map
, reply_p
);
867 ::decode(missed_map
, reply_p
);
868 ASSERT_EQ(1u, reply_map
.size());
869 ASSERT_EQ(0u, missed_map
.size());
870 ASSERT_EQ(1u, notify_cookies
.count(handle
));
871 ASSERT_EQ(5u, reply_map
.begin()->second
.length());
872 ASSERT_EQ(0, strncmp("reply", reply_map
.begin()->second
.c_str(), 5));
874 ASSERT_EQ(1u, notify_cookies
.size());
875 ASSERT_GT(rados_watch_check(ioctx
, handle
), 0);
877 rados_buffer_free(reply_buf
);
878 rados_unwatch2(ioctx
, handle
);
879 rados_watch_flush(cluster
);
882 TEST_F(LibRadosWatchNotify
, AioWatchDelete2
) {
887 uint32_t timeout
= 3;
888 memset(buf
, 0xcc, sizeof(buf
));
889 ASSERT_EQ(0, rados_write(ioctx
, notify_oid
, buf
, sizeof(buf
), 0));
892 rados_completion_t comp
;
894 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
895 rados_aio_watch2(ioctx
, notify_oid
, comp
, &handle
,
896 watch_notify2_test_cb
, watch_notify2_test_errcb
, timeout
, this);
897 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
898 ASSERT_EQ(0, rados_aio_get_return_value(comp
));
899 rados_aio_release(comp
);
900 ASSERT_EQ(0, rados_remove(ioctx
, notify_oid
));
902 std::cout
<< "waiting up to " << left
<< " for disconnect notification ..."
904 while (notify_err
== 0 && --left
) {
907 ASSERT_TRUE(left
> 0);
908 ASSERT_EQ(-ENOTCONN
, notify_err
);
909 ASSERT_EQ(-ENOTCONN
, rados_watch_check(ioctx
, handle
));
910 ASSERT_EQ(0, rados_aio_create_completion(NULL
, NULL
, NULL
, &comp
));
911 rados_aio_unwatch(ioctx
, handle
, comp
);
912 ASSERT_EQ(0, rados_aio_wait_for_complete(comp
));
913 ASSERT_EQ(-ENOENT
, rados_aio_get_return_value(comp
));
914 rados_aio_release(comp
);
918 INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests
, LibRadosWatchNotifyPP
,
919 ::testing::Values("", "cache"));