2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 #include <seastar/core/app-template.hh>
24 #include <seastar/core/future-util.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/print.hh>
28 using namespace seastar
;
30 using namespace std::chrono_literals
;
32 static int rx_msg_size
= 4 * 1024;
33 static int tx_msg_total_size
= 100 * 1024 * 1024;
34 static int tx_msg_size
= 4 * 1024;
35 static int tx_msg_nr
= tx_msg_total_size
/ tx_msg_size
;
36 static std::string
str_txbuf(tx_msg_size
, 'X');
39 distributed
<client
> clients
;
41 transport protocol
= transport::TCP
;
45 static constexpr unsigned _pings_per_connection
= 10000;
46 unsigned _total_pings
;
47 unsigned _concurrent_connections
;
48 ipv4_addr _server_addr
;
50 lowres_clock::time_point _earliest_started
;
51 lowres_clock::time_point _latest_finished
;
52 size_t _processed_bytes
;
53 unsigned _num_reported
;
57 input_stream
<char> _read_buf
;
58 output_stream
<char> _write_buf
;
59 size_t _bytes_read
= 0;
60 size_t _bytes_write
= 0;
62 connection(connected_socket
&& fd
)
64 , _read_buf(_fd
.input())
65 , _write_buf(_fd
.output()) {}
68 return _read_buf
.read_exactly(rx_msg_size
).then([this] (temporary_buffer
<char> buf
) {
69 _bytes_read
+= buf
.size();
70 if (buf
.size() == 0) {
71 return make_ready_future();
78 future
<> do_write(int end
) {
80 return make_ready_future();
82 return _write_buf
.write(str_txbuf
).then([this] {
83 _bytes_write
+= tx_msg_size
;
84 return _write_buf
.flush();
86 return do_write(end
- 1);
90 future
<> ping(int times
) {
91 return _write_buf
.write("ping").then([this] {
92 return _write_buf
.flush();
93 }).then([this, times
] {
94 return _read_buf
.read_exactly(4).then([this, times
] (temporary_buffer
<char> buf
) {
95 if (buf
.size() != 4) {
96 fprint(std::cerr
, "illegal packet received: %d\n", buf
.size());
97 return make_ready_future();
99 auto str
= std::string(buf
.get(), buf
.size());
101 fprint(std::cerr
, "illegal packet received: %d\n", buf
.size());
102 return make_ready_future();
105 return ping(times
- 1);
107 return make_ready_future();
113 future
<size_t> rxrx() {
114 return _write_buf
.write("rxrx").then([this] {
115 return _write_buf
.flush();
117 return do_write(tx_msg_nr
).then([this] {
118 return _write_buf
.close();
120 return make_ready_future
<size_t>(_bytes_write
);
125 future
<size_t> txtx() {
126 return _write_buf
.write("txtx").then([this] {
127 return _write_buf
.flush();
129 return do_read().then([this] {
130 return make_ready_future
<size_t>(_bytes_read
);
136 future
<> ping_test(connection
*conn
) {
137 auto started
= lowres_clock::now();
138 return conn
->ping(_pings_per_connection
).then([started
] {
139 auto finished
= lowres_clock::now();
140 (void)clients
.invoke_on(0, &client::ping_report
, started
, finished
);
144 future
<> rxrx_test(connection
*conn
) {
145 auto started
= lowres_clock::now();
146 return conn
->rxrx().then([started
] (size_t bytes
) {
147 auto finished
= lowres_clock::now();
148 (void)clients
.invoke_on(0, &client::rxtx_report
, started
, finished
, bytes
);
152 future
<> txtx_test(connection
*conn
) {
153 auto started
= lowres_clock::now();
154 return conn
->txtx().then([started
] (size_t bytes
) {
155 auto finished
= lowres_clock::now();
156 (void)clients
.invoke_on(0, &client::rxtx_report
, started
, finished
, bytes
);
160 void ping_report(lowres_clock::time_point started
, lowres_clock::time_point finished
) {
161 if (_earliest_started
> started
)
162 _earliest_started
= started
;
163 if (_latest_finished
< finished
)
164 _latest_finished
= finished
;
165 if (++_num_reported
== _concurrent_connections
) {
166 auto elapsed
= _latest_finished
- _earliest_started
;
167 auto usecs
= std::chrono::duration_cast
<std::chrono::microseconds
>(elapsed
).count();
168 auto secs
= static_cast<double>(usecs
) / static_cast<double>(1000 * 1000);
169 fprint(std::cout
, "========== ping ============\n");
170 fprint(std::cout
, "Server: %s\n", _server_addr
);
171 fprint(std::cout
,"Connections: %u\n", _concurrent_connections
);
172 fprint(std::cout
, "Total PingPong: %u\n", _total_pings
);
173 fprint(std::cout
, "Total Time(Secs): %f\n", secs
);
174 fprint(std::cout
, "Requests/Sec: %f\n",
175 static_cast<double>(_total_pings
) / secs
);
176 (void)clients
.stop().then([] {
182 void rxtx_report(lowres_clock::time_point started
, lowres_clock::time_point finished
, size_t bytes
) {
183 if (_earliest_started
> started
)
184 _earliest_started
= started
;
185 if (_latest_finished
< finished
)
186 _latest_finished
= finished
;
187 _processed_bytes
+= bytes
;
188 if (++_num_reported
== _concurrent_connections
) {
189 auto elapsed
= _latest_finished
- _earliest_started
;
190 auto usecs
= std::chrono::duration_cast
<std::chrono::microseconds
>(elapsed
).count();
191 auto secs
= static_cast<double>(usecs
) / static_cast<double>(1000 * 1000);
192 fprint(std::cout
, "========== %s ============\n", _test
);
193 fprint(std::cout
, "Server: %s\n", _server_addr
);
194 fprint(std::cout
, "Connections: %u\n", _concurrent_connections
);
195 fprint(std::cout
, "Bytes Received(MiB): %u\n", _processed_bytes
/1024/1024);
196 fprint(std::cout
, "Total Time(Secs): %f\n", secs
);
197 fprint(std::cout
, "Bandwidth(Gbits/Sec): %f\n",
198 static_cast<double>((_processed_bytes
* 8)) / (1000 * 1000 * 1000) / secs
);
199 (void)clients
.stop().then([] {
205 future
<> start(ipv4_addr server_addr
, std::string test
, unsigned ncon
) {
206 _server_addr
= server_addr
;
207 _concurrent_connections
= ncon
* smp::count
;
208 _total_pings
= _pings_per_connection
* _concurrent_connections
;
211 for (unsigned i
= 0; i
< ncon
; i
++) {
212 socket_address local
= socket_address(::sockaddr_in
{AF_INET
, INADDR_ANY
, {0}});
213 (void)engine().net().connect(make_ipv4_address(server_addr
), local
, protocol
).then([this, test
] (connected_socket fd
) {
214 auto conn
= new connection(std::move(fd
));
215 (void)(this->*tests
.at(test
))(conn
).then_wrapped([conn
] (auto&& f
) {
219 } catch (std::exception
& ex
) {
220 fprint(std::cerr
, "request error: %s\n", ex
.what());
225 return make_ready_future();
228 return make_ready_future();
231 typedef future
<> (client::*test_fn
)(connection
*conn
);
232 static const std::map
<std::string
, test_fn
> tests
;
235 namespace bpo
= boost::program_options
;
237 int main(int ac
, char ** av
) {
240 ("server", bpo::value
<std::string
>()->required(), "Server address")
241 ("test", bpo::value
<std::string
>()->default_value("ping"), "test type(ping | rxrx | txtx)")
242 ("conn", bpo::value
<unsigned>()->default_value(16), "nr connections per cpu")
243 ("proto", bpo::value
<std::string
>()->default_value("tcp"), "transport protocol tcp|sctp")
246 return app
.run_deprecated(ac
, av
, [&app
] {
247 auto&& config
= app
.configuration();
248 auto server
= config
["server"].as
<std::string
>();
249 auto test
= config
["test"].as
<std::string
>();
250 auto ncon
= config
["conn"].as
<unsigned>();
251 auto proto
= config
["proto"].as
<std::string
>();
253 if (proto
== "tcp") {
254 protocol
= transport::TCP
;
255 } else if (proto
== "sctp") {
256 protocol
= transport::SCTP
;
258 fprint(std::cerr
, "Error: --proto=tcp|sctp\n");
259 return engine().exit(1);
262 if (!client::tests
.count(test
)) {
263 fprint(std::cerr
, "Error: -test=ping | rxrx | txtx\n");
264 return engine().exit(1);
267 (void)clients
.start().then([server
, test
, ncon
] () {
268 return clients
.invoke_on_all(&client::start
, ipv4_addr
{server
}, test
, ncon
);
273 const std::map
<std::string
, client::test_fn
> client::tests
= {
274 {"ping", &client::ping_test
},
275 {"rxrx", &client::rxrx_test
},
276 {"txtx", &client::txtx_test
},