]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/apps/seawreck/seawreck.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / apps / seawreck / seawreck.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #include <seastar/http/response_parser.hh>
23 #include <seastar/core/print.hh>
24 #include <seastar/core/reactor.hh>
25 #include <seastar/core/app-template.hh>
26 #include <seastar/core/future-util.hh>
27 #include <seastar/core/distributed.hh>
28 #include <seastar/core/semaphore.hh>
29 #include <seastar/core/future-util.hh>
30 #include <chrono>
31
32 using namespace seastar;
33
34 template <typename... Args>
35 void http_debug(const char* fmt, Args&&... args) {
36 #if HTTP_DEBUG
37 print(fmt, std::forward<Args>(args)...);
38 #endif
39 }
40
41 class http_client {
42 private:
43 unsigned _duration;
44 unsigned _conn_per_core;
45 unsigned _reqs_per_conn;
46 std::vector<connected_socket> _sockets;
47 semaphore _conn_connected{0};
48 semaphore _conn_finished{0};
49 timer<> _run_timer;
50 bool _timer_based;
51 bool _timer_done{false};
52 uint64_t _total_reqs{0};
53 public:
54 http_client(unsigned duration, unsigned total_conn, unsigned reqs_per_conn)
55 : _duration(duration)
56 , _conn_per_core(total_conn / smp::count)
57 , _reqs_per_conn(reqs_per_conn)
58 , _run_timer([this] { _timer_done = true; })
59 , _timer_based(reqs_per_conn == 0) {
60 }
61
62 class connection {
63 private:
64 connected_socket _fd;
65 input_stream<char> _read_buf;
66 output_stream<char> _write_buf;
67 http_response_parser _parser;
68 http_client* _http_client;
69 uint64_t _nr_done{0};
70 public:
71 connection(connected_socket&& fd, http_client* client)
72 : _fd(std::move(fd))
73 , _read_buf(_fd.input())
74 , _write_buf(_fd.output())
75 , _http_client(client){
76 }
77
78 uint64_t nr_done() {
79 return _nr_done;
80 }
81
82 future<> do_req() {
83 return _write_buf.write("GET / HTTP/1.1\r\nHost: 127.0.0.1:10000\r\n\r\n").then([this] {
84 return _write_buf.flush();
85 }).then([this] {
86 _parser.init();
87 return _read_buf.consume(_parser).then([this] {
88 // Read HTTP response header first
89 if (_parser.eof()) {
90 return make_ready_future<>();
91 }
92 auto _rsp = _parser.get_parsed_response();
93 auto it = _rsp->_headers.find("Content-Length");
94 if (it == _rsp->_headers.end()) {
95 fmt::print("Error: HTTP response does not contain: Content-Length\n");
96 return make_ready_future<>();
97 }
98 auto content_len = std::stoi(it->second);
99 http_debug("Content-Length = %d\n", content_len);
100 // Read HTTP response body
101 return _read_buf.read_exactly(content_len).then([this] (temporary_buffer<char> buf) {
102 _nr_done++;
103 http_debug("%s\n", buf.get());
104 if (_http_client->done(_nr_done)) {
105 return make_ready_future();
106 } else {
107 return do_req();
108 }
109 });
110 });
111 });
112 }
113 };
114
115 future<uint64_t> total_reqs() {
116 fmt::print("Requests on cpu {:2d}: {:d}\n", engine().cpu_id(), _total_reqs);
117 return make_ready_future<uint64_t>(_total_reqs);
118 }
119
120 bool done(uint64_t nr_done) {
121 if (_timer_based) {
122 return _timer_done;
123 } else {
124 return nr_done >= _reqs_per_conn;
125 }
126 }
127
128 future<> connect(ipv4_addr server_addr) {
129 // Establish all the TCP connections first
130 for (unsigned i = 0; i < _conn_per_core; i++) {
131 engine().net().connect(make_ipv4_address(server_addr)).then([this] (connected_socket fd) {
132 _sockets.push_back(std::move(fd));
133 http_debug("Established connection %6d on cpu %3d\n", _conn_connected.current(), engine().cpu_id());
134 _conn_connected.signal();
135 }).or_terminate();
136 }
137 return _conn_connected.wait(_conn_per_core);
138 }
139
140 future<> run() {
141 // All connected, start HTTP request
142 http_debug("Established all %6d tcp connections on cpu %3d\n", _conn_per_core, engine().cpu_id());
143 if (_timer_based) {
144 _run_timer.arm(std::chrono::seconds(_duration));
145 }
146 for (auto&& fd : _sockets) {
147 auto conn = new connection(std::move(fd), this);
148 conn->do_req().then_wrapped([this, conn] (auto&& f) {
149 http_debug("Finished connection %6d on cpu %3d\n", _conn_finished.current(), engine().cpu_id());
150 _total_reqs += conn->nr_done();
151 _conn_finished.signal();
152 delete conn;
153 try {
154 f.get();
155 } catch (std::exception& ex) {
156 fmt::print("http request error: {}\n", ex.what());
157 }
158 });
159 }
160
161 // All finished
162 return _conn_finished.wait(_conn_per_core);
163 }
164 future<> stop() {
165 return make_ready_future();
166 }
167 };
168
169 namespace bpo = boost::program_options;
170
171 int main(int ac, char** av) {
172 app_template app;
173 app.add_options()
174 ("server,s", bpo::value<std::string>()->default_value("192.168.66.100:10000"), "Server address")
175 ("conn,c", bpo::value<unsigned>()->default_value(100), "total connections")
176 ("reqs,r", bpo::value<unsigned>()->default_value(0), "reqs per connection")
177 ("duration,d", bpo::value<unsigned>()->default_value(10), "duration of the test in seconds)");
178
179 return app.run(ac, av, [&app] () -> future<int> {
180 auto& config = app.configuration();
181 auto server = config["server"].as<std::string>();
182 auto reqs_per_conn = config["reqs"].as<unsigned>();
183 auto total_conn= config["conn"].as<unsigned>();
184 auto duration = config["duration"].as<unsigned>();
185
186 if (total_conn % smp::count != 0) {
187 fmt::print("Error: conn needs to be n * cpu_nr\n");
188 return make_ready_future<int>(-1);
189 }
190
191 auto http_clients = new distributed<http_client>;
192
193 // Start http requests on all the cores
194 auto started = steady_clock_type::now();
195 fmt::print("========== http_client ============\n");
196 fmt::print("Server: {}\n", server);
197 fmt::print("Connections: {:d}\n", total_conn);
198 fmt::print("Requests/connection: {}\n", reqs_per_conn == 0 ? "dynamic (timer based)" : std::to_string(reqs_per_conn));
199 return http_clients->start(std::move(duration), std::move(total_conn), std::move(reqs_per_conn)).then([http_clients, server] {
200 return http_clients->invoke_on_all(&http_client::connect, ipv4_addr{server});
201 }).then([http_clients] {
202 return http_clients->invoke_on_all(&http_client::run);
203 }).then([http_clients] {
204 return http_clients->map_reduce(adder<uint64_t>(), &http_client::total_reqs);
205 }).then([http_clients, started] (auto total_reqs) {
206 // All the http requests are finished
207 auto finished = steady_clock_type::now();
208 auto elapsed = finished - started;
209 auto secs = static_cast<double>(elapsed.count() / 1000000000.0);
210 fmt::print("Total cpus: {:d}\n", smp::count);
211 fmt::print("Total requests: {:d}\n", total_reqs);
212 fmt::print("Total time: {:f}\n", secs);
213 fmt::print("Requests/sec: {:f}\n", static_cast<double>(total_reqs) / secs);
214 fmt::print("========== done ============\n");
215 return http_clients->stop().then([http_clients] {
216 // FIXME: If we call engine().exit(0) here to exit when
217 // requests are done. The tcp connection will not be closed
218 // properly, becasue we exit too earily and the FIN packets are
219 // not exchanged.
220 delete http_clients;
221 return make_ready_future<int>(0);
222 });
223 });
224 });
225 }