2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2015 Cloudius Systems
22 #include <seastar/core/sstring.hh>
23 #include <seastar/core/app-template.hh>
24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/queue.hh>
27 #include <seastar/core/when_all.hh>
28 #include <seastar/core/metrics.hh>
29 #include <seastar/core/print.hh>
32 #include <unordered_map>
38 #include <seastar/http/httpd.hh>
39 #include <seastar/http/internal/content_source.hh>
40 #include <seastar/http/reply.hh>
41 #include <seastar/util/short_streams.hh>
42 #include <seastar/util/log.hh>
44 using namespace std::chrono_literals
;
48 logger
hlogger("httpd");
51 http_stats::http_stats(http_server
& server
, const sstring
& name
)
53 namespace sm
= seastar::metrics
;
54 std::vector
<sm::label_instance
> labels
;
56 labels
.push_back(sm::label_instance("service", name
));
57 _metric_groups
.add_group("httpd", {
58 sm::make_derive("connections_total", [&server
] { return server
.total_connections(); }, sm::description("The total number of connections opened"), labels
),
59 sm::make_gauge("connections_current", [&server
] { return server
.current_connections(); }, sm::description("The current number of open connections"), labels
),
60 sm::make_derive("read_errors", [&server
] { return server
.read_errors(); }, sm::description("The total number of errors while reading http requests"), labels
),
61 sm::make_derive("reply_errors", [&server
] { return server
.reply_errors(); }, sm::description("The total number of errors while replying to http"), labels
),
62 sm::make_derive("requests_served", [&server
] { return server
.requests_served(); }, sm::description("The total number of http requests served"), labels
)
66 sstring
http_server_control::generate_server_name() {
67 static thread_local
uint16_t idgen
;
68 return seastar::format("http-{}", idgen
++);
71 future
<> connection::do_response_loop() {
72 return _replies
.pop_eventually().then(
73 [this] (std::unique_ptr
<reply
> resp
) {
76 return make_ready_future
<>();
78 _resp
= std::move(resp
);
79 return start_response().then([this] {
80 return do_response_loop();
85 future
<> connection::start_response() {
86 if (_resp
->_body_writer
) {
87 return _resp
->write_reply_to_connection(*this).then_wrapped([this] (auto f
) {
89 // In case of an error during the write close the connection
90 _server
._respond_errors
++;
92 _replies
.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
93 _replies
.push(std::unique_ptr
<reply
>());
94 f
.ignore_ready_future();
95 return make_ready_future
<>();
97 return _write_buf
.write("0\r\n\r\n", 5);
98 }).then_wrapped([this ] (auto f
) {
100 // We could not write the closing sequence
101 // Something is probably wrong with the connection,
102 // we should close it, so the client will disconnect
104 _replies
.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
105 _replies
.push(std::unique_ptr
<reply
>());
106 f
.ignore_ready_future();
107 return make_ready_future
<>();
109 return _write_buf
.flush();
111 }).then_wrapped([this] (auto f
) {
113 // flush failed. just close the connection
115 _replies
.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
116 _replies
.push(std::unique_ptr
<reply
>());
117 f
.ignore_ready_future();
120 return make_ready_future
<>();
124 _resp
->_headers
["Content-Length"] = to_sstring(
125 _resp
->_content
.size());
126 return _write_buf
.write(_resp
->_response_line
.data(),
127 _resp
->_response_line
.size()).then([this] {
128 return _resp
->write_reply_headers(*this);
130 return _write_buf
.write("\r\n", 2);
134 return _write_buf
.flush();
140 connection::~connection() {
141 --_server
._current_connections
;
142 _server
._connections
.erase(_server
._connections
.iterator_to(*this));
145 bool connection::url_decode(const std::string_view
& in
, sstring
& out
) {
147 sstring
buff(in
.length(), 0);
148 for (size_t i
= 0; i
< in
.length(); ++i
) {
150 if (i
+ 3 <= in
.size()) {
151 buff
[pos
++] = hexstr_to_char(in
, i
+ 1);
156 } else if (in
[i
] == '+') {
167 void connection::on_new_connection() {
168 ++_server
._total_connections
;
169 ++_server
._current_connections
;
170 _fd
.set_nodelay(true);
171 _server
._connections
.push_back(*this);
174 future
<> connection::read() {
175 return do_until([this] {return _done
;}, [this] {
177 }).then_wrapped([this] (future
<> f
) {
180 _server
._read_errors
++;
182 f
.ignore_ready_future();
183 return _replies
.push_eventually( {});
185 return _read_buf
.close();
189 static input_stream
<char> make_content_stream(httpd::request
* req
, input_stream
<char>& buf
) {
190 // Create an input stream based on the requests body encoding or lack thereof
191 if (request::case_insensitive_cmp()(req
->get_header("Transfer-Encoding"), "chunked")) {
192 return input_stream
<char>(data_source(std::make_unique
<internal::chunked_source_impl
>(buf
, req
->chunk_extensions
, req
->trailing_headers
)));
194 return input_stream
<char>(data_source(std::make_unique
<internal::content_length_source_impl
>(buf
, req
->content_length
)));
198 static future
<std::unique_ptr
<httpd::request
>>
199 set_request_content(std::unique_ptr
<httpd::request
> req
, input_stream
<char>* content_stream
, bool streaming
) {
200 req
->content_stream
= content_stream
;
203 return make_ready_future
<std::unique_ptr
<httpd::request
>>(std::move(req
));
205 // Read the entire content into the request content string
206 return util::read_entire_stream_contiguous(*content_stream
).then([req
= std::move(req
)] (sstring content
) mutable {
207 req
->content
= std::move(content
);
208 return make_ready_future
<std::unique_ptr
<httpd::request
>>(std::move(req
));
213 void connection::generate_error_reply_and_close(std::unique_ptr
<httpd::request
> req
, reply::status_type status
, const sstring
& msg
) {
214 auto resp
= std::make_unique
<reply
>();
215 // TODO: Handle HTTP/2.0 when it releases
216 resp
->set_version(req
->_version
);
217 resp
->set_status(status
, msg
);
220 _replies
.push(std::move(resp
));
223 future
<> connection::read_one() {
225 return _read_buf
.consume(_parser
).then([this] () mutable {
228 return make_ready_future
<>();
230 ++_server
._requests_served
;
231 std::unique_ptr
<httpd::request
> req
= _parser
.get_parsed_request();
232 if (_server
._credentials
) {
233 req
->protocol_name
= "https";
235 if (_parser
.failed()) {
236 if (req
->_version
.empty()) {
237 // we might have failed to parse even the version
238 req
->_version
= "1.1";
240 generate_error_reply_and_close(std::move(req
), reply::status_type::bad_request
, "Can't parse the request");
241 return make_ready_future
<>();
244 size_t content_length_limit
= _server
.get_content_length_limit();
245 sstring length_header
= req
->get_header("Content-Length");
246 req
->content_length
= strtol(length_header
.c_str(), nullptr, 10);
248 if (req
->content_length
> content_length_limit
) {
249 auto msg
= format("Content length limit ({}) exceeded: {}", content_length_limit
, req
->content_length
);
250 generate_error_reply_and_close(std::move(req
), reply::status_type::payload_too_large
, std::move(msg
));
251 return make_ready_future
<>();
254 sstring encoding
= req
->get_header("Transfer-Encoding");
255 if (encoding
.size() && !request::case_insensitive_cmp()(encoding
, "chunked")){
256 //TODO: add "identity", "gzip"("x-gzip"), "compress"("x-compress"), and "deflate" encodings and their combinations
257 generate_error_reply_and_close(std::move(req
), reply::status_type::not_implemented
, format("Encodings other than \"chunked\" are not implemented (received encoding: \"{}\")", encoding
));
258 return make_ready_future
<>();
261 auto maybe_reply_continue
= [this, req
= std::move(req
)] () mutable {
262 if (req
->_version
== "1.1" && request::case_insensitive_cmp()(req
->get_header("Expect"), "100-continue")){
263 return _replies
.not_full().then([req
= std::move(req
), this] () mutable {
264 auto continue_reply
= std::make_unique
<reply
>();
265 set_headers(*continue_reply
);
266 continue_reply
->set_version(req
->_version
);
267 continue_reply
->set_status(reply::status_type::continue_
).done();
268 this->_replies
.push(std::move(continue_reply
));
269 return make_ready_future
<std::unique_ptr
<httpd::request
>>(std::move(req
));
272 return make_ready_future
<std::unique_ptr
<httpd::request
>>(std::move(req
));
276 return maybe_reply_continue().then([this] (std::unique_ptr
<httpd::request
> req
) {
277 return do_with(make_content_stream(req
.get(), _read_buf
), sstring(req
->_version
), std::move(req
), [this] (input_stream
<char>& content_stream
, sstring
& version
, std::unique_ptr
<httpd::request
>& req
) {
278 return set_request_content(std::move(req
), &content_stream
, _server
.get_content_streaming()).then([this, &content_stream
] (std::unique_ptr
<httpd::request
> req
) {
279 return _replies
.not_full().then([this, req
= std::move(req
)] () mutable {
280 return generate_reply(std::move(req
));
281 }).then([this, &content_stream
](bool done
) {
283 // If the handler did not read the entire request
284 // content, this connection cannot be reused so we
285 // need to close it (via "_done = true"). But we can't
286 // just check content_stream.eof(): It may only become
287 // true after read(). Issue #907.
288 return content_stream
.read().then([this] (temporary_buffer
<char> buf
) {
294 }).handle_exception_type([this, &version
] (const base_exception
& e
) mutable {
295 // If the request had a "Transfer-Encoding: chunked" header and content streaming wasn't enabled, we might have failed
296 // before passing the request to handler - when we were parsing chunks
297 auto err_req
= std::make_unique
<httpd::request
>();
298 err_req
->_version
= version
;
299 generate_error_reply_and_close(std::move(err_req
), e
.status(), e
.str());
306 future
<> connection::process() {
307 // Launch read and write "threads" simultaneously:
308 return when_all(read(), respond()).then(
309 [] (std::tuple
<future
<>, future
<>> joined
) {
311 std::get
<0>(joined
).get();
313 hlogger
.debug("Read exception encountered: {}", std::current_exception());
316 std::get
<1>(joined
).get();
318 hlogger
.debug("Response exception encountered: {}", std::current_exception());
320 return make_ready_future
<>();
323 void connection::shutdown() {
324 _fd
.shutdown_input();
325 _fd
.shutdown_output();
328 future
<> connection::write_reply_headers(
329 std::unordered_map
<sstring
, sstring
>::iterator hi
) {
330 if (hi
== _resp
->_headers
.end()) {
331 return make_ready_future
<>();
333 return _write_buf
.write(hi
->first
.data(), hi
->first
.size()).then(
335 return _write_buf
.write(": ", 2);
337 return _write_buf
.write(hi
->second
.data(), hi
->second
.size());
339 return _write_buf
.write("\r\n", 2);
340 }).then([hi
, this] () mutable {
341 return write_reply_headers(++hi
);
345 short connection::hex_to_byte(char c
) {
346 if (c
>='a' && c
<= 'z') {
348 } else if (c
>='A' && c
<= 'Z') {
355 * Convert a hex encoded 2 bytes substring to char
357 char connection::hexstr_to_char(const std::string_view
& in
, size_t from
) {
359 return static_cast<char>(hex_to_byte(in
[from
]) * 16 + hex_to_byte(in
[from
+ 1]));
362 void connection::add_param(request
& req
, const std::string_view
& param
) {
363 size_t split
= param
.find('=');
365 if (split
>= param
.length() - 1) {
367 if (url_decode(param
.substr(0,split
) , key
)) {
368 req
.query_parameters
[key
] = "";
373 if (url_decode(param
.substr(0,split
), key
)
374 && url_decode(param
.substr(split
+ 1), value
)) {
375 req
.query_parameters
[key
] = value
;
381 sstring
connection::set_query_param(request
& req
) {
382 size_t pos
= req
._url
.find('?');
383 if (pos
== sstring::npos
) {
386 size_t curr
= pos
+ 1;
388 std::string_view url
= req
._url
;
389 while ((end_param
= req
._url
.find('&', curr
)) != sstring::npos
) {
390 add_param(req
, url
.substr(curr
, end_param
- curr
) );
391 curr
= end_param
+ 1;
393 add_param(req
, url
.substr(curr
));
394 return req
._url
.substr(0, pos
);
397 output_stream
<char>& connection::out() {
401 future
<> connection::respond() {
402 return do_response_loop().then_wrapped([this] (future
<> f
) {
405 _server
._respond_errors
++;
407 f
.ignore_ready_future();
408 return _write_buf
.close();
412 future
<> connection::write_body() {
413 return _write_buf
.write(_resp
->_content
.data(),
414 _resp
->_content
.size());
417 void connection::set_headers(reply
& resp
) {
418 resp
._headers
["Server"] = "Seastar httpd";
419 resp
._headers
["Date"] = _server
._date
;
422 future
<bool> connection::generate_reply(std::unique_ptr
<request
> req
) {
423 auto resp
= std::make_unique
<reply
>();
424 bool conn_keep_alive
= false;
425 bool conn_close
= false;
426 auto it
= req
->_headers
.find("Connection");
427 if (it
!= req
->_headers
.end()) {
428 if (it
->second
== "Keep-Alive") {
429 conn_keep_alive
= true;
430 } else if (it
->second
== "Close") {
435 // TODO: Handle HTTP/2.0 when it releases
436 resp
->set_version(req
->_version
);
438 if (req
->_version
== "1.0") {
439 if (conn_keep_alive
) {
440 resp
->_headers
["Connection"] = "Keep-Alive";
442 should_close
= !conn_keep_alive
;
443 } else if (req
->_version
== "1.1") {
444 should_close
= conn_close
;
446 // HTTP/0.9 goes here
449 sstring url
= set_query_param(*req
.get());
450 sstring version
= req
->_version
;
452 return _server
._routes
.handle(url
, std::move(req
), std::move(resp
)).
453 // Caller guarantees enough room
454 then([this, should_close
, version
= std::move(version
)](std::unique_ptr
<reply
> rep
) {
455 rep
->set_version(version
).done();
456 this->_replies
.push(std::move(rep
));
457 return make_ready_future
<bool>(should_close
);
461 void http_server::set_tls_credentials(shared_ptr
<seastar::tls::server_credentials
> credentials
) {
462 _credentials
= credentials
;
465 size_t http_server::get_content_length_limit() const {
466 return _content_length_limit
;
469 void http_server::set_content_length_limit(size_t limit
) {
470 _content_length_limit
= limit
;
473 bool http_server::get_content_streaming() const {
474 return _content_streaming
;
477 void http_server::set_content_streaming(bool b
) {
478 _content_streaming
= b
;
481 future
<> http_server::listen(socket_address addr
, listen_options lo
) {
483 _listeners
.push_back(seastar::tls::listen(_credentials
, addr
, lo
));
485 _listeners
.push_back(seastar::listen(addr
, lo
));
487 return do_accepts(_listeners
.size() - 1);
489 future
<> http_server::listen(socket_address addr
) {
491 lo
.reuse_address
= true;
492 return listen(addr
, lo
);
494 future
<> http_server::stop() {
495 future
<> tasks_done
= _task_gate
.close();
496 for (auto&& l
: _listeners
) {
499 for (auto&& c
: _connections
) {
505 // FIXME: This could return void
506 future
<> http_server::do_accepts(int which
) {
507 (void)try_with_gate(_task_gate
, [this, which
] {
508 return keep_doing([this, which
] {
509 return try_with_gate(_task_gate
, [this, which
] {
510 return do_accept_one(which
);
512 }).handle_exception_type([](const gate_closed_exception
& e
) {});
513 }).handle_exception_type([](const gate_closed_exception
& e
) {});
514 return make_ready_future
<>();
517 future
<> http_server::do_accept_one(int which
) {
518 return _listeners
[which
].accept().then([this] (accept_result ar
) mutable {
519 auto conn
= std::make_unique
<connection
>(*this, std::move(ar
.connection
), std::move(ar
.remote_address
));
520 (void)try_with_gate(_task_gate
, [conn
= std::move(conn
)]() mutable {
521 return conn
->process().handle_exception([conn
= std::move(conn
)] (std::exception_ptr ex
) {
522 hlogger
.error("request error: {}", ex
);
524 }).handle_exception_type([] (const gate_closed_exception
& e
) {});
525 }).handle_exception_type([] (const std::system_error
&e
) {
526 // We expect a ECONNABORTED when http_server::stop is called,
527 // no point in warning about that.
528 if (e
.code().value() != ECONNABORTED
) {
529 hlogger
.error("accept failed: {}", e
);
531 }).handle_exception([] (std::exception_ptr ex
) {
532 hlogger
.error("accept failed: {}", ex
);
536 uint64_t http_server::total_connections() const {
537 return _total_connections
;
539 uint64_t http_server::current_connections() const {
540 return _current_connections
;
542 uint64_t http_server::requests_served() const {
543 return _requests_served
;
545 uint64_t http_server::read_errors() const {
548 uint64_t http_server::reply_errors() const {
549 return _respond_errors
;
552 // Write the current date in the specific "preferred format" defined in
553 // RFC 7231, Section 7.1.1.1, a.k.a. IMF (Internet Message Format) fixdate.
554 // For example: Sun, 06 Nov 1994 08:49:37 GMT
555 sstring
http_server::http_date() {
556 auto t
= ::time(nullptr);
559 // Using strftime() would have been easier, but unfortunately relies on
560 // the current locale, and we need the month and day names in English.
561 static const char* days
[] = {
562 "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"
564 static const char* months
[] = {
565 "Jan", "Feb", "Mar", "Apr", "May", "Jun",
566 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
568 return seastar::format("{}, {:02d} {} {} {:02d}:{:02d}:{:02d} GMT",
569 days
[tm
.tm_wday
], tm
.tm_mday
, months
[tm
.tm_mon
], 1900 + tm
.tm_year
,
570 tm
.tm_hour
, tm
.tm_min
, tm
.tm_sec
);
574 future
<> http_server_control::start(const sstring
& name
) {
575 return _server_dist
->start(name
);
578 future
<> http_server_control::stop() {
579 return _server_dist
->stop();
582 future
<> http_server_control::set_routes(std::function
<void(routes
& r
)> fun
) {
583 return _server_dist
->invoke_on_all([fun
](http_server
& server
) {
588 future
<> http_server_control::listen(socket_address addr
) {
589 return _server_dist
->invoke_on_all
<future
<> (http_server::*)(socket_address
)>(&http_server::listen
, addr
);
592 future
<> http_server_control::listen(socket_address addr
, listen_options lo
) {
593 return _server_dist
->invoke_on_all
<future
<> (http_server::*)(socket_address
, listen_options
)>(&http_server::listen
, addr
, lo
);
596 distributed
<http_server
>& http_server_control::server() {
597 return *_server_dist
;