1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
24 #include <gtest/gtest.h>
27 #include "include/Context.h"
29 #include "msg/async/Event.h"
30 #include "msg/async/Stack.h"
33 #if GTEST_HAS_PARAM_TEST
35 class NetworkWorkerTest
: public ::testing::TestWithParam
<const char*> {
37 std::shared_ptr
<NetworkStack
> stack
;
38 string addr
, port_addr
;
40 NetworkWorkerTest() {}
41 void SetUp() override
{
42 cerr
<< __func__
<< " start set up " << GetParam() << std::endl
;
43 if (strncmp(GetParam(), "dpdk", 4)) {
44 g_ceph_context
->_conf
->set_val("ms_type", "async+posix", false);
45 addr
= "127.0.0.1:15000";
46 port_addr
= "127.0.0.1:15001";
48 g_ceph_context
->_conf
->set_val("ms_type", "async+dpdk", false);
49 g_ceph_context
->_conf
->set_val("ms_dpdk_debug_allow_loopback", "true", false);
50 g_ceph_context
->_conf
->set_val("ms_async_op_threads", "2", false);
51 g_ceph_context
->_conf
->set_val("ms_dpdk_coremask", "0x7", false);
52 g_ceph_context
->_conf
->set_val("ms_dpdk_host_ipv4_addr", "172.16.218.3", false);
53 g_ceph_context
->_conf
->set_val("ms_dpdk_gateway_ipv4_addr", "172.16.218.2", false);
54 g_ceph_context
->_conf
->set_val("ms_dpdk_netmask_ipv4_addr", "255.255.255.0", false);
55 addr
= "172.16.218.3:15000";
56 port_addr
= "172.16.218.3:15001";
58 stack
= NetworkStack::create(g_ceph_context
, GetParam());
61 void TearDown() override
{
64 string
get_addr() const {
67 string
get_ip_different_port() const {
70 string
get_different_ip() const {
71 return "10.0.123.100:4323";
73 EventCenter
*get_center(unsigned i
) {
74 return &stack
->get_worker(i
)->center
;
76 Worker
*get_worker(unsigned i
) {
77 return stack
->get_worker(i
);
79 template<typename func
>
80 class C_dispatch
: public EventCallback
{
83 std::atomic_bool done
;
85 C_dispatch(Worker
*w
, func
&&_f
): worker(w
), f(std::move(_f
)), done(false) {}
86 void do_request(int id
) override
{
91 int us
= 1000 * 1000 * 1000;
99 template<typename func
>
100 void exec_events(func
&&f
) {
101 std::vector
<C_dispatch
<func
>*> dis
;
102 for (unsigned i
= 0; i
< stack
->get_num_worker(); ++i
) {
103 Worker
*w
= stack
->get_worker(i
);
104 C_dispatch
<func
> *e
= new C_dispatch
<func
>(w
, std::move(f
));
105 stack
->get_worker(i
)->center
.dispatch_event_external(e
);
109 for (auto &&e
: dis
) {
116 class C_poll
: public EventCallback
{
118 std::atomic
<bool> woken
;
119 static const int sleepus
= 500;
122 C_poll(EventCenter
*c
): center(c
), woken(false) {}
123 void do_request(int r
) override
{
126 bool poll(int milliseconds
) {
127 auto start
= ceph::coarse_real_clock::now();
129 center
->process_events(sleepus
);
131 auto r
= std::chrono::duration_cast
<std::chrono::milliseconds
>(
132 ceph::coarse_real_clock::now() - start
);
133 if (r
>= std::chrono::milliseconds(milliseconds
))
143 TEST_P(NetworkWorkerTest
, SimpleTest
) {
144 entity_addr_t bind_addr
;
145 ASSERT_TRUE(bind_addr
.parse(get_addr().c_str()));
146 std::atomic_bool
accepted(false);
147 std::atomic_bool
*accepted_p
= &accepted
;
149 exec_events([this, accepted_p
, bind_addr
](Worker
*worker
) mutable {
150 entity_addr_t cli_addr
;
151 SocketOptions options
;
152 ServerSocket bind_socket
;
153 EventCenter
*center
= &worker
->center
;
155 if (stack
->support_local_listen_table() || worker
->id
== 0)
156 r
= worker
->listen(bind_addr
, options
, &bind_socket
);
159 ConnectedSocket cli_socket
, srv_socket
;
160 if (worker
->id
== 0) {
161 r
= worker
->connect(bind_addr
, options
, &cli_socket
);
165 bool is_my_accept
= false;
168 center
->create_file_event(bind_socket
.fd(), EVENT_READABLE
, &cb
);
173 ASSERT_TRUE(*accepted_p
);
174 center
->delete_file_event(bind_socket
.fd(), EVENT_READABLE
);
178 r
= bind_socket
.accept(&srv_socket
, options
, &cli_addr
, worker
);
180 ASSERT_TRUE(srv_socket
.fd() > 0);
183 if (worker
->id
== 0) {
185 center
->create_file_event(cli_socket
.fd(), EVENT_READABLE
, &cb
);
186 r
= cli_socket
.is_connected();
188 ASSERT_EQ(true, cb
.poll(500));
189 r
= cli_socket
.is_connected();
192 center
->delete_file_event(cli_socket
.fd(), EVENT_READABLE
);
195 const char *message
= "this is a new message";
196 int len
= strlen(message
);
198 bl
.append(message
, len
);
199 if (worker
->id
== 0) {
200 r
= cli_socket
.send(bl
, false);
207 center
->create_file_event(srv_socket
.fd(), EVENT_READABLE
, &cb
);
209 r
= srv_socket
.read(buf
, sizeof(buf
));
210 while (r
== -EAGAIN
) {
211 ASSERT_TRUE(cb
.poll(500));
212 r
= srv_socket
.read(buf
, sizeof(buf
));
216 ASSERT_EQ(0, memcmp(buf
, message
, len
));
218 bind_socket
.abort_accept();
220 if (worker
->id
== 0) {
221 cli_socket
.shutdown();
222 // ack delay is 200 ms
226 bl
.append(message
, len
);
227 if (worker
->id
== 0) {
228 r
= cli_socket
.send(bl
, false);
229 ASSERT_EQ(-EPIPE
, r
);
233 ASSERT_TRUE(cb
.poll(500));
234 r
= srv_socket
.read(buf
, sizeof(buf
));
237 ASSERT_TRUE(cb
.poll(1000*500));
238 r
= srv_socket
.read(buf
, sizeof(buf
));
241 center
->delete_file_event(srv_socket
.fd(), EVENT_READABLE
);
247 TEST_P(NetworkWorkerTest
, ConnectFailedTest
) {
248 entity_addr_t bind_addr
;
249 ASSERT_TRUE(bind_addr
.parse(get_addr().c_str()));
251 exec_events([this, bind_addr
](Worker
*worker
) mutable {
252 EventCenter
*center
= &worker
->center
;
253 entity_addr_t cli_addr
;
254 SocketOptions options
;
255 ServerSocket bind_socket
;
257 if (stack
->support_local_listen_table() || worker
->id
== 0)
258 r
= worker
->listen(bind_addr
, options
, &bind_socket
);
261 ConnectedSocket cli_socket1
, cli_socket2
;
262 if (worker
->id
== 0) {
263 ASSERT_TRUE(cli_addr
.parse(get_ip_different_port().c_str()));
264 r
= worker
->connect(cli_addr
, options
, &cli_socket1
);
267 center
->create_file_event(cli_socket1
.fd(), EVENT_READABLE
, &cb
);
268 r
= cli_socket1
.is_connected();
270 ASSERT_TRUE(cb
.poll(500));
271 r
= cli_socket1
.is_connected();
273 ASSERT_TRUE(r
== -ECONNREFUSED
|| r
== -ECONNRESET
);
276 if (worker
->id
== 1) {
277 ASSERT_TRUE(cli_addr
.parse(get_different_ip().c_str()));
278 r
= worker
->connect(cli_addr
, options
, &cli_socket2
);
281 center
->create_file_event(cli_socket2
.fd(), EVENT_READABLE
, &cb
);
282 r
= cli_socket2
.is_connected();
285 r
= cli_socket2
.is_connected();
288 center
->delete_file_event(cli_socket2
.fd(), EVENT_READABLE
);
293 TEST_P(NetworkWorkerTest
, ListenTest
) {
294 Worker
*worker
= get_worker(0);
295 entity_addr_t bind_addr
;
296 ASSERT_TRUE(bind_addr
.parse(get_addr().c_str()));
297 SocketOptions options
;
298 ServerSocket bind_socket1
, bind_socket2
;
299 int r
= worker
->listen(bind_addr
, options
, &bind_socket1
);
302 r
= worker
->listen(bind_addr
, options
, &bind_socket2
);
303 ASSERT_EQ(-EADDRINUSE
, r
);
306 TEST_P(NetworkWorkerTest
, AcceptAndCloseTest
) {
307 entity_addr_t bind_addr
;
308 ASSERT_TRUE(bind_addr
.parse(get_addr().c_str()));
309 std::atomic_bool
accepted(false);
310 std::atomic_bool
*accepted_p
= &accepted
;
311 std::atomic_int
unbind_count(stack
->get_num_worker());
312 std::atomic_int
*count_p
= &unbind_count
;
313 exec_events([this, bind_addr
, accepted_p
, count_p
](Worker
*worker
) mutable {
314 SocketOptions options
;
315 EventCenter
*center
= &worker
->center
;
316 entity_addr_t cli_addr
;
319 ServerSocket bind_socket
;
320 if (stack
->support_local_listen_table() || worker
->id
== 0)
321 r
= worker
->listen(bind_addr
, options
, &bind_socket
);
324 ConnectedSocket srv_socket
, cli_socket
;
326 r
= bind_socket
.accept(&srv_socket
, options
, &cli_addr
, worker
);
327 ASSERT_EQ(-EAGAIN
, r
);
331 if (worker
->id
== 0) {
332 center
->create_file_event(bind_socket
.fd(), EVENT_READABLE
, &cb
);
333 r
= worker
->connect(bind_addr
, options
, &cli_socket
);
335 ASSERT_TRUE(cb
.poll(500));
341 ConnectedSocket srv_socket2
;
343 r
= bind_socket
.accept(&srv_socket2
, options
, &cli_addr
, worker
);
345 } while (r
== -EAGAIN
&& !*accepted_p
);
348 ASSERT_TRUE(*accepted_p
);
349 // srv_socket2 closed
350 center
->delete_file_event(bind_socket
.fd(), EVENT_READABLE
);
353 if (worker
->id
== 0) {
356 center
->create_file_event(cli_socket
.fd(), EVENT_READABLE
, &cb
);
359 ASSERT_TRUE(cb
.poll(500));
360 r
= cli_socket
.read(buf
, sizeof(buf
));
365 center
->delete_file_event(cli_socket
.fd(), EVENT_READABLE
);
369 center
->create_file_event(bind_socket
.fd(), EVENT_READABLE
, &cb
);
370 if (worker
->id
== 0) {
372 r
= worker
->connect(bind_addr
, options
, &cli_socket
);
375 ASSERT_TRUE(cb
.poll(500));
381 r
= bind_socket
.accept(&srv_socket
, options
, &cli_addr
, worker
);
383 } while (r
== -EAGAIN
&& !*accepted_p
);
386 ASSERT_TRUE(*accepted_p
);
387 center
->delete_file_event(bind_socket
.fd(), EVENT_READABLE
);
396 ConnectedSocket cli_socket
;
397 r
= worker
->connect(bind_addr
, options
, &cli_socket
);
401 center
->create_file_event(cli_socket
.fd(), EVENT_READABLE
, &cb
);
402 r
= cli_socket
.is_connected();
404 ASSERT_TRUE(cb
.poll(500));
405 r
= cli_socket
.is_connected();
407 ASSERT_TRUE(r
== -ECONNREFUSED
|| r
== -ECONNRESET
);
412 TEST_P(NetworkWorkerTest
, ComplexTest
) {
413 entity_addr_t bind_addr
;
414 std::atomic_bool
listen_done(false);
415 std::atomic_bool
*listen_p
= &listen_done
;
416 std::atomic_bool
accepted(false);
417 std::atomic_bool
*accepted_p
= &accepted
;
418 std::atomic_bool
done(false);
419 std::atomic_bool
*done_p
= &done
;
420 ASSERT_TRUE(bind_addr
.parse(get_addr().c_str()));
421 exec_events([this, bind_addr
, listen_p
, accepted_p
, done_p
](Worker
*worker
) mutable {
422 entity_addr_t cli_addr
;
423 EventCenter
*center
= &worker
->center
;
424 SocketOptions options
;
425 ServerSocket bind_socket
;
427 if (stack
->support_local_listen_table() || worker
->id
== 0) {
428 r
= worker
->listen(bind_addr
, options
, &bind_socket
);
432 ConnectedSocket cli_socket
, srv_socket
;
433 if (worker
->id
== 1) {
436 r
= worker
->connect(bind_addr
, options
, &cli_socket
);
443 center
->create_file_event(bind_socket
.fd(), EVENT_READABLE
, &cb
);
447 r
= bind_socket
.accept(&srv_socket
, options
, &cli_addr
, worker
);
453 ASSERT_TRUE(*accepted_p
);
454 center
->delete_file_event(bind_socket
.fd(), EVENT_READABLE
);
457 if (worker
->id
== 1) {
459 center
->create_file_event(cli_socket
.fd(), EVENT_WRITABLE
, &cb
);
460 r
= cli_socket
.is_connected();
462 ASSERT_TRUE(cb
.poll(500));
463 r
= cli_socket
.is_connected();
466 center
->delete_file_event(cli_socket
.fd(), EVENT_WRITABLE
);
469 const size_t message_size
= 10240;
471 string
message(message_size
, '!');
472 for (size_t i
= 0; i
< message_size
; i
+= 100)
474 size_t len
= message_size
* count
;
477 center
->create_file_event(cli_socket
.fd(), EVENT_WRITABLE
, &cb
);
479 center
->create_file_event(srv_socket
.fd(), EVENT_READABLE
, &cb
);
486 for (size_t i
= 0; i
< count
; ++i
)
487 bl
.push_back(bufferptr((char*)message
.data(), message_size
));
490 if (worker
->id
== 1) {
495 r
= cli_socket
.send(bl
, false);
496 ASSERT_TRUE(r
>= 0 || r
== -EAGAIN
);
504 left
= message_size
* count
;
505 ASSERT_EQ(0U, bl
.length());
506 for (size_t i
= 0; i
< count
; ++i
)
507 bl
.push_back(bufferptr((char*)message
.data(), message_size
));
515 r
= srv_socket
.read(buf
, sizeof(buf
));
516 ASSERT_TRUE(r
> 0 || r
== -EAGAIN
);
518 read_string
.append(buf
, r
);
520 } else if (r
== -EAGAIN
) {
525 for (size_t i
= 0; i
< read_string
.size(); i
+= message_size
)
526 ASSERT_EQ(0, memcmp(read_string
.c_str()+i
, message
.c_str(), message_size
));
536 center
->delete_file_event(cli_socket
.fd(), EVENT_WRITABLE
);
538 center
->delete_file_event(srv_socket
.fd(), EVENT_READABLE
);
541 bind_socket
.abort_accept();
549 class StressFactory
{
554 std::set
<Client
*> clients
;
555 std::set
<Server
*> servers
;
557 for (auto && i
: clients
)
559 for (auto && i
: servers
)
564 struct RandomString
{
566 vector
<std::string
> strs
;
567 std::random_device rd
;
568 std::default_random_engine rng
;
570 RandomString(size_t s
): slen(s
), rng(rd()) {}
571 void prepare(size_t n
) {
572 static const char alphabet
[] =
573 "abcdefghijklmnopqrstuvwxyz"
574 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
577 std::uniform_int_distribution
<> dist(
578 0, sizeof(alphabet
) / sizeof(*alphabet
) - 2);
582 std::back_inserter(strs
), strs
.capacity(), [&] {
585 std::generate_n(std::back_inserter(str
), slen
, [&]() {
586 return alphabet
[dist(rng
)];
592 std::string
&get_random_string() {
593 std::uniform_int_distribution
<> dist(
595 return strs
[dist(rng
)];
603 explicit Message(RandomString
&rs
, size_t i
, size_t l
): idx(i
) {
604 size_t slen
= rs
.slen
;
605 len
= std::max(slen
, l
);
607 std::vector
<std::string
> strs
;
608 strs
.reserve(len
/ slen
);
610 std::back_inserter(strs
), strs
.capacity(), [&] {
611 return rs
.get_random_string();
614 len
= slen
* strs
.size();
615 content
.reserve(len
);
616 for (auto &&s
: strs
)
619 bool verify(const char *b
, size_t len
= 0) const {
620 return content
.compare(0, len
, b
, 0, len
) == 0;
624 template <typename T
>
625 class C_delete
: public EventCallback
{
628 C_delete(T
*c
): ctxt(c
) {}
629 void do_request(int id
) override
{
636 StressFactory
*factory
;
638 ConnectedSocket socket
;
639 std::deque
<StressFactory::Message
*> acking
;
640 std::deque
<StressFactory::Message
*> writings
;
644 bool write_enabled
= false;
645 size_t read_offset
= 0, write_offset
= 0;
648 StressFactory::Message homeless_message
;
650 class Client_read_handle
: public EventCallback
{
653 Client_read_handle(Client
*_c
): c(_c
) {}
654 void do_request(int id
) override
{
655 c
->do_read_request();
659 class Client_write_handle
: public EventCallback
{
662 Client_write_handle(Client
*_c
): c(_c
) {}
663 void do_request(int id
) override
{
664 c
->do_write_request();
669 Client(StressFactory
*f
, EventCenter
*cen
, ConnectedSocket s
, size_t c
)
670 : factory(f
), center(cen
), socket(std::move(s
)), left(c
), homeless_message(factory
->rs
, -1, 1024),
671 read_ctxt(this), write_ctxt(this) {
672 center
->create_file_event(
673 socket
.fd(), EVENT_READABLE
, &read_ctxt
);
674 center
->dispatch_event_external(&read_ctxt
);
677 ASSERT_FALSE(write_enabled
);
680 center
->delete_file_event(socket
.fd(), EVENT_READABLE
);
681 center
->dispatch_event_external(new C_delete
<Client
>(this));
684 void do_read_request() {
687 ASSERT_TRUE(socket
.is_connected() >= 0);
688 if (!socket
.is_connected())
690 ASSERT_TRUE(!acking
.empty() || first
);
693 center
->dispatch_event_external(&write_ctxt
);
697 StressFactory::Message
*m
= acking
.front();
700 buffer
.resize(m
->len
);
701 bool must_no
= false;
703 r
= socket
.read((char*)buffer
.data() + read_offset
,
704 m
->len
- read_offset
);
705 ASSERT_TRUE(r
== -EAGAIN
|| r
> 0);
710 std::cerr
<< " client " << this << " receive " << m
->idx
<< " len " << r
<< " content: " << std::endl
;
711 ASSERT_FALSE(must_no
);
712 if ((m
->len
- read_offset
) == 0) {
713 ASSERT_TRUE(m
->verify(buffer
.data(), 0));
718 if (acking
.empty()) {
719 m
= &homeless_message
;
723 buffer
.resize(m
->len
);
727 if (acking
.empty()) {
728 center
->dispatch_event_external(&write_ctxt
);
733 void do_write_request() {
736 ASSERT_TRUE(socket
.is_connected() > 0);
738 while (left
> 0 && factory
->queue_depth
> writings
.size() + acking
.size()) {
739 StressFactory::Message
*m
= new StressFactory::Message(
740 factory
->rs
, ++index
,
741 factory
->rd() % factory
->max_message_length
);
742 std::cerr
<< " client " << this << " generate message " << m
->idx
<< " length " << m
->len
<< std::endl
;
743 ASSERT_EQ(m
->len
, m
->content
.size());
744 writings
.push_back(m
);
746 --factory
->message_left
;
749 while (!writings
.empty()) {
750 StressFactory::Message
*m
= writings
.front();
752 bl
.append(m
->content
.data() + write_offset
, m
->content
.size() - write_offset
);
753 ssize_t r
= socket
.send(bl
, false);
756 std::cerr
<< " client " << this << " send " << m
->idx
<< " len " << r
<< " content: " << std::endl
;
759 if (write_offset
== m
->content
.size()) {
761 writings
.pop_front();
765 if (writings
.empty() && write_enabled
) {
766 center
->delete_file_event(socket
.fd(), EVENT_WRITABLE
);
767 write_enabled
= false;
768 } else if (!writings
.empty() && !write_enabled
) {
769 ASSERT_EQ(0, center
->create_file_event(
770 socket
.fd(), EVENT_WRITABLE
, &write_ctxt
));
771 write_enabled
= true;
775 bool finish() const {
776 return left
== 0 && acking
.empty() && writings
.empty();
782 StressFactory
*factory
;
784 ConnectedSocket socket
;
785 std::deque
<std::string
> buffers
;
786 bool write_enabled
= false;
789 class Server_read_handle
: public EventCallback
{
792 Server_read_handle(Server
*_s
): s(_s
) {}
793 void do_request(int id
) override
{
794 s
->do_read_request();
798 class Server_write_handle
: public EventCallback
{
801 Server_write_handle(Server
*_s
): s(_s
) {}
802 void do_request(int id
) override
{
803 s
->do_write_request();
808 Server(StressFactory
*f
, EventCenter
*c
, ConnectedSocket s
):
809 factory(f
), center(c
), socket(std::move(s
)), read_ctxt(this), write_ctxt(this) {
810 center
->create_file_event(socket
.fd(), EVENT_READABLE
, &read_ctxt
);
811 center
->dispatch_event_external(&read_ctxt
);
814 ASSERT_FALSE(write_enabled
);
816 center
->delete_file_event(socket
.fd(), EVENT_READABLE
);
817 center
->dispatch_event_external(new C_delete
<Server
>(this));
819 void do_read_request() {
826 if (factory
->zero_copy_read
) {
827 r
= socket
.zero_copy_read(data
);
829 r
= socket
.read(buf
, sizeof(buf
));
831 ASSERT_TRUE(r
== -EAGAIN
|| (r
>= 0 && (size_t)r
<= sizeof(buf
)));
833 ASSERT_TRUE(buffers
.empty());
836 } else if (r
== -EAGAIN
)
838 if (factory
->zero_copy_read
) {
839 buffers
.emplace_back(data
.c_str(), 0, data
.length());
841 buffers
.emplace_back(buf
, 0, r
);
843 std::cerr
<< " server " << this << " receive " << r
<< " content: " << std::endl
;
845 if (!buffers
.empty() && !write_enabled
)
846 center
->dispatch_event_external(&write_ctxt
);
849 void do_write_request() {
853 while (!buffers
.empty()) {
855 auto it
= buffers
.begin();
856 for (size_t i
= 0; i
< buffers
.size(); ++i
) {
857 bl
.push_back(bufferptr((char*)it
->data(), it
->size()));
861 ssize_t r
= socket
.send(bl
, false);
862 std::cerr
<< " server " << this << " send " << r
<< std::endl
;
867 ASSERT_TRUE(!buffers
.empty());
868 string
&buffer
= buffers
.front();
869 if (r
>= (int)buffer
.size()) {
870 r
-= (int)buffer
.size();
873 std::cerr
<< " server " << this << " sent " << r
<< std::endl
;
874 buffer
= buffer
.substr(r
, buffer
.size());
879 if (buffers
.empty()) {
881 center
->delete_file_event(socket
.fd(), EVENT_WRITABLE
);
882 write_enabled
= false;
884 } else if (!write_enabled
) {
885 ASSERT_EQ(0, center
->create_file_event(
886 socket
.fd(), EVENT_WRITABLE
, &write_ctxt
));
887 write_enabled
= true;
897 class C_accept
: public EventCallback
{
898 StressFactory
*factory
;
899 ServerSocket bind_socket
;
904 C_accept(StressFactory
*f
, ServerSocket s
, ThreadData
*data
, Worker
*w
)
905 : factory(f
), bind_socket(std::move(s
)), t_data(data
), worker(w
) {}
906 void do_request(int id
) override
{
908 entity_addr_t cli_addr
;
909 ConnectedSocket srv_socket
;
910 SocketOptions options
;
911 int r
= bind_socket
.accept(&srv_socket
, options
, &cli_addr
, worker
);
916 ASSERT_TRUE(srv_socket
.fd() > 0);
917 Server
*cb
= new Server(factory
, &t_data
->worker
->center
, std::move(srv_socket
));
918 t_data
->servers
.insert(cb
);
922 friend class C_accept
;
925 static const size_t min_client_send_messages
= 100;
926 static const size_t max_client_send_messages
= 1000;
927 std::shared_ptr
<NetworkStack
> stack
;
929 std::random_device rd
;
930 const size_t client_num
, queue_depth
, max_message_length
;
931 atomic_int message_count
, message_left
;
932 entity_addr_t bind_addr
;
933 std::atomic_bool already_bind
= {false};
935 SocketOptions options
;
937 explicit StressFactory(std::shared_ptr
<NetworkStack
> s
, const string
&addr
,
938 size_t cli
, size_t qd
, size_t mc
, size_t l
, bool zero_copy
)
939 : stack(s
), rs(128), client_num(cli
), queue_depth(qd
),
940 max_message_length(l
), message_count(mc
), message_left(mc
),
941 zero_copy_read(zero_copy
) {
942 bind_addr
.parse(addr
.c_str());
948 void add_client(ThreadData
*t_data
) {
949 static Mutex
lock("add_client_lock");
950 Mutex::Locker
l(lock
);
951 ConnectedSocket sock
;
952 int r
= t_data
->worker
->connect(bind_addr
, options
, &sock
);
953 std::default_random_engine
rng(rd());
954 std::uniform_int_distribution
<> dist(
955 min_client_send_messages
, max_client_send_messages
);
958 if (c
> message_count
.load())
959 c
= message_count
.load();
960 Client
*cb
= new Client(this, &t_data
->worker
->center
, std::move(sock
), c
);
961 t_data
->clients
.insert(cb
);
965 void drop_client(ThreadData
*t_data
, Client
*c
) {
967 ASSERT_EQ(1U, t_data
->clients
.erase(c
));
970 void drop_server(ThreadData
*t_data
, Server
*s
) {
972 ASSERT_EQ(1U, t_data
->servers
.erase(s
));
975 void start(Worker
*worker
) {
978 t_data
.worker
= worker
;
979 ServerSocket bind_socket
;
980 if (stack
->support_local_listen_table() || worker
->id
== 0) {
981 r
= worker
->listen(bind_addr
, options
, &bind_socket
);
985 while (!already_bind
)
987 C_accept
*accept_handler
= nullptr;
990 bind_fd
= bind_socket
.fd();
991 accept_handler
= new C_accept(this, std::move(bind_socket
), &t_data
, worker
);
992 ASSERT_EQ(0, worker
->center
.create_file_event(
993 bind_fd
, EVENT_READABLE
, accept_handler
));
996 int echo_throttle
= message_count
;
997 while (message_count
> 0 || !t_data
.clients
.empty() || !t_data
.servers
.empty()) {
998 if (message_count
> 0 && t_data
.clients
.size() < client_num
&& t_data
.servers
.size() < client_num
)
1000 for (auto &&c
: t_data
.clients
) {
1002 drop_client(&t_data
, c
);
1006 for (auto &&s
: t_data
.servers
) {
1008 drop_server(&t_data
, s
);
1013 worker
->center
.process_events(1);
1014 if (echo_throttle
> message_left
) {
1015 std::cerr
<< " clients " << t_data
.clients
.size() << " servers " << t_data
.servers
.size()
1016 << " message count " << message_left
<< std::endl
;
1017 echo_throttle
-= 100;
1021 worker
->center
.delete_file_event(bind_fd
, EVENT_READABLE
);
1022 delete accept_handler
;
1026 TEST_P(NetworkWorkerTest
, StressTest
) {
1027 StressFactory
factory(stack
, get_addr(), 16, 16, 10000, 1024,
1028 strncmp(GetParam(), "dpdk", 4) == 0);
1029 StressFactory
*f
= &factory
;
1030 exec_events([f
](Worker
*worker
) mutable {
1033 ASSERT_EQ(0, factory
.message_left
);
1037 INSTANTIATE_TEST_CASE_P(
1050 // Google Test may not support value-parameterized tests with some
1051 // compilers. If we use conditional compilation to compile out all
1052 // code referring to the gtest_main library, MSVC linker will not link
1053 // that library at all and consequently complain about missing entry
1054 // point defined in that library (fatal error LNK1561: entry point
1055 // must be defined). This dummy test keeps gtest_main linked in.
1056 TEST(DummyTest
, ValueParameterizedTestsAreNotSupportedOnThisPlatform
) {}
1063 * compile-command: "cd ../.. ; make ceph_test_async_networkstack &&
1064 * ./ceph_test_async_networkstack