]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/http/httpd.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / http / httpd.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2015 Cloudius Systems
20 */
21
11fdf7f2
TL
22#include <seastar/core/sstring.hh>
23#include <seastar/core/app-template.hh>
24#include <seastar/core/circular_buffer.hh>
25#include <seastar/core/distributed.hh>
26#include <seastar/core/queue.hh>
f67539c2 27#include <seastar/core/when_all.hh>
11fdf7f2 28#include <seastar/core/metrics.hh>
9f95a23c 29#include <seastar/core/print.hh>
11fdf7f2
TL
30#include <iostream>
31#include <algorithm>
32#include <unordered_map>
33#include <queue>
34#include <bitset>
35#include <limits>
36#include <cctype>
37#include <vector>
38#include <seastar/http/httpd.hh>
20effc67 39#include <seastar/http/internal/content_source.hh>
11fdf7f2 40#include <seastar/http/reply.hh>
20effc67 41#include <seastar/util/short_streams.hh>
f67539c2 42#include <seastar/util/log.hh>
11fdf7f2
TL
43
44using namespace std::chrono_literals;
45
46namespace seastar {
47
f67539c2
TL
48logger hlogger("httpd");
49
11fdf7f2
TL
50namespace httpd {
51http_stats::http_stats(http_server& server, const sstring& name)
52 {
53 namespace sm = seastar::metrics;
54 std::vector<sm::label_instance> labels;
55
56 labels.push_back(sm::label_instance("service", name));
57 _metric_groups.add_group("httpd", {
58 sm::make_derive("connections_total", [&server] { return server.total_connections(); }, sm::description("The total number of connections opened"), labels),
59 sm::make_gauge("connections_current", [&server] { return server.current_connections(); }, sm::description("The current number of open connections"), labels),
60 sm::make_derive("read_errors", [&server] { return server.read_errors(); }, sm::description("The total number of errors while reading http requests"), labels),
61 sm::make_derive("reply_errors", [&server] { return server.reply_errors(); }, sm::description("The total number of errors while replying to http"), labels),
62 sm::make_derive("requests_served", [&server] { return server.requests_served(); }, sm::description("The total number of http requests served"), labels)
63 });
64}
65
66sstring http_server_control::generate_server_name() {
67 static thread_local uint16_t idgen;
68 return seastar::format("http-{}", idgen++);
69}
70
71future<> connection::do_response_loop() {
72 return _replies.pop_eventually().then(
73 [this] (std::unique_ptr<reply> resp) {
74 if (!resp) {
75 // eof
76 return make_ready_future<>();
77 }
78 _resp = std::move(resp);
79 return start_response().then([this] {
80 return do_response_loop();
81 });
82 });
83}
84
85future<> connection::start_response() {
86 if (_resp->_body_writer) {
87 return _resp->write_reply_to_connection(*this).then_wrapped([this] (auto f) {
88 if (f.failed()) {
89 // In case of an error during the write close the connection
90 _server._respond_errors++;
91 _done = true;
92 _replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
93 _replies.push(std::unique_ptr<reply>());
94 f.ignore_ready_future();
95 return make_ready_future<>();
96 }
97 return _write_buf.write("0\r\n\r\n", 5);
98 }).then_wrapped([this ] (auto f) {
99 if (f.failed()) {
100 // We could not write the closing sequence
101 // Something is probably wrong with the connection,
102 // we should close it, so the client will disconnect
103 _done = true;
104 _replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
105 _replies.push(std::unique_ptr<reply>());
106 f.ignore_ready_future();
107 return make_ready_future<>();
108 } else {
109 return _write_buf.flush();
110 }
111 }).then_wrapped([this] (auto f) {
112 if (f.failed()) {
113 // flush failed. just close the connection
114 _done = true;
115 _replies.abort(std::make_exception_ptr(std::logic_error("Unknown exception during body creation")));
116 _replies.push(std::unique_ptr<reply>());
117 f.ignore_ready_future();
118 }
119 _resp.reset();
120 return make_ready_future<>();
121 });
122 }
123 set_headers(*_resp);
124 _resp->_headers["Content-Length"] = to_sstring(
125 _resp->_content.size());
f67539c2 126 return _write_buf.write(_resp->_response_line.data(),
11fdf7f2
TL
127 _resp->_response_line.size()).then([this] {
128 return _resp->write_reply_headers(*this);
129 }).then([this] {
130 return _write_buf.write("\r\n", 2);
131 }).then([this] {
132 return write_body();
133 }).then([this] {
134 return _write_buf.flush();
135 }).then([this] {
136 _resp.reset();
137 });
138}
139
140connection::~connection() {
141 --_server._current_connections;
142 _server._connections.erase(_server._connections.iterator_to(*this));
11fdf7f2
TL
143}
144
f67539c2 145bool connection::url_decode(const std::string_view& in, sstring& out) {
9f95a23c
TL
146 size_t pos = 0;
147 sstring buff(in.length(), 0);
148 for (size_t i = 0; i < in.length(); ++i) {
149 if (in[i] == '%') {
150 if (i + 3 <= in.size()) {
151 buff[pos++] = hexstr_to_char(in, i + 1);
152 i += 2;
153 } else {
154 return false;
155 }
156 } else if (in[i] == '+') {
157 buff[pos++] = ' ';
158 } else {
159 buff[pos++] = in[i];
160 }
161 }
162 buff.resize(pos);
163 out = buff;
164 return true;
165}
166
11fdf7f2
TL
167void connection::on_new_connection() {
168 ++_server._total_connections;
169 ++_server._current_connections;
20effc67 170 _fd.set_nodelay(true);
11fdf7f2
TL
171 _server._connections.push_back(*this);
172}
173
174future<> connection::read() {
175 return do_until([this] {return _done;}, [this] {
176 return read_one();
177 }).then_wrapped([this] (future<> f) {
178 // swallow error
179 if (f.failed()) {
180 _server._read_errors++;
181 }
182 f.ignore_ready_future();
183 return _replies.push_eventually( {});
184 }).finally([this] {
185 return _read_buf.close();
186 });
187}
9f95a23c 188
20effc67
TL
189static input_stream<char> make_content_stream(httpd::request* req, input_stream<char>& buf) {
190 // Create an input stream based on the requests body encoding or lack thereof
191 if (request::case_insensitive_cmp()(req->get_header("Transfer-Encoding"), "chunked")) {
192 return input_stream<char>(data_source(std::make_unique<internal::chunked_source_impl>(buf, req->chunk_extensions, req->trailing_headers)));
193 } else {
194 return input_stream<char>(data_source(std::make_unique<internal::content_length_source_impl>(buf, req->content_length)));
195 }
196}
197
9f95a23c 198static future<std::unique_ptr<httpd::request>>
20effc67
TL
199set_request_content(std::unique_ptr<httpd::request> req, input_stream<char>* content_stream, bool streaming) {
200 req->content_stream = content_stream;
201
202 if (streaming) {
9f95a23c 203 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
20effc67
TL
204 } else {
205 // Read the entire content into the request content string
206 return util::read_entire_stream_contiguous(*content_stream).then([req = std::move(req)] (sstring content) mutable {
207 req->content = std::move(content);
208 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
209 });
9f95a23c 210 }
9f95a23c
TL
211}
212
213void connection::generate_error_reply_and_close(std::unique_ptr<httpd::request> req, reply::status_type status, const sstring& msg) {
214 auto resp = std::make_unique<reply>();
215 // TODO: Handle HTTP/2.0 when it releases
216 resp->set_version(req->_version);
217 resp->set_status(status, msg);
218 resp->done();
219 _done = true;
220 _replies.push(std::move(resp));
221}
222
11fdf7f2
TL
223future<> connection::read_one() {
224 _parser.init();
225 return _read_buf.consume(_parser).then([this] () mutable {
226 if (_parser.eof()) {
227 _done = true;
228 return make_ready_future<>();
229 }
230 ++_server._requests_served;
231 std::unique_ptr<httpd::request> req = _parser.get_parsed_request();
9f95a23c
TL
232 if (_server._credentials) {
233 req->protocol_name = "https";
234 }
f67539c2
TL
235 if (_parser.failed()) {
236 if (req->_version.empty()) {
237 // we might have failed to parse even the version
238 req->_version = "1.1";
239 }
240 generate_error_reply_and_close(std::move(req), reply::status_type::bad_request, "Can't parse the request");
241 return make_ready_future<>();
242 }
9f95a23c
TL
243
244 size_t content_length_limit = _server.get_content_length_limit();
245 sstring length_header = req->get_header("Content-Length");
246 req->content_length = strtol(length_header.c_str(), nullptr, 10);
11fdf7f2 247
9f95a23c 248 if (req->content_length > content_length_limit) {
f67539c2
TL
249 auto msg = format("Content length limit ({}) exceeded: {}", content_length_limit, req->content_length);
250 generate_error_reply_and_close(std::move(req), reply::status_type::payload_too_large, std::move(msg));
9f95a23c
TL
251 return make_ready_future<>();
252 }
f67539c2 253
20effc67
TL
254 sstring encoding = req->get_header("Transfer-Encoding");
255 if (encoding.size() && !request::case_insensitive_cmp()(encoding, "chunked")){
256 //TODO: add "identity", "gzip"("x-gzip"), "compress"("x-compress"), and "deflate" encodings and their combinations
257 generate_error_reply_and_close(std::move(req), reply::status_type::not_implemented, format("Encodings other than \"chunked\" are not implemented (received encoding: \"{}\")", encoding));
258 return make_ready_future<>();
259 }
260
f67539c2
TL
261 auto maybe_reply_continue = [this, req = std::move(req)] () mutable {
262 if (req->_version == "1.1" && request::case_insensitive_cmp()(req->get_header("Expect"), "100-continue")){
263 return _replies.not_full().then([req = std::move(req), this] () mutable {
264 auto continue_reply = std::make_unique<reply>();
265 set_headers(*continue_reply);
266 continue_reply->set_version(req->_version);
267 continue_reply->set_status(reply::status_type::continue_).done();
268 this->_replies.push(std::move(continue_reply));
269 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
270 });
271 } else {
272 return make_ready_future<std::unique_ptr<httpd::request>>(std::move(req));
273 }
274 };
275
276 return maybe_reply_continue().then([this] (std::unique_ptr<httpd::request> req) {
20effc67
TL
277 return do_with(make_content_stream(req.get(), _read_buf), sstring(req->_version), std::move(req), [this] (input_stream<char>& content_stream, sstring& version, std::unique_ptr<httpd::request>& req) {
278 return set_request_content(std::move(req), &content_stream, _server.get_content_streaming()).then([this, &content_stream] (std::unique_ptr<httpd::request> req) {
279 return _replies.not_full().then([this, req = std::move(req)] () mutable {
280 return generate_reply(std::move(req));
281 }).then([this, &content_stream](bool done) {
282 _done = done;
283 // If the handler did not read the entire request
284 // content, this connection cannot be reused so we
285 // need to close it (via "_done = true"). But we can't
286 // just check content_stream.eof(): It may only become
287 // true after read(). Issue #907.
288 return content_stream.read().then([this] (temporary_buffer<char> buf) {
289 if (!buf.empty()) {
290 _done = true;
291 }
292 });
293 });
294 }).handle_exception_type([this, &version] (const base_exception& e) mutable {
295 // If the request had a "Transfer-Encoding: chunked" header and content streaming wasn't enabled, we might have failed
296 // before passing the request to handler - when we were parsing chunks
297 auto err_req = std::make_unique<httpd::request>();
298 err_req->_version = version;
299 generate_error_reply_and_close(std::move(err_req), e.status(), e.str());
f67539c2 300 });
9f95a23c 301 });
11fdf7f2
TL
302 });
303 });
304}
305
f67539c2
TL
306future<> connection::process() {
307 // Launch read and write "threads" simultaneously:
308 return when_all(read(), respond()).then(
309 [] (std::tuple<future<>, future<>> joined) {
310 try {
311 std::get<0>(joined).get();
312 } catch (...) {
313 hlogger.debug("Read exception encountered: {}", std::current_exception());
314 }
315 try {
316 std::get<1>(joined).get();
317 } catch (...) {
318 hlogger.debug("Response exception encountered: {}", std::current_exception());
319 }
320 return make_ready_future<>();
321 });
322}
323void connection::shutdown() {
324 _fd.shutdown_input();
325 _fd.shutdown_output();
326}
327
328future<> connection::write_reply_headers(
329 std::unordered_map<sstring, sstring>::iterator hi) {
330 if (hi == _resp->_headers.end()) {
331 return make_ready_future<>();
332 }
333 return _write_buf.write(hi->first.data(), hi->first.size()).then(
334 [this] {
335 return _write_buf.write(": ", 2);
336 }).then([hi, this] {
337 return _write_buf.write(hi->second.data(), hi->second.size());
338 }).then([this] {
339 return _write_buf.write("\r\n", 2);
340 }).then([hi, this] () mutable {
341 return write_reply_headers(++hi);
342 });
343}
344
345short connection::hex_to_byte(char c) {
346 if (c >='a' && c <= 'z') {
347 return c - 'a' + 10;
348 } else if (c >='A' && c <= 'Z') {
349 return c - 'A' + 10;
350 }
351 return c - '0';
352}
353
354/**
355 * Convert a hex encoded 2 bytes substring to char
356 */
357char connection::hexstr_to_char(const std::string_view& in, size_t from) {
358
359 return static_cast<char>(hex_to_byte(in[from]) * 16 + hex_to_byte(in[from + 1]));
360}
361
362void connection::add_param(request& req, const std::string_view& param) {
363 size_t split = param.find('=');
364
365 if (split >= param.length() - 1) {
366 sstring key;
367 if (url_decode(param.substr(0,split) , key)) {
368 req.query_parameters[key] = "";
369 }
370 } else {
371 sstring key;
372 sstring value;
373 if (url_decode(param.substr(0,split), key)
374 && url_decode(param.substr(split + 1), value)) {
375 req.query_parameters[key] = value;
376 }
377 }
378
379}
380
381sstring connection::set_query_param(request& req) {
382 size_t pos = req._url.find('?');
383 if (pos == sstring::npos) {
384 return req._url;
385 }
386 size_t curr = pos + 1;
387 size_t end_param;
388 std::string_view url = req._url;
389 while ((end_param = req._url.find('&', curr)) != sstring::npos) {
390 add_param(req, url.substr(curr, end_param - curr) );
391 curr = end_param + 1;
392 }
393 add_param(req, url.substr(curr));
394 return req._url.substr(0, pos);
395}
396
397output_stream<char>& connection::out() {
398 return _write_buf;
399}
400
11fdf7f2
TL
401future<> connection::respond() {
402 return do_response_loop().then_wrapped([this] (future<> f) {
403 // swallow error
404 if (f.failed()) {
405 _server._respond_errors++;
406 }
407 f.ignore_ready_future();
408 return _write_buf.close();
409 });
410}
411
412future<> connection::write_body() {
f67539c2 413 return _write_buf.write(_resp->_content.data(),
11fdf7f2
TL
414 _resp->_content.size());
415}
416
417void connection::set_headers(reply& resp) {
418 resp._headers["Server"] = "Seastar httpd";
419 resp._headers["Date"] = _server._date;
420}
421
422future<bool> connection::generate_reply(std::unique_ptr<request> req) {
423 auto resp = std::make_unique<reply>();
424 bool conn_keep_alive = false;
425 bool conn_close = false;
426 auto it = req->_headers.find("Connection");
427 if (it != req->_headers.end()) {
428 if (it->second == "Keep-Alive") {
429 conn_keep_alive = true;
430 } else if (it->second == "Close") {
431 conn_close = true;
432 }
433 }
434 bool should_close;
435 // TODO: Handle HTTP/2.0 when it releases
436 resp->set_version(req->_version);
437
438 if (req->_version == "1.0") {
439 if (conn_keep_alive) {
440 resp->_headers["Connection"] = "Keep-Alive";
441 }
442 should_close = !conn_keep_alive;
443 } else if (req->_version == "1.1") {
444 should_close = conn_close;
445 } else {
446 // HTTP/0.9 goes here
447 should_close = true;
448 }
449 sstring url = set_query_param(*req.get());
450 sstring version = req->_version;
451 set_headers(*resp);
11fdf7f2
TL
452 return _server._routes.handle(url, std::move(req), std::move(resp)).
453 // Caller guarantees enough room
454 then([this, should_close, version = std::move(version)](std::unique_ptr<reply> rep) {
455 rep->set_version(version).done();
456 this->_replies.push(std::move(rep));
457 return make_ready_future<bool>(should_close);
458 });
459}
f67539c2
TL
460
461void http_server::set_tls_credentials(shared_ptr<seastar::tls::server_credentials> credentials) {
462 _credentials = credentials;
463}
464
465size_t http_server::get_content_length_limit() const {
466 return _content_length_limit;
467}
468
469void http_server::set_content_length_limit(size_t limit) {
470 _content_length_limit = limit;
471}
472
20effc67
TL
473bool http_server::get_content_streaming() const {
474 return _content_streaming;
475}
476
477void http_server::set_content_streaming(bool b) {
478 _content_streaming = b;
479}
480
f67539c2
TL
481future<> http_server::listen(socket_address addr, listen_options lo) {
482 if (_credentials) {
483 _listeners.push_back(seastar::tls::listen(_credentials, addr, lo));
484 } else {
485 _listeners.push_back(seastar::listen(addr, lo));
486 }
487 return do_accepts(_listeners.size() - 1);
488}
489future<> http_server::listen(socket_address addr) {
490 listen_options lo;
491 lo.reuse_address = true;
492 return listen(addr, lo);
493}
494future<> http_server::stop() {
495 future<> tasks_done = _task_gate.close();
496 for (auto&& l : _listeners) {
497 l.abort_accept();
498 }
499 for (auto&& c : _connections) {
500 c.shutdown();
501 }
502 return tasks_done;
503}
504
505// FIXME: This could return void
506future<> http_server::do_accepts(int which) {
507 (void)try_with_gate(_task_gate, [this, which] {
508 return keep_doing([this, which] {
509 return try_with_gate(_task_gate, [this, which] {
510 return do_accept_one(which);
511 });
512 }).handle_exception_type([](const gate_closed_exception& e) {});
513 }).handle_exception_type([](const gate_closed_exception& e) {});
514 return make_ready_future<>();
515}
516
517future<> http_server::do_accept_one(int which) {
518 return _listeners[which].accept().then([this] (accept_result ar) mutable {
519 auto conn = std::make_unique<connection>(*this, std::move(ar.connection), std::move(ar.remote_address));
520 (void)try_with_gate(_task_gate, [conn = std::move(conn)]() mutable {
521 return conn->process().handle_exception([conn = std::move(conn)] (std::exception_ptr ex) {
522 hlogger.error("request error: {}", ex);
523 });
524 }).handle_exception_type([] (const gate_closed_exception& e) {});
525 }).handle_exception_type([] (const std::system_error &e) {
526 // We expect a ECONNABORTED when http_server::stop is called,
527 // no point in warning about that.
528 if (e.code().value() != ECONNABORTED) {
529 hlogger.error("accept failed: {}", e);
530 }
531 }).handle_exception([] (std::exception_ptr ex) {
532 hlogger.error("accept failed: {}", ex);
533 });
534}
535
536uint64_t http_server::total_connections() const {
537 return _total_connections;
538}
539uint64_t http_server::current_connections() const {
540 return _current_connections;
541}
542uint64_t http_server::requests_served() const {
543 return _requests_served;
544}
545uint64_t http_server::read_errors() const {
546 return _read_errors;
547}
548uint64_t http_server::reply_errors() const {
549 return _respond_errors;
550}
551
552// Write the current date in the specific "preferred format" defined in
553// RFC 7231, Section 7.1.1.1, a.k.a. IMF (Internet Message Format) fixdate.
554// For example: Sun, 06 Nov 1994 08:49:37 GMT
555sstring http_server::http_date() {
556 auto t = ::time(nullptr);
557 struct tm tm;
558 gmtime_r(&t, &tm);
559 // Using strftime() would have been easier, but unfortunately relies on
560 // the current locale, and we need the month and day names in English.
561 static const char* days[] = {
562 "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"
563 };
564 static const char* months[] = {
565 "Jan", "Feb", "Mar", "Apr", "May", "Jun",
566 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
567 };
568 return seastar::format("{}, {:02d} {} {} {:02d}:{:02d}:{:02d} GMT",
569 days[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], 1900 + tm.tm_year,
570 tm.tm_hour, tm.tm_min, tm.tm_sec);
571}
572
573
574future<> http_server_control::start(const sstring& name) {
575 return _server_dist->start(name);
576}
577
578future<> http_server_control::stop() {
579 return _server_dist->stop();
580}
581
582future<> http_server_control::set_routes(std::function<void(routes& r)> fun) {
583 return _server_dist->invoke_on_all([fun](http_server& server) {
584 fun(server._routes);
585 });
586}
587
588future<> http_server_control::listen(socket_address addr) {
589 return _server_dist->invoke_on_all<future<> (http_server::*)(socket_address)>(&http_server::listen, addr);
590}
591
592future<> http_server_control::listen(socket_address addr, listen_options lo) {
593 return _server_dist->invoke_on_all<future<> (http_server::*)(socket_address, listen_options)>(&http_server::listen, addr, lo);
594}
595
596distributed<http_server>& http_server_control::server() {
597 return *_server_dist;
598}
599
11fdf7f2
TL
600}
601
602}