]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_async_driver.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / msgr / test_async_driver.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifdef __APPLE__
18 #include <AvailabilityMacros.h>
19 #endif
20
21 #include <fcntl.h>
22 #include <sys/socket.h>
23 #include <pthread.h>
24 #include <stdint.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/ceph_mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
32
33 #include <atomic>
34
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
37 #define HAVE_EPOLL 1
38 #endif
39
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
41 #define HAVE_KQUEUE 1
42 #endif
43
44 #ifdef __sun
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
47 #define HAVE_EVPORT 1
48 #endif
49 #endif
50
51 #ifdef HAVE_EPOLL
52 #include "msg/async/EventEpoll.h"
53 #endif
54 #ifdef HAVE_KQUEUE
55 #include "msg/async/EventKqueue.h"
56 #endif
57 #include "msg/async/EventSelect.h"
58
59 #include <gtest/gtest.h>
60
61 using namespace std;
62
63 class EventDriverTest : public ::testing::TestWithParam<const char*> {
64 public:
65 EventDriver *driver;
66
67 EventDriverTest(): driver(0) {}
68 void SetUp() override {
69 cerr << __func__ << " start set up " << GetParam() << std::endl;
70 #ifdef HAVE_EPOLL
71 if (strcmp(GetParam(), "epoll"))
72 driver = new EpollDriver(g_ceph_context);
73 #endif
74 #ifdef HAVE_KQUEUE
75 if (strcmp(GetParam(), "kqueue"))
76 driver = new KqueueDriver(g_ceph_context);
77 #endif
78 if (strcmp(GetParam(), "select"))
79 driver = new SelectDriver(g_ceph_context);
80 driver->init(NULL, 100);
81 }
82 void TearDown() override {
83 delete driver;
84 }
85 };
86
87 int set_nonblock(int sd)
88 {
89 int flags;
90
91 /* Set the socket nonblocking.
92 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
93 * interrupted by a signal. */
94 if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
95 return -1;
96 }
97 if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
98 return -1;
99 }
100 return 0;
101 }
102
103
104 TEST_P(EventDriverTest, PipeTest) {
105 int fds[2];
106 vector<FiredFileEvent> fired_events;
107 int r;
108 struct timeval tv;
109 tv.tv_sec = 0;
110 tv.tv_usec = 1;
111
112 r = pipe(fds);
113 ASSERT_EQ(r, 0);
114 r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE);
115 ASSERT_EQ(r, 0);
116 r = driver->event_wait(fired_events, &tv);
117 ASSERT_EQ(r, 0);
118
119 char c = 'A';
120 r = write(fds[1], &c, sizeof(c));
121 ASSERT_EQ(r, 1);
122 r = driver->event_wait(fired_events, &tv);
123 ASSERT_EQ(r, 1);
124 ASSERT_EQ(fired_events[0].fd, fds[0]);
125
126
127 fired_events.clear();
128 r = write(fds[1], &c, sizeof(c));
129 ASSERT_EQ(r, 1);
130 r = driver->event_wait(fired_events, &tv);
131 ASSERT_EQ(r, 1);
132 ASSERT_EQ(fired_events[0].fd, fds[0]);
133
134 fired_events.clear();
135 driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE);
136 r = write(fds[1], &c, sizeof(c));
137 ASSERT_EQ(r, 1);
138 r = driver->event_wait(fired_events, &tv);
139 ASSERT_EQ(r, 0);
140 }
141
142 void* echoclient(void *arg)
143 {
144 intptr_t port = (intptr_t)arg;
145 struct sockaddr_in sa;
146 memset(&sa, 0, sizeof(sa));
147 sa.sin_family = AF_INET;
148 sa.sin_port = htons(port);
149 char addr[] = "127.0.0.1";
150 int r = inet_pton(AF_INET, addr, &sa.sin_addr);
151 ceph_assert(r == 1);
152
153 int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0);
154 if (connect_sd >= 0) {
155 r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa));
156 ceph_assert(r == 0);
157 int t = 0;
158
159 do {
160 char c[] = "banner";
161 r = write(connect_sd, c, sizeof(c));
162 char d[100];
163 r = read(connect_sd, d, sizeof(d));
164 if (r == 0)
165 break;
166 if (t++ == 30)
167 break;
168 } while (1);
169 ::close(connect_sd);
170 }
171 return 0;
172 }
173
174 TEST_P(EventDriverTest, NetworkSocketTest) {
175 int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
176 ASSERT_TRUE(listen_sd > 0);
177 int on = 1;
178 int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
179 ASSERT_EQ(r, 0);
180 r = set_nonblock(listen_sd);
181 ASSERT_EQ(r, 0);
182 struct sockaddr_in sa;
183 long port = 0;
184 for (port = 38788; port < 40000; port++) {
185 memset(&sa,0,sizeof(sa));
186 sa.sin_family = AF_INET;
187 sa.sin_port = htons(port);
188 sa.sin_addr.s_addr = htonl(INADDR_ANY);
189
190 r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa));
191 if (r == 0) {
192 break;
193 }
194 }
195 ASSERT_EQ(r, 0);
196 r = listen(listen_sd, 511);
197 ASSERT_EQ(r, 0);
198
199 vector<FiredFileEvent> fired_events;
200 struct timeval tv;
201 tv.tv_sec = 0;
202 tv.tv_usec = 1;
203 r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE);
204 ASSERT_EQ(r, 0);
205 r = driver->event_wait(fired_events, &tv);
206 ASSERT_EQ(r, 0);
207
208 fired_events.clear();
209 pthread_t thread1;
210 r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port);
211 ASSERT_EQ(r, 0);
212 tv.tv_sec = 5;
213 tv.tv_usec = 0;
214 r = driver->event_wait(fired_events, &tv);
215 ASSERT_EQ(r, 1);
216 ASSERT_EQ(fired_events[0].fd, listen_sd);
217
218 fired_events.clear();
219 int client_sd = ::accept(listen_sd, NULL, NULL);
220 ASSERT_TRUE(client_sd > 0);
221 r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE);
222 ASSERT_EQ(r, 0);
223
224 do {
225 fired_events.clear();
226 tv.tv_sec = 5;
227 tv.tv_usec = 0;
228 r = driver->event_wait(fired_events, &tv);
229 ASSERT_EQ(1, r);
230 ASSERT_EQ(EVENT_READABLE, fired_events[0].mask);
231
232 fired_events.clear();
233 char data[100];
234 r = ::read(client_sd, data, sizeof(data));
235 if (r == 0)
236 break;
237 ASSERT_GT(r, 0);
238 r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE);
239 ASSERT_EQ(0, r);
240 r = driver->event_wait(fired_events, &tv);
241 ASSERT_EQ(1, r);
242 ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE);
243 r = write(client_sd, data, strlen(data));
244 ASSERT_EQ((int)strlen(data), r);
245 driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE,
246 EVENT_WRITABLE);
247 } while (1);
248
249 ::close(client_sd);
250 ::close(listen_sd);
251 }
252
253 class FakeEvent : public EventCallback {
254
255 public:
256 void do_request(uint64_t fd_or_id) override {}
257 };
258
259 TEST(EventCenterTest, FileEventExpansion) {
260 vector<int> sds;
261 EventCenter center(g_ceph_context);
262 center.init(100, 0, "posix");
263 center.set_owner();
264 EventCallbackRef e(new FakeEvent());
265 for (int i = 0; i < 300; i++) {
266 int sd = ::socket(AF_INET, SOCK_STREAM, 0);
267 center.create_file_event(sd, EVENT_READABLE, e);
268 sds.push_back(sd);
269 }
270
271 for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
272 center.delete_file_event(*it, EVENT_READABLE);
273 }
274
275
276 class Worker : public Thread {
277 CephContext *cct;
278 bool done;
279
280 public:
281 EventCenter center;
282 explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
283 center.init(100, idx, "posix");
284 }
285 void stop() {
286 done = true;
287 center.wakeup();
288 }
289 void* entry() override {
290 center.set_owner();
291 while (!done)
292 center.process_events(1000000);
293 return 0;
294 }
295 };
296
297 class CountEvent: public EventCallback {
298 std::atomic<unsigned> *count;
299 ceph::mutex *lock;
300 ceph::condition_variable *cond;
301
302 public:
303 CountEvent(std::atomic<unsigned> *atomic,
304 ceph::mutex *l, ceph::condition_variable *c)
305 : count(atomic), lock(l), cond(c) {}
306 void do_request(uint64_t id) override {
307 std::scoped_lock l{*lock};
308 (*count)--;
309 cond->notify_all();
310 }
311 };
312
313 TEST(EventCenterTest, DispatchTest) {
314 Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
315 std::atomic<unsigned> count = { 0 };
316 ceph::mutex lock = ceph::make_mutex("DispatchTest::lock");
317 ceph::condition_variable cond;
318 worker1.create("worker_1");
319 worker2.create("worker_2");
320 for (int i = 0; i < 10000; ++i) {
321 count++;
322 worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
323 count++;
324 worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
325 std::unique_lock l{lock};
326 cond.wait(l, [&] { return count == 0; });
327 }
328 worker1.stop();
329 worker2.stop();
330 worker1.join();
331 worker2.join();
332 }
333
334 INSTANTIATE_TEST_SUITE_P(
335 AsyncMessenger,
336 EventDriverTest,
337 ::testing::Values(
338 #ifdef HAVE_EPOLL
339 "epoll",
340 #endif
341 #ifdef HAVE_KQUEUE
342 "kqueue",
343 #endif
344 "select"
345 )
346 );
347
348 /*
349 * Local Variables:
350 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
351 * ./ceph_test_async_driver
352 *
353 * End:
354 */