]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/msgr/test_async_driver.cc
update sources to v12.1.0
[ceph.git] / ceph / src / test / msgr / test_async_driver.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#ifdef __APPLE__
18#include <AvailabilityMacros.h>
19#endif
20
21#include <fcntl.h>
22#include <sys/socket.h>
23#include <pthread.h>
24#include <stdint.h>
25#include <arpa/inet.h>
26#include "include/Context.h"
7c673cae
FG
27#include "common/Mutex.h"
28#include "common/Cond.h"
29#include "global/global_init.h"
30#include "common/ceph_argparse.h"
31#include "msg/async/Event.h"
32
31f18b77
FG
33#include <atomic>
34
7c673cae
FG
35// We use epoll, kqueue, evport, select in descending order by performance.
36#if defined(__linux__)
37#define HAVE_EPOLL 1
38#endif
39
40#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
41#define HAVE_KQUEUE 1
42#endif
43
44#ifdef __sun
45#include <sys/feature_tests.h>
46#ifdef _DTRACE_VERSION
47#define HAVE_EVPORT 1
48#endif
49#endif
50
51#ifdef HAVE_EPOLL
52#include "msg/async/EventEpoll.h"
53#endif
54#ifdef HAVE_KQUEUE
55#include "msg/async/EventKqueue.h"
56#endif
57#include "msg/async/EventSelect.h"
58
59#include <gtest/gtest.h>
60
61
62#if GTEST_HAS_PARAM_TEST
63
64class EventDriverTest : public ::testing::TestWithParam<const char*> {
65 public:
66 EventDriver *driver;
67
68 EventDriverTest(): driver(0) {}
69 void SetUp() override {
70 cerr << __func__ << " start set up " << GetParam() << std::endl;
71#ifdef HAVE_EPOLL
72 if (strcmp(GetParam(), "epoll"))
73 driver = new EpollDriver(g_ceph_context);
74#endif
75#ifdef HAVE_KQUEUE
76 if (strcmp(GetParam(), "kqueue"))
77 driver = new KqueueDriver(g_ceph_context);
78#endif
79 if (strcmp(GetParam(), "select"))
80 driver = new SelectDriver(g_ceph_context);
81 driver->init(NULL, 100);
82 }
83 void TearDown() override {
84 delete driver;
85 }
86};
87
88int set_nonblock(int sd)
89{
90 int flags;
91
92 /* Set the socket nonblocking.
93 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
94 * interrupted by a signal. */
95 if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
96 return -1;
97 }
98 if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
99 return -1;
100 }
101 return 0;
102}
103
104
105TEST_P(EventDriverTest, PipeTest) {
106 int fds[2];
107 vector<FiredFileEvent> fired_events;
108 int r;
109 struct timeval tv;
110 tv.tv_sec = 0;
111 tv.tv_usec = 1;
112
113 r = pipe(fds);
114 ASSERT_EQ(r, 0);
115 r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE);
116 ASSERT_EQ(r, 0);
117 r = driver->event_wait(fired_events, &tv);
118 ASSERT_EQ(r, 0);
119
120 char c = 'A';
121 r = write(fds[1], &c, sizeof(c));
122 ASSERT_EQ(r, 1);
123 r = driver->event_wait(fired_events, &tv);
124 ASSERT_EQ(r, 1);
125 ASSERT_EQ(fired_events[0].fd, fds[0]);
126
127
128 fired_events.clear();
129 r = write(fds[1], &c, sizeof(c));
130 ASSERT_EQ(r, 1);
131 r = driver->event_wait(fired_events, &tv);
132 ASSERT_EQ(r, 1);
133 ASSERT_EQ(fired_events[0].fd, fds[0]);
134
135 fired_events.clear();
136 driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE);
137 r = write(fds[1], &c, sizeof(c));
138 ASSERT_EQ(r, 1);
139 r = driver->event_wait(fired_events, &tv);
140 ASSERT_EQ(r, 0);
141}
142
143void* echoclient(void *arg)
144{
145 intptr_t port = (intptr_t)arg;
146 struct sockaddr_in sa;
147 memset(&sa, 0, sizeof(sa));
148 sa.sin_family = AF_INET;
149 sa.sin_port = htons(port);
150 char addr[] = "127.0.0.1";
151 int r = inet_pton(AF_INET, addr, &sa.sin_addr);
152 assert(r == 1);
153
154 int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0);
155 if (connect_sd >= 0) {
156 r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa));
157 int t = 0;
158
159 do {
160 char c[] = "banner";
161 r = write(connect_sd, c, sizeof(c));
162 char d[100];
163 r = read(connect_sd, d, sizeof(d));
164 if (r == 0)
165 break;
166 if (t++ == 30)
167 break;
168 } while (1);
169 ::close(connect_sd);
170 }
171 return 0;
172}
173
174TEST_P(EventDriverTest, NetworkSocketTest) {
175 int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
176 ASSERT_TRUE(listen_sd > 0);
177 int on = 1;
178 int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
179 ASSERT_EQ(r, 0);
180 r = set_nonblock(listen_sd);
181 ASSERT_EQ(r, 0);
182 struct sockaddr_in sa;
183 long port = 0;
184 for (port = 38788; port < 40000; port++) {
185 memset(&sa,0,sizeof(sa));
186 sa.sin_family = AF_INET;
187 sa.sin_port = htons(port);
188 sa.sin_addr.s_addr = htonl(INADDR_ANY);
189
190 r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa));
191 if (r == 0) {
192 break;
193 }
194 }
195 ASSERT_EQ(r, 0);
196 r = listen(listen_sd, 511);
197 ASSERT_EQ(r, 0);
198
199 vector<FiredFileEvent> fired_events;
200 struct timeval tv;
201 tv.tv_sec = 0;
202 tv.tv_usec = 1;
203 r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE);
204 ASSERT_EQ(r, 0);
205 r = driver->event_wait(fired_events, &tv);
206 ASSERT_EQ(r, 0);
207
208 fired_events.clear();
209 pthread_t thread1;
210 r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port);
211 ASSERT_EQ(r, 0);
212 tv.tv_sec = 5;
213 tv.tv_usec = 0;
214 r = driver->event_wait(fired_events, &tv);
215 ASSERT_EQ(r, 1);
216 ASSERT_EQ(fired_events[0].fd, listen_sd);
217
218 fired_events.clear();
219 int client_sd = ::accept(listen_sd, NULL, NULL);
220 ASSERT_TRUE(client_sd > 0);
221 r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE);
222 ASSERT_EQ(r, 0);
223
224 do {
225 fired_events.clear();
226 tv.tv_sec = 5;
227 tv.tv_usec = 0;
228 r = driver->event_wait(fired_events, &tv);
229 ASSERT_EQ(r, 1);
230 ASSERT_EQ(fired_events[0].mask, EVENT_READABLE);
231
232 fired_events.clear();
233 char data[100];
234 r = ::read(client_sd, data, sizeof(data));
235 if (r == 0)
236 break;
237 ASSERT_TRUE(r > 0);
238 r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE);
239 r = driver->event_wait(fired_events, &tv);
240 ASSERT_EQ(r, 1);
241 ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE);
242 r = write(client_sd, data, strlen(data));
243 ASSERT_EQ(r, (int)strlen(data));
244 driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE,
245 EVENT_WRITABLE);
246 } while (1);
247
248 ::close(client_sd);
249 ::close(listen_sd);
250}
251
252class FakeEvent : public EventCallback {
253
254 public:
255 void do_request(int fd_or_id) override {}
256};
257
258TEST(EventCenterTest, FileEventExpansion) {
259 vector<int> sds;
260 EventCenter center(g_ceph_context);
261 center.init(100, 0, "posix");
262 center.set_owner();
263 EventCallbackRef e(new FakeEvent());
264 for (int i = 0; i < 300; i++) {
265 int sd = ::socket(AF_INET, SOCK_STREAM, 0);
266 center.create_file_event(sd, EVENT_READABLE, e);
267 sds.push_back(sd);
268 }
269
270 for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
271 center.delete_file_event(*it, EVENT_READABLE);
272}
273
274
275class Worker : public Thread {
276 CephContext *cct;
277 bool done;
278
279 public:
280 EventCenter center;
281 explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
282 center.init(100, idx, "posix");
283 }
284 void stop() {
285 done = true;
286 center.wakeup();
287 }
288 void* entry() override {
289 center.set_owner();
290 while (!done)
291 center.process_events(1000000);
292 return 0;
293 }
294};
295
296class CountEvent: public EventCallback {
31f18b77 297 std::atomic<unsigned> *count;
7c673cae
FG
298 Mutex *lock;
299 Cond *cond;
300
301 public:
31f18b77 302 CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
7c673cae
FG
303 void do_request(int id) override {
304 lock->Lock();
31f18b77 305 (*count)--;
7c673cae
FG
306 cond->Signal();
307 lock->Unlock();
308 }
309};
310
311TEST(EventCenterTest, DispatchTest) {
312 Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
31f18b77 313 std::atomic<unsigned> count = { 0 };
7c673cae
FG
314 Mutex lock("DispatchTest::lock");
315 Cond cond;
316 worker1.create("worker_1");
317 worker2.create("worker_2");
318 for (int i = 0; i < 10000; ++i) {
31f18b77 319 count++;
7c673cae 320 worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
31f18b77 321 count++;
7c673cae
FG
322 worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
323 Mutex::Locker l(lock);
31f18b77 324 while (count)
7c673cae
FG
325 cond.Wait(lock);
326 }
327 worker1.stop();
328 worker2.stop();
329 worker1.join();
330 worker2.join();
331}
332
333INSTANTIATE_TEST_CASE_P(
334 AsyncMessenger,
335 EventDriverTest,
336 ::testing::Values(
337#ifdef HAVE_EPOLL
338 "epoll",
339#endif
340#ifdef HAVE_KQUEUE
341 "kqueue",
342#endif
343 "select"
344 )
345);
346
347#else
348
349// Google Test may not support value-parameterized tests with some
350// compilers. If we use conditional compilation to compile out all
351// code referring to the gtest_main library, MSVC linker will not link
352// that library at all and consequently complain about missing entry
353// point defined in that library (fatal error LNK1561: entry point
354// must be defined). This dummy test keeps gtest_main linked in.
355TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
356
357#endif
358
359
360/*
361 * Local Variables:
362 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
363 * ./ceph_test_async_driver
364 *
365 * End:
366 */