1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include <condition_variable>
10 #include <boost/asio.hpp>
12 #include "rgw_asio_client.h"
13 #include "rgw_asio_frontend.h"
15 #define dout_subsys ceph_subsys_rgw
21 std::condition_variable cond_ready
; // signaled on ready==true
22 std::condition_variable cond_paused
; // signaled on waiters==thread_count
26 template <typename Func
>
27 void pause(int thread_count
, Func
&& func
);
32 template <typename Func
>
33 void Pauser::pause(int thread_count
, Func
&& func
)
35 std::unique_lock
<std::mutex
> lock(mutex
);
41 // wait for all threads to pause
43 cond_paused
.wait(lock
, [=] { return waiters
== thread_count
; });
46 void Pauser::unpause()
48 std::lock_guard
<std::mutex
> lock(mutex
);
50 cond_ready
.notify_all();
55 std::unique_lock
<std::mutex
> lock(mutex
);
57 cond_paused
.notify_one(); // notify pause() that we're waiting
58 cond_ready
.wait(lock
, [this] { return ready
; }); // wait for unpause()
62 using tcp
= boost::asio::ip::tcp
;
63 namespace beast
= boost::beast
;
67 boost::asio::io_service::strand strand
;
70 // references are bound to callbacks for async operations. if a callback
71 // function returns without issuing another operation, the reference is
72 // dropped and the Connection is deleted/closed
73 std::atomic
<int> nref
{0};
74 using Ref
= boost::intrusive_ptr
<Connection
>;
76 // limit header to 4k, since we read it all into a single flat_buffer
77 static constexpr size_t header_limit
= 4096;
78 // don't impose a limit on the body, since we read it in pieces
79 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
81 beast::flat_buffer buffer
;
82 boost::optional
<rgw::asio::parser_type
> parser
;
84 using bad_response_type
= beast::http::response
<beast::http::empty_body
>;
85 boost::optional
<bad_response_type
> response
;
87 CephContext
* ctx() const { return env
.store
->ctx(); }
90 // configure the parser
92 parser
->header_limit(header_limit
);
93 parser
->body_limit(body_limit
);
96 beast::http::async_read_header(socket
, buffer
, *parser
, strand
.wrap(
97 std::bind(&Connection::on_header
, Ref
{this},
98 std::placeholders::_1
)));
101 void discard_unread_message() {
102 if (parser
->is_done()) {
103 // nothing left to discard, start reading the next message
108 // read the rest of the request into a static buffer. multiple clients could
109 // write at the same time, but this is okay because we never read it back
110 static std::array
<char, 1024> discard_buffer
;
112 auto& body
= parser
->get().body();
113 body
.size
= discard_buffer
.size();
114 body
.data
= discard_buffer
.data();
116 beast::http::async_read_some(socket
, buffer
, *parser
, strand
.wrap(
117 std::bind(&Connection::on_discard_unread
, Ref
{this},
118 std::placeholders::_1
)));
121 void on_discard_unread(boost::system::error_code ec
) {
122 if (ec
== boost::asio::error::connection_reset
) {
126 ldout(ctx(), 5) << "discard_unread_message failed: "
127 << ec
.message() << dendl
;
130 discard_unread_message();
133 void on_write_error(boost::system::error_code ec
) {
135 ldout(ctx(), 5) << "failed to write response: " << ec
.message() << dendl
;
139 void on_header(boost::system::error_code ec
) {
140 if (ec
== boost::asio::error::connection_reset
||
141 ec
== beast::http::error::end_of_stream
) {
145 auto& message
= parser
->get();
146 ldout(ctx(), 1) << "failed to read header: " << ec
.message() << dendl
;
147 ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl
;
149 response
->result(beast::http::status::bad_request
);
150 response
->version(message
.version() == 10 ? 10 : 11);
151 response
->prepare_payload();
152 beast::http::async_write(socket
, *response
, strand
.wrap(
153 std::bind(&Connection::on_write_error
, Ref
{this},
154 std::placeholders::_1
)));
158 // process the request
159 RGWRequest req
{env
.store
->get_new_req_id()};
161 rgw::asio::ClientIO real_client
{socket
, *parser
, buffer
};
163 auto real_client_io
= rgw::io::add_reordering(
164 rgw::io::add_buffering(ctx(),
165 rgw::io::add_chunking(
166 rgw::io::add_conlen_controlling(
168 RGWRestfulIO
client(ctx(), &real_client_io
);
169 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
170 *env
.auth_registry
, &client
, env
.olog
);
172 if (parser
->keep_alive()) {
173 // parse any unread bytes from the previous message (in case we replied
174 // before reading the entire body) before reading the next
175 discard_unread_message();
180 Connection(RGWProcessEnv
& env
, tcp::socket
&& socket
)
181 : env(env
), strand(socket
.get_io_service()), socket(std::move(socket
)) {}
187 void get() { ++nref
; }
188 void put() { if (nref
.fetch_sub(1) == 1) { delete this; } }
190 friend void intrusive_ptr_add_ref(Connection
*c
) { c
->get(); }
191 friend void intrusive_ptr_release(Connection
*c
) { c
->put(); }
196 RGWFrontendConfig
* conf
;
197 boost::asio::io_service service
;
200 tcp::endpoint endpoint
;
201 tcp::acceptor acceptor
;
204 Listener(boost::asio::io_service
& service
)
205 : acceptor(service
), socket(service
) {}
207 std::vector
<Listener
> listeners
;
209 std::vector
<std::thread
> threads
;
211 std::atomic
<bool> going_down
{false};
213 CephContext
* ctx() const { return env
.store
->ctx(); }
215 void accept(Listener
& listener
, boost::system::error_code ec
);
218 AsioFrontend(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
)
219 : env(env
), conf(conf
) {}
226 void unpause(RGWRados
* store
, rgw_auth_registry_ptr_t
);
229 unsigned short parse_port(const char *input
, boost::system::error_code
& ec
)
232 auto port
= std::strtoul(input
, &end
, 10);
233 if (port
> std::numeric_limits
<unsigned short>::max()) {
234 ec
.assign(ERANGE
, boost::system::system_category());
235 } else if (port
== 0 && end
== input
) {
236 ec
.assign(EINVAL
, boost::system::system_category());
241 tcp::endpoint
parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input
,
242 boost::system::error_code
& ec
)
244 tcp::endpoint endpoint
;
246 auto colon
= input
.find(':');
247 if (colon
!= input
.npos
) {
248 auto port_str
= input
.substr(colon
+ 1);
249 endpoint
.port(parse_port(port_str
.data(), ec
));
254 auto addr
= input
.substr(0, colon
);
255 endpoint
.address(boost::asio::ip::make_address(addr
, ec
));
260 int AsioFrontend::init()
262 boost::system::error_code ec
;
263 auto& config
= conf
->get_config_map();
266 auto range
= config
.equal_range("port");
267 for (auto i
= range
.first
; i
!= range
.second
; ++i
) {
268 auto port
= parse_port(i
->second
.c_str(), ec
);
270 lderr(ctx()) << "failed to parse port=" << i
->second
<< dendl
;
273 listeners
.emplace_back(service
);
274 listeners
.back().endpoint
.port(port
);
277 range
= config
.equal_range("endpoint");
278 for (auto i
= range
.first
; i
!= range
.second
; ++i
) {
279 auto endpoint
= parse_endpoint(i
->second
, ec
);
281 lderr(ctx()) << "failed to parse endpoint=" << i
->second
<< dendl
;
284 listeners
.emplace_back(service
);
285 listeners
.back().endpoint
= endpoint
;
289 for (auto& l
: listeners
) {
290 l
.acceptor
.open(l
.endpoint
.protocol(), ec
);
292 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
295 l
.acceptor
.set_option(tcp::acceptor::reuse_address(true));
296 l
.acceptor
.bind(l
.endpoint
, ec
);
298 lderr(ctx()) << "failed to bind address " << l
.endpoint
299 << ": " << ec
.message() << dendl
;
302 l
.acceptor
.listen(boost::asio::socket_base::max_connections
);
303 l
.acceptor
.async_accept(l
.socket
,
304 [this, &l
] (boost::system::error_code ec
) {
308 ldout(ctx(), 4) << "frontend listening on " << l
.endpoint
<< dendl
;
313 void AsioFrontend::accept(Listener
& l
, boost::system::error_code ec
)
315 if (!l
.acceptor
.is_open()) {
317 } else if (ec
== boost::asio::error::operation_aborted
) {
322 auto socket
= std::move(l
.socket
);
323 l
.acceptor
.async_accept(l
.socket
,
324 [this, &l
] (boost::system::error_code ec
) {
328 boost::intrusive_ptr
<Connection
> conn
{new Connection(env
, std::move(socket
))};
330 // reference drops here, but on_connect() takes another
333 int AsioFrontend::run()
336 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
337 threads
.reserve(thread_count
);
339 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
341 for (int i
= 0; i
< thread_count
; i
++) {
342 threads
.emplace_back([=] {
355 void AsioFrontend::stop()
357 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
361 boost::system::error_code ec
;
362 // close all listeners
363 for (auto& listener
: listeners
) {
364 listener
.acceptor
.close(ec
);
367 // unblock the run() threads
371 void AsioFrontend::join()
376 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
377 for (auto& thread
: threads
) {
380 ldout(ctx(), 4) << "frontend done" << dendl
;
383 void AsioFrontend::pause()
385 ldout(ctx(), 4) << "frontend pausing threads..." << dendl
;
386 pauser
.pause(threads
.size(), [=] {
387 // unblock the run() threads
390 ldout(ctx(), 4) << "frontend paused" << dendl
;
393 void AsioFrontend::unpause(RGWRados
* const store
,
394 rgw_auth_registry_ptr_t auth_registry
)
397 env
.auth_registry
= std::move(auth_registry
);
398 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
403 } // anonymous namespace
405 class RGWAsioFrontend::Impl
: public AsioFrontend
{
407 Impl(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
) : AsioFrontend(env
, conf
) {}
410 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
,
411 RGWFrontendConfig
* conf
)
412 : impl(new Impl(env
, conf
))
416 RGWAsioFrontend::~RGWAsioFrontend() = default;
418 int RGWAsioFrontend::init()
423 int RGWAsioFrontend::run()
428 void RGWAsioFrontend::stop()
433 void RGWAsioFrontend::join()
438 void RGWAsioFrontend::pause_for_new_config()
443 void RGWAsioFrontend::unpause_with_new_config(
444 RGWRados
* const store
,
445 rgw_auth_registry_ptr_t auth_registry
447 impl
->unpause(store
, std::move(auth_registry
));