1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
18 #include <AvailabilityMacros.h>
22 #include <sys/socket.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/Mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
52 #include "msg/async/EventEpoll.h"
55 #include "msg/async/EventKqueue.h"
57 #include "msg/async/EventSelect.h"
59 #include <gtest/gtest.h>
62 #if GTEST_HAS_PARAM_TEST
64 class EventDriverTest
: public ::testing::TestWithParam
<const char*> {
68 EventDriverTest(): driver(0) {}
69 void SetUp() override
{
70 cerr
<< __func__
<< " start set up " << GetParam() << std::endl
;
72 if (strcmp(GetParam(), "epoll"))
73 driver
= new EpollDriver(g_ceph_context
);
76 if (strcmp(GetParam(), "kqueue"))
77 driver
= new KqueueDriver(g_ceph_context
);
79 if (strcmp(GetParam(), "select"))
80 driver
= new SelectDriver(g_ceph_context
);
81 driver
->init(NULL
, 100);
83 void TearDown() override
{
88 int set_nonblock(int sd
)
92 /* Set the socket nonblocking.
93 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
94 * interrupted by a signal. */
95 if ((flags
= fcntl(sd
, F_GETFL
)) < 0 ) {
98 if (fcntl(sd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
105 TEST_P(EventDriverTest
, PipeTest
) {
107 vector
<FiredFileEvent
> fired_events
;
115 r
= driver
->add_event(fds
[0], EVENT_NONE
, EVENT_READABLE
);
117 r
= driver
->event_wait(fired_events
, &tv
);
121 r
= write(fds
[1], &c
, sizeof(c
));
123 r
= driver
->event_wait(fired_events
, &tv
);
125 ASSERT_EQ(fired_events
[0].fd
, fds
[0]);
128 fired_events
.clear();
129 r
= write(fds
[1], &c
, sizeof(c
));
131 r
= driver
->event_wait(fired_events
, &tv
);
133 ASSERT_EQ(fired_events
[0].fd
, fds
[0]);
135 fired_events
.clear();
136 driver
->del_event(fds
[0], EVENT_READABLE
, EVENT_READABLE
);
137 r
= write(fds
[1], &c
, sizeof(c
));
139 r
= driver
->event_wait(fired_events
, &tv
);
143 void* echoclient(void *arg
)
145 intptr_t port
= (intptr_t)arg
;
146 struct sockaddr_in sa
;
147 memset(&sa
, 0, sizeof(sa
));
148 sa
.sin_family
= AF_INET
;
149 sa
.sin_port
= htons(port
);
150 char addr
[] = "127.0.0.1";
151 int r
= inet_pton(AF_INET
, addr
, &sa
.sin_addr
);
154 int connect_sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
155 if (connect_sd
>= 0) {
156 r
= connect(connect_sd
, (struct sockaddr
*)&sa
, sizeof(sa
));
162 r
= write(connect_sd
, c
, sizeof(c
));
164 r
= read(connect_sd
, d
, sizeof(d
));
175 TEST_P(EventDriverTest
, NetworkSocketTest
) {
176 int listen_sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
177 ASSERT_TRUE(listen_sd
> 0);
179 int r
= ::setsockopt(listen_sd
, SOL_SOCKET
, SO_REUSEADDR
, &on
, sizeof(on
));
181 r
= set_nonblock(listen_sd
);
183 struct sockaddr_in sa
;
185 for (port
= 38788; port
< 40000; port
++) {
186 memset(&sa
,0,sizeof(sa
));
187 sa
.sin_family
= AF_INET
;
188 sa
.sin_port
= htons(port
);
189 sa
.sin_addr
.s_addr
= htonl(INADDR_ANY
);
191 r
= ::bind(listen_sd
, (struct sockaddr
*)&sa
, sizeof(sa
));
197 r
= listen(listen_sd
, 511);
200 vector
<FiredFileEvent
> fired_events
;
204 r
= driver
->add_event(listen_sd
, EVENT_NONE
, EVENT_READABLE
);
206 r
= driver
->event_wait(fired_events
, &tv
);
209 fired_events
.clear();
211 r
= pthread_create(&thread1
, NULL
, echoclient
, (void*)(intptr_t)port
);
215 r
= driver
->event_wait(fired_events
, &tv
);
217 ASSERT_EQ(fired_events
[0].fd
, listen_sd
);
219 fired_events
.clear();
220 int client_sd
= ::accept(listen_sd
, NULL
, NULL
);
221 ASSERT_TRUE(client_sd
> 0);
222 r
= driver
->add_event(client_sd
, EVENT_NONE
, EVENT_READABLE
);
226 fired_events
.clear();
229 r
= driver
->event_wait(fired_events
, &tv
);
231 ASSERT_EQ(EVENT_READABLE
, fired_events
[0].mask
);
233 fired_events
.clear();
235 r
= ::read(client_sd
, data
, sizeof(data
));
239 r
= driver
->add_event(client_sd
, EVENT_READABLE
, EVENT_WRITABLE
);
241 r
= driver
->event_wait(fired_events
, &tv
);
243 ASSERT_EQ(fired_events
[0].mask
, EVENT_WRITABLE
);
244 r
= write(client_sd
, data
, strlen(data
));
245 ASSERT_EQ((int)strlen(data
), r
);
246 driver
->del_event(client_sd
, EVENT_READABLE
|EVENT_WRITABLE
,
254 class FakeEvent
: public EventCallback
{
257 void do_request(int fd_or_id
) override
{}
260 TEST(EventCenterTest
, FileEventExpansion
) {
262 EventCenter
center(g_ceph_context
);
263 center
.init(100, 0, "posix");
265 EventCallbackRef
e(new FakeEvent());
266 for (int i
= 0; i
< 300; i
++) {
267 int sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
268 center
.create_file_event(sd
, EVENT_READABLE
, e
);
272 for (vector
<int>::iterator it
= sds
.begin(); it
!= sds
.end(); ++it
)
273 center
.delete_file_event(*it
, EVENT_READABLE
);
277 class Worker
: public Thread
{
283 explicit Worker(CephContext
*c
, int idx
): cct(c
), done(false), center(c
) {
284 center
.init(100, idx
, "posix");
290 void* entry() override
{
293 center
.process_events(1000000);
298 class CountEvent
: public EventCallback
{
299 std::atomic
<unsigned> *count
;
304 CountEvent(std::atomic
<unsigned> *atomic
, Mutex
*l
, Cond
*c
): count(atomic
), lock(l
), cond(c
) {}
305 void do_request(int id
) override
{
313 TEST(EventCenterTest
, DispatchTest
) {
314 Worker
worker1(g_ceph_context
, 1), worker2(g_ceph_context
, 2);
315 std::atomic
<unsigned> count
= { 0 };
316 Mutex
lock("DispatchTest::lock");
318 worker1
.create("worker_1");
319 worker2
.create("worker_2");
320 for (int i
= 0; i
< 10000; ++i
) {
322 worker1
.center
.dispatch_event_external(EventCallbackRef(new CountEvent(&count
, &lock
, &cond
)));
324 worker2
.center
.dispatch_event_external(EventCallbackRef(new CountEvent(&count
, &lock
, &cond
)));
325 Mutex::Locker
l(lock
);
335 INSTANTIATE_TEST_CASE_P(
351 // Google Test may not support value-parameterized tests with some
352 // compilers. If we use conditional compilation to compile out all
353 // code referring to the gtest_main library, MSVC linker will not link
354 // that library at all and consequently complain about missing entry
355 // point defined in that library (fatal error LNK1561: entry point
356 // must be defined). This dummy test keeps gtest_main linked in.
357 TEST(DummyTest
, ValueParameterizedTestsAreNotSupportedOnThisPlatform
) {}
364 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
365 * ./ceph_test_async_driver