]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/librados/watch_notify.cc
update sources to v12.1.2
[ceph.git] / ceph / src / test / librados / watch_notify.cc
1 #include "include/rados/librados.h"
2 #include "include/rados/librados.hpp"
3 #include "include/rados/rados_types.h"
4 #include "test/librados/test.h"
5 #include "test/librados/TestCase.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <semaphore.h>
10 #include "gtest/gtest.h"
11 #include "include/encoding.h"
12 #include <set>
13 #include <map>
14
15 using namespace librados;
16
17 typedef RadosTestEC LibRadosWatchNotifyEC;
18 typedef RadosTestECPP LibRadosWatchNotifyECPP;
19
20 int notify_sleep = 0;
21
22 // notify
23 static sem_t *sem;
24
25 static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg)
26 {
27 std::cout << __func__ << std::endl;
28 sem_post(sem);
29 }
30
31 class WatchNotifyTestCtx : public WatchCtx
32 {
33 public:
34 void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) override
35 {
36 std::cout << __func__ << std::endl;
37 sem_post(sem);
38 }
39 };
40
41 class LibRadosWatchNotify : public RadosTest
42 {
43 protected:
44 // notify 2
45 bufferlist notify_bl;
46 std::set<uint64_t> notify_cookies;
47 rados_ioctx_t notify_io;
48 const char *notify_oid = nullptr;
49 int notify_err = 0;
50
51 static void watch_notify2_test_cb(void *arg,
52 uint64_t notify_id,
53 uint64_t cookie,
54 uint64_t notifier_gid,
55 void *data,
56 size_t data_len);
57 static void watch_notify2_test_errcb(void *arg, uint64_t cookie, int err);
58 };
59
60
61 void LibRadosWatchNotify::watch_notify2_test_cb(void *arg,
62 uint64_t notify_id,
63 uint64_t cookie,
64 uint64_t notifier_gid,
65 void *data,
66 size_t data_len)
67 {
68 std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
69 << " cookie " << cookie << std::endl;
70 assert(notifier_gid > 0);
71 auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
72 assert(thiz);
73 thiz->notify_cookies.insert(cookie);
74 thiz->notify_bl.clear();
75 thiz->notify_bl.append((char*)data, data_len);
76 if (notify_sleep)
77 sleep(notify_sleep);
78 rados_notify_ack(thiz->notify_io, thiz->notify_oid, notify_id, cookie,
79 "reply", 5);
80 }
81
82 void LibRadosWatchNotify::watch_notify2_test_errcb(void *arg,
83 uint64_t cookie,
84 int err)
85 {
86 std::cout << __func__ << " cookie " << cookie << " err " << err << std::endl;
87 assert(cookie > 1000);
88 auto thiz = reinterpret_cast<LibRadosWatchNotify*>(arg);
89 assert(thiz);
90 thiz->notify_err = err;
91 }
92
93 class WatchNotifyTestCtx2;
94 class LibRadosWatchNotifyPP : public RadosTestParamPP
95 {
96 protected:
97 bufferlist notify_bl;
98 std::set<uint64_t> notify_cookies;
99 rados_ioctx_t notify_io;
100 const char *notify_oid = nullptr;
101 int notify_err = 0;
102
103 friend class WatchNotifyTestCtx2;
104 };
105
106 IoCtx *notify_ioctx;
107
108 class WatchNotifyTestCtx2 : public WatchCtx2
109 {
110 LibRadosWatchNotifyPP *notify;
111
112 public:
113 WatchNotifyTestCtx2(LibRadosWatchNotifyPP *notify)
114 : notify(notify)
115 {}
116
117 void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
118 bufferlist& bl) override {
119 std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
120 << " notifier_gid " << notifier_gid << std::endl;
121 notify->notify_bl = bl;
122 notify->notify_cookies.insert(cookie);
123 bufferlist reply;
124 reply.append("reply", 5);
125 if (notify_sleep)
126 sleep(notify_sleep);
127 notify_ioctx->notify_ack(notify->notify_oid, notify_id, cookie, reply);
128 }
129
130 void handle_error(uint64_t cookie, int err) override {
131 std::cout << __func__ << " cookie " << cookie
132 << " err " << err << std::endl;
133 assert(cookie > 1000);
134 notify->notify_err = err;
135 }
136 };
137
138 // --
139
140 #pragma GCC diagnostic ignored "-Wpragmas"
141 #pragma GCC diagnostic push
142 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
143
144 TEST_F(LibRadosWatchNotify, WatchNotify) {
145 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
146 char buf[128];
147 memset(buf, 0xcc, sizeof(buf));
148 ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
149 uint64_t handle;
150 ASSERT_EQ(0,
151 rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
152 ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
153 TestAlarm alarm;
154 sem_wait(sem);
155 rados_unwatch(ioctx, "foo", handle);
156
157 // when dne ...
158 ASSERT_EQ(-ENOENT,
159 rados_watch(ioctx, "dne", 0, &handle, watch_notify_test_cb, NULL));
160
161 sem_close(sem);
162 }
163
164 TEST_P(LibRadosWatchNotifyPP, WatchNotify) {
165 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
166 char buf[128];
167 memset(buf, 0xcc, sizeof(buf));
168 bufferlist bl1;
169 bl1.append(buf, sizeof(buf));
170 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
171 uint64_t handle;
172 WatchNotifyTestCtx ctx;
173 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
174 std::list<obj_watch_t> watches;
175 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
176 ASSERT_EQ(watches.size(), 1u);
177 bufferlist bl2;
178 ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
179 TestAlarm alarm;
180 sem_wait(sem);
181 ioctx.unwatch("foo", handle);
182 sem_close(sem);
183 }
184
185 TEST_F(LibRadosWatchNotifyEC, WatchNotify) {
186 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
187 char buf[128];
188 memset(buf, 0xcc, sizeof(buf));
189 ASSERT_EQ(0, rados_write(ioctx, "foo", buf, sizeof(buf), 0));
190 uint64_t handle;
191 ASSERT_EQ(0,
192 rados_watch(ioctx, "foo", 0, &handle, watch_notify_test_cb, NULL));
193 ASSERT_EQ(0, rados_notify(ioctx, "foo", 0, NULL, 0));
194 TestAlarm alarm;
195 sem_wait(sem);
196 rados_unwatch(ioctx, "foo", handle);
197 sem_close(sem);
198 }
199
200 TEST_F(LibRadosWatchNotifyECPP, WatchNotify) {
201 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
202 char buf[128];
203 memset(buf, 0xcc, sizeof(buf));
204 bufferlist bl1;
205 bl1.append(buf, sizeof(buf));
206 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
207 uint64_t handle;
208 WatchNotifyTestCtx ctx;
209 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
210 std::list<obj_watch_t> watches;
211 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
212 ASSERT_EQ(watches.size(), 1u);
213 bufferlist bl2;
214 ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
215 TestAlarm alarm;
216 sem_wait(sem);
217 ioctx.unwatch("foo", handle);
218 sem_close(sem);
219 }
220
221 // --
222
223 TEST_P(LibRadosWatchNotifyPP, WatchNotifyTimeout) {
224 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
225 ioctx.set_notify_timeout(1);
226 uint64_t handle;
227 WatchNotifyTestCtx ctx;
228
229 char buf[128];
230 memset(buf, 0xcc, sizeof(buf));
231 bufferlist bl1;
232 bl1.append(buf, sizeof(buf));
233 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
234
235 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
236 sem_close(sem);
237 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
238 }
239
240 TEST_F(LibRadosWatchNotifyECPP, WatchNotifyTimeout) {
241 ASSERT_NE(SEM_FAILED, (sem = sem_open("/test_watch_notify_sem", O_CREAT, 0644, 0)));
242 ioctx.set_notify_timeout(1);
243 uint64_t handle;
244 WatchNotifyTestCtx ctx;
245
246 char buf[128];
247 memset(buf, 0xcc, sizeof(buf));
248 bufferlist bl1;
249 bl1.append(buf, sizeof(buf));
250 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
251
252 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
253 sem_close(sem);
254 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
255 }
256
257 #pragma GCC diagnostic pop
258 #pragma GCC diagnostic warning "-Wpragmas"
259
260
261 // --
262
263 TEST_F(LibRadosWatchNotify, Watch2Delete) {
264 notify_io = ioctx;
265 notify_oid = "foo";
266 notify_err = 0;
267 char buf[128];
268 memset(buf, 0xcc, sizeof(buf));
269 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
270 uint64_t handle;
271 ASSERT_EQ(0,
272 rados_watch2(ioctx, notify_oid, &handle,
273 watch_notify2_test_cb,
274 watch_notify2_test_errcb, this));
275 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
276 int left = 300;
277 std::cout << "waiting up to " << left << " for disconnect notification ..."
278 << std::endl;
279 while (notify_err == 0 && --left) {
280 sleep(1);
281 }
282 ASSERT_TRUE(left > 0);
283 ASSERT_EQ(-ENOTCONN, notify_err);
284 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
285 rados_unwatch2(ioctx, handle);
286 rados_watch_flush(cluster);
287 }
288
289 TEST_F(LibRadosWatchNotify, AioWatchDelete) {
290 notify_io = ioctx;
291 notify_oid = "foo";
292 notify_err = 0;
293 char buf[128];
294 memset(buf, 0xcc, sizeof(buf));
295 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
296
297
298 rados_completion_t comp;
299 uint64_t handle;
300 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
301 rados_aio_watch(ioctx, notify_oid, comp, &handle,
302 watch_notify2_test_cb, watch_notify2_test_errcb, this);
303 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
304 ASSERT_EQ(0, rados_aio_get_return_value(comp));
305 rados_aio_release(comp);
306 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
307 int left = 300;
308 std::cout << "waiting up to " << left << " for disconnect notification ..."
309 << std::endl;
310 while (notify_err == 0 && --left) {
311 sleep(1);
312 }
313 ASSERT_TRUE(left > 0);
314 ASSERT_EQ(-ENOTCONN, notify_err);
315 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
316 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
317 rados_aio_unwatch(ioctx, handle, comp);
318 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
319 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
320 rados_aio_release(comp);
321 }
322
323 // --
324
325 TEST_F(LibRadosWatchNotify, WatchNotify2) {
326 notify_io = ioctx;
327 notify_oid = "foo";
328 notify_cookies.clear();
329 char buf[128];
330 memset(buf, 0xcc, sizeof(buf));
331 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
332 uint64_t handle;
333 ASSERT_EQ(0,
334 rados_watch2(ioctx, notify_oid, &handle,
335 watch_notify2_test_cb,
336 watch_notify2_test_errcb, this));
337 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
338 char *reply_buf = 0;
339 size_t reply_buf_len;
340 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
341 "notify", 6, 300000,
342 &reply_buf, &reply_buf_len));
343 bufferlist reply;
344 reply.append(reply_buf, reply_buf_len);
345 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
346 std::set<std::pair<uint64_t,uint64_t> > missed_map;
347 bufferlist::iterator reply_p = reply.begin();
348 ::decode(reply_map, reply_p);
349 ::decode(missed_map, reply_p);
350 ASSERT_EQ(1u, reply_map.size());
351 ASSERT_EQ(0u, missed_map.size());
352 ASSERT_EQ(1u, notify_cookies.size());
353 ASSERT_EQ(1u, notify_cookies.count(handle));
354 ASSERT_EQ(5u, reply_map.begin()->second.length());
355 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
356 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
357 rados_buffer_free(reply_buf);
358
359 // try it on a non-existent object ... our buffer pointers
360 // should get zeroed.
361 ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
362 "notify", 6, 300000,
363 &reply_buf, &reply_buf_len));
364 ASSERT_EQ((char*)0, reply_buf);
365 ASSERT_EQ(0u, reply_buf_len);
366
367 rados_unwatch2(ioctx, handle);
368 rados_watch_flush(cluster);
369 }
370
371 TEST_F(LibRadosWatchNotify, AioWatchNotify2) {
372 notify_io = ioctx;
373 notify_oid = "foo";
374 notify_cookies.clear();
375 char buf[128];
376 memset(buf, 0xcc, sizeof(buf));
377 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
378
379 rados_completion_t comp;
380 uint64_t handle;
381 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
382 rados_aio_watch(ioctx, notify_oid, comp, &handle,
383 watch_notify2_test_cb, watch_notify2_test_errcb, this);
384 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
385 ASSERT_EQ(0, rados_aio_get_return_value(comp));
386 rados_aio_release(comp);
387
388 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
389 char *reply_buf = 0;
390 size_t reply_buf_len;
391 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
392 "notify", 6, 300000,
393 &reply_buf, &reply_buf_len));
394 bufferlist reply;
395 reply.append(reply_buf, reply_buf_len);
396 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
397 std::set<std::pair<uint64_t,uint64_t> > missed_map;
398 bufferlist::iterator reply_p = reply.begin();
399 ::decode(reply_map, reply_p);
400 ::decode(missed_map, reply_p);
401 ASSERT_EQ(1u, reply_map.size());
402 ASSERT_EQ(0u, missed_map.size());
403 ASSERT_EQ(1u, notify_cookies.size());
404 ASSERT_EQ(1u, notify_cookies.count(handle));
405 ASSERT_EQ(5u, reply_map.begin()->second.length());
406 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
407 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
408 rados_buffer_free(reply_buf);
409
410 // try it on a non-existent object ... our buffer pointers
411 // should get zeroed.
412 ASSERT_EQ(-ENOENT, rados_notify2(ioctx, "doesnotexist",
413 "notify", 6, 300000,
414 &reply_buf, &reply_buf_len));
415 ASSERT_EQ((char*)0, reply_buf);
416 ASSERT_EQ(0u, reply_buf_len);
417
418 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
419 rados_aio_unwatch(ioctx, handle, comp);
420 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
421 ASSERT_EQ(0, rados_aio_get_return_value(comp));
422 rados_aio_release(comp);
423 }
424
425 TEST_F(LibRadosWatchNotify, AioNotify) {
426 notify_io = ioctx;
427 notify_oid = "foo";
428 notify_cookies.clear();
429 char buf[128];
430 memset(buf, 0xcc, sizeof(buf));
431 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
432 uint64_t handle;
433 ASSERT_EQ(0,
434 rados_watch2(ioctx, notify_oid, &handle,
435 watch_notify2_test_cb,
436 watch_notify2_test_errcb, this));
437 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
438 char *reply_buf = 0;
439 size_t reply_buf_len;
440 rados_completion_t comp;
441 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
442 ASSERT_EQ(0, rados_aio_notify(ioctx, "foo", comp, "notify", 6, 300000,
443 &reply_buf, &reply_buf_len));
444 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
445 ASSERT_EQ(0, rados_aio_get_return_value(comp));
446 rados_aio_release(comp);
447
448 bufferlist reply;
449 reply.append(reply_buf, reply_buf_len);
450 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
451 std::set<std::pair<uint64_t,uint64_t> > missed_map;
452 bufferlist::iterator reply_p = reply.begin();
453 ::decode(reply_map, reply_p);
454 ::decode(missed_map, reply_p);
455 ASSERT_EQ(1u, reply_map.size());
456 ASSERT_EQ(0u, missed_map.size());
457 ASSERT_EQ(1u, notify_cookies.size());
458 ASSERT_EQ(1u, notify_cookies.count(handle));
459 ASSERT_EQ(5u, reply_map.begin()->second.length());
460 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
461 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
462 rados_buffer_free(reply_buf);
463
464 // try it on a non-existent object ... our buffer pointers
465 // should get zeroed.
466 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
467 ASSERT_EQ(0, rados_aio_notify(ioctx, "doesnotexist", comp, "notify", 6,
468 300000, &reply_buf, &reply_buf_len));
469 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
470 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
471 rados_aio_release(comp);
472 ASSERT_EQ((char*)0, reply_buf);
473 ASSERT_EQ(0u, reply_buf_len);
474
475 rados_unwatch2(ioctx, handle);
476 rados_watch_flush(cluster);
477 }
478
479 TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
480 notify_oid = "foo";
481 notify_ioctx = &ioctx;
482 notify_cookies.clear();
483 char buf[128];
484 memset(buf, 0xcc, sizeof(buf));
485 bufferlist bl1;
486 bl1.append(buf, sizeof(buf));
487 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
488 uint64_t handle;
489 WatchNotifyTestCtx2 ctx(this);
490 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
491 ASSERT_GT(ioctx.watch_check(handle), 0);
492 std::list<obj_watch_t> watches;
493 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
494 ASSERT_EQ(watches.size(), 1u);
495 bufferlist bl2, bl_reply;
496 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
497 bufferlist::iterator p = bl_reply.begin();
498 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
499 std::set<std::pair<uint64_t,uint64_t> > missed_map;
500 ::decode(reply_map, p);
501 ::decode(missed_map, p);
502 ASSERT_EQ(1u, notify_cookies.size());
503 ASSERT_EQ(1u, notify_cookies.count(handle));
504 ASSERT_EQ(1u, reply_map.size());
505 ASSERT_EQ(5u, reply_map.begin()->second.length());
506 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
507 ASSERT_EQ(0u, missed_map.size());
508 ASSERT_GT(ioctx.watch_check(handle), 0);
509 ioctx.unwatch2(handle);
510 }
511
512 TEST_P(LibRadosWatchNotifyPP, AioWatchNotify2) {
513 notify_oid = "foo";
514 notify_ioctx = &ioctx;
515 notify_cookies.clear();
516 char buf[128];
517 memset(buf, 0xcc, sizeof(buf));
518 bufferlist bl1;
519 bl1.append(buf, sizeof(buf));
520 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
521
522 uint64_t handle;
523 WatchNotifyTestCtx2 ctx(this);
524 librados::AioCompletion *comp = cluster.aio_create_completion();
525 ASSERT_EQ(0, ioctx.aio_watch(notify_oid, comp, &handle, &ctx));
526 ASSERT_EQ(0, comp->wait_for_complete());
527 ASSERT_EQ(0, comp->get_return_value());
528 comp->release();
529
530 ASSERT_GT(ioctx.watch_check(handle), 0);
531 std::list<obj_watch_t> watches;
532 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
533 ASSERT_EQ(watches.size(), 1u);
534 bufferlist bl2, bl_reply;
535 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
536 bufferlist::iterator p = bl_reply.begin();
537 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
538 std::set<std::pair<uint64_t,uint64_t> > missed_map;
539 ::decode(reply_map, p);
540 ::decode(missed_map, p);
541 ASSERT_EQ(1u, notify_cookies.size());
542 ASSERT_EQ(1u, notify_cookies.count(handle));
543 ASSERT_EQ(1u, reply_map.size());
544 ASSERT_EQ(5u, reply_map.begin()->second.length());
545 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
546 ASSERT_EQ(0u, missed_map.size());
547 ASSERT_GT(ioctx.watch_check(handle), 0);
548
549 comp = cluster.aio_create_completion();
550 ioctx.aio_unwatch(handle, comp);
551 ASSERT_EQ(0, comp->wait_for_complete());
552 comp->release();
553 }
554
555
556 TEST_P(LibRadosWatchNotifyPP, AioNotify) {
557 notify_oid = "foo";
558 notify_ioctx = &ioctx;
559 notify_cookies.clear();
560 char buf[128];
561 memset(buf, 0xcc, sizeof(buf));
562 bufferlist bl1;
563 bl1.append(buf, sizeof(buf));
564 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
565 uint64_t handle;
566 WatchNotifyTestCtx2 ctx(this);
567 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
568 ASSERT_GT(ioctx.watch_check(handle), 0);
569 std::list<obj_watch_t> watches;
570 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
571 ASSERT_EQ(watches.size(), 1u);
572 bufferlist bl2, bl_reply;
573 librados::AioCompletion *comp = cluster.aio_create_completion();
574 ASSERT_EQ(0, ioctx.aio_notify(notify_oid, comp, bl2, 300000, &bl_reply));
575 ASSERT_EQ(0, comp->wait_for_complete());
576 ASSERT_EQ(0, comp->get_return_value());
577 comp->release();
578 bufferlist::iterator p = bl_reply.begin();
579 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
580 std::set<std::pair<uint64_t,uint64_t> > missed_map;
581 ::decode(reply_map, p);
582 ::decode(missed_map, p);
583 ASSERT_EQ(1u, notify_cookies.size());
584 ASSERT_EQ(1u, notify_cookies.count(handle));
585 ASSERT_EQ(1u, reply_map.size());
586 ASSERT_EQ(5u, reply_map.begin()->second.length());
587 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
588 ASSERT_EQ(0u, missed_map.size());
589 ASSERT_GT(ioctx.watch_check(handle), 0);
590 ioctx.unwatch2(handle);
591 cluster.watch_flush();
592 }
593
594 // --
595
596 TEST_F(LibRadosWatchNotify, WatchNotify2Multi) {
597 notify_io = ioctx;
598 notify_oid = "foo";
599 notify_cookies.clear();
600 char buf[128];
601 memset(buf, 0xcc, sizeof(buf));
602 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
603 uint64_t handle1, handle2;
604 ASSERT_EQ(0,
605 rados_watch2(ioctx, notify_oid, &handle1,
606 watch_notify2_test_cb,
607 watch_notify2_test_errcb, this));
608 ASSERT_EQ(0,
609 rados_watch2(ioctx, notify_oid, &handle2,
610 watch_notify2_test_cb,
611 watch_notify2_test_errcb, this));
612 ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
613 ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
614 ASSERT_NE(handle1, handle2);
615 char *reply_buf = 0;
616 size_t reply_buf_len;
617 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
618 "notify", 6, 300000,
619 &reply_buf, &reply_buf_len));
620 bufferlist reply;
621 reply.append(reply_buf, reply_buf_len);
622 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
623 std::set<std::pair<uint64_t,uint64_t> > missed_map;
624 bufferlist::iterator reply_p = reply.begin();
625 ::decode(reply_map, reply_p);
626 ::decode(missed_map, reply_p);
627 ASSERT_EQ(2u, reply_map.size());
628 ASSERT_EQ(5u, reply_map.begin()->second.length());
629 ASSERT_EQ(0u, missed_map.size());
630 ASSERT_EQ(2u, notify_cookies.size());
631 ASSERT_EQ(1u, notify_cookies.count(handle1));
632 ASSERT_EQ(1u, notify_cookies.count(handle2));
633 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
634 ASSERT_GT(rados_watch_check(ioctx, handle1), 0);
635 ASSERT_GT(rados_watch_check(ioctx, handle2), 0);
636 rados_buffer_free(reply_buf);
637 rados_unwatch2(ioctx, handle1);
638 rados_unwatch2(ioctx, handle2);
639 rados_watch_flush(cluster);
640 }
641
642 // --
643
644 TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
645 notify_io = ioctx;
646 notify_oid = "foo";
647 notify_sleep = 3; // 3s
648 notify_cookies.clear();
649 char buf[128];
650 memset(buf, 0xcc, sizeof(buf));
651 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
652 uint64_t handle;
653 ASSERT_EQ(0,
654 rados_watch2(ioctx, notify_oid, &handle,
655 watch_notify2_test_cb,
656 watch_notify2_test_errcb, this));
657 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
658 char *reply_buf = 0;
659 size_t reply_buf_len;
660 ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,
661 "notify", 6, 1000, // 1s
662 &reply_buf, &reply_buf_len));
663 ASSERT_EQ(1u, notify_cookies.size());
664 {
665 bufferlist reply;
666 reply.append(reply_buf, reply_buf_len);
667 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
668 std::set<std::pair<uint64_t,uint64_t> > missed_map;
669 bufferlist::iterator reply_p = reply.begin();
670 ::decode(reply_map, reply_p);
671 ::decode(missed_map, reply_p);
672 ASSERT_EQ(0u, reply_map.size());
673 ASSERT_EQ(1u, missed_map.size());
674 }
675 rados_buffer_free(reply_buf);
676
677 // we should get the next notify, though!
678 notify_sleep = 0;
679 notify_cookies.clear();
680 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
681 "notify", 6, 300000, // 300s
682 &reply_buf, &reply_buf_len));
683 ASSERT_EQ(1u, notify_cookies.size());
684 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
685
686 rados_unwatch2(ioctx, handle);
687
688 rados_completion_t comp;
689 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
690 rados_aio_watch_flush(cluster, comp);
691 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
692 ASSERT_EQ(0, rados_aio_get_return_value(comp));
693 rados_aio_release(comp);
694 rados_buffer_free(reply_buf);
695
696 }
697
698 TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
699 notify_oid = "foo";
700 notify_ioctx = &ioctx;
701 notify_sleep = 3; // 3s
702 notify_cookies.clear();
703 char buf[128];
704 memset(buf, 0xcc, sizeof(buf));
705 bufferlist bl1;
706 bl1.append(buf, sizeof(buf));
707 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
708 uint64_t handle;
709 WatchNotifyTestCtx2 ctx(this);
710 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
711 ASSERT_GT(ioctx.watch_check(handle), 0);
712 std::list<obj_watch_t> watches;
713 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
714 ASSERT_EQ(watches.size(), 1u);
715 ASSERT_EQ(0u, notify_cookies.size());
716 bufferlist bl2, bl_reply;
717 std::cout << " trying..." << std::endl;
718 ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */,
719 &bl_reply));
720 std::cout << " timed out" << std::endl;
721 ASSERT_GT(ioctx.watch_check(handle), 0);
722 ioctx.unwatch2(handle);
723
724 std::cout << " flushing" << std::endl;
725 librados::AioCompletion *comp = cluster.aio_create_completion();
726 cluster.aio_watch_flush(comp);
727 ASSERT_EQ(0, comp->wait_for_complete());
728 ASSERT_EQ(0, comp->get_return_value());
729 std::cout << " flushed" << std::endl;
730 comp->release();
731 }
732
733 TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
734 notify_oid = "foo";
735 notify_ioctx = &ioctx;
736 notify_cookies.clear();
737 uint32_t timeout = 12; // configured timeout
738 char buf[128];
739 memset(buf, 0xcc, sizeof(buf));
740 bufferlist bl1;
741 bl1.append(buf, sizeof(buf));
742 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
743 uint64_t handle;
744 WatchNotifyTestCtx2 ctx(this);
745 ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
746 ASSERT_GT(ioctx.watch_check(handle), 0);
747 std::list<obj_watch_t> watches;
748 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
749 ASSERT_EQ(watches.size(), 1u);
750 std::cout << "List watches" << std::endl;
751 for (std::list<obj_watch_t>::iterator it = watches.begin();
752 it != watches.end(); ++it) {
753 ASSERT_EQ(it->timeout_seconds, timeout);
754 }
755 bufferlist bl2, bl_reply;
756 std::cout << "notify2" << std::endl;
757 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
758 std::cout << "notify2 done" << std::endl;
759 bufferlist::iterator p = bl_reply.begin();
760 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
761 std::set<std::pair<uint64_t,uint64_t> > missed_map;
762 ::decode(reply_map, p);
763 ::decode(missed_map, p);
764 ASSERT_EQ(1u, notify_cookies.size());
765 ASSERT_EQ(1u, notify_cookies.count(handle));
766 ASSERT_EQ(1u, reply_map.size());
767 ASSERT_EQ(5u, reply_map.begin()->second.length());
768 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
769 ASSERT_EQ(0u, missed_map.size());
770 std::cout << "watch_check" << std::endl;
771 ASSERT_GT(ioctx.watch_check(handle), 0);
772 std::cout << "unwatch2" << std::endl;
773 ioctx.unwatch2(handle);
774
775 std::cout << " flushing" << std::endl;
776 cluster.watch_flush();
777 std::cout << "done" << std::endl;
778 }
779
780 TEST_F(LibRadosWatchNotify, Watch3Timeout) {
781 notify_io = ioctx;
782 notify_oid = "foo";
783 notify_cookies.clear();
784 notify_err = 0;
785 char buf[128];
786 memset(buf, 0xcc, sizeof(buf));
787 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
788 uint64_t handle;
789 time_t start = time(0);
790 const uint32_t timeout = 4;
791 {
792 // make sure i timeout before the messenger reconnects to the OSD,
793 // it will resend a watch request on behalf of the client, and the
794 // timer of timeout on OSD side will be reset by the new request.
795 char conf[128];
796 ASSERT_EQ(0, rados_conf_get(cluster,
797 "ms_tcp_read_timeout",
798 conf, sizeof(conf)));
799 auto tcp_read_timeout = std::stoll(conf);
800 ASSERT_LT(timeout, tcp_read_timeout);
801 }
802 ASSERT_EQ(0,
803 rados_watch3(ioctx, notify_oid, &handle,
804 watch_notify2_test_cb, watch_notify2_test_errcb,
805 timeout, this));
806 int age = rados_watch_check(ioctx, handle);
807 time_t age_bound = time(0) + 1 - start;
808 ASSERT_LT(age, age_bound * 1000);
809 ASSERT_GT(age, 0);
810 rados_conf_set(cluster, "objecter_inject_no_watch_ping", "true");
811 // allow a long time here since an osd peering event will renew our
812 // watch.
813 int left = 16 * timeout;
814 std::cout << "waiting up to " << left << " for osd to time us out ..."
815 << std::endl;
816 while (notify_err == 0 && --left) {
817 sleep(1);
818 }
819 ASSERT_GT(left, 0);
820 rados_conf_set(cluster, "objecter_inject_no_watch_ping", "false");
821 ASSERT_EQ(-ENOTCONN, notify_err);
822 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
823
824 // a subsequent notify should not reach us
825 char *reply_buf = nullptr;
826 size_t reply_buf_len;
827 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
828 "notify", 6, 300000,
829 &reply_buf, &reply_buf_len));
830 {
831 bufferlist reply;
832 reply.append(reply_buf, reply_buf_len);
833 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
834 std::set<std::pair<uint64_t,uint64_t> > missed_map;
835 bufferlist::iterator reply_p = reply.begin();
836 ::decode(reply_map, reply_p);
837 ::decode(missed_map, reply_p);
838 ASSERT_EQ(0u, reply_map.size());
839 ASSERT_EQ(0u, missed_map.size());
840 }
841 ASSERT_EQ(0u, notify_cookies.size());
842 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
843 rados_buffer_free(reply_buf);
844
845 // re-watch
846 rados_unwatch2(ioctx, handle);
847 rados_watch_flush(cluster);
848
849 handle = 0;
850 ASSERT_EQ(0,
851 rados_watch2(ioctx, notify_oid, &handle,
852 watch_notify2_test_cb,
853 watch_notify2_test_errcb, this));
854 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
855
856 // and now a notify will work.
857 ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
858 "notify", 6, 300000,
859 &reply_buf, &reply_buf_len));
860 {
861 bufferlist reply;
862 reply.append(reply_buf, reply_buf_len);
863 std::map<std::pair<uint64_t,uint64_t>, bufferlist> reply_map;
864 std::set<std::pair<uint64_t,uint64_t> > missed_map;
865 bufferlist::iterator reply_p = reply.begin();
866 ::decode(reply_map, reply_p);
867 ::decode(missed_map, reply_p);
868 ASSERT_EQ(1u, reply_map.size());
869 ASSERT_EQ(0u, missed_map.size());
870 ASSERT_EQ(1u, notify_cookies.count(handle));
871 ASSERT_EQ(5u, reply_map.begin()->second.length());
872 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
873 }
874 ASSERT_EQ(1u, notify_cookies.size());
875 ASSERT_GT(rados_watch_check(ioctx, handle), 0);
876
877 rados_buffer_free(reply_buf);
878 rados_unwatch2(ioctx, handle);
879 rados_watch_flush(cluster);
880 }
881
882 TEST_F(LibRadosWatchNotify, AioWatchDelete2) {
883 notify_io = ioctx;
884 notify_oid = "foo";
885 notify_err = 0;
886 char buf[128];
887 uint32_t timeout = 3;
888 memset(buf, 0xcc, sizeof(buf));
889 ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
890
891
892 rados_completion_t comp;
893 uint64_t handle;
894 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
895 rados_aio_watch2(ioctx, notify_oid, comp, &handle,
896 watch_notify2_test_cb, watch_notify2_test_errcb, timeout, this);
897 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
898 ASSERT_EQ(0, rados_aio_get_return_value(comp));
899 rados_aio_release(comp);
900 ASSERT_EQ(0, rados_remove(ioctx, notify_oid));
901 int left = 30;
902 std::cout << "waiting up to " << left << " for disconnect notification ..."
903 << std::endl;
904 while (notify_err == 0 && --left) {
905 sleep(1);
906 }
907 ASSERT_TRUE(left > 0);
908 ASSERT_EQ(-ENOTCONN, notify_err);
909 ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
910 ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &comp));
911 rados_aio_unwatch(ioctx, handle, comp);
912 ASSERT_EQ(0, rados_aio_wait_for_complete(comp));
913 ASSERT_EQ(-ENOENT, rados_aio_get_return_value(comp));
914 rados_aio_release(comp);
915 }
916 // --
917
918 INSTANTIATE_TEST_CASE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
919 ::testing::Values("", "cache"));