]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_async_driver.cc
e68e57b6ba83e428883883da10cca4b9d8e1dbbc
[ceph.git] / ceph / src / test / msgr / test_async_driver.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifdef __APPLE__
18 #include <AvailabilityMacros.h>
19 #endif
20
21 #include <fcntl.h>
22 #include <sys/socket.h>
23 #include <pthread.h>
24 #include <stdint.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/Mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
32
33 #include <atomic>
34
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
37 #define HAVE_EPOLL 1
38 #endif
39
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
41 #define HAVE_KQUEUE 1
42 #endif
43
44 #ifdef __sun
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
47 #define HAVE_EVPORT 1
48 #endif
49 #endif
50
51 #ifdef HAVE_EPOLL
52 #include "msg/async/EventEpoll.h"
53 #endif
54 #ifdef HAVE_KQUEUE
55 #include "msg/async/EventKqueue.h"
56 #endif
57 #include "msg/async/EventSelect.h"
58
59 #include <gtest/gtest.h>
60
61
62 #if GTEST_HAS_PARAM_TEST
63
64 class EventDriverTest : public ::testing::TestWithParam<const char*> {
65 public:
66 EventDriver *driver;
67
68 EventDriverTest(): driver(0) {}
69 void SetUp() override {
70 cerr << __func__ << " start set up " << GetParam() << std::endl;
71 #ifdef HAVE_EPOLL
72 if (strcmp(GetParam(), "epoll"))
73 driver = new EpollDriver(g_ceph_context);
74 #endif
75 #ifdef HAVE_KQUEUE
76 if (strcmp(GetParam(), "kqueue"))
77 driver = new KqueueDriver(g_ceph_context);
78 #endif
79 if (strcmp(GetParam(), "select"))
80 driver = new SelectDriver(g_ceph_context);
81 driver->init(NULL, 100);
82 }
83 void TearDown() override {
84 delete driver;
85 }
86 };
87
88 int set_nonblock(int sd)
89 {
90 int flags;
91
92 /* Set the socket nonblocking.
93 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
94 * interrupted by a signal. */
95 if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
96 return -1;
97 }
98 if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
99 return -1;
100 }
101 return 0;
102 }
103
104
105 TEST_P(EventDriverTest, PipeTest) {
106 int fds[2];
107 vector<FiredFileEvent> fired_events;
108 int r;
109 struct timeval tv;
110 tv.tv_sec = 0;
111 tv.tv_usec = 1;
112
113 r = pipe(fds);
114 ASSERT_EQ(r, 0);
115 r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE);
116 ASSERT_EQ(r, 0);
117 r = driver->event_wait(fired_events, &tv);
118 ASSERT_EQ(r, 0);
119
120 char c = 'A';
121 r = write(fds[1], &c, sizeof(c));
122 ASSERT_EQ(r, 1);
123 r = driver->event_wait(fired_events, &tv);
124 ASSERT_EQ(r, 1);
125 ASSERT_EQ(fired_events[0].fd, fds[0]);
126
127
128 fired_events.clear();
129 r = write(fds[1], &c, sizeof(c));
130 ASSERT_EQ(r, 1);
131 r = driver->event_wait(fired_events, &tv);
132 ASSERT_EQ(r, 1);
133 ASSERT_EQ(fired_events[0].fd, fds[0]);
134
135 fired_events.clear();
136 driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE);
137 r = write(fds[1], &c, sizeof(c));
138 ASSERT_EQ(r, 1);
139 r = driver->event_wait(fired_events, &tv);
140 ASSERT_EQ(r, 0);
141 }
142
143 void* echoclient(void *arg)
144 {
145 intptr_t port = (intptr_t)arg;
146 struct sockaddr_in sa;
147 memset(&sa, 0, sizeof(sa));
148 sa.sin_family = AF_INET;
149 sa.sin_port = htons(port);
150 char addr[] = "127.0.0.1";
151 int r = inet_pton(AF_INET, addr, &sa.sin_addr);
152 assert(r == 1);
153
154 int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0);
155 if (connect_sd >= 0) {
156 r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa));
157 int t = 0;
158
159 do {
160 char c[] = "banner";
161 r = write(connect_sd, c, sizeof(c));
162 char d[100];
163 r = read(connect_sd, d, sizeof(d));
164 if (r == 0)
165 break;
166 if (t++ == 30)
167 break;
168 } while (1);
169 ::close(connect_sd);
170 }
171 return 0;
172 }
173
174 TEST_P(EventDriverTest, NetworkSocketTest) {
175 int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
176 ASSERT_TRUE(listen_sd > 0);
177 int on = 1;
178 int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
179 ASSERT_EQ(r, 0);
180 r = set_nonblock(listen_sd);
181 ASSERT_EQ(r, 0);
182 struct sockaddr_in sa;
183 long port = 0;
184 for (port = 38788; port < 40000; port++) {
185 memset(&sa,0,sizeof(sa));
186 sa.sin_family = AF_INET;
187 sa.sin_port = htons(port);
188 sa.sin_addr.s_addr = htonl(INADDR_ANY);
189
190 r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa));
191 if (r == 0) {
192 break;
193 }
194 }
195 ASSERT_EQ(r, 0);
196 r = listen(listen_sd, 511);
197 ASSERT_EQ(r, 0);
198
199 vector<FiredFileEvent> fired_events;
200 struct timeval tv;
201 tv.tv_sec = 0;
202 tv.tv_usec = 1;
203 r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE);
204 ASSERT_EQ(r, 0);
205 r = driver->event_wait(fired_events, &tv);
206 ASSERT_EQ(r, 0);
207
208 fired_events.clear();
209 pthread_t thread1;
210 r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port);
211 ASSERT_EQ(r, 0);
212 tv.tv_sec = 5;
213 tv.tv_usec = 0;
214 r = driver->event_wait(fired_events, &tv);
215 ASSERT_EQ(r, 1);
216 ASSERT_EQ(fired_events[0].fd, listen_sd);
217
218 fired_events.clear();
219 int client_sd = ::accept(listen_sd, NULL, NULL);
220 ASSERT_TRUE(client_sd > 0);
221 r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE);
222 ASSERT_EQ(r, 0);
223
224 do {
225 fired_events.clear();
226 tv.tv_sec = 5;
227 tv.tv_usec = 0;
228 r = driver->event_wait(fired_events, &tv);
229 ASSERT_EQ(r, 1);
230 ASSERT_EQ(fired_events[0].mask, EVENT_READABLE);
231
232 fired_events.clear();
233 char data[100];
234 r = ::read(client_sd, data, sizeof(data));
235 if (r == 0)
236 break;
237 ASSERT_TRUE(r > 0);
238 r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE);
239 r = driver->event_wait(fired_events, &tv);
240 ASSERT_EQ(r, 1);
241 ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE);
242 r = write(client_sd, data, strlen(data));
243 ASSERT_EQ(r, (int)strlen(data));
244 driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE,
245 EVENT_WRITABLE);
246 } while (1);
247
248 ::close(client_sd);
249 ::close(listen_sd);
250 }
251
252 class FakeEvent : public EventCallback {
253
254 public:
255 void do_request(int fd_or_id) override {}
256 };
257
258 TEST(EventCenterTest, FileEventExpansion) {
259 vector<int> sds;
260 EventCenter center(g_ceph_context);
261 center.init(100, 0, "posix");
262 center.set_owner();
263 EventCallbackRef e(new FakeEvent());
264 for (int i = 0; i < 300; i++) {
265 int sd = ::socket(AF_INET, SOCK_STREAM, 0);
266 center.create_file_event(sd, EVENT_READABLE, e);
267 sds.push_back(sd);
268 }
269
270 for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
271 center.delete_file_event(*it, EVENT_READABLE);
272 }
273
274
275 class Worker : public Thread {
276 CephContext *cct;
277 bool done;
278
279 public:
280 EventCenter center;
281 explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
282 center.init(100, idx, "posix");
283 }
284 void stop() {
285 done = true;
286 center.wakeup();
287 }
288 void* entry() override {
289 center.set_owner();
290 while (!done)
291 center.process_events(1000000);
292 return 0;
293 }
294 };
295
296 class CountEvent: public EventCallback {
297 std::atomic<unsigned> *count;
298 Mutex *lock;
299 Cond *cond;
300
301 public:
302 CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
303 void do_request(int id) override {
304 lock->Lock();
305 (*count)--;
306 cond->Signal();
307 lock->Unlock();
308 }
309 };
310
311 TEST(EventCenterTest, DispatchTest) {
312 Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
313 std::atomic<unsigned> count = { 0 };
314 Mutex lock("DispatchTest::lock");
315 Cond cond;
316 worker1.create("worker_1");
317 worker2.create("worker_2");
318 for (int i = 0; i < 10000; ++i) {
319 count++;
320 worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
321 count++;
322 worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
323 Mutex::Locker l(lock);
324 while (count)
325 cond.Wait(lock);
326 }
327 worker1.stop();
328 worker2.stop();
329 worker1.join();
330 worker2.join();
331 }
332
333 INSTANTIATE_TEST_CASE_P(
334 AsyncMessenger,
335 EventDriverTest,
336 ::testing::Values(
337 #ifdef HAVE_EPOLL
338 "epoll",
339 #endif
340 #ifdef HAVE_KQUEUE
341 "kqueue",
342 #endif
343 "select"
344 )
345 );
346
347 #else
348
349 // Google Test may not support value-parameterized tests with some
350 // compilers. If we use conditional compilation to compile out all
351 // code referring to the gtest_main library, MSVC linker will not link
352 // that library at all and consequently complain about missing entry
353 // point defined in that library (fatal error LNK1561: entry point
354 // must be defined). This dummy test keeps gtest_main linked in.
355 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
356
357 #endif
358
359
360 /*
361 * Local Variables:
362 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
363 * ./ceph_test_async_driver
364 *
365 * End:
366 */