]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <atomic>
5 #include <condition_variable>
6 #include <mutex>
7 #include <thread>
8 #include <vector>
9
10 #include <boost/asio.hpp>
11
12 #include "rgw_asio_client.h"
13 #include "rgw_asio_frontend.h"
14
15 #define dout_subsys ceph_subsys_rgw
16
17 namespace {
18
19 class Pauser {
20 std::mutex mutex;
21 std::condition_variable cond_ready; // signaled on ready==true
22 std::condition_variable cond_paused; // signaled on waiters==thread_count
23 bool ready{false};
24 int waiters{0};
25 public:
26 template <typename Func>
27 void pause(int thread_count, Func&& func);
28 void unpause();
29 void wait();
30 };
31
32 template <typename Func>
33 void Pauser::pause(int thread_count, Func&& func)
34 {
35 std::unique_lock<std::mutex> lock(mutex);
36 ready = false;
37 lock.unlock();
38
39 func();
40
41 // wait for all threads to pause
42 lock.lock();
43 cond_paused.wait(lock, [=] { return waiters == thread_count; });
44 }
45
46 void Pauser::unpause()
47 {
48 std::lock_guard<std::mutex> lock(mutex);
49 ready = true;
50 cond_ready.notify_all();
51 }
52
53 void Pauser::wait()
54 {
55 std::unique_lock<std::mutex> lock(mutex);
56 ++waiters;
57 cond_paused.notify_one(); // notify pause() that we're waiting
58 cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
59 --waiters;
60 }
61
62 using tcp = boost::asio::ip::tcp;
63 namespace beast = boost::beast;
64
65 class Connection {
66 RGWProcessEnv& env;
67 boost::asio::io_service::strand strand;
68 tcp::socket socket;
69
70 // references are bound to callbacks for async operations. if a callback
71 // function returns without issuing another operation, the reference is
72 // dropped and the Connection is deleted/closed
73 std::atomic<int> nref{0};
74 using Ref = boost::intrusive_ptr<Connection>;
75
76 // limit header to 4k, since we read it all into a single flat_buffer
77 static constexpr size_t header_limit = 4096;
78 // don't impose a limit on the body, since we read it in pieces
79 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
80
81 beast::flat_buffer buffer;
82 boost::optional<rgw::asio::parser_type> parser;
83
84 using bad_response_type = beast::http::response<beast::http::empty_body>;
85 boost::optional<bad_response_type> response;
86
87 CephContext* ctx() const { return env.store->ctx(); }
88
89 void read_header() {
90 // configure the parser
91 parser.emplace();
92 parser->header_limit(header_limit);
93 parser->body_limit(body_limit);
94
95 // parse the header
96 beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
97 std::bind(&Connection::on_header, Ref{this},
98 std::placeholders::_1)));
99 }
100
101 void discard_unread_message() {
102 if (parser->is_done()) {
103 // nothing left to discard, start reading the next message
104 read_header();
105 return;
106 }
107
108 // read the rest of the request into a static buffer. multiple clients could
109 // write at the same time, but this is okay because we never read it back
110 static std::array<char, 1024> discard_buffer;
111
112 auto& body = parser->get().body();
113 body.size = discard_buffer.size();
114 body.data = discard_buffer.data();
115
116 beast::http::async_read_some(socket, buffer, *parser, strand.wrap(
117 std::bind(&Connection::on_discard_unread, Ref{this},
118 std::placeholders::_1)));
119 }
120
121 void on_discard_unread(boost::system::error_code ec) {
122 if (ec == boost::asio::error::connection_reset) {
123 return;
124 }
125 if (ec) {
126 ldout(ctx(), 5) << "discard_unread_message failed: "
127 << ec.message() << dendl;
128 return;
129 }
130 discard_unread_message();
131 }
132
133 void on_write_error(boost::system::error_code ec) {
134 if (ec) {
135 ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
136 }
137 }
138
139 void on_header(boost::system::error_code ec) {
140 if (ec == boost::asio::error::connection_reset ||
141 ec == beast::http::error::end_of_stream) {
142 return;
143 }
144 if (ec) {
145 auto& message = parser->get();
146 ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
147 ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
148 response.emplace();
149 response->result(beast::http::status::bad_request);
150 response->version(message.version() == 10 ? 10 : 11);
151 response->prepare_payload();
152 beast::http::async_write(socket, *response, strand.wrap(
153 std::bind(&Connection::on_write_error, Ref{this},
154 std::placeholders::_1)));
155 return;
156 }
157
158 // process the request
159 RGWRequest req{env.store->get_new_req_id()};
160
161 rgw::asio::ClientIO real_client{socket, *parser, buffer};
162
163 auto real_client_io = rgw::io::add_reordering(
164 rgw::io::add_buffering(ctx(),
165 rgw::io::add_chunking(
166 rgw::io::add_conlen_controlling(
167 &real_client))));
168 RGWRestfulIO client(ctx(), &real_client_io);
169 process_request(env.store, env.rest, &req, env.uri_prefix,
170 *env.auth_registry, &client, env.olog);
171
172 if (parser->keep_alive()) {
173 // parse any unread bytes from the previous message (in case we replied
174 // before reading the entire body) before reading the next
175 discard_unread_message();
176 }
177 }
178
179 public:
180 Connection(RGWProcessEnv& env, tcp::socket&& socket)
181 : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
182
183 void on_connect() {
184 read_header();
185 }
186
187 void get() { ++nref; }
188 void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
189
190 friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
191 friend void intrusive_ptr_release(Connection *c) { c->put(); }
192 };
193
194 class AsioFrontend {
195 RGWProcessEnv env;
196 RGWFrontendConfig* conf;
197 boost::asio::io_service service;
198
199 struct Listener {
200 tcp::endpoint endpoint;
201 tcp::acceptor acceptor;
202 tcp::socket socket;
203
204 Listener(boost::asio::io_service& service)
205 : acceptor(service), socket(service) {}
206 };
207 std::vector<Listener> listeners;
208
209 std::vector<std::thread> threads;
210 Pauser pauser;
211 std::atomic<bool> going_down{false};
212
213 CephContext* ctx() const { return env.store->ctx(); }
214
215 void accept(Listener& listener, boost::system::error_code ec);
216
217 public:
218 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
219 : env(env), conf(conf) {}
220
221 int init();
222 int run();
223 void stop();
224 void join();
225 void pause();
226 void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
227 };
228
229 unsigned short parse_port(const char *input, boost::system::error_code& ec)
230 {
231 char *end = nullptr;
232 auto port = std::strtoul(input, &end, 10);
233 if (port > std::numeric_limits<unsigned short>::max()) {
234 ec.assign(ERANGE, boost::system::system_category());
235 } else if (port == 0 && end == input) {
236 ec.assign(EINVAL, boost::system::system_category());
237 }
238 return port;
239 }
240
241 tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input,
242 boost::system::error_code& ec)
243 {
244 tcp::endpoint endpoint;
245
246 auto colon = input.find(':');
247 if (colon != input.npos) {
248 auto port_str = input.substr(colon + 1);
249 endpoint.port(parse_port(port_str.data(), ec));
250 } else {
251 endpoint.port(80);
252 }
253 if (!ec) {
254 auto addr = input.substr(0, colon);
255 endpoint.address(boost::asio::ip::make_address(addr, ec));
256 }
257 return endpoint;
258 }
259
260 int AsioFrontend::init()
261 {
262 boost::system::error_code ec;
263 auto& config = conf->get_config_map();
264
265 // parse endpoints
266 auto range = config.equal_range("port");
267 for (auto i = range.first; i != range.second; ++i) {
268 auto port = parse_port(i->second.c_str(), ec);
269 if (ec) {
270 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
271 return -ec.value();
272 }
273 listeners.emplace_back(service);
274 listeners.back().endpoint.port(port);
275 }
276
277 range = config.equal_range("endpoint");
278 for (auto i = range.first; i != range.second; ++i) {
279 auto endpoint = parse_endpoint(i->second, ec);
280 if (ec) {
281 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
282 return -ec.value();
283 }
284 listeners.emplace_back(service);
285 listeners.back().endpoint = endpoint;
286 }
287
288 // start listeners
289 for (auto& l : listeners) {
290 l.acceptor.open(l.endpoint.protocol(), ec);
291 if (ec) {
292 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
293 return -ec.value();
294 }
295 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
296 l.acceptor.bind(l.endpoint, ec);
297 if (ec) {
298 lderr(ctx()) << "failed to bind address " << l.endpoint
299 << ": " << ec.message() << dendl;
300 return -ec.value();
301 }
302 l.acceptor.listen(boost::asio::socket_base::max_connections);
303 l.acceptor.async_accept(l.socket,
304 [this, &l] (boost::system::error_code ec) {
305 accept(l, ec);
306 });
307
308 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
309 }
310 return 0;
311 }
312
313 void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
314 {
315 if (!l.acceptor.is_open()) {
316 return;
317 } else if (ec == boost::asio::error::operation_aborted) {
318 return;
319 } else if (ec) {
320 throw ec;
321 }
322 auto socket = std::move(l.socket);
323 l.acceptor.async_accept(l.socket,
324 [this, &l] (boost::system::error_code ec) {
325 accept(l, ec);
326 });
327
328 boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
329 conn->on_connect();
330 // reference drops here, but on_connect() takes another
331 }
332
333 int AsioFrontend::run()
334 {
335 auto cct = ctx();
336 const int thread_count = cct->_conf->rgw_thread_pool_size;
337 threads.reserve(thread_count);
338
339 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
340
341 for (int i = 0; i < thread_count; i++) {
342 threads.emplace_back([=] {
343 for (;;) {
344 service.run();
345 if (going_down) {
346 break;
347 }
348 pauser.wait();
349 }
350 });
351 }
352 return 0;
353 }
354
355 void AsioFrontend::stop()
356 {
357 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
358
359 going_down = true;
360
361 boost::system::error_code ec;
362 // close all listeners
363 for (auto& listener : listeners) {
364 listener.acceptor.close(ec);
365 }
366
367 // unblock the run() threads
368 service.stop();
369 }
370
371 void AsioFrontend::join()
372 {
373 if (!going_down) {
374 stop();
375 }
376 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
377 for (auto& thread : threads) {
378 thread.join();
379 }
380 ldout(ctx(), 4) << "frontend done" << dendl;
381 }
382
383 void AsioFrontend::pause()
384 {
385 ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
386 pauser.pause(threads.size(), [=] {
387 // unblock the run() threads
388 service.stop();
389 });
390 ldout(ctx(), 4) << "frontend paused" << dendl;
391 }
392
393 void AsioFrontend::unpause(RGWRados* const store,
394 rgw_auth_registry_ptr_t auth_registry)
395 {
396 env.store = store;
397 env.auth_registry = std::move(auth_registry);
398 ldout(ctx(), 4) << "frontend unpaused" << dendl;
399 service.reset();
400 pauser.unpause();
401 }
402
403 } // anonymous namespace
404
405 class RGWAsioFrontend::Impl : public AsioFrontend {
406 public:
407 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf) : AsioFrontend(env, conf) {}
408 };
409
410 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
411 RGWFrontendConfig* conf)
412 : impl(new Impl(env, conf))
413 {
414 }
415
416 RGWAsioFrontend::~RGWAsioFrontend() = default;
417
418 int RGWAsioFrontend::init()
419 {
420 return impl->init();
421 }
422
423 int RGWAsioFrontend::run()
424 {
425 return impl->run();
426 }
427
428 void RGWAsioFrontend::stop()
429 {
430 impl->stop();
431 }
432
433 void RGWAsioFrontend::join()
434 {
435 impl->join();
436 }
437
438 void RGWAsioFrontend::pause_for_new_config()
439 {
440 impl->pause();
441 }
442
443 void RGWAsioFrontend::unpause_with_new_config(
444 RGWRados* const store,
445 rgw_auth_registry_ptr_t auth_registry
446 ) {
447 impl->unpause(store, std::move(auth_registry));
448 }