]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_async_driver.cc
update sources to v12.1.1
[ceph.git] / ceph / src / test / msgr / test_async_driver.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifdef __APPLE__
18 #include <AvailabilityMacros.h>
19 #endif
20
21 #include <fcntl.h>
22 #include <sys/socket.h>
23 #include <pthread.h>
24 #include <stdint.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/Mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
32
33 #include <atomic>
34
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
37 #define HAVE_EPOLL 1
38 #endif
39
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
41 #define HAVE_KQUEUE 1
42 #endif
43
44 #ifdef __sun
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
47 #define HAVE_EVPORT 1
48 #endif
49 #endif
50
51 #ifdef HAVE_EPOLL
52 #include "msg/async/EventEpoll.h"
53 #endif
54 #ifdef HAVE_KQUEUE
55 #include "msg/async/EventKqueue.h"
56 #endif
57 #include "msg/async/EventSelect.h"
58
59 #include <gtest/gtest.h>
60
61
62 #if GTEST_HAS_PARAM_TEST
63
64 class EventDriverTest : public ::testing::TestWithParam<const char*> {
65 public:
66 EventDriver *driver;
67
68 EventDriverTest(): driver(0) {}
69 void SetUp() override {
70 cerr << __func__ << " start set up " << GetParam() << std::endl;
71 #ifdef HAVE_EPOLL
72 if (strcmp(GetParam(), "epoll"))
73 driver = new EpollDriver(g_ceph_context);
74 #endif
75 #ifdef HAVE_KQUEUE
76 if (strcmp(GetParam(), "kqueue"))
77 driver = new KqueueDriver(g_ceph_context);
78 #endif
79 if (strcmp(GetParam(), "select"))
80 driver = new SelectDriver(g_ceph_context);
81 driver->init(NULL, 100);
82 }
83 void TearDown() override {
84 delete driver;
85 }
86 };
87
88 int set_nonblock(int sd)
89 {
90 int flags;
91
92 /* Set the socket nonblocking.
93 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
94 * interrupted by a signal. */
95 if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
96 return -1;
97 }
98 if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
99 return -1;
100 }
101 return 0;
102 }
103
104
105 TEST_P(EventDriverTest, PipeTest) {
106 int fds[2];
107 vector<FiredFileEvent> fired_events;
108 int r;
109 struct timeval tv;
110 tv.tv_sec = 0;
111 tv.tv_usec = 1;
112
113 r = pipe(fds);
114 ASSERT_EQ(r, 0);
115 r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE);
116 ASSERT_EQ(r, 0);
117 r = driver->event_wait(fired_events, &tv);
118 ASSERT_EQ(r, 0);
119
120 char c = 'A';
121 r = write(fds[1], &c, sizeof(c));
122 ASSERT_EQ(r, 1);
123 r = driver->event_wait(fired_events, &tv);
124 ASSERT_EQ(r, 1);
125 ASSERT_EQ(fired_events[0].fd, fds[0]);
126
127
128 fired_events.clear();
129 r = write(fds[1], &c, sizeof(c));
130 ASSERT_EQ(r, 1);
131 r = driver->event_wait(fired_events, &tv);
132 ASSERT_EQ(r, 1);
133 ASSERT_EQ(fired_events[0].fd, fds[0]);
134
135 fired_events.clear();
136 driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE);
137 r = write(fds[1], &c, sizeof(c));
138 ASSERT_EQ(r, 1);
139 r = driver->event_wait(fired_events, &tv);
140 ASSERT_EQ(r, 0);
141 }
142
143 void* echoclient(void *arg)
144 {
145 intptr_t port = (intptr_t)arg;
146 struct sockaddr_in sa;
147 memset(&sa, 0, sizeof(sa));
148 sa.sin_family = AF_INET;
149 sa.sin_port = htons(port);
150 char addr[] = "127.0.0.1";
151 int r = inet_pton(AF_INET, addr, &sa.sin_addr);
152 assert(r == 1);
153
154 int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0);
155 if (connect_sd >= 0) {
156 r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa));
157 assert(r == 0);
158 int t = 0;
159
160 do {
161 char c[] = "banner";
162 r = write(connect_sd, c, sizeof(c));
163 char d[100];
164 r = read(connect_sd, d, sizeof(d));
165 if (r == 0)
166 break;
167 if (t++ == 30)
168 break;
169 } while (1);
170 ::close(connect_sd);
171 }
172 return 0;
173 }
174
175 TEST_P(EventDriverTest, NetworkSocketTest) {
176 int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
177 ASSERT_TRUE(listen_sd > 0);
178 int on = 1;
179 int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
180 ASSERT_EQ(r, 0);
181 r = set_nonblock(listen_sd);
182 ASSERT_EQ(r, 0);
183 struct sockaddr_in sa;
184 long port = 0;
185 for (port = 38788; port < 40000; port++) {
186 memset(&sa,0,sizeof(sa));
187 sa.sin_family = AF_INET;
188 sa.sin_port = htons(port);
189 sa.sin_addr.s_addr = htonl(INADDR_ANY);
190
191 r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa));
192 if (r == 0) {
193 break;
194 }
195 }
196 ASSERT_EQ(r, 0);
197 r = listen(listen_sd, 511);
198 ASSERT_EQ(r, 0);
199
200 vector<FiredFileEvent> fired_events;
201 struct timeval tv;
202 tv.tv_sec = 0;
203 tv.tv_usec = 1;
204 r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE);
205 ASSERT_EQ(r, 0);
206 r = driver->event_wait(fired_events, &tv);
207 ASSERT_EQ(r, 0);
208
209 fired_events.clear();
210 pthread_t thread1;
211 r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port);
212 ASSERT_EQ(r, 0);
213 tv.tv_sec = 5;
214 tv.tv_usec = 0;
215 r = driver->event_wait(fired_events, &tv);
216 ASSERT_EQ(r, 1);
217 ASSERT_EQ(fired_events[0].fd, listen_sd);
218
219 fired_events.clear();
220 int client_sd = ::accept(listen_sd, NULL, NULL);
221 ASSERT_TRUE(client_sd > 0);
222 r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE);
223 ASSERT_EQ(r, 0);
224
225 do {
226 fired_events.clear();
227 tv.tv_sec = 5;
228 tv.tv_usec = 0;
229 r = driver->event_wait(fired_events, &tv);
230 ASSERT_EQ(1, r);
231 ASSERT_EQ(EVENT_READABLE, fired_events[0].mask);
232
233 fired_events.clear();
234 char data[100];
235 r = ::read(client_sd, data, sizeof(data));
236 if (r == 0)
237 break;
238 ASSERT_GT(r, 0);
239 r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE);
240 ASSERT_EQ(0, r);
241 r = driver->event_wait(fired_events, &tv);
242 ASSERT_EQ(1, r);
243 ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE);
244 r = write(client_sd, data, strlen(data));
245 ASSERT_EQ((int)strlen(data), r);
246 driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE,
247 EVENT_WRITABLE);
248 } while (1);
249
250 ::close(client_sd);
251 ::close(listen_sd);
252 }
253
254 class FakeEvent : public EventCallback {
255
256 public:
257 void do_request(int fd_or_id) override {}
258 };
259
260 TEST(EventCenterTest, FileEventExpansion) {
261 vector<int> sds;
262 EventCenter center(g_ceph_context);
263 center.init(100, 0, "posix");
264 center.set_owner();
265 EventCallbackRef e(new FakeEvent());
266 for (int i = 0; i < 300; i++) {
267 int sd = ::socket(AF_INET, SOCK_STREAM, 0);
268 center.create_file_event(sd, EVENT_READABLE, e);
269 sds.push_back(sd);
270 }
271
272 for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
273 center.delete_file_event(*it, EVENT_READABLE);
274 }
275
276
277 class Worker : public Thread {
278 CephContext *cct;
279 bool done;
280
281 public:
282 EventCenter center;
283 explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
284 center.init(100, idx, "posix");
285 }
286 void stop() {
287 done = true;
288 center.wakeup();
289 }
290 void* entry() override {
291 center.set_owner();
292 while (!done)
293 center.process_events(1000000);
294 return 0;
295 }
296 };
297
298 class CountEvent: public EventCallback {
299 std::atomic<unsigned> *count;
300 Mutex *lock;
301 Cond *cond;
302
303 public:
304 CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
305 void do_request(int id) override {
306 lock->Lock();
307 (*count)--;
308 cond->Signal();
309 lock->Unlock();
310 }
311 };
312
313 TEST(EventCenterTest, DispatchTest) {
314 Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
315 std::atomic<unsigned> count = { 0 };
316 Mutex lock("DispatchTest::lock");
317 Cond cond;
318 worker1.create("worker_1");
319 worker2.create("worker_2");
320 for (int i = 0; i < 10000; ++i) {
321 count++;
322 worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
323 count++;
324 worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
325 Mutex::Locker l(lock);
326 while (count)
327 cond.Wait(lock);
328 }
329 worker1.stop();
330 worker2.stop();
331 worker1.join();
332 worker2.join();
333 }
334
335 INSTANTIATE_TEST_CASE_P(
336 AsyncMessenger,
337 EventDriverTest,
338 ::testing::Values(
339 #ifdef HAVE_EPOLL
340 "epoll",
341 #endif
342 #ifdef HAVE_KQUEUE
343 "kqueue",
344 #endif
345 "select"
346 )
347 );
348
349 #else
350
351 // Google Test may not support value-parameterized tests with some
352 // compilers. If we use conditional compilation to compile out all
353 // code referring to the gtest_main library, MSVC linker will not link
354 // that library at all and consequently complain about missing entry
355 // point defined in that library (fatal error LNK1561: entry point
356 // must be defined). This dummy test keeps gtest_main linked in.
357 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
358
359 #endif
360
361
362 /*
363 * Local Variables:
364 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
365 * ./ceph_test_async_driver
366 *
367 * End:
368 */