]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/demos/tcp_sctp_client_demo.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / demos / tcp_sctp_client_demo.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#include <iostream>
23#include <seastar/core/app-template.hh>
f67539c2 24#include <seastar/core/reactor.hh>
11fdf7f2 25#include <seastar/core/distributed.hh>
9f95a23c 26#include <seastar/core/print.hh>
11fdf7f2
TL
27
28using namespace seastar;
29using namespace net;
30using namespace std::chrono_literals;
31
32static int rx_msg_size = 4 * 1024;
33static int tx_msg_total_size = 100 * 1024 * 1024;
34static int tx_msg_size = 4 * 1024;
35static int tx_msg_nr = tx_msg_total_size / tx_msg_size;
36static std::string str_txbuf(tx_msg_size, 'X');
37
38class client;
39distributed<client> clients;
40
41transport protocol = transport::TCP;
42
43class client {
44private:
45 static constexpr unsigned _pings_per_connection = 10000;
46 unsigned _total_pings;
47 unsigned _concurrent_connections;
48 ipv4_addr _server_addr;
49 std::string _test;
50 lowres_clock::time_point _earliest_started;
51 lowres_clock::time_point _latest_finished;
52 size_t _processed_bytes;
53 unsigned _num_reported;
54public:
55 class connection {
56 connected_socket _fd;
57 input_stream<char> _read_buf;
58 output_stream<char> _write_buf;
59 size_t _bytes_read = 0;
60 size_t _bytes_write = 0;
61 public:
62 connection(connected_socket&& fd)
63 : _fd(std::move(fd))
64 , _read_buf(_fd.input())
65 , _write_buf(_fd.output()) {}
66
67 future<> do_read() {
68 return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) {
69 _bytes_read += buf.size();
70 if (buf.size() == 0) {
71 return make_ready_future();
72 } else {
73 return do_read();
74 }
75 });
76 }
77
78 future<> do_write(int end) {
79 if (end == 0) {
80 return make_ready_future();
81 }
82 return _write_buf.write(str_txbuf).then([this] {
83 _bytes_write += tx_msg_size;
84 return _write_buf.flush();
85 }).then([this, end] {
86 return do_write(end - 1);
87 });
88 }
89
90 future<> ping(int times) {
91 return _write_buf.write("ping").then([this] {
92 return _write_buf.flush();
93 }).then([this, times] {
94 return _read_buf.read_exactly(4).then([this, times] (temporary_buffer<char> buf) {
95 if (buf.size() != 4) {
96 fprint(std::cerr, "illegal packet received: %d\n", buf.size());
97 return make_ready_future();
98 }
99 auto str = std::string(buf.get(), buf.size());
100 if (str != "pong") {
101 fprint(std::cerr, "illegal packet received: %d\n", buf.size());
102 return make_ready_future();
103 }
104 if (times > 0) {
105 return ping(times - 1);
106 } else {
107 return make_ready_future();
108 }
109 });
110 });
111 }
112
113 future<size_t> rxrx() {
114 return _write_buf.write("rxrx").then([this] {
115 return _write_buf.flush();
116 }).then([this] {
117 return do_write(tx_msg_nr).then([this] {
118 return _write_buf.close();
119 }).then([this] {
120 return make_ready_future<size_t>(_bytes_write);
121 });
122 });
123 }
124
125 future<size_t> txtx() {
126 return _write_buf.write("txtx").then([this] {
127 return _write_buf.flush();
128 }).then([this] {
129 return do_read().then([this] {
130 return make_ready_future<size_t>(_bytes_read);
131 });
132 });
133 }
134 };
135
136 future<> ping_test(connection *conn) {
137 auto started = lowres_clock::now();
138 return conn->ping(_pings_per_connection).then([started] {
139 auto finished = lowres_clock::now();
9f95a23c 140 (void)clients.invoke_on(0, &client::ping_report, started, finished);
11fdf7f2
TL
141 });
142 }
143
144 future<> rxrx_test(connection *conn) {
145 auto started = lowres_clock::now();
146 return conn->rxrx().then([started] (size_t bytes) {
147 auto finished = lowres_clock::now();
9f95a23c 148 (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes);
11fdf7f2
TL
149 });
150 }
151
152 future<> txtx_test(connection *conn) {
153 auto started = lowres_clock::now();
154 return conn->txtx().then([started] (size_t bytes) {
155 auto finished = lowres_clock::now();
9f95a23c 156 (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes);
11fdf7f2
TL
157 });
158 }
159
160 void ping_report(lowres_clock::time_point started, lowres_clock::time_point finished) {
161 if (_earliest_started > started)
162 _earliest_started = started;
163 if (_latest_finished < finished)
164 _latest_finished = finished;
165 if (++_num_reported == _concurrent_connections) {
166 auto elapsed = _latest_finished - _earliest_started;
167 auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
168 auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000);
169 fprint(std::cout, "========== ping ============\n");
170 fprint(std::cout, "Server: %s\n", _server_addr);
171 fprint(std::cout,"Connections: %u\n", _concurrent_connections);
172 fprint(std::cout, "Total PingPong: %u\n", _total_pings);
173 fprint(std::cout, "Total Time(Secs): %f\n", secs);
174 fprint(std::cout, "Requests/Sec: %f\n",
175 static_cast<double>(_total_pings) / secs);
9f95a23c 176 (void)clients.stop().then([] {
11fdf7f2
TL
177 engine().exit(0);
178 });
179 }
180 }
181
182 void rxtx_report(lowres_clock::time_point started, lowres_clock::time_point finished, size_t bytes) {
183 if (_earliest_started > started)
184 _earliest_started = started;
185 if (_latest_finished < finished)
186 _latest_finished = finished;
187 _processed_bytes += bytes;
188 if (++_num_reported == _concurrent_connections) {
189 auto elapsed = _latest_finished - _earliest_started;
190 auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
191 auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000);
192 fprint(std::cout, "========== %s ============\n", _test);
193 fprint(std::cout, "Server: %s\n", _server_addr);
194 fprint(std::cout, "Connections: %u\n", _concurrent_connections);
195 fprint(std::cout, "Bytes Received(MiB): %u\n", _processed_bytes/1024/1024);
196 fprint(std::cout, "Total Time(Secs): %f\n", secs);
197 fprint(std::cout, "Bandwidth(Gbits/Sec): %f\n",
198 static_cast<double>((_processed_bytes * 8)) / (1000 * 1000 * 1000) / secs);
9f95a23c 199 (void)clients.stop().then([] {
11fdf7f2
TL
200 engine().exit(0);
201 });
202 }
203 }
204
205 future<> start(ipv4_addr server_addr, std::string test, unsigned ncon) {
206 _server_addr = server_addr;
207 _concurrent_connections = ncon * smp::count;
208 _total_pings = _pings_per_connection * _concurrent_connections;
209 _test = test;
210
211 for (unsigned i = 0; i < ncon; i++) {
212 socket_address local = socket_address(::sockaddr_in{AF_INET, INADDR_ANY, {0}});
f67539c2 213 (void)connect(make_ipv4_address(server_addr), local, protocol).then([this, test] (connected_socket fd) {
11fdf7f2 214 auto conn = new connection(std::move(fd));
9f95a23c 215 (void)(this->*tests.at(test))(conn).then_wrapped([conn] (auto&& f) {
11fdf7f2
TL
216 delete conn;
217 try {
218 f.get();
219 } catch (std::exception& ex) {
220 fprint(std::cerr, "request error: %s\n", ex.what());
221 }
222 });
223 });
224 }
225 return make_ready_future();
226 }
227 future<> stop() {
228 return make_ready_future();
229 }
230
231 typedef future<> (client::*test_fn)(connection *conn);
232 static const std::map<std::string, test_fn> tests;
233};
234
235namespace bpo = boost::program_options;
236
237int main(int ac, char ** av) {
238 app_template app;
239 app.add_options()
240 ("server", bpo::value<std::string>()->required(), "Server address")
241 ("test", bpo::value<std::string>()->default_value("ping"), "test type(ping | rxrx | txtx)")
242 ("conn", bpo::value<unsigned>()->default_value(16), "nr connections per cpu")
243 ("proto", bpo::value<std::string>()->default_value("tcp"), "transport protocol tcp|sctp")
244 ;
245
246 return app.run_deprecated(ac, av, [&app] {
247 auto&& config = app.configuration();
248 auto server = config["server"].as<std::string>();
249 auto test = config["test"].as<std::string>();
250 auto ncon = config["conn"].as<unsigned>();
251 auto proto = config["proto"].as<std::string>();
252
253 if (proto == "tcp") {
254 protocol = transport::TCP;
255 } else if (proto == "sctp") {
256 protocol = transport::SCTP;
257 } else {
258 fprint(std::cerr, "Error: --proto=tcp|sctp\n");
259 return engine().exit(1);
260 }
261
262 if (!client::tests.count(test)) {
263 fprint(std::cerr, "Error: -test=ping | rxrx | txtx\n");
264 return engine().exit(1);
265 }
266
9f95a23c
TL
267 (void)clients.start().then([server, test, ncon] () {
268 return clients.invoke_on_all(&client::start, ipv4_addr{server}, test, ncon);
11fdf7f2
TL
269 });
270 });
271}
272
273const std::map<std::string, client::test_fn> client::tests = {
274 {"ping", &client::ping_test},
275 {"rxrx", &client::rxrx_test},
276 {"txtx", &client::txtx_test},
277};
278