labels.push_back(sm::label_instance("service", name));
_metric_groups.add_group("httpd", {
- sm::make_derive("connections_total", [&server] { return server.total_connections(); }, sm::description("The total number of connections opened"), labels),
+ sm::make_counter("connections_total", [&server] { return server.total_connections(); }, sm::description("The total number of connections opened"), labels),
sm::make_gauge("connections_current", [&server] { return server.current_connections(); }, sm::description("The current number of open connections"), labels),
- sm::make_derive("read_errors", [&server] { return server.read_errors(); }, sm::description("The total number of errors while reading http requests"), labels),
- sm::make_derive("reply_errors", [&server] { return server.reply_errors(); }, sm::description("The total number of errors while replying to http"), labels),
- sm::make_derive("requests_served", [&server] { return server.requests_served(); }, sm::description("The total number of http requests served"), labels)
+ sm::make_counter("read_errors", [&server] { return server.read_errors(); }, sm::description("The total number of errors while reading http requests"), labels),
+ sm::make_counter("reply_errors", [&server] { return server.reply_errors(); }, sm::description("The total number of errors while replying to http"), labels),
+ sm::make_counter("requests_served", [&server] { return server.requests_served(); }, sm::description("The total number of http requests served"), labels)
});
}
future<> connection::do_response_loop() {
return _replies.pop_eventually().then(
- [this] (std::unique_ptr<reply> resp) {
+ [this] (std::unique_ptr<http::reply> resp) {
if (!resp) {
// eof
return make_ready_future<>();
_server._respond_errors++;
_done = true;
_replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
- _replies.push(std::unique_ptr<reply>());
+ _replies.push(std::unique_ptr<http::reply>());
f.ignore_ready_future();
return make_ready_future<>();
}
// we should close it, so the client will disconnect
_done = true;
_replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
- _replies.push(std::unique_ptr<reply>());
+ _replies.push(std::unique_ptr<http::reply>());
f.ignore_ready_future();
return make_ready_future<>();
} else {
// flush failed. just close the connection
_done = true;
_replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
- _replies.push(std::unique_ptr<reply>());
+ _replies.push(std::unique_ptr<http::reply>());
f.ignore_ready_future();
}
_resp.reset();
_server._connections.erase(_server._connections.iterator_to(*this));
}
-bool connection::url_decode(const std::string_view& in, sstring& out) {
- size_t pos = 0;
- sstring buff(in.length(), 0);
- for (size_t i = 0; i < in.length(); ++i) {
- if (in[i] == '%') {
- if (i + 3 <= in.size()) {
- buff[pos++] = hexstr_to_char(in, i + 1);
- i += 2;
- } else {
- return false;
- }
- } else if (in[i] == '+') {
- buff[pos++] = ' ';
- } else {
- buff[pos++] = in[i];
- }
- }
- buff.resize(pos);
- out = buff;
- return true;
-}
-
void connection::on_new_connection() {
++_server._total_connections;
++_server._current_connections;
}
f.ignore_ready_future();
return _replies.push_eventually( {});
- }).finally([this] {
- return _read_buf.close();
});
}
-static input_stream<char> make_content_stream(httpd::request* req, input_stream<char>& buf) {
+static input_stream<char> make_content_stream(http::request* req, input_stream<char>& buf) {
// Create an input stream based on the requests body encoding or lack thereof
- if (request::case_insensitive_cmp()(req->get_header("Transfer-Encoding"), "chunked")) {
+ if (http::request::case_insensitive_cmp()(req->get_header("Transfer-Encoding"), "chunked")) {
return input_stream<char>(data_source(std::make_unique<internal::chunked_source_impl>(buf, req->chunk_extensions, req->trailing_headers)));
} else {
return input_stream<char>(data_source(std::make_unique<internal::content_length_source_impl>(buf, req->content_length)));
}
}
-static future<std::unique_ptr<httpd::request>>
-set_request_content(std::unique_ptr<httpd::request> req, input_stream<char>* content_stream, bool streaming) {
+static future<std::unique_ptr<http::request>>
+set_request_content(std::unique_ptr<http::request> req, input_stream<char>* content_stream, bool streaming) {
req->content_stream = content_stream;
if (streaming) {
- return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
+ return make_ready_future<std::unique_ptr<http::request>>(std::move(req));
} else {
// Read the entire content into the request content string
return util::read_entire_stream_contiguous(*content_stream).then([req = std::move(req)] (sstring content) mutable {
req->content = std::move(content);
- return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
+ return make_ready_future<std::unique_ptr<http::request>>(std::move(req));
});
}
}
-void connection::generate_error_reply_and_close(std::unique_ptr<httpd::request> req, reply::status_type status, const sstring& msg) {
- auto resp = std::make_unique<reply>();
+void connection::generate_error_reply_and_close(std::unique_ptr<http::request> req, http::reply::status_type status, const sstring& msg) {
+ auto resp = std::make_unique<http::reply>();
// TODO: Handle HTTP/2.0 when it releases
resp->set_version(req->_version);
resp->set_status(status, msg);
return make_ready_future<>();
}
++_server._requests_served;
- std::unique_ptr<httpd::request> req = _parser.get_parsed_request();
+ std::unique_ptr<http::request> req = _parser.get_parsed_request();
if (_server._credentials) {
req->protocol_name = "https";
}
// we might have failed to parse even the version
req->_version = "1.1";
}
- generate_error_reply_and_close(std::move(req), reply::status_type::bad_request, "Can't parse the request");
+ generate_error_reply_and_close(std::move(req), http::reply::status_type::bad_request, "Can't parse the request");
return make_ready_future<>();
}
if (req->content_length > content_length_limit) {
auto msg = format("Content length limit ({}) exceeded: {}", content_length_limit, req->content_length);
- generate_error_reply_and_close(std::move(req), reply::status_type::payload_too_large, std::move(msg));
+ generate_error_reply_and_close(std::move(req), http::reply::status_type::payload_too_large, std::move(msg));
return make_ready_future<>();
}
sstring encoding = req->get_header("Transfer-Encoding");
- if (encoding.size() && !request::case_insensitive_cmp()(encoding, "chunked")){
+ if (encoding.size() && !http::request::case_insensitive_cmp()(encoding, "chunked")){
//TODO: add "identity", "gzip"("x-gzip"), "compress"("x-compress"), and "deflate" encodings and their combinations
- generate_error_reply_and_close(std::move(req), reply::status_type::not_implemented, format("Encodings other than \"chunked\" are not implemented (received encoding: \"{}\")", encoding));
+ generate_error_reply_and_close(std::move(req), http::reply::status_type::not_implemented, format("Encodings other than \"chunked\" are not implemented (received encoding: \"{}\")", encoding));
return make_ready_future<>();
}
auto maybe_reply_continue = [this, req = std::move(req)] () mutable {
- if (req->_version == "1.1" && request::case_insensitive_cmp()(req->get_header("Expect"), "100-continue")){
+ if (req->_version == "1.1" && http::request::case_insensitive_cmp()(req->get_header("Expect"), "100-continue")){
return _replies.not_full().then([req = std::move(req), this] () mutable {
- auto continue_reply = std::make_unique<reply>();
+ auto continue_reply = std::make_unique<http::reply>();
set_headers(*continue_reply);
continue_reply->set_version(req->_version);
- continue_reply->set_status(reply::status_type::continue_).done();
+ continue_reply->set_status(http::reply::status_type::continue_).done();
this->_replies.push(std::move(continue_reply));
- return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
+ return make_ready_future<std::unique_ptr<http::request>>(std::move(req));
});
} else {
- return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
+ return make_ready_future<std::unique_ptr<http::request>>(std::move(req));
}
};
- return maybe_reply_continue().then([this] (std::unique_ptr<httpd::request> req) {
- return do_with(make_content_stream(req.get(), _read_buf), sstring(req->_version), std::move(req), [this] (input_stream<char>& content_stream, sstring& version, std::unique_ptr<httpd::request>& req) {
- return set_request_content(std::move(req), &content_stream, _server.get_content_streaming()).then([this, &content_stream] (std::unique_ptr<httpd::request> req) {
+ return maybe_reply_continue().then([this] (std::unique_ptr<http::request> req) {
+ return do_with(make_content_stream(req.get(), _read_buf), sstring(req->_version), std::move(req), [this] (input_stream<char>& content_stream, sstring& version, std::unique_ptr<http::request>& req) {
+ return set_request_content(std::move(req), &content_stream, _server.get_content_streaming()).then([this, &content_stream] (std::unique_ptr<http::request> req) {
return _replies.not_full().then([this, req = std::move(req)] () mutable {
return generate_reply(std::move(req));
}).then([this, &content_stream](bool done) {
}).handle_exception_type([this, &version] (const base_exception& e) mutable {
// If the request had a "Transfer-Encoding: chunked" header and content streaming wasn't enabled, we might have failed
// before passing the request to handler - when we were parsing chunks
- auto err_req = std::make_unique<httpd::request>();
+ auto err_req = std::make_unique<http::request>();
err_req->_version = version;
generate_error_reply_and_close(std::move(err_req), e.status(), e.str());
});
hlogger.debug("Response exception encountered: {}", std::current_exception());
}
return make_ready_future<>();
+ }).finally([this]{
+ return _read_buf.close().handle_exception([](std::exception_ptr e) {
+ hlogger.debug("Close exception encountered: {}", e);
+ });
});
}
void connection::shutdown() {
_fd.shutdown_output();
}
-future<> connection::write_reply_headers(
- std::unordered_map<sstring, sstring>::iterator hi) {
- if (hi == _resp->_headers.end()) {
- return make_ready_future<>();
- }
- return _write_buf.write(hi->first.data(), hi->first.size()).then(
- [this] {
- return _write_buf.write(": ", 2);
- }).then([hi, this] {
- return _write_buf.write(hi->second.data(), hi->second.size());
- }).then([this] {
- return _write_buf.write("\r\n", 2);
- }).then([hi, this] () mutable {
- return write_reply_headers(++hi);
- });
-}
-
-short connection::hex_to_byte(char c) {
- if (c >='a' && c <= 'z') {
- return c - 'a' + 10;
- } else if (c >='A' && c <= 'Z') {
- return c - 'A' + 10;
- }
- return c - '0';
-}
-
-/**
- * Convert a hex encoded 2 bytes substring to char
- */
-char connection::hexstr_to_char(const std::string_view& in, size_t from) {
-
- return static_cast<char>(hex_to_byte(in[from]) * 16 + hex_to_byte(in[from + 1]));
-}
-
-void connection::add_param(request& req, const std::string_view& param) {
- size_t split = param.find('=');
-
- if (split >= param.length() - 1) {
- sstring key;
- if (url_decode(param.substr(0,split) , key)) {
- req.query_parameters[key] = "";
- }
- } else {
- sstring key;
- sstring value;
- if (url_decode(param.substr(0,split), key)
- && url_decode(param.substr(split + 1), value)) {
- req.query_parameters[key] = value;
- }
- }
-
-}
-
-sstring connection::set_query_param(request& req) {
- size_t pos = req._url.find('?');
- if (pos == sstring::npos) {
- return req._url;
- }
- size_t curr = pos + 1;
- size_t end_param;
- std::string_view url = req._url;
- while ((end_param = req._url.find('&', curr)) != sstring::npos) {
- add_param(req, url.substr(curr, end_param - curr) );
- curr = end_param + 1;
- }
- add_param(req, url.substr(curr));
- return req._url.substr(0, pos);
-}
-
output_stream<char>& connection::out() {
return _write_buf;
}
_resp->_content.size());
}
-void connection::set_headers(reply& resp) {
+void connection::set_headers(http::reply& resp) {
resp._headers["Server"] = "Seastar httpd";
resp._headers["Date"] = _server._date;
}
-future<bool> connection::generate_reply(std::unique_ptr<request> req) {
- auto resp = std::make_unique<reply>();
- bool conn_keep_alive = false;
- bool conn_close = false;
- auto it = req->_headers.find("Connection");
- if (it != req->_headers.end()) {
- if (it->second == "Keep-Alive") {
- conn_keep_alive = true;
- } else if (it->second == "Close") {
- conn_close = true;
- }
- }
- bool should_close;
- // TODO: Handle HTTP/2.0 when it releases
+future<bool> connection::generate_reply(std::unique_ptr<http::request> req) {
+ auto resp = std::make_unique<http::reply>();
resp->set_version(req->_version);
-
- if (req->_version == "1.0") {
- if (conn_keep_alive) {
- resp->_headers["Connection"] = "Keep-Alive";
- }
- should_close = !conn_keep_alive;
- } else if (req->_version == "1.1") {
- should_close = conn_close;
- } else {
- // HTTP/0.9 goes here
- should_close = true;
+ set_headers(*resp);
+ bool keep_alive = req->should_keep_alive();
+ if (keep_alive && req->_version == "1.0") {
+ resp->_headers["Connection"] = "Keep-Alive";
}
- sstring url = set_query_param(*req.get());
+
+ sstring url = req->parse_query_param();
sstring version = req->_version;
- set_headers(*resp);
return _server._routes.handle(url, std::move(req), std::move(resp)).
// Caller guarantees enough room
- then([this, should_close, version = std::move(version)](std::unique_ptr<reply> rep) {
+ then([this, keep_alive , version = std::move(version)](std::unique_ptr<http::reply> rep) {
rep->set_version(version).done();
this->_replies.push(std::move(rep));
- return make_ready_future<bool>(should_close);
+ return make_ready_future<bool>(!keep_alive);
});
}