]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/librados/watch_notify.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / test / librados / watch_notify.cc
1 #include "include/rados/librados.h"
2 #include "include/rados/librados.hpp"
3 #include "include/rados/rados_types.h"
4 #include "test/librados/test.h"
5 #include "test/librados/TestCase.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <semaphore.h>
10 #include "gtest/gtest.h"
11 #include "include/encoding.h"
12 #include <set>
13 #include <map>
14
15 using namespace librados;
16
17 typedef RadosTestEC LibRadosWatchNotifyEC;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP;
19
20 int notify_sleep = 0;
21
22 // notify
23 static sem_t *sem;
24
25 static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg)
26 {
27 std::cout << __func__ << std::endl;
28 sem_post(sem);
29 }
30
31 class WatchNotifyTestCtx : public WatchCtx
32 {
33 public:
34 void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) override
35 {
36 std::cout << __func__ << std::endl;
37 sem_post(sem);
38 }
39 };
40
41 class LibRadosWatchNotify : public RadosTest
42 {
43 protected:
44 // notify 2
45 bufferlist notify_bl;
46 std::set<uint64_t> notify_cookies;
47 rados_ioctx_t notify_io;
48 const char *notify_oid = nullptr;
49 int notify_err = 0;
50
51 static void watch_notify2_test_cb(void *arg,
52 uint64_t notify_id,
53 uint64_t cookie,
54 uint64_t notifier_gid,
55 void *data,
56 size_t data_len);
57 static void watch_notify2_test_errcb(void *arg, uint64_t cookie, int err);
58 };
59
60
61 void LibRadosWatchNotify::watch_notify2_test_cb(void *arg,
62 uint64_t notify_id,
63 uint64_t cookie,
64 uint64_t notifier_gid,
65 void *data,
66 size_t data_len)
67 {
68 std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
69 << " cookie " << cookie << std::endl;
70 assert(notifier_gid > 0);
71 auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
72 assert(thiz);
73 thiz->notify_cookies.insert(cookie);
74 thiz->notify_bl.clear();
75 thiz->notify_bl.append((char*)data, data_len);
76 if (notify_sleep)
77 sleep(notify_sleep);
78 rados_notify_ack(thiz->notify_io, thiz->notify_oid, notify_id, cookie,
79 "reply", 5);
80 }
81
82 void LibRadosWatchNotify::watch_notify2_test_errcb(void *arg,
83 uint64_t cookie,
84 int err)
85 {
86 std::cout << __func__ << " cookie " << cookie << " err " << err << std::endl;
87 assert(cookie > 1000);
88 auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
89 assert(thiz);
90 thiz->notify_err = err;
91 }
92
93 class WatchNotifyTestCtx2;
94 class LibRadosWatchNotifyPP : public RadosTestParamPP
95 {
96 protected:
97 bufferlist notify_bl;
98 std::set<uint64_t> notify_cookies;
99 rados_ioctx_t notify_io;
100 const char *notify_oid = nullptr;
101 int notify_err = 0;
102
103 friend class WatchNotifyTestCtx2;
104 };
105
106 IoCtx *notify_ioctx;
107
108 class WatchNotifyTestCtx2 : public WatchCtx2
109 {
110 LibRadosWatchNotifyPP *notify;
111
112 public:
113 WatchNotifyTestCtx2(LibRadosWatchNotifyPP *notify)
114 : notify(notify)
115 {}
116
117 void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
118 bufferlist& bl) override {
119 std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
120 << " notifier_gid " << notifier_gid << std::endl;
121 notify->notify_bl = bl;
122 notify->notify_cookies.insert(cookie);
123 bufferlist reply;
124 reply.append("reply", 5);
125 if (notify_sleep)
126 sleep(notify_sleep);
127 notify_ioctx->notify_ack(notify->notify_oid, notify_id, cookie, reply);
128 }
129
130 void handle_error(uint64_t cookie, int err) override {
131 std::cout << __func__ << " cookie " << cookie
132 << " err " << err << std::endl;
133 assert(cookie > 1000);
134 notify->notify_err = err;
135 }
136 };
137
138 // --
139
140 #pragma GCC diagnostic ignored "-Wpragmas"
141 #pragma GCC diagnostic push
142 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
143
144 TEST_F(LibRadosWatchNotify, WatchNotify) {
145 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
146 char buf[128];
147 memset(buf, 0xcc, sizeof(buf));
148 ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
149 uint64_t handle;
150 ASSERT_EQ(0,
151 rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
152 ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
153 TestAlarm alarm;
154 sem_wait(sem);
155 rados_unwatch(ioctx, "foo", handle);
156
157 // when dne ...
158 ASSERT_EQ(-ENOENT,
159 rados_watch(ioctx, "dne", 0, &handle, watch_notify_test_cb, NULL));
160
161 sem_close(sem);
162 }
163
164 TEST_P(LibRadosWatchNotifyPP, WatchNotify) {
165 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
166 char buf[128];
167 memset(buf, 0xcc, sizeof(buf));
168 bufferlist bl1;
169 bl1.append(buf, sizeof(buf));
170 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
171 uint64_t handle;
172 WatchNotifyTestCtx ctx;
173 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
174 std::list<obj_watch_t> watches;
175 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
176 ASSERT_EQ(watches.size(), 1u);
177 bufferlist bl2;
178 ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
179 TestAlarm alarm;
180 sem_wait(sem);
181 ioctx.unwatch("foo", handle);
182 sem_close(sem);
183 }
184
185 TEST_F(LibRadosWatchNotifyEC, WatchNotify) {
186 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
187 char buf[128];
188 memset(buf, 0xcc, sizeof(buf));
189 ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
190 uint64_t handle;
191 ASSERT_EQ(0,
192 rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
193 ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
194 TestAlarm alarm;
195 sem_wait(sem);
196 rados_unwatch(ioctx, "foo", handle);
197 sem_close(sem);
198 }
199
200 TEST_F(LibRadosWatchNotifyECPP, WatchNotify) {
201 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
202 char buf[128];
203 memset(buf, 0xcc, sizeof(buf));
204 bufferlist bl1;
205 bl1.append(buf, sizeof(buf));
206 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
207 uint64_t handle;
208 WatchNotifyTestCtx ctx;
209 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
210 std::list<obj_watch_t> watches;
211 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
212 ASSERT_EQ(watches.size(), 1u);
213 bufferlist bl2;
214 ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
215 TestAlarm alarm;
216 sem_wait(sem);
217 ioctx.unwatch("foo", handle);
218 sem_close(sem);
219 }
220
221 // --
222
223 TEST_P(LibRadosWatchNotifyPP, WatchNotifyTimeout) {
224 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
225 ioctx.set_notify_timeout(1);
226 uint64_t handle;
227 WatchNotifyTestCtx ctx;
228
229 char buf[128];
230 memset(buf, 0xcc, sizeof(buf));
231 bufferlist bl1;
232 bl1.append(buf, sizeof(buf));
233 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
234
235 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
236 sem_close(sem);
237 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
238 }
239
240 TEST_F(LibRadosWatchNotifyECPP, WatchNotifyTimeout) {
241 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
242 ioctx.set_notify_timeout(1);
243 uint64_t handle;
244 WatchNotifyTestCtx ctx;
245
246 char buf[128];
247 memset(buf, 0xcc, sizeof(buf));
248 bufferlist bl1;
249 bl1.append(buf, sizeof(buf));
250 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
251
252 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
253 sem_close(sem);
254 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
255 }
256
257 #pragma GCC diagnostic pop
258 #pragma GCC diagnostic warning "-Wpragmas"
259
260
261 // --
262
263 TEST_F(LibRadosWatchNotify, Watch2Delete) {
264 notify_io = ioctx;
265 notify_oid = "foo";
266 notify_err = 0;
267 char buf[128];
268 memset(buf, 0xcc, sizeof(buf));
269 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
270 uint64_t handle;
271 ASSERT_EQ(0,
272 rados_watch2(ioctx, notify_oid, &handle,
273 watch_notify2_test_cb,
274 watch_notify2_test_errcb, this));
275 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
276 int left = 300;
277 std::cout << "waiting up to " << left << " for disconnect notification ..."
278 << std::endl;
279 while (notify_err == 0 && --left) {
280 sleep(1);
281 }
282 ASSERT_TRUE(left > 0);
283 ASSERT_EQ(-ENOTCONN, notify_err);
284 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
285 rados_unwatch2(ioctx, handle);
286 }
287
288 TEST_F(LibRadosWatchNotify, AioWatchDelete) {
289 notify_io = ioctx;
290 notify_oid = "foo";
291 notify_err = 0;
292 char buf[128];
293 memset(buf, 0xcc, sizeof(buf));
294 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
295
296
297 rados_completion_t comp;
298 uint64_t handle;
299 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
300 rados_aio_watch(ioctx, notify_oid, comp, &handle,
301 watch_notify2_test_cb, watch_notify2_test_errcb, this);
302 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
303 ASSERT_EQ(0, rados_aio_get_return_value(comp));
304 rados_aio_release(comp);
305 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
306 int left = 300;
307 std::cout << "waiting up to " << left << " for disconnect notification ..."
308 << std::endl;
309 while (notify_err == 0 && --left) {
310 sleep(1);
311 }
312 ASSERT_TRUE(left > 0);
313 ASSERT_EQ(-ENOTCONN, notify_err);
314 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
315 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
316 rados_aio_unwatch(ioctx, handle, comp);
317 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
318 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
319 rados_aio_release(comp);
320 }
321
322 // --
323
324 TEST_F(LibRadosWatchNotify, WatchNotify2) {
325 notify_io = ioctx;
326 notify_oid = "foo";
327 notify_cookies.clear();
328 char buf[128];
329 memset(buf, 0xcc, sizeof(buf));
330 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
331 uint64_t handle;
332 ASSERT_EQ(0,
333 rados_watch2(ioctx, notify_oid, &handle,
334 watch_notify2_test_cb,
335 watch_notify2_test_errcb, this));
336 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
337 char *reply_buf = 0;
338 size_t reply_buf_len;
339 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
340 "notify", 6, 300000,
341 &reply_buf, &reply_buf_len));
342 bufferlist reply;
343 reply.append(reply_buf, reply_buf_len);
344 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
345 std::set<std::pair<uint64_t,uint64_t> > missed_map;
346 bufferlist::iterator reply_p = reply.begin();
347 ::decode(reply_map, reply_p);
348 ::decode(missed_map, reply_p);
349 ASSERT_EQ(1u, reply_map.size());
350 ASSERT_EQ(0u, missed_map.size());
351 ASSERT_EQ(1u, notify_cookies.size());
352 ASSERT_EQ(1u, notify_cookies.count(handle));
353 ASSERT_EQ(5u, reply_map.begin()->second.length());
354 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
355 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
356 rados_buffer_free(reply_buf);
357
358 // try it on a non-existent object ... our buffer pointers
359 // should get zeroed.
360 ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
361 "notify", 6, 300000,
362 &reply_buf, &reply_buf_len));
363 ASSERT_EQ((char*)0, reply_buf);
364 ASSERT_EQ(0u, reply_buf_len);
365
366 rados_unwatch2(ioctx, handle);
367 }
368
369 TEST_F(LibRadosWatchNotify, AioWatchNotify2) {
370 notify_io = ioctx;
371 notify_oid = "foo";
372 notify_cookies.clear();
373 char buf[128];
374 memset(buf, 0xcc, sizeof(buf));
375 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
376
377 rados_completion_t comp;
378 uint64_t handle;
379 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
380 rados_aio_watch(ioctx, notify_oid, comp, &handle,
381 watch_notify2_test_cb, watch_notify2_test_errcb, this);
382 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
383 ASSERT_EQ(0, rados_aio_get_return_value(comp));
384 rados_aio_release(comp);
385
386 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
387 char *reply_buf = 0;
388 size_t reply_buf_len;
389 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
390 "notify", 6, 300000,
391 &reply_buf, &reply_buf_len));
392 bufferlist reply;
393 reply.append(reply_buf, reply_buf_len);
394 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
395 std::set<std::pair<uint64_t,uint64_t> > missed_map;
396 bufferlist::iterator reply_p = reply.begin();
397 ::decode(reply_map, reply_p);
398 ::decode(missed_map, reply_p);
399 ASSERT_EQ(1u, reply_map.size());
400 ASSERT_EQ(0u, missed_map.size());
401 ASSERT_EQ(1u, notify_cookies.size());
402 ASSERT_EQ(1u, notify_cookies.count(handle));
403 ASSERT_EQ(5u, reply_map.begin()->second.length());
404 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
405 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
406 rados_buffer_free(reply_buf);
407
408 // try it on a non-existent object ... our buffer pointers
409 // should get zeroed.
410 ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
411 "notify", 6, 300000,
412 &reply_buf, &reply_buf_len));
413 ASSERT_EQ((char*)0, reply_buf);
414 ASSERT_EQ(0u, reply_buf_len);
415
416 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
417 rados_aio_unwatch(ioctx, handle, comp);
418 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
419 ASSERT_EQ(0, rados_aio_get_return_value(comp));
420 rados_aio_release(comp);
421 }
422
423 TEST_F(LibRadosWatchNotify, AioNotify) {
424 notify_io = ioctx;
425 notify_oid = "foo";
426 notify_cookies.clear();
427 char buf[128];
428 memset(buf, 0xcc, sizeof(buf));
429 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
430 uint64_t handle;
431 ASSERT_EQ(0,
432 rados_watch2(ioctx, notify_oid, &handle,
433 watch_notify2_test_cb,
434 watch_notify2_test_errcb, this));
435 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
436 char *reply_buf = 0;
437 size_t reply_buf_len;
438 rados_completion_t comp;
439 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
440 ASSERT_EQ(0, rados_aio_notify(ioctx, "foo", comp, "notify", 6, 300000,
441 &reply_buf, &reply_buf_len));
442 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
443 ASSERT_EQ(0, rados_aio_get_return_value(comp));
444 rados_aio_release(comp);
445
446 bufferlist reply;
447 reply.append(reply_buf, reply_buf_len);
448 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
449 std::set<std::pair<uint64_t,uint64_t> > missed_map;
450 bufferlist::iterator reply_p = reply.begin();
451 ::decode(reply_map, reply_p);
452 ::decode(missed_map, reply_p);
453 ASSERT_EQ(1u, reply_map.size());
454 ASSERT_EQ(0u, missed_map.size());
455 ASSERT_EQ(1u, notify_cookies.size());
456 ASSERT_EQ(1u, notify_cookies.count(handle));
457 ASSERT_EQ(5u, reply_map.begin()->second.length());
458 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
459 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
460 rados_buffer_free(reply_buf);
461
462 // try it on a non-existent object ... our buffer pointers
463 // should get zeroed.
464 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
465 ASSERT_EQ(0, rados_aio_notify(ioctx, "doesnotexist", comp, "notify", 6,
466 300000, &reply_buf, &reply_buf_len));
467 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
468 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
469 rados_aio_release(comp);
470 ASSERT_EQ((char*)0, reply_buf);
471 ASSERT_EQ(0u, reply_buf_len);
472
473 rados_unwatch2(ioctx, handle);
474 }
475
476 TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
477 notify_oid = "foo";
478 notify_ioctx = &ioctx;
479 notify_cookies.clear();
480 char buf[128];
481 memset(buf, 0xcc, sizeof(buf));
482 bufferlist bl1;
483 bl1.append(buf, sizeof(buf));
484 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
485 uint64_t handle;
486 WatchNotifyTestCtx2 ctx(this);
487 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
488 ASSERT_GT(ioctx.watch_check(handle), 0);
489 std::list<obj_watch_t> watches;
490 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
491 ASSERT_EQ(watches.size(), 1u);
492 bufferlist bl2, bl_reply;
493 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
494 bufferlist::iterator p = bl_reply.begin();
495 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
496 std::set<std::pair<uint64_t,uint64_t> > missed_map;
497 ::decode(reply_map, p);
498 ::decode(missed_map, p);
499 ASSERT_EQ(1u, notify_cookies.size());
500 ASSERT_EQ(1u, notify_cookies.count(handle));
501 ASSERT_EQ(1u, reply_map.size());
502 ASSERT_EQ(5u, reply_map.begin()->second.length());
503 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
504 ASSERT_EQ(0u, missed_map.size());
505 ASSERT_GT(ioctx.watch_check(handle), 0);
506 ioctx.unwatch2(handle);
507 }
508
509 TEST_P(LibRadosWatchNotifyPP, AioWatchNotify2) {
510 notify_oid = "foo";
511 notify_ioctx = &ioctx;
512 notify_cookies.clear();
513 char buf[128];
514 memset(buf, 0xcc, sizeof(buf));
515 bufferlist bl1;
516 bl1.append(buf, sizeof(buf));
517 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
518
519 uint64_t handle;
520 WatchNotifyTestCtx2 ctx(this);
521 librados::AioCompletion *comp = cluster.aio_create_completion();
522 ASSERT_EQ(0, ioctx.aio_watch(notify_oid, comp, &handle, &ctx));
523 ASSERT_EQ(0, comp->wait_for_complete());
524 ASSERT_EQ(0, comp->get_return_value());
525 comp->release();
526
527 ASSERT_GT(ioctx.watch_check(handle), 0);
528 std::list<obj_watch_t> watches;
529 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
530 ASSERT_EQ(watches.size(), 1u);
531 bufferlist bl2, bl_reply;
532 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
533 bufferlist::iterator p = bl_reply.begin();
534 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
535 std::set<std::pair<uint64_t,uint64_t> > missed_map;
536 ::decode(reply_map, p);
537 ::decode(missed_map, p);
538 ASSERT_EQ(1u, notify_cookies.size());
539 ASSERT_EQ(1u, notify_cookies.count(handle));
540 ASSERT_EQ(1u, reply_map.size());
541 ASSERT_EQ(5u, reply_map.begin()->second.length());
542 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
543 ASSERT_EQ(0u, missed_map.size());
544 ASSERT_GT(ioctx.watch_check(handle), 0);
545
546 comp = cluster.aio_create_completion();
547 ioctx.aio_unwatch(handle, comp);
548 ASSERT_EQ(0, comp->wait_for_complete());
549 comp->release();
550 }
551
552
553 TEST_P(LibRadosWatchNotifyPP, AioNotify) {
554 notify_oid = "foo";
555 notify_ioctx = &ioctx;
556 notify_cookies.clear();
557 char buf[128];
558 memset(buf, 0xcc, sizeof(buf));
559 bufferlist bl1;
560 bl1.append(buf, sizeof(buf));
561 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
562 uint64_t handle;
563 WatchNotifyTestCtx2 ctx(this);
564 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
565 ASSERT_GT(ioctx.watch_check(handle), 0);
566 std::list<obj_watch_t> watches;
567 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
568 ASSERT_EQ(watches.size(), 1u);
569 bufferlist bl2, bl_reply;
570 librados::AioCompletion *comp = cluster.aio_create_completion();
571 ASSERT_EQ(0, ioctx.aio_notify(notify_oid, comp, bl2, 300000, &bl_reply));
572 ASSERT_EQ(0, comp->wait_for_complete());
573 ASSERT_EQ(0, comp->get_return_value());
574 comp->release();
575 bufferlist::iterator p = bl_reply.begin();
576 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
577 std::set<std::pair<uint64_t,uint64_t> > missed_map;
578 ::decode(reply_map, p);
579 ::decode(missed_map, p);
580 ASSERT_EQ(1u, notify_cookies.size());
581 ASSERT_EQ(1u, notify_cookies.count(handle));
582 ASSERT_EQ(1u, reply_map.size());
583 ASSERT_EQ(5u, reply_map.begin()->second.length());
584 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
585 ASSERT_EQ(0u, missed_map.size());
586 ASSERT_GT(ioctx.watch_check(handle), 0);
587 ioctx.unwatch2(handle);
588 }
589
590 // --
591
592 TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
593 notify_io = ioctx;
594 notify_oid = "foo";
595 notify_cookies.clear();
596 char buf[128];
597 memset(buf, 0xcc, sizeof(buf));
598 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
599 uint64_t handle1, handle2;
600 ASSERT_EQ(0,
601 rados_watch2(ioctx, notify_oid, &handle1,
602 watch_notify2_test_cb,
603 watch_notify2_test_errcb, this));
604 ASSERT_EQ(0,
605 rados_watch2(ioctx, notify_oid, &handle2,
606 watch_notify2_test_cb,
607 watch_notify2_test_errcb, this));
608 ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
609 ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
610 ASSERT_NE(handle1, handle2);
611 char *reply_buf = 0;
612 size_t reply_buf_len;
613 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
614 "notify", 6, 300000,
615 &reply_buf, &reply_buf_len));
616 bufferlist reply;
617 reply.append(reply_buf, reply_buf_len);
618 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
619 std::set<std::pair<uint64_t,uint64_t> > missed_map;
620 bufferlist::iterator reply_p = reply.begin();
621 ::decode(reply_map, reply_p);
622 ::decode(missed_map, reply_p);
623 ASSERT_EQ(2u, reply_map.size());
624 ASSERT_EQ(5u, reply_map.begin()->second.length());
625 ASSERT_EQ(0u, missed_map.size());
626 ASSERT_EQ(2u, notify_cookies.size());
627 ASSERT_EQ(1u, notify_cookies.count(handle1));
628 ASSERT_EQ(1u, notify_cookies.count(handle2));
629 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
630 ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
631 ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
632 rados_buffer_free(reply_buf);
633 rados_unwatch2(ioctx, handle1);
634 rados_unwatch2(ioctx, handle2);
635 }
636
637 // --
638
639 TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
640 notify_io = ioctx;
641 notify_oid = "foo";
642 notify_sleep = 3; // 3s
643 notify_cookies.clear();
644 char buf[128];
645 memset(buf, 0xcc, sizeof(buf));
646 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
647 uint64_t handle;
648 ASSERT_EQ(0,
649 rados_watch2(ioctx, notify_oid, &handle,
650 watch_notify2_test_cb,
651 watch_notify2_test_errcb, this));
652 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
653 char *reply_buf = 0;
654 size_t reply_buf_len;
655 ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,
656 "notify", 6, 1000, // 1s
657 &reply_buf, &reply_buf_len));
658 ASSERT_EQ(1u, notify_cookies.size());
659 {
660 bufferlist reply;
661 reply.append(reply_buf, reply_buf_len);
662 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
663 std::set<std::pair<uint64_t,uint64_t> > missed_map;
664 bufferlist::iterator reply_p = reply.begin();
665 ::decode(reply_map, reply_p);
666 ::decode(missed_map, reply_p);
667 ASSERT_EQ(0u, reply_map.size());
668 ASSERT_EQ(1u, missed_map.size());
669 }
670 rados_buffer_free(reply_buf);
671
672 // we should get the next notify, though!
673 notify_sleep = 0;
674 notify_cookies.clear();
675 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
676 "notify", 6, 300000, // 300s
677 &reply_buf, &reply_buf_len));
678 ASSERT_EQ(1u, notify_cookies.size());
679 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
680
681 rados_unwatch2(ioctx, handle);
682
683 rados_completion_t comp;
684 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
685 rados_aio_watch_flush(cluster, comp);
686 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
687 ASSERT_EQ(0, rados_aio_get_return_value(comp));
688 rados_aio_release(comp);
689 rados_buffer_free(reply_buf);
690
691 }
692
693 TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
694 notify_oid = "foo";
695 notify_ioctx = &ioctx;
696 notify_sleep = 3; // 3s
697 notify_cookies.clear();
698 char buf[128];
699 memset(buf, 0xcc, sizeof(buf));
700 bufferlist bl1;
701 bl1.append(buf, sizeof(buf));
702 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
703 uint64_t handle;
704 WatchNotifyTestCtx2 ctx(this);
705 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
706 ASSERT_GT(ioctx.watch_check(handle), 0);
707 std::list<obj_watch_t> watches;
708 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
709 ASSERT_EQ(watches.size(), 1u);
710 ASSERT_EQ(0u, notify_cookies.size());
711 bufferlist bl2, bl_reply;
712 std::cout << " trying..." << std::endl;
713 ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */,
714 &bl_reply));
715 std::cout << " timed out" << std::endl;
716 ASSERT_GT(ioctx.watch_check(handle), 0);
717 ioctx.unwatch2(handle);
718
719 std::cout << " flushing" << std::endl;
720 librados::AioCompletion *comp = cluster.aio_create_completion();
721 cluster.aio_watch_flush(comp);
722 ASSERT_EQ(0, comp->wait_for_complete());
723 ASSERT_EQ(0, comp->get_return_value());
724 std::cout << " flushed" << std::endl;
725 comp->release();
726 }
727
728 TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
729 notify_oid = "foo";
730 notify_ioctx = &ioctx;
731 notify_cookies.clear();
732 uint32_t timeout = 12; // configured timeout
733 char buf[128];
734 memset(buf, 0xcc, sizeof(buf));
735 bufferlist bl1;
736 bl1.append(buf, sizeof(buf));
737 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
738 uint64_t handle;
739 WatchNotifyTestCtx2 ctx(this);
740 ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
741 ASSERT_GT(ioctx.watch_check(handle), 0);
742 std::list<obj_watch_t> watches;
743 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
744 ASSERT_EQ(watches.size(), 1u);
745 std::cout << "List watches" << std::endl;
746 for (std::list<obj_watch_t>::iterator it = watches.begin();
747 it != watches.end(); ++it) {
748 ASSERT_EQ(it->timeout_seconds, timeout);
749 }
750 bufferlist bl2, bl_reply;
751 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
752 bufferlist::iterator p = bl_reply.begin();
753 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
754 std::set<std::pair<uint64_t,uint64_t> > missed_map;
755 ::decode(reply_map, p);
756 ::decode(missed_map, p);
757 ASSERT_EQ(1u, notify_cookies.size());
758 ASSERT_EQ(1u, notify_cookies.count(handle));
759 ASSERT_EQ(1u, reply_map.size());
760 ASSERT_EQ(5u, reply_map.begin()->second.length());
761 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
762 ASSERT_EQ(0u, missed_map.size());
763 ASSERT_GT(ioctx.watch_check(handle), 0);
764 ioctx.unwatch2(handle);
765 }
766
767 TEST_F(LibRadosWatchNotify, Watch3Timeout) {
768 notify_io = ioctx;
769 notify_oid = "foo";
770 notify_cookies.clear();
771 notify_err = 0;
772 char buf[128];
773 memset(buf, 0xcc, sizeof(buf));
774 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
775 uint64_t handle;
776 time_t start = time(0);
777 const uint32_t timeout = 4;
778 {
779 // make sure i timeout before the messenger reconnects to the OSD,
780 // it will resend a watch request on behalf of the client, and the
781 // timer of timeout on OSD side will be reset by the new request.
782 char conf[128];
783 ASSERT_EQ(0, rados_conf_get(cluster,
784 "ms_tcp_read_timeout",
785 conf, sizeof(conf)));
786 auto tcp_read_timeout = std::stoll(conf);
787 ASSERT_LT(timeout, tcp_read_timeout);
788 }
789 ASSERT_EQ(0,
790 rados_watch3(ioctx, notify_oid, &handle,
791 watch_notify2_test_cb, watch_notify2_test_errcb,
792 timeout, this));
793 int age = rados_watch_check(ioctx, handle);
794 time_t age_bound = time(0) + 1 - start;
795 ASSERT_LT(age, age_bound * 1000);
796 ASSERT_GT(age, 0);
797 rados_conf_set(cluster, "objecter_inject_no_watch_ping", "true");
798 // allow a long time here since an osd peering event will renew our
799 // watch.
800 int left = 16 * timeout;
801 std::cout << "waiting up to " << left << " for osd to time us out ..."
802 << std::endl;
803 while (notify_err == 0 && --left) {
804 sleep(1);
805 }
806 ASSERT_GT(left, 0);
807 rados_conf_set(cluster, "objecter_inject_no_watch_ping", "false");
808 ASSERT_EQ(-ENOTCONN, notify_err);
809 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
810
811 // a subsequent notify should not reach us
812 char *reply_buf = nullptr;
813 size_t reply_buf_len;
814 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
815 "notify", 6, 300000,
816 &reply_buf, &reply_buf_len));
817 {
818 bufferlist reply;
819 reply.append(reply_buf, reply_buf_len);
820 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
821 std::set<std::pair<uint64_t,uint64_t> > missed_map;
822 bufferlist::iterator reply_p = reply.begin();
823 ::decode(reply_map, reply_p);
824 ::decode(missed_map, reply_p);
825 ASSERT_EQ(0u, reply_map.size());
826 ASSERT_EQ(0u, missed_map.size());
827 }
828 ASSERT_EQ(0u, notify_cookies.size());
829 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
830 rados_buffer_free(reply_buf);
831
832 // re-watch
833 rados_unwatch2(ioctx, handle);
834 handle = 0;
835 ASSERT_EQ(0,
836 rados_watch2(ioctx, notify_oid, &handle,
837 watch_notify2_test_cb,
838 watch_notify2_test_errcb, this));
839 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
840
841 // and now a notify will work.
842 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
843 "notify", 6, 300000,
844 &reply_buf, &reply_buf_len));
845 {
846 bufferlist reply;
847 reply.append(reply_buf, reply_buf_len);
848 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
849 std::set<std::pair<uint64_t,uint64_t> > missed_map;
850 bufferlist::iterator reply_p = reply.begin();
851 ::decode(reply_map, reply_p);
852 ::decode(missed_map, reply_p);
853 ASSERT_EQ(1u, reply_map.size());
854 ASSERT_EQ(0u, missed_map.size());
855 ASSERT_EQ(1u, notify_cookies.count(handle));
856 ASSERT_EQ(5u, reply_map.begin()->second.length());
857 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
858 }
859 ASSERT_EQ(1u, notify_cookies.size());
860 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
861
862 rados_buffer_free(reply_buf);
863 rados_unwatch2(ioctx, handle);
864 }
865
866 TEST_F(LibRadosWatchNotify, AioWatchDelete2) {
867 notify_io = ioctx;
868 notify_oid = "foo";
869 notify_err = 0;
870 char buf[128];
871 uint32_t timeout = 3;
872 memset(buf, 0xcc, sizeof(buf));
873 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
874
875
876 rados_completion_t comp;
877 uint64_t handle;
878 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
879 rados_aio_watch2(ioctx, notify_oid, comp, &handle,
880 watch_notify2_test_cb, watch_notify2_test_errcb, timeout, this);
881 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
882 ASSERT_EQ(0, rados_aio_get_return_value(comp));
883 rados_aio_release(comp);
884 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
885 int left = 30;
886 std::cout << "waiting up to " << left << " for disconnect notification ..."
887 << std::endl;
888 while (notify_err == 0 && --left) {
889 sleep(1);
890 }
891 ASSERT_TRUE(left > 0);
892 ASSERT_EQ(-ENOTCONN, notify_err);
893 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
894 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
895 rados_aio_unwatch(ioctx, handle, comp);
896 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
897 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
898 rados_aio_release(comp);
899 }
900 // --
901
902 INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
903 ::testing::Values("", "cache"));