1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <condition_variable>
9 #include <boost/asio.hpp>
10 #include <boost/asio/spawn.hpp>
12 #include <beast/core/placeholders.hpp>
13 #include <beast/http/read.hpp>
14 #include <beast/http/string_body.hpp>
15 #include <beast/http/write.hpp>
17 #include "rgw_asio_frontend.h"
18 #include "rgw_asio_client.h"
20 #define dout_subsys ceph_subsys_rgw
23 #define dout_prefix (*_dout << "asio: ")
29 std::condition_variable cond_ready
; // signaled on ready==true
30 std::condition_variable cond_paused
; // signaled on waiters==thread_count
34 template <typename Func
>
35 void pause(int thread_count
, Func
&& func
);
40 template <typename Func
>
41 void Pauser::pause(int thread_count
, Func
&& func
)
43 std::unique_lock
<std::mutex
> lock(mutex
);
49 // wait for all threads to pause
51 cond_paused
.wait(lock
, [=] { return waiters
== thread_count
; });
54 void Pauser::unpause()
56 std::lock_guard
<std::mutex
> lock(mutex
);
58 cond_ready
.notify_all();
63 std::unique_lock
<std::mutex
> lock(mutex
);
65 cond_paused
.notify_one(); // notify pause() that we're waiting
66 cond_ready
.wait(lock
, [this] { return ready
; }); // wait for unpause()
70 using tcp
= boost::asio::ip::tcp
;
72 // coroutine to handle a client connection to completion
73 static void handle_connection(RGWProcessEnv
& env
, tcp::socket socket
,
74 boost::asio::yield_context yield
)
76 auto cct
= env
.store
->ctx();
77 boost::system::error_code ec
;
79 beast::flat_streambuf buffer
{1024};
81 // read messages from the socket until eof
84 rgw::asio::parser_type parser
;
86 auto bytes
= beast::http::async_read_some(socket
, buffer
, parser
, yield
[ec
]);
87 buffer
.consume(bytes
);
88 } while (!ec
&& !parser
.got_header());
90 if (ec
== boost::asio::error::connection_reset
||
91 ec
== boost::asio::error::eof
) {
95 auto& message
= parser
.get();
96 ldout(cct
, 1) << "read failed: " << ec
.message() << dendl
;
97 ldout(cct
, 1) << "====== req done http_status=400 ======" << dendl
;
98 beast::http::response
<beast::http::string_body
> response
;
99 response
.status
= 400;
100 response
.reason
= "Bad Request";
101 response
.version
= message
.version
== 10 ? 10 : 11;
102 beast::http::prepare(response
);
103 beast::http::async_write(socket
, std::move(response
), yield
[ec
]);
108 // process the request
109 RGWRequest req
{env
.store
->get_new_req_id()};
111 rgw::asio::ClientIO real_client
{socket
, parser
, buffer
};
113 auto real_client_io
= rgw::io::add_reordering(
114 rgw::io::add_buffering(cct
,
115 rgw::io::add_chunking(
116 rgw::io::add_conlen_controlling(
118 RGWRestfulIO
client(cct
, &real_client_io
);
119 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
120 *env
.auth_registry
, &client
, env
.olog
);
122 if (real_client
.get_conn_close()) {
130 boost::asio::io_service service
;
132 tcp::acceptor acceptor
;
133 tcp::socket peer_socket
;
135 std::vector
<std::thread
> threads
;
137 std::atomic
<bool> going_down
{false};
139 CephContext
* ctx() const { return env
.store
->ctx(); }
141 void accept(boost::system::error_code ec
);
144 AsioFrontend(const RGWProcessEnv
& env
)
145 : env(env
), acceptor(service
), peer_socket(service
) {}
152 void unpause(RGWRados
* store
, rgw_auth_registry_ptr_t
);
155 int AsioFrontend::init()
157 auto ep
= tcp::endpoint
{tcp::v4(), static_cast<unsigned short>(env
.port
)};
158 ldout(ctx(), 4) << "frontend listening on " << ep
<< dendl
;
160 boost::system::error_code ec
;
161 acceptor
.open(ep
.protocol(), ec
);
163 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
166 acceptor
.set_option(tcp::acceptor::reuse_address(true));
167 acceptor
.bind(ep
, ec
);
169 lderr(ctx()) << "failed to bind address " << ep
<<
170 ": " << ec
.message() << dendl
;
173 acceptor
.listen(boost::asio::socket_base::max_connections
);
174 acceptor
.async_accept(peer_socket
,
175 [this] (boost::system::error_code ec
) {
181 void AsioFrontend::accept(boost::system::error_code ec
)
183 if (!acceptor
.is_open()) {
185 } else if (ec
== boost::asio::error::operation_aborted
) {
190 auto socket
= std::move(peer_socket
);
191 // spawn a coroutine to handle the connection
192 boost::asio::spawn(service
,
193 [&] (boost::asio::yield_context yield
) {
194 handle_connection(env
, std::move(socket
), yield
);
196 acceptor
.async_accept(peer_socket
,
197 [this] (boost::system::error_code ec
) {
202 int AsioFrontend::run()
205 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
206 threads
.reserve(thread_count
);
208 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
210 for (int i
= 0; i
< thread_count
; i
++) {
211 threads
.emplace_back([=] {
224 void AsioFrontend::stop()
226 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
230 boost::system::error_code ec
;
231 acceptor
.close(ec
); // unblock the run() threads
234 void AsioFrontend::join()
239 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
240 for (auto& thread
: threads
) {
243 ldout(ctx(), 4) << "frontend done" << dendl
;
246 void AsioFrontend::pause()
248 ldout(ctx(), 4) << "frontend pausing threads..." << dendl
;
249 pauser
.pause(threads
.size(), [=] {
250 // stop accepting but leave the port open
251 boost::system::error_code ec
;
254 ldout(ctx(), 4) << "frontend paused" << dendl
;
257 void AsioFrontend::unpause(RGWRados
* const store
,
258 rgw_auth_registry_ptr_t auth_registry
)
261 env
.auth_registry
= std::move(auth_registry
);
262 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
264 acceptor
.async_accept(peer_socket
,
265 [this] (boost::system::error_code ec
) {
271 } // anonymous namespace
273 class RGWAsioFrontend::Impl
: public AsioFrontend
{
275 Impl(const RGWProcessEnv
& env
) : AsioFrontend(env
) {}
278 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
)
279 : impl(new Impl(env
))
283 RGWAsioFrontend::~RGWAsioFrontend() = default;
285 int RGWAsioFrontend::init()
290 int RGWAsioFrontend::run()
295 void RGWAsioFrontend::stop()
300 void RGWAsioFrontend::join()
305 void RGWAsioFrontend::pause_for_new_config()
310 void RGWAsioFrontend::unpause_with_new_config(
311 RGWRados
* const store
,
312 rgw_auth_registry_ptr_t auth_registry
314 impl
->unpause(store
, std::move(auth_registry
));