]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #ifdef __APPLE__ | |
18 | #include <AvailabilityMacros.h> | |
19 | #endif | |
20 | ||
21 | #include <fcntl.h> | |
22 | #include <sys/socket.h> | |
23 | #include <pthread.h> | |
24 | #include <stdint.h> | |
25 | #include <arpa/inet.h> | |
26 | #include "include/Context.h" | |
7c673cae FG |
27 | #include "common/Mutex.h" |
28 | #include "common/Cond.h" | |
29 | #include "global/global_init.h" | |
30 | #include "common/ceph_argparse.h" | |
31 | #include "msg/async/Event.h" | |
32 | ||
31f18b77 FG |
33 | #include <atomic> |
34 | ||
7c673cae FG |
35 | // We use epoll, kqueue, evport, select in descending order by performance. |
36 | #if defined(__linux__) | |
37 | #define HAVE_EPOLL 1 | |
38 | #endif | |
39 | ||
40 | #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) | |
41 | #define HAVE_KQUEUE 1 | |
42 | #endif | |
43 | ||
44 | #ifdef __sun | |
45 | #include <sys/feature_tests.h> | |
46 | #ifdef _DTRACE_VERSION | |
47 | #define HAVE_EVPORT 1 | |
48 | #endif | |
49 | #endif | |
50 | ||
51 | #ifdef HAVE_EPOLL | |
52 | #include "msg/async/EventEpoll.h" | |
53 | #endif | |
54 | #ifdef HAVE_KQUEUE | |
55 | #include "msg/async/EventKqueue.h" | |
56 | #endif | |
57 | #include "msg/async/EventSelect.h" | |
58 | ||
59 | #include <gtest/gtest.h> | |
60 | ||
61 | ||
62 | #if GTEST_HAS_PARAM_TEST | |
63 | ||
64 | class EventDriverTest : public ::testing::TestWithParam<const char*> { | |
65 | public: | |
66 | EventDriver *driver; | |
67 | ||
68 | EventDriverTest(): driver(0) {} | |
69 | void SetUp() override { | |
70 | cerr << __func__ << " start set up " << GetParam() << std::endl; | |
71 | #ifdef HAVE_EPOLL | |
72 | if (strcmp(GetParam(), "epoll")) | |
73 | driver = new EpollDriver(g_ceph_context); | |
74 | #endif | |
75 | #ifdef HAVE_KQUEUE | |
76 | if (strcmp(GetParam(), "kqueue")) | |
77 | driver = new KqueueDriver(g_ceph_context); | |
78 | #endif | |
79 | if (strcmp(GetParam(), "select")) | |
80 | driver = new SelectDriver(g_ceph_context); | |
81 | driver->init(NULL, 100); | |
82 | } | |
83 | void TearDown() override { | |
84 | delete driver; | |
85 | } | |
86 | }; | |
87 | ||
88 | int set_nonblock(int sd) | |
89 | { | |
90 | int flags; | |
91 | ||
92 | /* Set the socket nonblocking. | |
93 | * Note that fcntl(2) for F_GETFL and F_SETFL can't be | |
94 | * interrupted by a signal. */ | |
95 | if ((flags = fcntl(sd, F_GETFL)) < 0 ) { | |
96 | return -1; | |
97 | } | |
98 | if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) { | |
99 | return -1; | |
100 | } | |
101 | return 0; | |
102 | } | |
103 | ||
104 | ||
105 | TEST_P(EventDriverTest, PipeTest) { | |
106 | int fds[2]; | |
107 | vector<FiredFileEvent> fired_events; | |
108 | int r; | |
109 | struct timeval tv; | |
110 | tv.tv_sec = 0; | |
111 | tv.tv_usec = 1; | |
112 | ||
113 | r = pipe(fds); | |
114 | ASSERT_EQ(r, 0); | |
115 | r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE); | |
116 | ASSERT_EQ(r, 0); | |
117 | r = driver->event_wait(fired_events, &tv); | |
118 | ASSERT_EQ(r, 0); | |
119 | ||
120 | char c = 'A'; | |
121 | r = write(fds[1], &c, sizeof(c)); | |
122 | ASSERT_EQ(r, 1); | |
123 | r = driver->event_wait(fired_events, &tv); | |
124 | ASSERT_EQ(r, 1); | |
125 | ASSERT_EQ(fired_events[0].fd, fds[0]); | |
126 | ||
127 | ||
128 | fired_events.clear(); | |
129 | r = write(fds[1], &c, sizeof(c)); | |
130 | ASSERT_EQ(r, 1); | |
131 | r = driver->event_wait(fired_events, &tv); | |
132 | ASSERT_EQ(r, 1); | |
133 | ASSERT_EQ(fired_events[0].fd, fds[0]); | |
134 | ||
135 | fired_events.clear(); | |
136 | driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE); | |
137 | r = write(fds[1], &c, sizeof(c)); | |
138 | ASSERT_EQ(r, 1); | |
139 | r = driver->event_wait(fired_events, &tv); | |
140 | ASSERT_EQ(r, 0); | |
141 | } | |
142 | ||
143 | void* echoclient(void *arg) | |
144 | { | |
145 | intptr_t port = (intptr_t)arg; | |
146 | struct sockaddr_in sa; | |
147 | memset(&sa, 0, sizeof(sa)); | |
148 | sa.sin_family = AF_INET; | |
149 | sa.sin_port = htons(port); | |
150 | char addr[] = "127.0.0.1"; | |
151 | int r = inet_pton(AF_INET, addr, &sa.sin_addr); | |
152 | assert(r == 1); | |
153 | ||
154 | int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0); | |
155 | if (connect_sd >= 0) { | |
156 | r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa)); | |
157 | int t = 0; | |
158 | ||
159 | do { | |
160 | char c[] = "banner"; | |
161 | r = write(connect_sd, c, sizeof(c)); | |
162 | char d[100]; | |
163 | r = read(connect_sd, d, sizeof(d)); | |
164 | if (r == 0) | |
165 | break; | |
166 | if (t++ == 30) | |
167 | break; | |
168 | } while (1); | |
169 | ::close(connect_sd); | |
170 | } | |
171 | return 0; | |
172 | } | |
173 | ||
174 | TEST_P(EventDriverTest, NetworkSocketTest) { | |
175 | int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0); | |
176 | ASSERT_TRUE(listen_sd > 0); | |
177 | int on = 1; | |
178 | int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); | |
179 | ASSERT_EQ(r, 0); | |
180 | r = set_nonblock(listen_sd); | |
181 | ASSERT_EQ(r, 0); | |
182 | struct sockaddr_in sa; | |
183 | long port = 0; | |
184 | for (port = 38788; port < 40000; port++) { | |
185 | memset(&sa,0,sizeof(sa)); | |
186 | sa.sin_family = AF_INET; | |
187 | sa.sin_port = htons(port); | |
188 | sa.sin_addr.s_addr = htonl(INADDR_ANY); | |
189 | ||
190 | r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa)); | |
191 | if (r == 0) { | |
192 | break; | |
193 | } | |
194 | } | |
195 | ASSERT_EQ(r, 0); | |
196 | r = listen(listen_sd, 511); | |
197 | ASSERT_EQ(r, 0); | |
198 | ||
199 | vector<FiredFileEvent> fired_events; | |
200 | struct timeval tv; | |
201 | tv.tv_sec = 0; | |
202 | tv.tv_usec = 1; | |
203 | r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE); | |
204 | ASSERT_EQ(r, 0); | |
205 | r = driver->event_wait(fired_events, &tv); | |
206 | ASSERT_EQ(r, 0); | |
207 | ||
208 | fired_events.clear(); | |
209 | pthread_t thread1; | |
210 | r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port); | |
211 | ASSERT_EQ(r, 0); | |
212 | tv.tv_sec = 5; | |
213 | tv.tv_usec = 0; | |
214 | r = driver->event_wait(fired_events, &tv); | |
215 | ASSERT_EQ(r, 1); | |
216 | ASSERT_EQ(fired_events[0].fd, listen_sd); | |
217 | ||
218 | fired_events.clear(); | |
219 | int client_sd = ::accept(listen_sd, NULL, NULL); | |
220 | ASSERT_TRUE(client_sd > 0); | |
221 | r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE); | |
222 | ASSERT_EQ(r, 0); | |
223 | ||
224 | do { | |
225 | fired_events.clear(); | |
226 | tv.tv_sec = 5; | |
227 | tv.tv_usec = 0; | |
228 | r = driver->event_wait(fired_events, &tv); | |
229 | ASSERT_EQ(r, 1); | |
230 | ASSERT_EQ(fired_events[0].mask, EVENT_READABLE); | |
231 | ||
232 | fired_events.clear(); | |
233 | char data[100]; | |
234 | r = ::read(client_sd, data, sizeof(data)); | |
235 | if (r == 0) | |
236 | break; | |
237 | ASSERT_TRUE(r > 0); | |
238 | r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE); | |
239 | r = driver->event_wait(fired_events, &tv); | |
240 | ASSERT_EQ(r, 1); | |
241 | ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE); | |
242 | r = write(client_sd, data, strlen(data)); | |
243 | ASSERT_EQ(r, (int)strlen(data)); | |
244 | driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE, | |
245 | EVENT_WRITABLE); | |
246 | } while (1); | |
247 | ||
248 | ::close(client_sd); | |
249 | ::close(listen_sd); | |
250 | } | |
251 | ||
252 | class FakeEvent : public EventCallback { | |
253 | ||
254 | public: | |
255 | void do_request(int fd_or_id) override {} | |
256 | }; | |
257 | ||
258 | TEST(EventCenterTest, FileEventExpansion) { | |
259 | vector<int> sds; | |
260 | EventCenter center(g_ceph_context); | |
261 | center.init(100, 0, "posix"); | |
262 | center.set_owner(); | |
263 | EventCallbackRef e(new FakeEvent()); | |
264 | for (int i = 0; i < 300; i++) { | |
265 | int sd = ::socket(AF_INET, SOCK_STREAM, 0); | |
266 | center.create_file_event(sd, EVENT_READABLE, e); | |
267 | sds.push_back(sd); | |
268 | } | |
269 | ||
270 | for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it) | |
271 | center.delete_file_event(*it, EVENT_READABLE); | |
272 | } | |
273 | ||
274 | ||
275 | class Worker : public Thread { | |
276 | CephContext *cct; | |
277 | bool done; | |
278 | ||
279 | public: | |
280 | EventCenter center; | |
281 | explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) { | |
282 | center.init(100, idx, "posix"); | |
283 | } | |
284 | void stop() { | |
285 | done = true; | |
286 | center.wakeup(); | |
287 | } | |
288 | void* entry() override { | |
289 | center.set_owner(); | |
290 | while (!done) | |
291 | center.process_events(1000000); | |
292 | return 0; | |
293 | } | |
294 | }; | |
295 | ||
296 | class CountEvent: public EventCallback { | |
31f18b77 | 297 | std::atomic<unsigned> *count; |
7c673cae FG |
298 | Mutex *lock; |
299 | Cond *cond; | |
300 | ||
301 | public: | |
31f18b77 | 302 | CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {} |
7c673cae FG |
303 | void do_request(int id) override { |
304 | lock->Lock(); | |
31f18b77 | 305 | (*count)--; |
7c673cae FG |
306 | cond->Signal(); |
307 | lock->Unlock(); | |
308 | } | |
309 | }; | |
310 | ||
311 | TEST(EventCenterTest, DispatchTest) { | |
312 | Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2); | |
31f18b77 | 313 | std::atomic<unsigned> count = { 0 }; |
7c673cae FG |
314 | Mutex lock("DispatchTest::lock"); |
315 | Cond cond; | |
316 | worker1.create("worker_1"); | |
317 | worker2.create("worker_2"); | |
318 | for (int i = 0; i < 10000; ++i) { | |
31f18b77 | 319 | count++; |
7c673cae | 320 | worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); |
31f18b77 | 321 | count++; |
7c673cae FG |
322 | worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); |
323 | Mutex::Locker l(lock); | |
31f18b77 | 324 | while (count) |
7c673cae FG |
325 | cond.Wait(lock); |
326 | } | |
327 | worker1.stop(); | |
328 | worker2.stop(); | |
329 | worker1.join(); | |
330 | worker2.join(); | |
331 | } | |
332 | ||
333 | INSTANTIATE_TEST_CASE_P( | |
334 | AsyncMessenger, | |
335 | EventDriverTest, | |
336 | ::testing::Values( | |
337 | #ifdef HAVE_EPOLL | |
338 | "epoll", | |
339 | #endif | |
340 | #ifdef HAVE_KQUEUE | |
341 | "kqueue", | |
342 | #endif | |
343 | "select" | |
344 | ) | |
345 | ); | |
346 | ||
347 | #else | |
348 | ||
349 | // Google Test may not support value-parameterized tests with some | |
350 | // compilers. If we use conditional compilation to compile out all | |
351 | // code referring to the gtest_main library, MSVC linker will not link | |
352 | // that library at all and consequently complain about missing entry | |
353 | // point defined in that library (fatal error LNK1561: entry point | |
354 | // must be defined). This dummy test keeps gtest_main linked in. | |
355 | TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {} | |
356 | ||
357 | #endif | |
358 | ||
359 | ||
360 | /* | |
361 | * Local Variables: | |
362 | * compile-command: "cd ../.. ; make ceph_test_async_driver && | |
363 | * ./ceph_test_async_driver | |
364 | * | |
365 | * End: | |
366 | */ |