]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
update sources to v12.2.5
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <atomic>
5 #include <condition_variable>
6 #include <mutex>
7 #include <thread>
8 #include <vector>
9
10 #include <boost/asio.hpp>
11
12 #include "rgw_asio_client.h"
13 #include "rgw_asio_frontend.h"
14
15 #define dout_subsys ceph_subsys_rgw
16
17 namespace {
18
19 class Pauser {
20 std::mutex mutex;
21 std::condition_variable cond_ready; // signaled on ready==true
22 std::condition_variable cond_paused; // signaled on waiters==thread_count
23 bool ready{false};
24 int waiters{0};
25 public:
26 template <typename Func>
27 void pause(int thread_count, Func&& func);
28 void unpause();
29 void wait();
30 };
31
32 template <typename Func>
33 void Pauser::pause(int thread_count, Func&& func)
34 {
35 std::unique_lock<std::mutex> lock(mutex);
36 ready = false;
37 lock.unlock();
38
39 func();
40
41 // wait for all threads to pause
42 lock.lock();
43 cond_paused.wait(lock, [=] { return waiters == thread_count; });
44 }
45
46 void Pauser::unpause()
47 {
48 std::lock_guard<std::mutex> lock(mutex);
49 ready = true;
50 cond_ready.notify_all();
51 }
52
53 void Pauser::wait()
54 {
55 std::unique_lock<std::mutex> lock(mutex);
56 ++waiters;
57 cond_paused.notify_one(); // notify pause() that we're waiting
58 cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
59 --waiters;
60 }
61
62 using tcp = boost::asio::ip::tcp;
63 namespace beast = boost::beast;
64
65 class Connection {
66 RGWProcessEnv& env;
67 boost::asio::io_service::strand strand;
68 tcp::socket socket;
69
70 // references are bound to callbacks for async operations. if a callback
71 // function returns without issuing another operation, the reference is
72 // dropped and the Connection is deleted/closed
73 std::atomic<int> nref{0};
74 using Ref = boost::intrusive_ptr<Connection>;
75
76 // limit header to 4k, since we read it all into a single flat_buffer
77 static constexpr size_t header_limit = 4096;
78 // don't impose a limit on the body, since we read it in pieces
79 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
80
81 beast::flat_buffer buffer;
82 boost::optional<rgw::asio::parser_type> parser;
83
84 using bad_response_type = beast::http::response<beast::http::empty_body>;
85 boost::optional<bad_response_type> response;
86
87 CephContext* ctx() const { return env.store->ctx(); }
88
89 void read_header() {
90 // configure the parser
91 parser.emplace();
92 parser->header_limit(header_limit);
93 parser->body_limit(body_limit);
94
95 // parse the header
96 beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
97 std::bind(&Connection::on_header, Ref{this},
98 std::placeholders::_1)));
99 }
100
101 void discard_unread_message() {
102 if (parser->is_done()) {
103 // nothing left to discard, start reading the next message
104 read_header();
105 return;
106 }
107
108 // read the rest of the request into a static buffer. multiple clients could
109 // write at the same time, but this is okay because we never read it back
110 static std::array<char, 1024> discard_buffer;
111
112 auto& body = parser->get().body();
113 body.size = discard_buffer.size();
114 body.data = discard_buffer.data();
115
116 beast::http::async_read_some(socket, buffer, *parser, strand.wrap(
117 std::bind(&Connection::on_discard_unread, Ref{this},
118 std::placeholders::_1)));
119 }
120
121 void on_discard_unread(boost::system::error_code ec) {
122 if (ec == boost::asio::error::connection_reset) {
123 return;
124 }
125 if (ec) {
126 ldout(ctx(), 5) << "discard_unread_message failed: "
127 << ec.message() << dendl;
128 return;
129 }
130 discard_unread_message();
131 }
132
133 void on_write_error(boost::system::error_code ec) {
134 if (ec) {
135 ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
136 }
137 }
138
139 void on_header(boost::system::error_code ec) {
140 if (ec == boost::asio::error::connection_reset ||
141 ec == beast::http::error::end_of_stream) {
142 return;
143 }
144 if (ec) {
145 auto& message = parser->get();
146 ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
147 ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
148 response.emplace();
149 response->result(beast::http::status::bad_request);
150 response->version(message.version() == 10 ? 10 : 11);
151 response->prepare_payload();
152 beast::http::async_write(socket, *response, strand.wrap(
153 std::bind(&Connection::on_write_error, Ref{this},
154 std::placeholders::_1)));
155 return;
156 }
157
158 // process the request
159 RGWRequest req{env.store->get_new_req_id()};
160
161 rgw::asio::ClientIO real_client{socket, *parser, buffer};
162
163 auto real_client_io = rgw::io::add_reordering(
164 rgw::io::add_buffering(ctx(),
165 rgw::io::add_chunking(
166 rgw::io::add_conlen_controlling(
167 &real_client))));
168 RGWRestfulIO client(ctx(), &real_client_io);
169 process_request(env.store, env.rest, &req, env.uri_prefix,
170 *env.auth_registry, &client, env.olog);
171
172 if (parser->keep_alive()) {
173 // parse any unread bytes from the previous message (in case we replied
174 // before reading the entire body) before reading the next
175 discard_unread_message();
176 }
177 }
178
179 public:
180 Connection(RGWProcessEnv& env, tcp::socket&& socket)
181 : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
182
183 void on_connect() {
184 read_header();
185 }
186
187 void get() { ++nref; }
188 void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
189
190 friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
191 friend void intrusive_ptr_release(Connection *c) { c->put(); }
192 };
193
194
195 class AsioFrontend {
196 RGWProcessEnv env;
197 RGWFrontendConfig* conf;
198 boost::asio::io_service service;
199
200 tcp::acceptor acceptor;
201 tcp::socket peer_socket;
202
203 std::vector<std::thread> threads;
204 Pauser pauser;
205 std::atomic<bool> going_down{false};
206
207 CephContext* ctx() const { return env.store->ctx(); }
208
209 void accept(boost::system::error_code ec);
210
211 public:
212 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
213 : env(env), conf(conf), acceptor(service), peer_socket(service) {}
214
215 int init();
216 int run();
217 void stop();
218 void join();
219 void pause();
220 void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
221 };
222
223 int AsioFrontend::init()
224 {
225 std::string port_str;
226 conf->get_val("port", "80", &port_str);
227
228 unsigned short port;
229 boost::asio::ip::address addr; // default to 'any'
230 boost::system::error_code ec;
231
232 auto colon = port_str.find(':');
233 if (colon != port_str.npos) {
234 addr = boost::asio::ip::make_address(port_str.substr(0, colon), ec);
235 if (ec) {
236 lderr(ctx()) << "failed to parse address '" << port_str << "': " << ec.message() << dendl;
237 return -ec.value();
238 }
239 port = std::stoul(port_str.substr(colon + 1), nullptr, 0);
240 } else {
241 port = std::stoul(port_str, nullptr, 0);
242 }
243
244 tcp::endpoint ep = {addr, port};
245 ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
246
247 acceptor.open(ep.protocol(), ec);
248 if (ec) {
249 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
250 return -ec.value();
251 }
252 acceptor.set_option(tcp::acceptor::reuse_address(true));
253 acceptor.bind(ep, ec);
254 if (ec) {
255 lderr(ctx()) << "failed to bind address " << ep <<
256 ": " << ec.message() << dendl;
257 return -ec.value();
258 }
259 acceptor.listen(boost::asio::socket_base::max_connections);
260 acceptor.async_accept(peer_socket,
261 [this] (boost::system::error_code ec) {
262 return accept(ec);
263 });
264 return 0;
265 }
266
267 void AsioFrontend::accept(boost::system::error_code ec)
268 {
269 if (!acceptor.is_open()) {
270 return;
271 } else if (ec == boost::asio::error::operation_aborted) {
272 return;
273 } else if (ec) {
274 throw ec;
275 }
276 auto socket = std::move(peer_socket);
277 acceptor.async_accept(peer_socket,
278 [this] (boost::system::error_code ec) {
279 return accept(ec);
280 });
281
282 boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
283 conn->on_connect();
284 // reference drops here, but on_connect() takes another
285 }
286
287 int AsioFrontend::run()
288 {
289 auto cct = ctx();
290 const int thread_count = cct->_conf->rgw_thread_pool_size;
291 threads.reserve(thread_count);
292
293 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
294
295 for (int i = 0; i < thread_count; i++) {
296 threads.emplace_back([=] {
297 for (;;) {
298 service.run();
299 if (going_down) {
300 break;
301 }
302 pauser.wait();
303 }
304 });
305 }
306 return 0;
307 }
308
309 void AsioFrontend::stop()
310 {
311 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
312
313 going_down = true;
314
315 boost::system::error_code ec;
316 acceptor.close(ec);
317
318 // unblock the run() threads
319 service.stop();
320 }
321
322 void AsioFrontend::join()
323 {
324 if (!going_down) {
325 stop();
326 }
327 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
328 for (auto& thread : threads) {
329 thread.join();
330 }
331 ldout(ctx(), 4) << "frontend done" << dendl;
332 }
333
334 void AsioFrontend::pause()
335 {
336 ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
337 pauser.pause(threads.size(), [=] {
338 // unblock the run() threads
339 service.stop();
340 });
341 ldout(ctx(), 4) << "frontend paused" << dendl;
342 }
343
344 void AsioFrontend::unpause(RGWRados* const store,
345 rgw_auth_registry_ptr_t auth_registry)
346 {
347 env.store = store;
348 env.auth_registry = std::move(auth_registry);
349 ldout(ctx(), 4) << "frontend unpaused" << dendl;
350 service.reset();
351 pauser.unpause();
352 }
353
354 } // anonymous namespace
355
356 class RGWAsioFrontend::Impl : public AsioFrontend {
357 public:
358 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf) : AsioFrontend(env, conf) {}
359 };
360
361 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
362 RGWFrontendConfig* conf)
363 : impl(new Impl(env, conf))
364 {
365 }
366
367 RGWAsioFrontend::~RGWAsioFrontend() = default;
368
369 int RGWAsioFrontend::init()
370 {
371 return impl->init();
372 }
373
374 int RGWAsioFrontend::run()
375 {
376 return impl->run();
377 }
378
379 void RGWAsioFrontend::stop()
380 {
381 impl->stop();
382 }
383
384 void RGWAsioFrontend::join()
385 {
386 impl->join();
387 }
388
389 void RGWAsioFrontend::pause_for_new_config()
390 {
391 impl->pause();
392 }
393
394 void RGWAsioFrontend::unpause_with_new_config(
395 RGWRados* const store,
396 rgw_auth_registry_ptr_t auth_registry
397 ) {
398 impl->unpause(store, std::move(auth_registry));
399 }