]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/librados/watch_notify_cxx.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / test / librados / watch_notify_cxx.cc
1 #include <errno.h>
2 #include <fcntl.h>
3 #include <semaphore.h>
4 #include <set>
5 #include <map>
6
7 #include "gtest/gtest.h"
8
9 #include "include/encoding.h"
10 #include "include/rados/librados.hpp"
11 #include "include/rados/rados_types.h"
12 #include "test/librados/test_cxx.h"
13 #include "test/librados/testcase_cxx.h"
14 #include "crimson_utils.h"
15
16 using namespace librados;
17
18 typedef RadosTestECPP LibRadosWatchNotifyECPP;
19
20 int notify_sleep = 0;
21
22 #pragma GCC diagnostic ignored "-Wpragmas"
23 #pragma GCC diagnostic push
24 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
25
26 class LibRadosWatchNotifyPP : public RadosTestParamPP
27 {
28 protected:
29 bufferlist notify_bl;
30 std::set<uint64_t> notify_cookies;
31 rados_ioctx_t notify_io;
32 const char *notify_oid = nullptr;
33 int notify_err = 0;
34
35 friend class WatchNotifyTestCtx2;
36 friend class WatchNotifyTestCtx2TimeOut;
37 };
38
39 IoCtx *notify_ioctx;
40
41 class WatchNotifyTestCtx2 : public WatchCtx2
42 {
43 LibRadosWatchNotifyPP *notify;
44
45 public:
46 WatchNotifyTestCtx2(LibRadosWatchNotifyPP *notify)
47 : notify(notify)
48 {}
49
50 void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
51 bufferlist& bl) override {
52 std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
53 << " notifier_gid " << notifier_gid << std::endl;
54 notify->notify_bl = bl;
55 notify->notify_cookies.insert(cookie);
56 bufferlist reply;
57 reply.append("reply", 5);
58 if (notify_sleep)
59 sleep(notify_sleep);
60 notify_ioctx->notify_ack(notify->notify_oid, notify_id, cookie, reply);
61 }
62
63 void handle_error(uint64_t cookie, int err) override {
64 std::cout << __func__ << " cookie " << cookie
65 << " err " << err << std::endl;
66 ceph_assert(cookie > 1000);
67 notify_ioctx->unwatch2(cookie);
68 notify->notify_cookies.erase(cookie);
69 notify->notify_err = notify_ioctx->watch2(notify->notify_oid, &cookie, this);
70 if (notify->notify_err < err ) {
71 std::cout << "reconnect notify_err " << notify->notify_err << " err " << err << std::endl;
72 }
73 }
74 };
75
76 class WatchNotifyTestCtx2TimeOut : public WatchCtx2
77 {
78 LibRadosWatchNotifyPP *notify;
79
80 public:
81 WatchNotifyTestCtx2TimeOut(LibRadosWatchNotifyPP *notify)
82 : notify(notify)
83 {}
84
85 void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
86 bufferlist& bl) override {
87 std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
88 << " notifier_gid " << notifier_gid << std::endl;
89 notify->notify_bl = bl;
90 notify->notify_cookies.insert(cookie);
91 bufferlist reply;
92 reply.append("reply", 5);
93 if (notify_sleep)
94 sleep(notify_sleep);
95 notify_ioctx->notify_ack(notify->notify_oid, notify_id, cookie, reply);
96 }
97
98 void handle_error(uint64_t cookie, int err) override {
99 std::cout << __func__ << " cookie " << cookie
100 << " err " << err << std::endl;
101 ceph_assert(cookie > 1000);
102 notify->notify_err = err;
103 }
104 };
105
106 // notify
107 static sem_t sem;
108
109 class WatchNotifyTestCtx : public WatchCtx
110 {
111 public:
112 void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) override
113 {
114 std::cout << __func__ << std::endl;
115 sem_post(&sem);
116 }
117 };
118
119 TEST_P(LibRadosWatchNotifyPP, WatchNotify) {
120 ASSERT_EQ(0, sem_init(&sem, 0, 0));
121 char buf[128];
122 memset(buf, 0xcc, sizeof(buf));
123 bufferlist bl1;
124 bl1.append(buf, sizeof(buf));
125 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
126 uint64_t handle;
127 WatchNotifyTestCtx ctx;
128 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
129 std::list<obj_watch_t> watches;
130 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
131 ASSERT_EQ(1u, watches.size());
132 bufferlist bl2;
133 for (unsigned i=0; i<10; ++i) {
134 int r = ioctx.notify("foo", 0, bl2);
135 if (r == 0) {
136 break;
137 }
138 if (!getenv("ALLOW_TIMEOUTS")) {
139 ASSERT_EQ(0, r);
140 }
141 }
142 TestAlarm alarm;
143 sem_wait(&sem);
144 ioctx.unwatch("foo", handle);
145 sem_destroy(&sem);
146 }
147
148 TEST_F(LibRadosWatchNotifyECPP, WatchNotify) {
149 SKIP_IF_CRIMSON();
150 ASSERT_EQ(0, sem_init(&sem, 0, 0));
151 char buf[128];
152 memset(buf, 0xcc, sizeof(buf));
153 bufferlist bl1;
154 bl1.append(buf, sizeof(buf));
155 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
156 uint64_t handle;
157 WatchNotifyTestCtx ctx;
158 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
159 std::list<obj_watch_t> watches;
160 ASSERT_EQ(0, ioctx.list_watchers("foo", &watches));
161 ASSERT_EQ(1u, watches.size());
162 bufferlist bl2;
163 for (unsigned i=0; i<10; ++i) {
164 int r = ioctx.notify("foo", 0, bl2);
165 if (r == 0) {
166 break;
167 }
168 if (!getenv("ALLOW_TIMEOUTS")) {
169 ASSERT_EQ(0, r);
170 }
171 }
172 TestAlarm alarm;
173 sem_wait(&sem);
174 ioctx.unwatch("foo", handle);
175 sem_destroy(&sem);
176 }
177
178 // --
179
180 TEST_P(LibRadosWatchNotifyPP, WatchNotifyTimeout) {
181 ASSERT_EQ(0, sem_init(&sem, 0, 0));
182 ioctx.set_notify_timeout(1);
183 uint64_t handle;
184 WatchNotifyTestCtx ctx;
185
186 char buf[128];
187 memset(buf, 0xcc, sizeof(buf));
188 bufferlist bl1;
189 bl1.append(buf, sizeof(buf));
190 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
191
192 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
193 sem_destroy(&sem);
194 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
195 }
196
197 TEST_F(LibRadosWatchNotifyECPP, WatchNotifyTimeout) {
198 SKIP_IF_CRIMSON();
199 ASSERT_EQ(0, sem_init(&sem, 0, 0));
200 ioctx.set_notify_timeout(1);
201 uint64_t handle;
202 WatchNotifyTestCtx ctx;
203
204 char buf[128];
205 memset(buf, 0xcc, sizeof(buf));
206 bufferlist bl1;
207 bl1.append(buf, sizeof(buf));
208 ASSERT_EQ(0, ioctx.write("foo", bl1, sizeof(buf), 0));
209
210 ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
211 sem_destroy(&sem);
212 ASSERT_EQ(0, ioctx.unwatch("foo", handle));
213 }
214
215 #pragma GCC diagnostic pop
216 #pragma GCC diagnostic warning "-Wpragmas"
217
218 TEST_P(LibRadosWatchNotifyPP, WatchNotify2) {
219 notify_oid = "foo";
220 notify_ioctx = &ioctx;
221 notify_cookies.clear();
222 char buf[128];
223 memset(buf, 0xcc, sizeof(buf));
224 bufferlist bl1;
225 bl1.append(buf, sizeof(buf));
226 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
227 uint64_t handle;
228 WatchNotifyTestCtx2 ctx(this);
229 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
230 ASSERT_GT(ioctx.watch_check(handle), 0);
231 std::list<obj_watch_t> watches;
232 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
233 ASSERT_EQ(watches.size(), 1u);
234 bufferlist bl2, bl_reply;
235 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
236 auto p = bl_reply.cbegin();
237 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
238 std::set<std::pair<uint64_t,uint64_t> > missed_map;
239 decode(reply_map, p);
240 decode(missed_map, p);
241 ASSERT_EQ(1u, notify_cookies.size());
242 ASSERT_EQ(1u, notify_cookies.count(handle));
243 ASSERT_EQ(1u, reply_map.size());
244 ASSERT_EQ(5u, reply_map.begin()->second.length());
245 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
246 ASSERT_EQ(0u, missed_map.size());
247 ASSERT_GT(ioctx.watch_check(handle), 0);
248 ioctx.unwatch2(handle);
249 }
250
251 TEST_P(LibRadosWatchNotifyPP, AioWatchNotify2) {
252 notify_oid = "foo";
253 notify_ioctx = &ioctx;
254 notify_cookies.clear();
255 char buf[128];
256 memset(buf, 0xcc, sizeof(buf));
257 bufferlist bl1;
258 bl1.append(buf, sizeof(buf));
259 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
260
261 uint64_t handle;
262 WatchNotifyTestCtx2 ctx(this);
263 librados::AioCompletion *comp = cluster.aio_create_completion();
264 ASSERT_EQ(0, ioctx.aio_watch(notify_oid, comp, &handle, &ctx));
265 ASSERT_EQ(0, comp->wait_for_complete());
266 ASSERT_EQ(0, comp->get_return_value());
267 comp->release();
268
269 ASSERT_GT(ioctx.watch_check(handle), 0);
270 std::list<obj_watch_t> watches;
271 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
272 ASSERT_EQ(watches.size(), 1u);
273 bufferlist bl2, bl_reply;
274 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
275 auto p = bl_reply.cbegin();
276 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
277 std::set<std::pair<uint64_t,uint64_t> > missed_map;
278 decode(reply_map, p);
279 decode(missed_map, p);
280 ASSERT_EQ(1u, notify_cookies.size());
281 ASSERT_EQ(1u, notify_cookies.count(handle));
282 ASSERT_EQ(1u, reply_map.size());
283 ASSERT_EQ(5u, reply_map.begin()->second.length());
284 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
285 ASSERT_EQ(0u, missed_map.size());
286 ASSERT_GT(ioctx.watch_check(handle), 0);
287
288 comp = cluster.aio_create_completion();
289 ioctx.aio_unwatch(handle, comp);
290 ASSERT_EQ(0, comp->wait_for_complete());
291 comp->release();
292 }
293
294
295 TEST_P(LibRadosWatchNotifyPP, AioNotify) {
296 notify_oid = "foo";
297 notify_ioctx = &ioctx;
298 notify_cookies.clear();
299 char buf[128];
300 memset(buf, 0xcc, sizeof(buf));
301 bufferlist bl1;
302 bl1.append(buf, sizeof(buf));
303 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
304 uint64_t handle;
305 WatchNotifyTestCtx2 ctx(this);
306 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
307 ASSERT_GT(ioctx.watch_check(handle), 0);
308 std::list<obj_watch_t> watches;
309 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
310 ASSERT_EQ(watches.size(), 1u);
311 bufferlist bl2, bl_reply;
312 librados::AioCompletion *comp = cluster.aio_create_completion();
313 ASSERT_EQ(0, ioctx.aio_notify(notify_oid, comp, bl2, 300000, &bl_reply));
314 ASSERT_EQ(0, comp->wait_for_complete());
315 ASSERT_EQ(0, comp->get_return_value());
316 comp->release();
317 std::vector<librados::notify_ack_t> acks;
318 std::vector<librados::notify_timeout_t> timeouts;
319 ioctx.decode_notify_response(bl_reply, &acks, &timeouts);
320 ASSERT_EQ(1u, notify_cookies.size());
321 ASSERT_EQ(1u, notify_cookies.count(handle));
322 ASSERT_EQ(1u, acks.size());
323 ASSERT_EQ(5u, acks[0].payload_bl.length());
324 ASSERT_EQ(0, strncmp("reply", acks[0].payload_bl.c_str(), acks[0].payload_bl.length()));
325 ASSERT_EQ(0u, timeouts.size());
326 ASSERT_GT(ioctx.watch_check(handle), 0);
327 ioctx.unwatch2(handle);
328 cluster.watch_flush();
329 }
330
331 // --
332 TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
333 notify_oid = "foo";
334 notify_ioctx = &ioctx;
335 notify_sleep = 3; // 3s
336 notify_cookies.clear();
337 char buf[128];
338 memset(buf, 0xcc, sizeof(buf));
339 bufferlist bl1;
340 bl1.append(buf, sizeof(buf));
341 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
342 uint64_t handle;
343 WatchNotifyTestCtx2TimeOut ctx(this);
344 ASSERT_EQ(0, ioctx.watch2(notify_oid, &handle, &ctx));
345 ASSERT_GT(ioctx.watch_check(handle), 0);
346 std::list<obj_watch_t> watches;
347 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
348 ASSERT_EQ(watches.size(), 1u);
349 ASSERT_EQ(0u, notify_cookies.size());
350 bufferlist bl2, bl_reply;
351 std::cout << " trying..." << std::endl;
352 ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */,
353 &bl_reply));
354 std::cout << " timed out" << std::endl;
355 ASSERT_GT(ioctx.watch_check(handle), 0);
356 ioctx.unwatch2(handle);
357
358 std::cout << " flushing" << std::endl;
359 librados::AioCompletion *comp = cluster.aio_create_completion();
360 cluster.aio_watch_flush(comp);
361 ASSERT_EQ(0, comp->wait_for_complete());
362 ASSERT_EQ(0, comp->get_return_value());
363 std::cout << " flushed" << std::endl;
364 comp->release();
365 }
366
367 TEST_P(LibRadosWatchNotifyPP, WatchNotify3) {
368 notify_oid = "foo";
369 notify_ioctx = &ioctx;
370 notify_cookies.clear();
371 uint32_t timeout = 12; // configured timeout
372 char buf[128];
373 memset(buf, 0xcc, sizeof(buf));
374 bufferlist bl1;
375 bl1.append(buf, sizeof(buf));
376 ASSERT_EQ(0, ioctx.write(notify_oid, bl1, sizeof(buf), 0));
377 uint64_t handle;
378 WatchNotifyTestCtx2TimeOut ctx(this);
379 ASSERT_EQ(0, ioctx.watch3(notify_oid, &handle, &ctx, timeout));
380 ASSERT_GT(ioctx.watch_check(handle), 0);
381 std::list<obj_watch_t> watches;
382 ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
383 ASSERT_EQ(watches.size(), 1u);
384 std::cout << "List watches" << std::endl;
385 for (std::list<obj_watch_t>::iterator it = watches.begin();
386 it != watches.end(); ++it) {
387 ASSERT_EQ(it->timeout_seconds, timeout);
388 }
389 bufferlist bl2, bl_reply;
390 std::cout << "notify2" << std::endl;
391 ASSERT_EQ(0, ioctx.notify2(notify_oid, bl2, 300000, &bl_reply));
392 std::cout << "notify2 done" << std::endl;
393 auto p = bl_reply.cbegin();
394 std::map<std::pair<uint64_t,uint64_t>,bufferlist> reply_map;
395 std::set<std::pair<uint64_t,uint64_t> > missed_map;
396 decode(reply_map, p);
397 decode(missed_map, p);
398 ASSERT_EQ(1u, notify_cookies.size());
399 ASSERT_EQ(1u, notify_cookies.count(handle));
400 ASSERT_EQ(1u, reply_map.size());
401 ASSERT_EQ(5u, reply_map.begin()->second.length());
402 ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
403 ASSERT_EQ(0u, missed_map.size());
404 std::cout << "watch_check" << std::endl;
405 ASSERT_GT(ioctx.watch_check(handle), 0);
406 std::cout << "unwatch2" << std::endl;
407 ioctx.unwatch2(handle);
408
409 std::cout << " flushing" << std::endl;
410 cluster.watch_flush();
411 std::cout << "done" << std::endl;
412 }
413 // --
414
415 INSTANTIATE_TEST_SUITE_P(LibRadosWatchNotifyPPTests, LibRadosWatchNotifyPP,
416 ::testing::Values("", "cache"));