]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_asio_frontend.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
b32b8144 4#include <atomic>
7c673cae
FG
5#include <condition_variable>
6#include <mutex>
7#include <thread>
8#include <vector>
9
10#include <boost/asio.hpp>
7c673cae 11
91327a77
AA
12#include "common/errno.h"
13
7c673cae 14#include "rgw_asio_client.h"
b32b8144 15#include "rgw_asio_frontend.h"
7c673cae
FG
16
17#define dout_subsys ceph_subsys_rgw
18
7c673cae
FG
19namespace {
20
21class Pauser {
22 std::mutex mutex;
23 std::condition_variable cond_ready; // signaled on ready==true
24 std::condition_variable cond_paused; // signaled on waiters==thread_count
25 bool ready{false};
26 int waiters{0};
27 public:
28 template <typename Func>
29 void pause(int thread_count, Func&& func);
30 void unpause();
31 void wait();
32};
33
34template <typename Func>
35void Pauser::pause(int thread_count, Func&& func)
36{
37 std::unique_lock<std::mutex> lock(mutex);
38 ready = false;
39 lock.unlock();
40
41 func();
42
43 // wait for all threads to pause
44 lock.lock();
45 cond_paused.wait(lock, [=] { return waiters == thread_count; });
46}
47
48void Pauser::unpause()
49{
50 std::lock_guard<std::mutex> lock(mutex);
51 ready = true;
52 cond_ready.notify_all();
53}
54
55void Pauser::wait()
56{
57 std::unique_lock<std::mutex> lock(mutex);
58 ++waiters;
59 cond_paused.notify_one(); // notify pause() that we're waiting
60 cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
61 --waiters;
62}
63
64using tcp = boost::asio::ip::tcp;
b32b8144 65namespace beast = boost::beast;
7c673cae 66
b32b8144
FG
67class Connection {
68 RGWProcessEnv& env;
69 boost::asio::io_service::strand strand;
70 tcp::socket socket;
71
72 // references are bound to callbacks for async operations. if a callback
73 // function returns without issuing another operation, the reference is
74 // dropped and the Connection is deleted/closed
75 std::atomic<int> nref{0};
76 using Ref = boost::intrusive_ptr<Connection>;
77
78 // limit header to 4k, since we read it all into a single flat_buffer
79 static constexpr size_t header_limit = 4096;
80 // don't impose a limit on the body, since we read it in pieces
81 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
82
83 beast::flat_buffer buffer;
84 boost::optional<rgw::asio::parser_type> parser;
85
86 using bad_response_type = beast::http::response<beast::http::empty_body>;
87 boost::optional<bad_response_type> response;
88
89 CephContext* ctx() const { return env.store->ctx(); }
7c673cae 90
b32b8144
FG
91 void read_header() {
92 // configure the parser
93 parser.emplace();
94 parser->header_limit(header_limit);
95 parser->body_limit(body_limit);
7c673cae 96
7c673cae 97 // parse the header
b32b8144
FG
98 beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
99 std::bind(&Connection::on_header, Ref{this},
100 std::placeholders::_1)));
101 }
102
103 void discard_unread_message() {
104 if (parser->is_done()) {
105 // nothing left to discard, start reading the next message
106 read_header();
107 return;
108 }
109
110 // read the rest of the request into a static buffer. multiple clients could
111 // write at the same time, but this is okay because we never read it back
112 static std::array<char, 1024> discard_buffer;
113
114 auto& body = parser->get().body();
115 body.size = discard_buffer.size();
116 body.data = discard_buffer.data();
7c673cae 117
b32b8144
FG
118 beast::http::async_read_some(socket, buffer, *parser, strand.wrap(
119 std::bind(&Connection::on_discard_unread, Ref{this},
120 std::placeholders::_1)));
121 }
122
123 void on_discard_unread(boost::system::error_code ec) {
124 if (ec == boost::asio::error::connection_reset) {
125 return;
126 }
127 if (ec) {
128 ldout(ctx(), 5) << "discard_unread_message failed: "
129 << ec.message() << dendl;
130 return;
131 }
132 discard_unread_message();
133 }
134
135 void on_write_error(boost::system::error_code ec) {
136 if (ec) {
137 ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
138 }
139 }
140
141 void on_header(boost::system::error_code ec) {
7c673cae 142 if (ec == boost::asio::error::connection_reset ||
b32b8144 143 ec == beast::http::error::end_of_stream) {
7c673cae
FG
144 return;
145 }
146 if (ec) {
b32b8144
FG
147 auto& message = parser->get();
148 ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
149 ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
150 response.emplace();
151 response->result(beast::http::status::bad_request);
152 response->version(message.version() == 10 ? 10 : 11);
153 response->prepare_payload();
154 beast::http::async_write(socket, *response, strand.wrap(
155 std::bind(&Connection::on_write_error, Ref{this},
156 std::placeholders::_1)));
7c673cae
FG
157 return;
158 }
159
160 // process the request
161 RGWRequest req{env.store->get_new_req_id()};
162
b32b8144 163 rgw::asio::ClientIO real_client{socket, *parser, buffer};
7c673cae
FG
164
165 auto real_client_io = rgw::io::add_reordering(
b32b8144 166 rgw::io::add_buffering(ctx(),
7c673cae
FG
167 rgw::io::add_chunking(
168 rgw::io::add_conlen_controlling(
169 &real_client))));
b32b8144 170 RGWRestfulIO client(ctx(), &real_client_io);
7c673cae
FG
171 process_request(env.store, env.rest, &req, env.uri_prefix,
172 *env.auth_registry, &client, env.olog);
173
b32b8144
FG
174 if (parser->keep_alive()) {
175 // parse any unread bytes from the previous message (in case we replied
176 // before reading the entire body) before reading the next
177 discard_unread_message();
7c673cae
FG
178 }
179 }
b32b8144
FG
180
181 public:
182 Connection(RGWProcessEnv& env, tcp::socket&& socket)
183 : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
184
185 void on_connect() {
186 read_header();
187 }
188
189 void get() { ++nref; }
190 void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
191
192 friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
193 friend void intrusive_ptr_release(Connection *c) { c->put(); }
194};
195
7c673cae
FG
196class AsioFrontend {
197 RGWProcessEnv env;
94b18763 198 RGWFrontendConfig* conf;
7c673cae
FG
199 boost::asio::io_service service;
200
28e407b8
AA
201 struct Listener {
202 tcp::endpoint endpoint;
203 tcp::acceptor acceptor;
204 tcp::socket socket;
205
206 Listener(boost::asio::io_service& service)
207 : acceptor(service), socket(service) {}
208 };
209 std::vector<Listener> listeners;
7c673cae
FG
210
211 std::vector<std::thread> threads;
212 Pauser pauser;
213 std::atomic<bool> going_down{false};
214
215 CephContext* ctx() const { return env.store->ctx(); }
216
28e407b8 217 void accept(Listener& listener, boost::system::error_code ec);
7c673cae
FG
218
219 public:
94b18763 220 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
28e407b8 221 : env(env), conf(conf) {}
7c673cae
FG
222
223 int init();
224 int run();
225 void stop();
226 void join();
227 void pause();
228 void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
229};
230
28e407b8
AA
231unsigned short parse_port(const char *input, boost::system::error_code& ec)
232{
233 char *end = nullptr;
234 auto port = std::strtoul(input, &end, 10);
235 if (port > std::numeric_limits<unsigned short>::max()) {
236 ec.assign(ERANGE, boost::system::system_category());
237 } else if (port == 0 && end == input) {
238 ec.assign(EINVAL, boost::system::system_category());
239 }
240 return port;
241}
242
243tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input,
244 boost::system::error_code& ec)
7c673cae 245{
28e407b8 246 tcp::endpoint endpoint;
7c673cae 247
28e407b8
AA
248 auto colon = input.find(':');
249 if (colon != input.npos) {
250 auto port_str = input.substr(colon + 1);
251 endpoint.port(parse_port(port_str.data(), ec));
252 } else {
253 endpoint.port(80);
254 }
255 if (!ec) {
256 auto addr = input.substr(0, colon);
257 endpoint.address(boost::asio::ip::make_address(addr, ec));
258 }
259 return endpoint;
260}
261
91327a77
AA
262static int drop_privileges(CephContext *ctx)
263{
264 uid_t uid = ctx->get_set_uid();
265 gid_t gid = ctx->get_set_gid();
266 std::string uid_string = ctx->get_set_uid_string();
267 std::string gid_string = ctx->get_set_gid_string();
268 if (gid && setgid(gid) != 0) {
269 int err = errno;
270 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
271 return -err;
272 }
273 if (uid && setuid(uid) != 0) {
274 int err = errno;
275 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
276 return -err;
277 }
278 if (uid && gid) {
279 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
280 << " (" << uid_string << ":" << gid_string << ")" << dendl;
281 }
282 return 0;
283}
284
28e407b8
AA
285int AsioFrontend::init()
286{
7c673cae 287 boost::system::error_code ec;
28e407b8 288 auto& config = conf->get_config_map();
94b18763 289
28e407b8
AA
290 // parse endpoints
291 auto range = config.equal_range("port");
292 for (auto i = range.first; i != range.second; ++i) {
293 auto port = parse_port(i->second.c_str(), ec);
94b18763 294 if (ec) {
28e407b8 295 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
94b18763
FG
296 return -ec.value();
297 }
28e407b8
AA
298 listeners.emplace_back(service);
299 listeners.back().endpoint.port(port);
94b18763
FG
300 }
301
28e407b8
AA
302 range = config.equal_range("endpoint");
303 for (auto i = range.first; i != range.second; ++i) {
304 auto endpoint = parse_endpoint(i->second, ec);
305 if (ec) {
306 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
307 return -ec.value();
308 }
309 listeners.emplace_back(service);
310 listeners.back().endpoint = endpoint;
7c673cae 311 }
28e407b8
AA
312
313 // start listeners
314 for (auto& l : listeners) {
315 l.acceptor.open(l.endpoint.protocol(), ec);
316 if (ec) {
317 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
318 return -ec.value();
319 }
320 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
321 l.acceptor.bind(l.endpoint, ec);
322 if (ec) {
323 lderr(ctx()) << "failed to bind address " << l.endpoint
324 << ": " << ec.message() << dendl;
325 return -ec.value();
326 }
327 l.acceptor.listen(boost::asio::socket_base::max_connections);
328 l.acceptor.async_accept(l.socket,
329 [this, &l] (boost::system::error_code ec) {
330 accept(l, ec);
331 });
332
333 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
7c673cae 334 }
91327a77 335 return drop_privileges(ctx());
7c673cae
FG
336}
337
28e407b8 338void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
7c673cae 339{
28e407b8 340 if (!l.acceptor.is_open()) {
7c673cae
FG
341 return;
342 } else if (ec == boost::asio::error::operation_aborted) {
343 return;
344 } else if (ec) {
345 throw ec;
346 }
28e407b8
AA
347 auto socket = std::move(l.socket);
348 l.acceptor.async_accept(l.socket,
349 [this, &l] (boost::system::error_code ec) {
350 accept(l, ec);
351 });
b32b8144
FG
352
353 boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
354 conn->on_connect();
355 // reference drops here, but on_connect() takes another
7c673cae
FG
356}
357
358int AsioFrontend::run()
359{
360 auto cct = ctx();
361 const int thread_count = cct->_conf->rgw_thread_pool_size;
362 threads.reserve(thread_count);
363
364 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
365
366 for (int i = 0; i < thread_count; i++) {
367 threads.emplace_back([=] {
368 for (;;) {
369 service.run();
370 if (going_down) {
371 break;
372 }
373 pauser.wait();
374 }
375 });
376 }
377 return 0;
378}
379
380void AsioFrontend::stop()
381{
382 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
383
384 going_down = true;
385
386 boost::system::error_code ec;
28e407b8
AA
387 // close all listeners
388 for (auto& listener : listeners) {
389 listener.acceptor.close(ec);
390 }
b32b8144
FG
391
392 // unblock the run() threads
393 service.stop();
7c673cae
FG
394}
395
396void AsioFrontend::join()
397{
398 if (!going_down) {
399 stop();
400 }
401 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
402 for (auto& thread : threads) {
403 thread.join();
404 }
405 ldout(ctx(), 4) << "frontend done" << dendl;
406}
407
408void AsioFrontend::pause()
409{
410 ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
411 pauser.pause(threads.size(), [=] {
b32b8144
FG
412 // unblock the run() threads
413 service.stop();
7c673cae
FG
414 });
415 ldout(ctx(), 4) << "frontend paused" << dendl;
416}
417
418void AsioFrontend::unpause(RGWRados* const store,
419 rgw_auth_registry_ptr_t auth_registry)
420{
421 env.store = store;
422 env.auth_registry = std::move(auth_registry);
423 ldout(ctx(), 4) << "frontend unpaused" << dendl;
424 service.reset();
7c673cae
FG
425 pauser.unpause();
426}
427
428} // anonymous namespace
429
430class RGWAsioFrontend::Impl : public AsioFrontend {
431 public:
94b18763 432 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf) : AsioFrontend(env, conf) {}
7c673cae
FG
433};
434
94b18763
FG
435RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
436 RGWFrontendConfig* conf)
437 : impl(new Impl(env, conf))
7c673cae
FG
438{
439}
440
441RGWAsioFrontend::~RGWAsioFrontend() = default;
442
443int RGWAsioFrontend::init()
444{
445 return impl->init();
446}
447
448int RGWAsioFrontend::run()
449{
450 return impl->run();
451}
452
453void RGWAsioFrontend::stop()
454{
455 impl->stop();
456}
457
458void RGWAsioFrontend::join()
459{
460 impl->join();
461}
462
463void RGWAsioFrontend::pause_for_new_config()
464{
465 impl->pause();
466}
467
468void RGWAsioFrontend::unpause_with_new_config(
469 RGWRados* const store,
470 rgw_auth_registry_ptr_t auth_registry
471) {
472 impl->unpause(store, std::move(auth_registry));
473}