]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_async_driver.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
18 #include <AvailabilityMacros.h>
22 #include <sys/socket.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/ceph_mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
52 #include "msg/async/EventEpoll.h"
55 #include "msg/async/EventKqueue.h"
57 #include "msg/async/EventSelect.h"
59 #include <gtest/gtest.h>
63 class EventDriverTest
: public ::testing::TestWithParam
<const char*> {
67 EventDriverTest(): driver(0) {}
68 void SetUp() override
{
69 cerr
<< __func__
<< " start set up " << GetParam() << std::endl
;
71 if (strcmp(GetParam(), "epoll"))
72 driver
= new EpollDriver(g_ceph_context
);
75 if (strcmp(GetParam(), "kqueue"))
76 driver
= new KqueueDriver(g_ceph_context
);
78 if (strcmp(GetParam(), "select"))
79 driver
= new SelectDriver(g_ceph_context
);
80 driver
->init(NULL
, 100);
82 void TearDown() override
{
87 int set_nonblock(int sd
)
91 /* Set the socket nonblocking.
92 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
93 * interrupted by a signal. */
94 if ((flags
= fcntl(sd
, F_GETFL
)) < 0 ) {
97 if (fcntl(sd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
104 TEST_P(EventDriverTest
, PipeTest
) {
106 vector
<FiredFileEvent
> fired_events
;
114 r
= driver
->add_event(fds
[0], EVENT_NONE
, EVENT_READABLE
);
116 r
= driver
->event_wait(fired_events
, &tv
);
120 r
= write(fds
[1], &c
, sizeof(c
));
122 r
= driver
->event_wait(fired_events
, &tv
);
124 ASSERT_EQ(fired_events
[0].fd
, fds
[0]);
127 fired_events
.clear();
128 r
= write(fds
[1], &c
, sizeof(c
));
130 r
= driver
->event_wait(fired_events
, &tv
);
132 ASSERT_EQ(fired_events
[0].fd
, fds
[0]);
134 fired_events
.clear();
135 driver
->del_event(fds
[0], EVENT_READABLE
, EVENT_READABLE
);
136 r
= write(fds
[1], &c
, sizeof(c
));
138 r
= driver
->event_wait(fired_events
, &tv
);
142 void* echoclient(void *arg
)
144 intptr_t port
= (intptr_t)arg
;
145 struct sockaddr_in sa
;
146 memset(&sa
, 0, sizeof(sa
));
147 sa
.sin_family
= AF_INET
;
148 sa
.sin_port
= htons(port
);
149 char addr
[] = "127.0.0.1";
150 int r
= inet_pton(AF_INET
, addr
, &sa
.sin_addr
);
153 int connect_sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
154 if (connect_sd
>= 0) {
155 r
= connect(connect_sd
, (struct sockaddr
*)&sa
, sizeof(sa
));
161 r
= write(connect_sd
, c
, sizeof(c
));
163 r
= read(connect_sd
, d
, sizeof(d
));
174 TEST_P(EventDriverTest
, NetworkSocketTest
) {
175 int listen_sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
176 ASSERT_TRUE(listen_sd
> 0);
178 int r
= ::setsockopt(listen_sd
, SOL_SOCKET
, SO_REUSEADDR
, &on
, sizeof(on
));
180 r
= set_nonblock(listen_sd
);
182 struct sockaddr_in sa
;
184 for (port
= 38788; port
< 40000; port
++) {
185 memset(&sa
,0,sizeof(sa
));
186 sa
.sin_family
= AF_INET
;
187 sa
.sin_port
= htons(port
);
188 sa
.sin_addr
.s_addr
= htonl(INADDR_ANY
);
190 r
= ::bind(listen_sd
, (struct sockaddr
*)&sa
, sizeof(sa
));
196 r
= listen(listen_sd
, 511);
199 vector
<FiredFileEvent
> fired_events
;
203 r
= driver
->add_event(listen_sd
, EVENT_NONE
, EVENT_READABLE
);
205 r
= driver
->event_wait(fired_events
, &tv
);
208 fired_events
.clear();
210 r
= pthread_create(&thread1
, NULL
, echoclient
, (void*)(intptr_t)port
);
214 r
= driver
->event_wait(fired_events
, &tv
);
216 ASSERT_EQ(fired_events
[0].fd
, listen_sd
);
218 fired_events
.clear();
219 int client_sd
= ::accept(listen_sd
, NULL
, NULL
);
220 ASSERT_TRUE(client_sd
> 0);
221 r
= driver
->add_event(client_sd
, EVENT_NONE
, EVENT_READABLE
);
225 fired_events
.clear();
228 r
= driver
->event_wait(fired_events
, &tv
);
230 ASSERT_EQ(EVENT_READABLE
, fired_events
[0].mask
);
232 fired_events
.clear();
234 r
= ::read(client_sd
, data
, sizeof(data
));
238 r
= driver
->add_event(client_sd
, EVENT_READABLE
, EVENT_WRITABLE
);
240 r
= driver
->event_wait(fired_events
, &tv
);
242 ASSERT_EQ(fired_events
[0].mask
, EVENT_WRITABLE
);
243 r
= write(client_sd
, data
, strlen(data
));
244 ASSERT_EQ((int)strlen(data
), r
);
245 driver
->del_event(client_sd
, EVENT_READABLE
|EVENT_WRITABLE
,
253 class FakeEvent
: public EventCallback
{
256 void do_request(uint64_t fd_or_id
) override
{}
259 TEST(EventCenterTest
, FileEventExpansion
) {
261 EventCenter
center(g_ceph_context
);
262 center
.init(100, 0, "posix");
264 EventCallbackRef
e(new FakeEvent());
265 for (int i
= 0; i
< 300; i
++) {
266 int sd
= ::socket(AF_INET
, SOCK_STREAM
, 0);
267 center
.create_file_event(sd
, EVENT_READABLE
, e
);
271 for (vector
<int>::iterator it
= sds
.begin(); it
!= sds
.end(); ++it
)
272 center
.delete_file_event(*it
, EVENT_READABLE
);
276 class Worker
: public Thread
{
282 explicit Worker(CephContext
*c
, int idx
): cct(c
), done(false), center(c
) {
283 center
.init(100, idx
, "posix");
289 void* entry() override
{
292 center
.process_events(1000000);
297 class CountEvent
: public EventCallback
{
298 std::atomic
<unsigned> *count
;
300 ceph::condition_variable
*cond
;
303 CountEvent(std::atomic
<unsigned> *atomic
,
304 ceph::mutex
*l
, ceph::condition_variable
*c
)
305 : count(atomic
), lock(l
), cond(c
) {}
306 void do_request(uint64_t id
) override
{
307 std::scoped_lock l
{*lock
};
313 TEST(EventCenterTest
, DispatchTest
) {
314 Worker
worker1(g_ceph_context
, 1), worker2(g_ceph_context
, 2);
315 std::atomic
<unsigned> count
= { 0 };
316 ceph::mutex lock
= ceph::make_mutex("DispatchTest::lock");
317 ceph::condition_variable cond
;
318 worker1
.create("worker_1");
319 worker2
.create("worker_2");
320 for (int i
= 0; i
< 10000; ++i
) {
322 worker1
.center
.dispatch_event_external(EventCallbackRef(new CountEvent(&count
, &lock
, &cond
)));
324 worker2
.center
.dispatch_event_external(EventCallbackRef(new CountEvent(&count
, &lock
, &cond
)));
325 std::unique_lock l
{lock
};
326 cond
.wait(l
, [&] { return count
== 0; });
334 INSTANTIATE_TEST_SUITE_P(
350 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
351 * ./ceph_test_async_driver