1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include <condition_variable>
10 #include <boost/asio.hpp>
12 #include "rgw_asio_client.h"
13 #include "rgw_asio_frontend.h"
15 #define dout_subsys ceph_subsys_rgw
21 std::condition_variable cond_ready
; // signaled on ready==true
22 std::condition_variable cond_paused
; // signaled on waiters==thread_count
26 template <typename Func
>
27 void pause(int thread_count
, Func
&& func
);
32 template <typename Func
>
33 void Pauser::pause(int thread_count
, Func
&& func
)
35 std::unique_lock
<std::mutex
> lock(mutex
);
41 // wait for all threads to pause
43 cond_paused
.wait(lock
, [=] { return waiters
== thread_count
; });
46 void Pauser::unpause()
48 std::lock_guard
<std::mutex
> lock(mutex
);
50 cond_ready
.notify_all();
55 std::unique_lock
<std::mutex
> lock(mutex
);
57 cond_paused
.notify_one(); // notify pause() that we're waiting
58 cond_ready
.wait(lock
, [this] { return ready
; }); // wait for unpause()
62 using tcp
= boost::asio::ip::tcp
;
63 namespace beast
= boost::beast
;
67 boost::asio::io_service::strand strand
;
70 // references are bound to callbacks for async operations. if a callback
71 // function returns without issuing another operation, the reference is
72 // dropped and the Connection is deleted/closed
73 std::atomic
<int> nref
{0};
74 using Ref
= boost::intrusive_ptr
<Connection
>;
76 // limit header to 4k, since we read it all into a single flat_buffer
77 static constexpr size_t header_limit
= 4096;
78 // don't impose a limit on the body, since we read it in pieces
79 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
81 beast::flat_buffer buffer
;
82 boost::optional
<rgw::asio::parser_type
> parser
;
84 using bad_response_type
= beast::http::response
<beast::http::empty_body
>;
85 boost::optional
<bad_response_type
> response
;
87 CephContext
* ctx() const { return env
.store
->ctx(); }
90 // configure the parser
92 parser
->header_limit(header_limit
);
93 parser
->body_limit(body_limit
);
96 beast::http::async_read_header(socket
, buffer
, *parser
, strand
.wrap(
97 std::bind(&Connection::on_header
, Ref
{this},
98 std::placeholders::_1
)));
101 void discard_unread_message() {
102 if (parser
->is_done()) {
103 // nothing left to discard, start reading the next message
108 // read the rest of the request into a static buffer. multiple clients could
109 // write at the same time, but this is okay because we never read it back
110 static std::array
<char, 1024> discard_buffer
;
112 auto& body
= parser
->get().body();
113 body
.size
= discard_buffer
.size();
114 body
.data
= discard_buffer
.data();
116 beast::http::async_read_some(socket
, buffer
, *parser
, strand
.wrap(
117 std::bind(&Connection::on_discard_unread
, Ref
{this},
118 std::placeholders::_1
)));
121 void on_discard_unread(boost::system::error_code ec
) {
122 if (ec
== boost::asio::error::connection_reset
) {
126 ldout(ctx(), 5) << "discard_unread_message failed: "
127 << ec
.message() << dendl
;
130 discard_unread_message();
133 void on_write_error(boost::system::error_code ec
) {
135 ldout(ctx(), 5) << "failed to write response: " << ec
.message() << dendl
;
139 void on_header(boost::system::error_code ec
) {
140 if (ec
== boost::asio::error::connection_reset
||
141 ec
== beast::http::error::end_of_stream
) {
145 auto& message
= parser
->get();
146 ldout(ctx(), 1) << "failed to read header: " << ec
.message() << dendl
;
147 ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl
;
149 response
->result(beast::http::status::bad_request
);
150 response
->version(message
.version() == 10 ? 10 : 11);
151 response
->prepare_payload();
152 beast::http::async_write(socket
, *response
, strand
.wrap(
153 std::bind(&Connection::on_write_error
, Ref
{this},
154 std::placeholders::_1
)));
158 // process the request
159 RGWRequest req
{env
.store
->get_new_req_id()};
161 rgw::asio::ClientIO real_client
{socket
, *parser
, buffer
};
163 auto real_client_io
= rgw::io::add_reordering(
164 rgw::io::add_buffering(ctx(),
165 rgw::io::add_chunking(
166 rgw::io::add_conlen_controlling(
168 RGWRestfulIO
client(ctx(), &real_client_io
);
169 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
170 *env
.auth_registry
, &client
, env
.olog
);
172 if (parser
->keep_alive()) {
173 // parse any unread bytes from the previous message (in case we replied
174 // before reading the entire body) before reading the next
175 discard_unread_message();
180 Connection(RGWProcessEnv
& env
, tcp::socket
&& socket
)
181 : env(env
), strand(socket
.get_io_service()), socket(std::move(socket
)) {}
187 void get() { ++nref
; }
188 void put() { if (nref
.fetch_sub(1) == 1) { delete this; } }
190 friend void intrusive_ptr_add_ref(Connection
*c
) { c
->get(); }
191 friend void intrusive_ptr_release(Connection
*c
) { c
->put(); }
197 boost::asio::io_service service
;
199 tcp::acceptor acceptor
;
200 tcp::socket peer_socket
;
202 std::vector
<std::thread
> threads
;
204 std::atomic
<bool> going_down
{false};
206 CephContext
* ctx() const { return env
.store
->ctx(); }
208 void accept(boost::system::error_code ec
);
211 AsioFrontend(const RGWProcessEnv
& env
)
212 : env(env
), acceptor(service
), peer_socket(service
) {}
219 void unpause(RGWRados
* store
, rgw_auth_registry_ptr_t
);
222 int AsioFrontend::init()
224 auto ep
= tcp::endpoint
{tcp::v4(), static_cast<unsigned short>(env
.port
)};
225 ldout(ctx(), 4) << "frontend listening on " << ep
<< dendl
;
227 boost::system::error_code ec
;
228 acceptor
.open(ep
.protocol(), ec
);
230 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
233 acceptor
.set_option(tcp::acceptor::reuse_address(true));
234 acceptor
.bind(ep
, ec
);
236 lderr(ctx()) << "failed to bind address " << ep
<<
237 ": " << ec
.message() << dendl
;
240 acceptor
.listen(boost::asio::socket_base::max_connections
);
241 acceptor
.async_accept(peer_socket
,
242 [this] (boost::system::error_code ec
) {
248 void AsioFrontend::accept(boost::system::error_code ec
)
250 if (!acceptor
.is_open()) {
252 } else if (ec
== boost::asio::error::operation_aborted
) {
257 auto socket
= std::move(peer_socket
);
258 acceptor
.async_accept(peer_socket
,
259 [this] (boost::system::error_code ec
) {
263 boost::intrusive_ptr
<Connection
> conn
{new Connection(env
, std::move(socket
))};
265 // reference drops here, but on_connect() takes another
268 int AsioFrontend::run()
271 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
272 threads
.reserve(thread_count
);
274 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
276 for (int i
= 0; i
< thread_count
; i
++) {
277 threads
.emplace_back([=] {
290 void AsioFrontend::stop()
292 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
296 boost::system::error_code ec
;
299 // unblock the run() threads
303 void AsioFrontend::join()
308 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
309 for (auto& thread
: threads
) {
312 ldout(ctx(), 4) << "frontend done" << dendl
;
315 void AsioFrontend::pause()
317 ldout(ctx(), 4) << "frontend pausing threads..." << dendl
;
318 pauser
.pause(threads
.size(), [=] {
319 // unblock the run() threads
322 ldout(ctx(), 4) << "frontend paused" << dendl
;
325 void AsioFrontend::unpause(RGWRados
* const store
,
326 rgw_auth_registry_ptr_t auth_registry
)
329 env
.auth_registry
= std::move(auth_registry
);
330 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
335 } // anonymous namespace
337 class RGWAsioFrontend::Impl
: public AsioFrontend
{
339 Impl(const RGWProcessEnv
& env
) : AsioFrontend(env
) {}
342 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
)
343 : impl(new Impl(env
))
347 RGWAsioFrontend::~RGWAsioFrontend() = default;
349 int RGWAsioFrontend::init()
354 int RGWAsioFrontend::run()
359 void RGWAsioFrontend::stop()
364 void RGWAsioFrontend::join()
369 void RGWAsioFrontend::pause_for_new_config()
374 void RGWAsioFrontend::unpause_with_new_config(
375 RGWRados
* const store
,
376 rgw_auth_registry_ptr_t auth_registry
378 impl
->unpause(store
, std::move(auth_registry
));