1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
18 #include <AvailabilityMacros.h>
22 #include <sys/socket.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/Mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
52 #include "msg/async/EventEpoll.h"
55 #include "msg/async/EventKqueue.h"
57 #include "msg/async/EventSelect.h"
59 #include <gtest/gtest.h>
62 #if GTEST_HAS_PARAM_TEST
64 class EventDriverTest
: public ::testing::TestWithParam
<const char*> {
68 EventDriverTest(): driver(0) {}
69 void SetUp() override
{
70 cerr
<< __func__
<< " start set up " << GetParam() << std::endl
;
72 if (strcmp(GetParam(), "epoll"))
73 driver
= new EpollDriver(g_ceph_context
);
76 if (strcmp(GetParam(), "kqueue"))
77 driver
= new KqueueDriver(g_ceph_context
);
79 if (strcmp(GetParam(), "select"))
80 driver
= new SelectDriver(g_ceph_context
);
81 driver
->init(NULL
, 100);
83 void TearDown() override
{
88 int set_nonblock(int sd
)
92 /* Set the socket nonblocking.
93 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
94 * interrupted by a signal. */
95 if ((flags
= fcntl(sd
, F_GETFL
)) < 0 ) {
98 if (fcntl(sd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
105 TEST_P(EventDriverTest
, PipeTest
) {
107 vector
<FiredFileEvent
> fired_events
;
115 r
= driver
->add_event(fds
[0], EVENT_NONE
, EVENT_READABLE
);
117 r
= driver
->event_wait(fired_events
, &tv
);
121 r
= write(fds
[1], &c
, sizeof(c
));
123 r
= driver
->event_wait(fired_events
, &tv
);
125 ASSERT_EQ(fired_events
[0].fd
, fds
[0]);
128 fired_events
.clear();
129 r
= write(fds
[1], &c
, sizeof(c
));
131 r
= driver
->event_wait(fired_events
, &tv
);
133 ASSERT_EQ(fired_events
[0].fd
, fds
[0]);
135 fired_events
.clear();
136 driver
->del_event(fds
[0], EVENT_READABLE
, EVENT_READABLE
);
137 r
= write(fds
[1], &c
, sizeof(c
));
139 r
= driver
->event_wait(fired_events
, &tv
);
143 void* echoclient(void *arg
)
145 intptr_t port
= (intptr_t)arg
;
146 struct sockaddr_in sa
;
147 memset(&sa
, 0, sizeof(sa
));
148 sa
.sin_family
= AF_INET
;
149 sa
.sin_port
= htons(port
);
150 char addr
[] = "127.0.0.1";
151 int r
= inet_pton(AF_INET
, addr
, &sa
.sin_addr
);
154 int connect_sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
155 if (connect_sd
>= 0) {
156 r
= connect(connect_sd
, (struct sockaddr
*)&sa
, sizeof(sa
));
161 r
= write(connect_sd
, c
, sizeof(c
));
163 r
= read(connect_sd
, d
, sizeof(d
));
174 TEST_P(EventDriverTest
, NetworkSocketTest
) {
175 int listen_sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
176 ASSERT_TRUE(listen_sd
> 0);
178 int r
= ::setsockopt(listen_sd
, SOL_SOCKET
, SO_REUSEADDR
, &on
, sizeof(on
));
180 r
= set_nonblock(listen_sd
);
182 struct sockaddr_in sa
;
184 for (port
= 38788; port
< 40000; port
++) {
185 memset(&sa
,0,sizeof(sa
));
186 sa
.sin_family
= AF_INET
;
187 sa
.sin_port
= htons(port
);
188 sa
.sin_addr
.s_addr
= htonl(INADDR_ANY
);
190 r
= ::bind(listen_sd
, (struct sockaddr
*)&sa
, sizeof(sa
));
196 r
= listen(listen_sd
, 511);
199 vector
<FiredFileEvent
> fired_events
;
203 r
= driver
->add_event(listen_sd
, EVENT_NONE
, EVENT_READABLE
);
205 r
= driver
->event_wait(fired_events
, &tv
);
208 fired_events
.clear();
210 r
= pthread_create(&thread1
, NULL
, echoclient
, (void*)(intptr_t)port
);
214 r
= driver
->event_wait(fired_events
, &tv
);
216 ASSERT_EQ(fired_events
[0].fd
, listen_sd
);
218 fired_events
.clear();
219 int client_sd
= ::accept(listen_sd
, NULL
, NULL
);
220 ASSERT_TRUE(client_sd
> 0);
221 r
= driver
->add_event(client_sd
, EVENT_NONE
, EVENT_READABLE
);
225 fired_events
.clear();
228 r
= driver
->event_wait(fired_events
, &tv
);
230 ASSERT_EQ(fired_events
[0].mask
, EVENT_READABLE
);
232 fired_events
.clear();
234 r
= ::read(client_sd
, data
, sizeof(data
));
238 r
= driver
->add_event(client_sd
, EVENT_READABLE
, EVENT_WRITABLE
);
239 r
= driver
->event_wait(fired_events
, &tv
);
241 ASSERT_EQ(fired_events
[0].mask
, EVENT_WRITABLE
);
242 r
= write(client_sd
, data
, strlen(data
));
243 ASSERT_EQ(r
, (int)strlen(data
));
244 driver
->del_event(client_sd
, EVENT_READABLE
|EVENT_WRITABLE
,
252 class FakeEvent
: public EventCallback
{
255 void do_request(int fd_or_id
) override
{}
258 TEST(EventCenterTest
, FileEventExpansion
) {
260 EventCenter
center(g_ceph_context
);
261 center
.init(100, 0, "posix");
263 EventCallbackRef
e(new FakeEvent());
264 for (int i
= 0; i
< 300; i
++) {
265 int sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
266 center
.create_file_event(sd
, EVENT_READABLE
, e
);
270 for (vector
<int>::iterator it
= sds
.begin(); it
!= sds
.end(); ++it
)
271 center
.delete_file_event(*it
, EVENT_READABLE
);
275 class Worker
: public Thread
{
281 explicit Worker(CephContext
*c
, int idx
): cct(c
), done(false), center(c
) {
282 center
.init(100, idx
, "posix");
288 void* entry() override
{
291 center
.process_events(1000000);
296 class CountEvent
: public EventCallback
{
297 std::atomic
<unsigned> *count
;
302 CountEvent(std::atomic
<unsigned> *atomic
, Mutex
*l
, Cond
*c
): count(atomic
), lock(l
), cond(c
) {}
303 void do_request(int id
) override
{
311 TEST(EventCenterTest
, DispatchTest
) {
312 Worker
worker1(g_ceph_context
, 1), worker2(g_ceph_context
, 2);
313 std::atomic
<unsigned> count
= { 0 };
314 Mutex
lock("DispatchTest::lock");
316 worker1
.create("worker_1");
317 worker2
.create("worker_2");
318 for (int i
= 0; i
< 10000; ++i
) {
320 worker1
.center
.dispatch_event_external(EventCallbackRef(new CountEvent(&count
, &lock
, &cond
)));
322 worker2
.center
.dispatch_event_external(EventCallbackRef(new CountEvent(&count
, &lock
, &cond
)));
323 Mutex::Locker
l(lock
);
333 INSTANTIATE_TEST_CASE_P(
349 // Google Test may not support value-parameterized tests with some
350 // compilers. If we use conditional compilation to compile out all
351 // code referring to the gtest_main library, MSVC linker will not link
352 // that library at all and consequently complain about missing entry
353 // point defined in that library (fatal error LNK1561: entry point
354 // must be defined). This dummy test keeps gtest_main linked in.
355 TEST(DummyTest
, ValueParameterizedTestsAreNotSupportedOnThisPlatform
) {}
362 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
363 * ./ceph_test_async_driver