]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/http/httpd.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / http / httpd.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2015 Cloudius Systems
20 */
21
22 #include <seastar/core/sstring.hh>
23 #include <seastar/core/app-template.hh>
24 #include <seastar/core/circular_buffer.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/queue.hh>
27 #include <seastar/core/when_all.hh>
28 #include <seastar/core/metrics.hh>
29 #include <seastar/core/print.hh>
30 #include <iostream>
31 #include <algorithm>
32 #include <unordered_map>
33 #include <queue>
34 #include <bitset>
35 #include <limits>
36 #include <cctype>
37 #include <vector>
38 #include <seastar/http/httpd.hh>
39 #include <seastar/http/internal/content_source.hh>
40 #include <seastar/http/reply.hh>
41 #include <seastar/util/short_streams.hh>
42 #include <seastar/util/log.hh>
43
44 using namespace std::chrono_literals;
45
46 namespace seastar {
47
48 logger hlogger("httpd");
49
50 namespace httpd {
51 http_stats::http_stats(http_server& server, const sstring& name)
52 {
53 namespace sm = seastar::metrics;
54 std::vector<sm::label_instance> labels;
55
56 labels.push_back(sm::label_instance("service", name));
57 _metric_groups.add_group("httpd", {
58 sm::make_derive("connections_total", [&server] { return server.total_connections(); }, sm::description("The total number of connections opened"), labels),
59 sm::make_gauge("connections_current", [&server] { return server.current_connections(); }, sm::description("The current number of open connections"), labels),
60 sm::make_derive("read_errors", [&server] { return server.read_errors(); }, sm::description("The total number of errors while reading http requests"), labels),
61 sm::make_derive("reply_errors", [&server] { return server.reply_errors(); }, sm::description("The total number of errors while replying to http"), labels),
62 sm::make_derive("requests_served", [&server] { return server.requests_served(); }, sm::description("The total number of http requests served"), labels)
63 });
64 }
65
66 sstring http_server_control::generate_server_name() {
67 static thread_local uint16_t idgen;
68 return seastar::format("http-{}", idgen++);
69 }
70
71 future<> connection::do_response_loop() {
72 return _replies.pop_eventually().then(
73 [this] (std::unique_ptr<reply> resp) {
74 if (!resp) {
75 // eof
76 return make_ready_future<>();
77 }
78 _resp = std::move(resp);
79 return start_response().then([this] {
80 return do_response_loop();
81 });
82 });
83 }
84
85 future<> connection::start_response() {
86 if (_resp->_body_writer) {
87 return _resp->write_reply_to_connection(*this).then_wrapped([this] (auto f) {
88 if (f.failed()) {
89 // In case of an error during the write close the connection
90 _server._respond_errors++;
91 _done = true;
92 _replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
93 _replies.push(std::unique_ptr<reply>());
94 f.ignore_ready_future();
95 return make_ready_future<>();
96 }
97 return _write_buf.write("0\r\n\r\n", 5);
98 }).then_wrapped([this ] (auto f) {
99 if (f.failed()) {
100 // We could not write the closing sequence
101 // Something is probably wrong with the connection,
102 // we should close it, so the client will disconnect
103 _done = true;
104 _replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
105 _replies.push(std::unique_ptr<reply>());
106 f.ignore_ready_future();
107 return make_ready_future<>();
108 } else {
109 return _write_buf.flush();
110 }
111 }).then_wrapped([this] (auto f) {
112 if (f.failed()) {
113 // flush failed. just close the connection
114 _done = true;
115 _replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
116 _replies.push(std::unique_ptr<reply>());
117 f.ignore_ready_future();
118 }
119 _resp.reset();
120 return make_ready_future<>();
121 });
122 }
123 set_headers(*_resp);
124 _resp->_headers["Content-Length"] = to_sstring(
125 _resp->_content.size());
126 return _write_buf.write(_resp->_response_line.data(),
127 _resp->_response_line.size()).then([this] {
128 return _resp->write_reply_headers(*this);
129 }).then([this] {
130 return _write_buf.write("\r\n", 2);
131 }).then([this] {
132 return write_body();
133 }).then([this] {
134 return _write_buf.flush();
135 }).then([this] {
136 _resp.reset();
137 });
138 }
139
140 connection::~connection() {
141 --_server._current_connections;
142 _server._connections.erase(_server._connections.iterator_to(*this));
143 }
144
145 bool connection::url_decode(const std::string_view& in, sstring& out) {
146 size_t pos = 0;
147 sstring buff(in.length(), 0);
148 for (size_t i = 0; i < in.length(); ++i) {
149 if (in[i] == '%') {
150 if (i + 3 <= in.size()) {
151 buff[pos++] = hexstr_to_char(in, i + 1);
152 i += 2;
153 } else {
154 return false;
155 }
156 } else if (in[i] == '+') {
157 buff[pos++] = ' ';
158 } else {
159 buff[pos++] = in[i];
160 }
161 }
162 buff.resize(pos);
163 out = buff;
164 return true;
165 }
166
167 void connection::on_new_connection() {
168 ++_server._total_connections;
169 ++_server._current_connections;
170 _fd.set_nodelay(true);
171 _server._connections.push_back(*this);
172 }
173
174 future<> connection::read() {
175 return do_until([this] {return _done;}, [this] {
176 return read_one();
177 }).then_wrapped([this] (future<> f) {
178 // swallow error
179 if (f.failed()) {
180 _server._read_errors++;
181 }
182 f.ignore_ready_future();
183 return _replies.push_eventually( {});
184 }).finally([this] {
185 return _read_buf.close();
186 });
187 }
188
189 static input_stream<char> make_content_stream(httpd::request* req, input_stream<char>& buf) {
190 // Create an input stream based on the requests body encoding or lack thereof
191 if (request::case_insensitive_cmp()(req->get_header("Transfer-Encoding"), "chunked")) {
192 return input_stream<char>(data_source(std::make_unique<internal::chunked_source_impl>(buf, req->chunk_extensions, req->trailing_headers)));
193 } else {
194 return input_stream<char>(data_source(std::make_unique<internal::content_length_source_impl>(buf, req->content_length)));
195 }
196 }
197
198 static future<std::unique_ptr<httpd::request>>
199 set_request_content(std::unique_ptr<httpd::request> req, input_stream<char>* content_stream, bool streaming) {
200 req->content_stream = content_stream;
201
202 if (streaming) {
203 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
204 } else {
205 // Read the entire content into the request content string
206 return util::read_entire_stream_contiguous(*content_stream).then([req = std::move(req)] (sstring content) mutable {
207 req->content = std::move(content);
208 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
209 });
210 }
211 }
212
213 void connection::generate_error_reply_and_close(std::unique_ptr<httpd::request> req, reply::status_type status, const sstring& msg) {
214 auto resp = std::make_unique<reply>();
215 // TODO: Handle HTTP/2.0 when it releases
216 resp->set_version(req->_version);
217 resp->set_status(status, msg);
218 resp->done();
219 _done = true;
220 _replies.push(std::move(resp));
221 }
222
223 future<> connection::read_one() {
224 _parser.init();
225 return _read_buf.consume(_parser).then([this] () mutable {
226 if (_parser.eof()) {
227 _done = true;
228 return make_ready_future<>();
229 }
230 ++_server._requests_served;
231 std::unique_ptr<httpd::request> req = _parser.get_parsed_request();
232 if (_server._credentials) {
233 req->protocol_name = "https";
234 }
235 if (_parser.failed()) {
236 if (req->_version.empty()) {
237 // we might have failed to parse even the version
238 req->_version = "1.1";
239 }
240 generate_error_reply_and_close(std::move(req), reply::status_type::bad_request, "Can't parse the request");
241 return make_ready_future<>();
242 }
243
244 size_t content_length_limit = _server.get_content_length_limit();
245 sstring length_header = req->get_header("Content-Length");
246 req->content_length = strtol(length_header.c_str(), nullptr, 10);
247
248 if (req->content_length > content_length_limit) {
249 auto msg = format("Content length limit ({}) exceeded: {}", content_length_limit, req->content_length);
250 generate_error_reply_and_close(std::move(req), reply::status_type::payload_too_large, std::move(msg));
251 return make_ready_future<>();
252 }
253
254 sstring encoding = req->get_header("Transfer-Encoding");
255 if (encoding.size() && !request::case_insensitive_cmp()(encoding, "chunked")){
256 //TODO: add "identity", "gzip"("x-gzip"), "compress"("x-compress"), and "deflate" encodings and their combinations
257 generate_error_reply_and_close(std::move(req), reply::status_type::not_implemented, format("Encodings other than \"chunked\" are not implemented (received encoding: \"{}\")", encoding));
258 return make_ready_future<>();
259 }
260
261 auto maybe_reply_continue = [this, req = std::move(req)] () mutable {
262 if (req->_version == "1.1" && request::case_insensitive_cmp()(req->get_header("Expect"), "100-continue")){
263 return _replies.not_full().then([req = std::move(req), this] () mutable {
264 auto continue_reply = std::make_unique<reply>();
265 set_headers(*continue_reply);
266 continue_reply->set_version(req->_version);
267 continue_reply->set_status(reply::status_type::continue_).done();
268 this->_replies.push(std::move(continue_reply));
269 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
270 });
271 } else {
272 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
273 }
274 };
275
276 return maybe_reply_continue().then([this] (std::unique_ptr<httpd::request> req) {
277 return do_with(make_content_stream(req.get(), _read_buf), sstring(req->_version), std::move(req), [this] (input_stream<char>& content_stream, sstring& version, std::unique_ptr<httpd::request>& req) {
278 return set_request_content(std::move(req), &content_stream, _server.get_content_streaming()).then([this, &content_stream] (std::unique_ptr<httpd::request> req) {
279 return _replies.not_full().then([this, req = std::move(req)] () mutable {
280 return generate_reply(std::move(req));
281 }).then([this, &content_stream](bool done) {
282 _done = done;
283 // If the handler did not read the entire request
284 // content, this connection cannot be reused so we
285 // need to close it (via "_done = true"). But we can't
286 // just check content_stream.eof(): It may only become
287 // true after read(). Issue #907.
288 return content_stream.read().then([this] (temporary_buffer<char> buf) {
289 if (!buf.empty()) {
290 _done = true;
291 }
292 });
293 });
294 }).handle_exception_type([this, &version] (const base_exception& e) mutable {
295 // If the request had a "Transfer-Encoding: chunked" header and content streaming wasn't enabled, we might have failed
296 // before passing the request to handler - when we were parsing chunks
297 auto err_req = std::make_unique<httpd::request>();
298 err_req->_version = version;
299 generate_error_reply_and_close(std::move(err_req), e.status(), e.str());
300 });
301 });
302 });
303 });
304 }
305
306 future<> connection::process() {
307 // Launch read and write "threads" simultaneously:
308 return when_all(read(), respond()).then(
309 [] (std::tuple<future<>, future<>> joined) {
310 try {
311 std::get<0>(joined).get();
312 } catch (...) {
313 hlogger.debug("Read exception encountered: {}", std::current_exception());
314 }
315 try {
316 std::get<1>(joined).get();
317 } catch (...) {
318 hlogger.debug("Response exception encountered: {}", std::current_exception());
319 }
320 return make_ready_future<>();
321 });
322 }
323 void connection::shutdown() {
324 _fd.shutdown_input();
325 _fd.shutdown_output();
326 }
327
328 future<> connection::write_reply_headers(
329 std::unordered_map<sstring, sstring>::iterator hi) {
330 if (hi == _resp->_headers.end()) {
331 return make_ready_future<>();
332 }
333 return _write_buf.write(hi->first.data(), hi->first.size()).then(
334 [this] {
335 return _write_buf.write(": ", 2);
336 }).then([hi, this] {
337 return _write_buf.write(hi->second.data(), hi->second.size());
338 }).then([this] {
339 return _write_buf.write("\r\n", 2);
340 }).then([hi, this] () mutable {
341 return write_reply_headers(++hi);
342 });
343 }
344
345 short connection::hex_to_byte(char c) {
346 if (c >='a' && c <= 'z') {
347 return c - 'a' + 10;
348 } else if (c >='A' && c <= 'Z') {
349 return c - 'A' + 10;
350 }
351 return c - '0';
352 }
353
354 /**
355 * Convert a hex encoded 2 bytes substring to char
356 */
357 char connection::hexstr_to_char(const std::string_view& in, size_t from) {
358
359 return static_cast<char>(hex_to_byte(in[from]) * 16 + hex_to_byte(in[from + 1]));
360 }
361
362 void connection::add_param(request& req, const std::string_view& param) {
363 size_t split = param.find('=');
364
365 if (split >= param.length() - 1) {
366 sstring key;
367 if (url_decode(param.substr(0,split) , key)) {
368 req.query_parameters[key] = "";
369 }
370 } else {
371 sstring key;
372 sstring value;
373 if (url_decode(param.substr(0,split), key)
374 && url_decode(param.substr(split + 1), value)) {
375 req.query_parameters[key] = value;
376 }
377 }
378
379 }
380
381 sstring connection::set_query_param(request& req) {
382 size_t pos = req._url.find('?');
383 if (pos == sstring::npos) {
384 return req._url;
385 }
386 size_t curr = pos + 1;
387 size_t end_param;
388 std::string_view url = req._url;
389 while ((end_param = req._url.find('&', curr)) != sstring::npos) {
390 add_param(req, url.substr(curr, end_param - curr) );
391 curr = end_param + 1;
392 }
393 add_param(req, url.substr(curr));
394 return req._url.substr(0, pos);
395 }
396
397 output_stream<char>& connection::out() {
398 return _write_buf;
399 }
400
401 future<> connection::respond() {
402 return do_response_loop().then_wrapped([this] (future<> f) {
403 // swallow error
404 if (f.failed()) {
405 _server._respond_errors++;
406 }
407 f.ignore_ready_future();
408 return _write_buf.close();
409 });
410 }
411
412 future<> connection::write_body() {
413 return _write_buf.write(_resp->_content.data(),
414 _resp->_content.size());
415 }
416
417 void connection::set_headers(reply& resp) {
418 resp._headers["Server"] = "Seastar httpd";
419 resp._headers["Date"] = _server._date;
420 }
421
422 future<bool> connection::generate_reply(std::unique_ptr<request> req) {
423 auto resp = std::make_unique<reply>();
424 bool conn_keep_alive = false;
425 bool conn_close = false;
426 auto it = req->_headers.find("Connection");
427 if (it != req->_headers.end()) {
428 if (it->second == "Keep-Alive") {
429 conn_keep_alive = true;
430 } else if (it->second == "Close") {
431 conn_close = true;
432 }
433 }
434 bool should_close;
435 // TODO: Handle HTTP/2.0 when it releases
436 resp->set_version(req->_version);
437
438 if (req->_version == "1.0") {
439 if (conn_keep_alive) {
440 resp->_headers["Connection"] = "Keep-Alive";
441 }
442 should_close = !conn_keep_alive;
443 } else if (req->_version == "1.1") {
444 should_close = conn_close;
445 } else {
446 // HTTP/0.9 goes here
447 should_close = true;
448 }
449 sstring url = set_query_param(*req.get());
450 sstring version = req->_version;
451 set_headers(*resp);
452 return _server._routes.handle(url, std::move(req), std::move(resp)).
453 // Caller guarantees enough room
454 then([this, should_close, version = std::move(version)](std::unique_ptr<reply> rep) {
455 rep->set_version(version).done();
456 this->_replies.push(std::move(rep));
457 return make_ready_future<bool>(should_close);
458 });
459 }
460
461 void http_server::set_tls_credentials(shared_ptr<seastar::tls::server_credentials> credentials) {
462 _credentials = credentials;
463 }
464
465 size_t http_server::get_content_length_limit() const {
466 return _content_length_limit;
467 }
468
469 void http_server::set_content_length_limit(size_t limit) {
470 _content_length_limit = limit;
471 }
472
473 bool http_server::get_content_streaming() const {
474 return _content_streaming;
475 }
476
477 void http_server::set_content_streaming(bool b) {
478 _content_streaming = b;
479 }
480
481 future<> http_server::listen(socket_address addr, listen_options lo) {
482 if (_credentials) {
483 _listeners.push_back(seastar::tls::listen(_credentials, addr, lo));
484 } else {
485 _listeners.push_back(seastar::listen(addr, lo));
486 }
487 return do_accepts(_listeners.size() - 1);
488 }
489 future<> http_server::listen(socket_address addr) {
490 listen_options lo;
491 lo.reuse_address = true;
492 return listen(addr, lo);
493 }
494 future<> http_server::stop() {
495 future<> tasks_done = _task_gate.close();
496 for (auto&& l : _listeners) {
497 l.abort_accept();
498 }
499 for (auto&& c : _connections) {
500 c.shutdown();
501 }
502 return tasks_done;
503 }
504
505 // FIXME: This could return void
506 future<> http_server::do_accepts(int which) {
507 (void)try_with_gate(_task_gate, [this, which] {
508 return keep_doing([this, which] {
509 return try_with_gate(_task_gate, [this, which] {
510 return do_accept_one(which);
511 });
512 }).handle_exception_type([](const gate_closed_exception& e) {});
513 }).handle_exception_type([](const gate_closed_exception& e) {});
514 return make_ready_future<>();
515 }
516
517 future<> http_server::do_accept_one(int which) {
518 return _listeners[which].accept().then([this] (accept_result ar) mutable {
519 auto conn = std::make_unique<connection>(*this, std::move(ar.connection), std::move(ar.remote_address));
520 (void)try_with_gate(_task_gate, [conn = std::move(conn)]() mutable {
521 return conn->process().handle_exception([conn = std::move(conn)] (std::exception_ptr ex) {
522 hlogger.error("request error: {}", ex);
523 });
524 }).handle_exception_type([] (const gate_closed_exception& e) {});
525 }).handle_exception_type([] (const std::system_error &e) {
526 // We expect a ECONNABORTED when http_server::stop is called,
527 // no point in warning about that.
528 if (e.code().value() != ECONNABORTED) {
529 hlogger.error("accept failed: {}", e);
530 }
531 }).handle_exception([] (std::exception_ptr ex) {
532 hlogger.error("accept failed: {}", ex);
533 });
534 }
535
536 uint64_t http_server::total_connections() const {
537 return _total_connections;
538 }
539 uint64_t http_server::current_connections() const {
540 return _current_connections;
541 }
542 uint64_t http_server::requests_served() const {
543 return _requests_served;
544 }
545 uint64_t http_server::read_errors() const {
546 return _read_errors;
547 }
548 uint64_t http_server::reply_errors() const {
549 return _respond_errors;
550 }
551
552 // Write the current date in the specific "preferred format" defined in
553 // RFC 7231, Section 7.1.1.1, a.k.a. IMF (Internet Message Format) fixdate.
554 // For example: Sun, 06 Nov 1994 08:49:37 GMT
555 sstring http_server::http_date() {
556 auto t = ::time(nullptr);
557 struct tm tm;
558 gmtime_r(&t, &tm);
559 // Using strftime() would have been easier, but unfortunately relies on
560 // the current locale, and we need the month and day names in English.
561 static const char* days[] = {
562 "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"
563 };
564 static const char* months[] = {
565 "Jan", "Feb", "Mar", "Apr", "May", "Jun",
566 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
567 };
568 return seastar::format("{}, {:02d} {} {} {:02d}:{:02d}:{:02d} GMT",
569 days[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], 1900 + tm.tm_year,
570 tm.tm_hour, tm.tm_min, tm.tm_sec);
571 }
572
573
574 future<> http_server_control::start(const sstring& name) {
575 return _server_dist->start(name);
576 }
577
578 future<> http_server_control::stop() {
579 return _server_dist->stop();
580 }
581
582 future<> http_server_control::set_routes(std::function<void(routes& r)> fun) {
583 return _server_dist->invoke_on_all([fun](http_server& server) {
584 fun(server._routes);
585 });
586 }
587
588 future<> http_server_control::listen(socket_address addr) {
589 return _server_dist->invoke_on_all<future<> (http_server::*)(socket_address)>(&http_server::listen, addr);
590 }
591
592 future<> http_server_control::listen(socket_address addr, listen_options lo) {
593 return _server_dist->invoke_on_all<future<> (http_server::*)(socket_address, listen_options)>(&http_server::listen, addr, lo);
594 }
595
596 distributed<http_server>& http_server_control::server() {
597 return *_server_dist;
598 }
599
600 }
601
602 }