]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2014 Cloudius Systems, Ltd. | |
20 | */ | |
21 | ||
22 | #include <iostream> | |
23 | #include <seastar/core/app-template.hh> | |
f67539c2 | 24 | #include <seastar/core/reactor.hh> |
11fdf7f2 | 25 | #include <seastar/core/distributed.hh> |
9f95a23c | 26 | #include <seastar/core/print.hh> |
11fdf7f2 TL |
27 | |
28 | using namespace seastar; | |
29 | using namespace net; | |
30 | using namespace std::chrono_literals; | |
31 | ||
32 | static int rx_msg_size = 4 * 1024; | |
33 | static int tx_msg_total_size = 100 * 1024 * 1024; | |
34 | static int tx_msg_size = 4 * 1024; | |
35 | static int tx_msg_nr = tx_msg_total_size / tx_msg_size; | |
36 | static std::string str_txbuf(tx_msg_size, 'X'); | |
37 | ||
38 | class client; | |
39 | distributed<client> clients; | |
40 | ||
41 | transport protocol = transport::TCP; | |
42 | ||
43 | class client { | |
44 | private: | |
45 | static constexpr unsigned _pings_per_connection = 10000; | |
46 | unsigned _total_pings; | |
47 | unsigned _concurrent_connections; | |
48 | ipv4_addr _server_addr; | |
49 | std::string _test; | |
50 | lowres_clock::time_point _earliest_started; | |
51 | lowres_clock::time_point _latest_finished; | |
52 | size_t _processed_bytes; | |
53 | unsigned _num_reported; | |
54 | public: | |
55 | class connection { | |
56 | connected_socket _fd; | |
57 | input_stream<char> _read_buf; | |
58 | output_stream<char> _write_buf; | |
59 | size_t _bytes_read = 0; | |
60 | size_t _bytes_write = 0; | |
61 | public: | |
62 | connection(connected_socket&& fd) | |
63 | : _fd(std::move(fd)) | |
64 | , _read_buf(_fd.input()) | |
65 | , _write_buf(_fd.output()) {} | |
66 | ||
67 | future<> do_read() { | |
68 | return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) { | |
69 | _bytes_read += buf.size(); | |
70 | if (buf.size() == 0) { | |
71 | return make_ready_future(); | |
72 | } else { | |
73 | return do_read(); | |
74 | } | |
75 | }); | |
76 | } | |
77 | ||
78 | future<> do_write(int end) { | |
79 | if (end == 0) { | |
80 | return make_ready_future(); | |
81 | } | |
82 | return _write_buf.write(str_txbuf).then([this] { | |
83 | _bytes_write += tx_msg_size; | |
84 | return _write_buf.flush(); | |
85 | }).then([this, end] { | |
86 | return do_write(end - 1); | |
87 | }); | |
88 | } | |
89 | ||
90 | future<> ping(int times) { | |
91 | return _write_buf.write("ping").then([this] { | |
92 | return _write_buf.flush(); | |
93 | }).then([this, times] { | |
94 | return _read_buf.read_exactly(4).then([this, times] (temporary_buffer<char> buf) { | |
95 | if (buf.size() != 4) { | |
96 | fprint(std::cerr, "illegal packet received: %d\n", buf.size()); | |
97 | return make_ready_future(); | |
98 | } | |
99 | auto str = std::string(buf.get(), buf.size()); | |
100 | if (str != "pong") { | |
101 | fprint(std::cerr, "illegal packet received: %d\n", buf.size()); | |
102 | return make_ready_future(); | |
103 | } | |
104 | if (times > 0) { | |
105 | return ping(times - 1); | |
106 | } else { | |
107 | return make_ready_future(); | |
108 | } | |
109 | }); | |
110 | }); | |
111 | } | |
112 | ||
113 | future<size_t> rxrx() { | |
114 | return _write_buf.write("rxrx").then([this] { | |
115 | return _write_buf.flush(); | |
116 | }).then([this] { | |
117 | return do_write(tx_msg_nr).then([this] { | |
118 | return _write_buf.close(); | |
119 | }).then([this] { | |
120 | return make_ready_future<size_t>(_bytes_write); | |
121 | }); | |
122 | }); | |
123 | } | |
124 | ||
125 | future<size_t> txtx() { | |
126 | return _write_buf.write("txtx").then([this] { | |
127 | return _write_buf.flush(); | |
128 | }).then([this] { | |
129 | return do_read().then([this] { | |
130 | return make_ready_future<size_t>(_bytes_read); | |
131 | }); | |
132 | }); | |
133 | } | |
134 | }; | |
135 | ||
136 | future<> ping_test(connection *conn) { | |
137 | auto started = lowres_clock::now(); | |
138 | return conn->ping(_pings_per_connection).then([started] { | |
139 | auto finished = lowres_clock::now(); | |
9f95a23c | 140 | (void)clients.invoke_on(0, &client::ping_report, started, finished); |
11fdf7f2 TL |
141 | }); |
142 | } | |
143 | ||
144 | future<> rxrx_test(connection *conn) { | |
145 | auto started = lowres_clock::now(); | |
146 | return conn->rxrx().then([started] (size_t bytes) { | |
147 | auto finished = lowres_clock::now(); | |
9f95a23c | 148 | (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes); |
11fdf7f2 TL |
149 | }); |
150 | } | |
151 | ||
152 | future<> txtx_test(connection *conn) { | |
153 | auto started = lowres_clock::now(); | |
154 | return conn->txtx().then([started] (size_t bytes) { | |
155 | auto finished = lowres_clock::now(); | |
9f95a23c | 156 | (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes); |
11fdf7f2 TL |
157 | }); |
158 | } | |
159 | ||
160 | void ping_report(lowres_clock::time_point started, lowres_clock::time_point finished) { | |
161 | if (_earliest_started > started) | |
162 | _earliest_started = started; | |
163 | if (_latest_finished < finished) | |
164 | _latest_finished = finished; | |
165 | if (++_num_reported == _concurrent_connections) { | |
166 | auto elapsed = _latest_finished - _earliest_started; | |
167 | auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count(); | |
168 | auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000); | |
169 | fprint(std::cout, "========== ping ============\n"); | |
170 | fprint(std::cout, "Server: %s\n", _server_addr); | |
171 | fprint(std::cout,"Connections: %u\n", _concurrent_connections); | |
172 | fprint(std::cout, "Total PingPong: %u\n", _total_pings); | |
173 | fprint(std::cout, "Total Time(Secs): %f\n", secs); | |
174 | fprint(std::cout, "Requests/Sec: %f\n", | |
175 | static_cast<double>(_total_pings) / secs); | |
9f95a23c | 176 | (void)clients.stop().then([] { |
11fdf7f2 TL |
177 | engine().exit(0); |
178 | }); | |
179 | } | |
180 | } | |
181 | ||
182 | void rxtx_report(lowres_clock::time_point started, lowres_clock::time_point finished, size_t bytes) { | |
183 | if (_earliest_started > started) | |
184 | _earliest_started = started; | |
185 | if (_latest_finished < finished) | |
186 | _latest_finished = finished; | |
187 | _processed_bytes += bytes; | |
188 | if (++_num_reported == _concurrent_connections) { | |
189 | auto elapsed = _latest_finished - _earliest_started; | |
190 | auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count(); | |
191 | auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000); | |
192 | fprint(std::cout, "========== %s ============\n", _test); | |
193 | fprint(std::cout, "Server: %s\n", _server_addr); | |
194 | fprint(std::cout, "Connections: %u\n", _concurrent_connections); | |
195 | fprint(std::cout, "Bytes Received(MiB): %u\n", _processed_bytes/1024/1024); | |
196 | fprint(std::cout, "Total Time(Secs): %f\n", secs); | |
197 | fprint(std::cout, "Bandwidth(Gbits/Sec): %f\n", | |
198 | static_cast<double>((_processed_bytes * 8)) / (1000 * 1000 * 1000) / secs); | |
9f95a23c | 199 | (void)clients.stop().then([] { |
11fdf7f2 TL |
200 | engine().exit(0); |
201 | }); | |
202 | } | |
203 | } | |
204 | ||
205 | future<> start(ipv4_addr server_addr, std::string test, unsigned ncon) { | |
206 | _server_addr = server_addr; | |
207 | _concurrent_connections = ncon * smp::count; | |
208 | _total_pings = _pings_per_connection * _concurrent_connections; | |
209 | _test = test; | |
210 | ||
211 | for (unsigned i = 0; i < ncon; i++) { | |
212 | socket_address local = socket_address(::sockaddr_in{AF_INET, INADDR_ANY, {0}}); | |
f67539c2 | 213 | (void)connect(make_ipv4_address(server_addr), local, protocol).then([this, test] (connected_socket fd) { |
11fdf7f2 | 214 | auto conn = new connection(std::move(fd)); |
9f95a23c | 215 | (void)(this->*tests.at(test))(conn).then_wrapped([conn] (auto&& f) { |
11fdf7f2 TL |
216 | delete conn; |
217 | try { | |
218 | f.get(); | |
219 | } catch (std::exception& ex) { | |
220 | fprint(std::cerr, "request error: %s\n", ex.what()); | |
221 | } | |
222 | }); | |
223 | }); | |
224 | } | |
225 | return make_ready_future(); | |
226 | } | |
227 | future<> stop() { | |
228 | return make_ready_future(); | |
229 | } | |
230 | ||
231 | typedef future<> (client::*test_fn)(connection *conn); | |
232 | static const std::map<std::string, test_fn> tests; | |
233 | }; | |
234 | ||
235 | namespace bpo = boost::program_options; | |
236 | ||
237 | int main(int ac, char ** av) { | |
238 | app_template app; | |
239 | app.add_options() | |
240 | ("server", bpo::value<std::string>()->required(), "Server address") | |
241 | ("test", bpo::value<std::string>()->default_value("ping"), "test type(ping | rxrx | txtx)") | |
242 | ("conn", bpo::value<unsigned>()->default_value(16), "nr connections per cpu") | |
243 | ("proto", bpo::value<std::string>()->default_value("tcp"), "transport protocol tcp|sctp") | |
244 | ; | |
245 | ||
246 | return app.run_deprecated(ac, av, [&app] { | |
247 | auto&& config = app.configuration(); | |
248 | auto server = config["server"].as<std::string>(); | |
249 | auto test = config["test"].as<std::string>(); | |
250 | auto ncon = config["conn"].as<unsigned>(); | |
251 | auto proto = config["proto"].as<std::string>(); | |
252 | ||
253 | if (proto == "tcp") { | |
254 | protocol = transport::TCP; | |
255 | } else if (proto == "sctp") { | |
256 | protocol = transport::SCTP; | |
257 | } else { | |
258 | fprint(std::cerr, "Error: --proto=tcp|sctp\n"); | |
259 | return engine().exit(1); | |
260 | } | |
261 | ||
262 | if (!client::tests.count(test)) { | |
263 | fprint(std::cerr, "Error: -test=ping | rxrx | txtx\n"); | |
264 | return engine().exit(1); | |
265 | } | |
266 | ||
9f95a23c TL |
267 | (void)clients.start().then([server, test, ncon] () { |
268 | return clients.invoke_on_all(&client::start, ipv4_addr{server}, test, ncon); | |
11fdf7f2 TL |
269 | }); |
270 | }); | |
271 | } | |
272 | ||
273 | const std::map<std::string, client::test_fn> client::tests = { | |
274 | {"ping", &client::ping_test}, | |
275 | {"rxrx", &client::rxrx_test}, | |
276 | {"txtx", &client::txtx_test}, | |
277 | }; | |
278 |