2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
22 #include <seastar/http/response_parser.hh>
23 #include <seastar/core/print.hh>
24 #include <seastar/core/reactor.hh>
25 #include <seastar/core/app-template.hh>
26 #include <seastar/core/future-util.hh>
27 #include <seastar/core/distributed.hh>
28 #include <seastar/core/semaphore.hh>
29 #include <seastar/core/future-util.hh>
32 using namespace seastar
;
34 template <typename
... Args
>
35 void http_debug(const char* fmt
, Args
&&... args
) {
37 print(fmt
, std::forward
<Args
>(args
)...);
44 unsigned _conn_per_core
;
45 unsigned _reqs_per_conn
;
46 std::vector
<connected_socket
> _sockets
;
47 semaphore _conn_connected
{0};
48 semaphore _conn_finished
{0};
51 bool _timer_done
{false};
52 uint64_t _total_reqs
{0};
54 http_client(unsigned duration
, unsigned total_conn
, unsigned reqs_per_conn
)
56 , _conn_per_core(total_conn
/ smp::count
)
57 , _reqs_per_conn(reqs_per_conn
)
58 , _run_timer([this] { _timer_done
= true; })
59 , _timer_based(reqs_per_conn
== 0) {
65 input_stream
<char> _read_buf
;
66 output_stream
<char> _write_buf
;
67 http_response_parser _parser
;
68 http_client
* _http_client
;
71 connection(connected_socket
&& fd
, http_client
* client
)
73 , _read_buf(_fd
.input())
74 , _write_buf(_fd
.output())
75 , _http_client(client
){
83 return _write_buf
.write("GET / HTTP/1.1\r\nHost: 127.0.0.1:10000\r\n\r\n").then([this] {
84 return _write_buf
.flush();
87 return _read_buf
.consume(_parser
).then([this] {
88 // Read HTTP response header first
90 return make_ready_future
<>();
92 auto _rsp
= _parser
.get_parsed_response();
93 auto it
= _rsp
->_headers
.find("Content-Length");
94 if (it
== _rsp
->_headers
.end()) {
95 fmt::print("Error: HTTP response does not contain: Content-Length\n");
96 return make_ready_future
<>();
98 auto content_len
= std::stoi(it
->second
);
99 http_debug("Content-Length = %d\n", content_len
);
100 // Read HTTP response body
101 return _read_buf
.read_exactly(content_len
).then([this] (temporary_buffer
<char> buf
) {
103 http_debug("%s\n", buf
.get());
104 if (_http_client
->done(_nr_done
)) {
105 return make_ready_future();
115 future
<uint64_t> total_reqs() {
116 fmt::print("Requests on cpu {:2d}: {:d}\n", engine().cpu_id(), _total_reqs
);
117 return make_ready_future
<uint64_t>(_total_reqs
);
120 bool done(uint64_t nr_done
) {
124 return nr_done
>= _reqs_per_conn
;
128 future
<> connect(ipv4_addr server_addr
) {
129 // Establish all the TCP connections first
130 for (unsigned i
= 0; i
< _conn_per_core
; i
++) {
131 engine().net().connect(make_ipv4_address(server_addr
)).then([this] (connected_socket fd
) {
132 _sockets
.push_back(std::move(fd
));
133 http_debug("Established connection %6d on cpu %3d\n", _conn_connected
.current(), engine().cpu_id());
134 _conn_connected
.signal();
137 return _conn_connected
.wait(_conn_per_core
);
141 // All connected, start HTTP request
142 http_debug("Established all %6d tcp connections on cpu %3d\n", _conn_per_core
, engine().cpu_id());
144 _run_timer
.arm(std::chrono::seconds(_duration
));
146 for (auto&& fd
: _sockets
) {
147 auto conn
= new connection(std::move(fd
), this);
148 conn
->do_req().then_wrapped([this, conn
] (auto&& f
) {
149 http_debug("Finished connection %6d on cpu %3d\n", _conn_finished
.current(), engine().cpu_id());
150 _total_reqs
+= conn
->nr_done();
151 _conn_finished
.signal();
155 } catch (std::exception
& ex
) {
156 fmt::print("http request error: {}\n", ex
.what());
162 return _conn_finished
.wait(_conn_per_core
);
165 return make_ready_future();
169 namespace bpo
= boost::program_options
;
171 int main(int ac
, char** av
) {
174 ("server,s", bpo::value
<std::string
>()->default_value("192.168.66.100:10000"), "Server address")
175 ("conn,c", bpo::value
<unsigned>()->default_value(100), "total connections")
176 ("reqs,r", bpo::value
<unsigned>()->default_value(0), "reqs per connection")
177 ("duration,d", bpo::value
<unsigned>()->default_value(10), "duration of the test in seconds)");
179 return app
.run(ac
, av
, [&app
] () -> future
<int> {
180 auto& config
= app
.configuration();
181 auto server
= config
["server"].as
<std::string
>();
182 auto reqs_per_conn
= config
["reqs"].as
<unsigned>();
183 auto total_conn
= config
["conn"].as
<unsigned>();
184 auto duration
= config
["duration"].as
<unsigned>();
186 if (total_conn
% smp::count
!= 0) {
187 fmt::print("Error: conn needs to be n * cpu_nr\n");
188 return make_ready_future
<int>(-1);
191 auto http_clients
= new distributed
<http_client
>;
193 // Start http requests on all the cores
194 auto started
= steady_clock_type::now();
195 fmt::print("========== http_client ============\n");
196 fmt::print("Server: {}\n", server
);
197 fmt::print("Connections: {:d}\n", total_conn
);
198 fmt::print("Requests/connection: {}\n", reqs_per_conn
== 0 ? "dynamic (timer based)" : std::to_string(reqs_per_conn
));
199 return http_clients
->start(std::move(duration
), std::move(total_conn
), std::move(reqs_per_conn
)).then([http_clients
, server
] {
200 return http_clients
->invoke_on_all(&http_client::connect
, ipv4_addr
{server
});
201 }).then([http_clients
] {
202 return http_clients
->invoke_on_all(&http_client::run
);
203 }).then([http_clients
] {
204 return http_clients
->map_reduce(adder
<uint64_t>(), &http_client::total_reqs
);
205 }).then([http_clients
, started
] (auto total_reqs
) {
206 // All the http requests are finished
207 auto finished
= steady_clock_type::now();
208 auto elapsed
= finished
- started
;
209 auto secs
= static_cast<double>(elapsed
.count() / 1000000000.0);
210 fmt::print("Total cpus: {:d}\n", smp::count
);
211 fmt::print("Total requests: {:d}\n", total_reqs
);
212 fmt::print("Total time: {:f}\n", secs
);
213 fmt::print("Requests/sec: {:f}\n", static_cast<double>(total_reqs
) / secs
);
214 fmt::print("========== done ============\n");
215 return http_clients
->stop().then([http_clients
] {
216 // FIXME: If we call engine().exit(0) here to exit when
217 // requests are done. The tcp connection will not be closed
218 // properly, becasue we exit too earily and the FIN packets are
221 return make_ready_future
<int>(0);