]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
58a7446db3c15f9f0a473f134c40acf77da6c4a0
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <atomic>
5 #include <thread>
6 #include <vector>
7
8 #include <boost/asio.hpp>
9 #define BOOST_COROUTINES_NO_DEPRECATION_WARNING
10 #include <boost/asio/spawn.hpp>
11 #include <boost/intrusive/list.hpp>
12
13 #include "common/async/shared_mutex.h"
14 #include "common/errno.h"
15
16 #include "rgw_asio_client.h"
17 #include "rgw_asio_frontend.h"
18
19 #ifdef WITH_RADOSGW_BEAST_OPENSSL
20 #include <boost/asio/ssl.hpp>
21 #endif
22
23 #include "rgw_dmclock_async_scheduler.h"
24
25 #define dout_subsys ceph_subsys_rgw
26
27 namespace {
28
29 using tcp = boost::asio::ip::tcp;
30 namespace http = boost::beast::http;
31 #ifdef WITH_RADOSGW_BEAST_OPENSSL
32 namespace ssl = boost::asio::ssl;
33 #endif
34
35 template <typename Stream>
36 class StreamIO : public rgw::asio::ClientIO {
37 CephContext* const cct;
38 Stream& stream;
39 boost::beast::flat_buffer& buffer;
40 public:
41 StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
42 boost::beast::flat_buffer& buffer, bool is_ssl,
43 const tcp::endpoint& local_endpoint,
44 const tcp::endpoint& remote_endpoint)
45 : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
46 cct(cct), stream(stream), buffer(buffer)
47 {}
48
49 size_t write_data(const char* buf, size_t len) override {
50 boost::system::error_code ec;
51 auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec);
52 if (ec) {
53 ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
54 throw rgw::io::Exception(ec.value(), std::system_category());
55 }
56 return bytes;
57 }
58
59 size_t recv_body(char* buf, size_t max) override {
60 auto& message = parser.get();
61 auto& body_remaining = message.body();
62 body_remaining.data = buf;
63 body_remaining.size = max;
64
65 while (body_remaining.size && !parser.is_done()) {
66 boost::system::error_code ec;
67 http::read_some(stream, buffer, parser, ec);
68 if (ec == http::error::partial_message ||
69 ec == http::error::need_buffer) {
70 break;
71 }
72 if (ec) {
73 ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
74 throw rgw::io::Exception(ec.value(), std::system_category());
75 }
76 }
77 return max - body_remaining.size;
78 }
79 };
80
81 using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
82
83 template <typename Stream>
84 void handle_connection(RGWProcessEnv& env, Stream& stream,
85 boost::beast::flat_buffer& buffer, bool is_ssl,
86 SharedMutex& pause_mutex,
87 rgw::dmclock::Scheduler *scheduler,
88 boost::system::error_code& ec,
89 boost::asio::yield_context yield)
90 {
91 // limit header to 4k, since we read it all into a single flat_buffer
92 static constexpr size_t header_limit = 4096;
93 // don't impose a limit on the body, since we read it in pieces
94 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
95
96 auto cct = env.store->ctx();
97
98 // read messages from the stream until eof
99 for (;;) {
100 // configure the parser
101 rgw::asio::parser_type parser;
102 parser.header_limit(header_limit);
103 parser.body_limit(body_limit);
104
105 // parse the header
106 http::async_read_header(stream, buffer, parser, yield[ec]);
107 if (ec == boost::asio::error::connection_reset ||
108 ec == boost::asio::error::bad_descriptor ||
109 ec == boost::asio::error::operation_aborted ||
110 #ifdef WITH_RADOSGW_BEAST_OPENSSL
111 ec == ssl::error::stream_truncated ||
112 #endif
113 ec == http::error::end_of_stream) {
114 ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
115 return;
116 }
117 if (ec) {
118 ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
119 auto& message = parser.get();
120 http::response<http::empty_body> response;
121 response.result(http::status::bad_request);
122 response.version(message.version() == 10 ? 10 : 11);
123 response.prepare_payload();
124 http::async_write(stream, response, yield[ec]);
125 if (ec) {
126 ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
127 }
128 ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
129 return;
130 }
131
132 {
133 auto lock = pause_mutex.async_lock_shared(yield[ec]);
134 if (ec == boost::asio::error::operation_aborted) {
135 return;
136 } else if (ec) {
137 ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
138 return;
139 }
140
141 // process the request
142 RGWRequest req{env.store->get_new_req_id()};
143
144 auto& socket = stream.lowest_layer();
145 StreamIO real_client{cct, stream, parser, buffer, is_ssl,
146 socket.local_endpoint(),
147 socket.remote_endpoint()};
148
149 auto real_client_io = rgw::io::add_reordering(
150 rgw::io::add_buffering(cct,
151 rgw::io::add_chunking(
152 rgw::io::add_conlen_controlling(
153 &real_client))));
154 RGWRestfulIO client(cct, &real_client_io);
155 auto y = optional_yield{socket.get_io_context(), yield};
156 process_request(env.store, env.rest, &req, env.uri_prefix,
157 *env.auth_registry, &client, env.olog, y, scheduler);
158 }
159
160 if (!parser.keep_alive()) {
161 return;
162 }
163
164 // if we failed before reading the entire message, discard any remaining
165 // bytes before reading the next
166 while (!parser.is_done()) {
167 static std::array<char, 1024> discard_buffer;
168
169 auto& body = parser.get().body();
170 body.size = discard_buffer.size();
171 body.data = discard_buffer.data();
172
173 http::async_read_some(stream, buffer, parser, yield[ec]);
174 if (ec == boost::asio::error::connection_reset) {
175 return;
176 }
177 if (ec) {
178 ldout(cct, 5) << "failed to discard unread message: "
179 << ec.message() << dendl;
180 return;
181 }
182 }
183 }
184 }
185
186 struct Connection : boost::intrusive::list_base_hook<> {
187 tcp::socket& socket;
188 Connection(tcp::socket& socket) : socket(socket) {}
189 };
190
191 class ConnectionList {
192 using List = boost::intrusive::list<Connection>;
193 List connections;
194 std::mutex mutex;
195
196 void remove(Connection& c) {
197 std::lock_guard lock{mutex};
198 if (c.is_linked()) {
199 connections.erase(List::s_iterator_to(c));
200 }
201 }
202 public:
203 class Guard {
204 ConnectionList *list;
205 Connection *conn;
206 public:
207 Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
208 ~Guard() { list->remove(*conn); }
209 };
210 [[nodiscard]] Guard add(Connection& conn) {
211 std::lock_guard lock{mutex};
212 connections.push_back(conn);
213 return Guard{this, &conn};
214 }
215 void close(boost::system::error_code& ec) {
216 std::lock_guard lock{mutex};
217 for (auto& conn : connections) {
218 conn.socket.close(ec);
219 }
220 connections.clear();
221 }
222 };
223
224 namespace dmc = rgw::dmclock;
225 class AsioFrontend {
226 RGWProcessEnv env;
227 RGWFrontendConfig* conf;
228 boost::asio::io_context context;
229 #ifdef WITH_RADOSGW_BEAST_OPENSSL
230 boost::optional<ssl::context> ssl_context;
231 int init_ssl();
232 #endif
233 SharedMutex pause_mutex;
234 std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
235
236 struct Listener {
237 tcp::endpoint endpoint;
238 tcp::acceptor acceptor;
239 tcp::socket socket;
240 bool use_ssl = false;
241 bool use_nodelay = false;
242
243 explicit Listener(boost::asio::io_context& context)
244 : acceptor(context), socket(context) {}
245 };
246 std::vector<Listener> listeners;
247
248 ConnectionList connections;
249
250 // work guard to keep run() threads busy while listeners are paused
251 using Executor = boost::asio::io_context::executor_type;
252 std::optional<boost::asio::executor_work_guard<Executor>> work;
253
254 std::vector<std::thread> threads;
255 std::atomic<bool> going_down{false};
256
257 CephContext* ctx() const { return env.store->ctx(); }
258 std::optional<dmc::ClientCounters> client_counters;
259 std::unique_ptr<dmc::ClientConfig> client_config;
260 void accept(Listener& listener, boost::system::error_code ec);
261
262 public:
263 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
264 dmc::SchedulerCtx& sched_ctx)
265 : env(env), conf(conf), pause_mutex(context.get_executor())
266 {
267 auto sched_t = dmc::get_scheduler_t(ctx());
268 switch(sched_t){
269 case dmc::scheduler_t::dmclock:
270 scheduler.reset(new dmc::AsyncScheduler(ctx(),
271 context,
272 std::ref(sched_ctx.get_dmc_client_counters()),
273 sched_ctx.get_dmc_client_config(),
274 *sched_ctx.get_dmc_client_config(),
275 dmc::AtLimit::Reject));
276 break;
277 case dmc::scheduler_t::none:
278 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
279 [[fallthrough]];
280 case dmc::scheduler_t::throttler:
281 scheduler.reset(new dmc::SimpleThrottler(ctx()));
282
283 }
284 }
285
286 int init();
287 int run();
288 void stop();
289 void join();
290 void pause();
291 void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
292 };
293
294 unsigned short parse_port(const char *input, boost::system::error_code& ec)
295 {
296 char *end = nullptr;
297 auto port = std::strtoul(input, &end, 10);
298 if (port > std::numeric_limits<unsigned short>::max()) {
299 ec.assign(ERANGE, boost::system::system_category());
300 } else if (port == 0 && end == input) {
301 ec.assign(EINVAL, boost::system::system_category());
302 }
303 return port;
304 }
305
306 tcp::endpoint parse_endpoint(boost::asio::string_view input,
307 unsigned short default_port,
308 boost::system::error_code& ec)
309 {
310 tcp::endpoint endpoint;
311
312 if (input.empty()) {
313 ec = boost::asio::error::invalid_argument;
314 return endpoint;
315 }
316
317 if (input[0] == '[') { // ipv6
318 const size_t addr_begin = 1;
319 const size_t addr_end = input.find(']');
320 if (addr_end == input.npos) { // no matching ]
321 ec = boost::asio::error::invalid_argument;
322 return endpoint;
323 }
324 if (addr_end + 1 < input.size()) {
325 // :port must must follow [ipv6]
326 if (input[addr_end + 1] != ':') {
327 ec = boost::asio::error::invalid_argument;
328 return endpoint;
329 } else {
330 auto port_str = input.substr(addr_end + 2);
331 endpoint.port(parse_port(port_str.data(), ec));
332 }
333 } else {
334 endpoint.port(default_port);
335 }
336 auto addr = input.substr(addr_begin, addr_end - addr_begin);
337 endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
338 } else { // ipv4
339 auto colon = input.find(':');
340 if (colon != input.npos) {
341 auto port_str = input.substr(colon + 1);
342 endpoint.port(parse_port(port_str.data(), ec));
343 if (ec) {
344 return endpoint;
345 }
346 } else {
347 endpoint.port(default_port);
348 }
349 auto addr = input.substr(0, colon);
350 endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
351 }
352 return endpoint;
353 }
354
355 static int drop_privileges(CephContext *ctx)
356 {
357 uid_t uid = ctx->get_set_uid();
358 gid_t gid = ctx->get_set_gid();
359 std::string uid_string = ctx->get_set_uid_string();
360 std::string gid_string = ctx->get_set_gid_string();
361 if (gid && setgid(gid) != 0) {
362 int err = errno;
363 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
364 return -err;
365 }
366 if (uid && setuid(uid) != 0) {
367 int err = errno;
368 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
369 return -err;
370 }
371 if (uid && gid) {
372 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
373 << " (" << uid_string << ":" << gid_string << ")" << dendl;
374 }
375 return 0;
376 }
377
378 int AsioFrontend::init()
379 {
380 boost::system::error_code ec;
381 auto& config = conf->get_config_map();
382
383 #ifdef WITH_RADOSGW_BEAST_OPENSSL
384 int r = init_ssl();
385 if (r < 0) {
386 return r;
387 }
388 #endif
389
390 // parse endpoints
391 auto ports = config.equal_range("port");
392 for (auto i = ports.first; i != ports.second; ++i) {
393 auto port = parse_port(i->second.c_str(), ec);
394 if (ec) {
395 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
396 return -ec.value();
397 }
398 listeners.emplace_back(context);
399 listeners.back().endpoint.port(port);
400
401 listeners.emplace_back(context);
402 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
403 }
404
405 auto endpoints = config.equal_range("endpoint");
406 for (auto i = endpoints.first; i != endpoints.second; ++i) {
407 auto endpoint = parse_endpoint(i->second, 80, ec);
408 if (ec) {
409 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
410 return -ec.value();
411 }
412 listeners.emplace_back(context);
413 listeners.back().endpoint = endpoint;
414 }
415 // parse tcp nodelay
416 auto nodelay = config.find("tcp_nodelay");
417 if (nodelay != config.end()) {
418 for (auto& l : listeners) {
419 l.use_nodelay = (nodelay->second == "1");
420 }
421 }
422
423
424 bool socket_bound = false;
425 // start listeners
426 for (auto& l : listeners) {
427 l.acceptor.open(l.endpoint.protocol(), ec);
428 if (ec) {
429 if (ec == boost::asio::error::address_family_not_supported) {
430 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
431 << ", " << ec.message() << dendl;
432 continue;
433 }
434
435 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
436 return -ec.value();
437 }
438
439 if (l.endpoint.protocol() == tcp::v6()) {
440 l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
441 if (ec) {
442 lderr(ctx()) << "failed to set v6_only socket option: "
443 << ec.message() << dendl;
444 return -ec.value();
445 }
446 }
447
448 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
449 l.acceptor.bind(l.endpoint, ec);
450 if (ec) {
451 lderr(ctx()) << "failed to bind address " << l.endpoint
452 << ": " << ec.message() << dendl;
453 return -ec.value();
454 }
455
456 l.acceptor.listen(boost::asio::socket_base::max_connections);
457 l.acceptor.async_accept(l.socket,
458 [this, &l] (boost::system::error_code ec) {
459 accept(l, ec);
460 });
461
462 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
463 socket_bound = true;
464 }
465 if (!socket_bound) {
466 lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
467 return -EINVAL;
468 }
469
470 return drop_privileges(ctx());
471 }
472
473 #ifdef WITH_RADOSGW_BEAST_OPENSSL
474 int AsioFrontend::init_ssl()
475 {
476 boost::system::error_code ec;
477 auto& config = conf->get_config_map();
478
479 // ssl configuration
480 auto cert = config.find("ssl_certificate");
481 const bool have_cert = cert != config.end();
482 if (have_cert) {
483 // only initialize the ssl context if it's going to be used
484 ssl_context = boost::in_place(ssl::context::tls);
485 }
486
487 auto key = config.find("ssl_private_key");
488 const bool have_private_key = key != config.end();
489 if (have_private_key) {
490 if (!have_cert) {
491 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
492 return -EINVAL;
493 }
494 ssl_context->use_private_key_file(key->second, ssl::context::pem, ec);
495 if (ec) {
496 lderr(ctx()) << "failed to add ssl_private_key=" << key->second
497 << ": " << ec.message() << dendl;
498 return -ec.value();
499 }
500 }
501 if (have_cert) {
502 ssl_context->use_certificate_chain_file(cert->second, ec);
503 if (ec) {
504 lderr(ctx()) << "failed to use ssl_certificate=" << cert->second
505 << ": " << ec.message() << dendl;
506 return -ec.value();
507 }
508 if (!have_private_key) {
509 // attempt to use it as a private key if a separate one wasn't provided
510 ssl_context->use_private_key_file(cert->second, ssl::context::pem, ec);
511 if (ec) {
512 lderr(ctx()) << "failed to use ssl_certificate=" << cert->second
513 << " as a private key: " << ec.message() << dendl;
514 return -ec.value();
515 }
516 }
517 }
518
519 // parse ssl endpoints
520 auto ports = config.equal_range("ssl_port");
521 for (auto i = ports.first; i != ports.second; ++i) {
522 if (!have_cert) {
523 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
524 return -EINVAL;
525 }
526 auto port = parse_port(i->second.c_str(), ec);
527 if (ec) {
528 lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
529 return -ec.value();
530 }
531 listeners.emplace_back(context);
532 listeners.back().endpoint.port(port);
533 listeners.back().use_ssl = true;
534
535 listeners.emplace_back(context);
536 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
537 listeners.back().use_ssl = true;
538 }
539
540 auto endpoints = config.equal_range("ssl_endpoint");
541 for (auto i = endpoints.first; i != endpoints.second; ++i) {
542 if (!have_cert) {
543 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
544 return -EINVAL;
545 }
546 auto endpoint = parse_endpoint(i->second, 443, ec);
547 if (ec) {
548 lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
549 return -ec.value();
550 }
551 listeners.emplace_back(context);
552 listeners.back().endpoint = endpoint;
553 listeners.back().use_ssl = true;
554 }
555 return 0;
556 }
557 #endif // WITH_RADOSGW_BEAST_OPENSSL
558
559 void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
560 {
561 if (!l.acceptor.is_open()) {
562 return;
563 } else if (ec == boost::asio::error::operation_aborted) {
564 return;
565 } else if (ec) {
566 throw ec;
567 }
568 auto socket = std::move(l.socket);
569 tcp::no_delay options(l.use_nodelay);
570 socket.set_option(options,ec);
571 l.acceptor.async_accept(l.socket,
572 [this, &l] (boost::system::error_code ec) {
573 accept(l, ec);
574 });
575
576 // spawn a coroutine to handle the connection
577 #ifdef WITH_RADOSGW_BEAST_OPENSSL
578 if (l.use_ssl) {
579 boost::asio::spawn(context,
580 [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
581 Connection conn{s};
582 auto c = connections.add(conn);
583 // wrap the socket in an ssl stream
584 ssl::stream<tcp::socket&> stream{s, *ssl_context};
585 boost::beast::flat_buffer buffer;
586 // do ssl handshake
587 boost::system::error_code ec;
588 auto bytes = stream.async_handshake(ssl::stream_base::server,
589 buffer.data(), yield[ec]);
590 if (ec) {
591 ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
592 return;
593 }
594 buffer.consume(bytes);
595 handle_connection(env, stream, buffer, true, pause_mutex,
596 scheduler.get(), ec, yield);
597 if (!ec) {
598 // ssl shutdown (ignoring errors)
599 stream.async_shutdown(yield[ec]);
600 }
601 s.shutdown(tcp::socket::shutdown_both, ec);
602 });
603 } else {
604 #else
605 {
606 #endif // WITH_RADOSGW_BEAST_OPENSSL
607 boost::asio::spawn(context,
608 [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
609 Connection conn{s};
610 auto c = connections.add(conn);
611 boost::beast::flat_buffer buffer;
612 boost::system::error_code ec;
613 handle_connection(env, s, buffer, false, pause_mutex,
614 scheduler.get(), ec, yield);
615 s.shutdown(tcp::socket::shutdown_both, ec);
616 });
617 }
618 }
619
620 int AsioFrontend::run()
621 {
622 auto cct = ctx();
623 const int thread_count = cct->_conf->rgw_thread_pool_size;
624 threads.reserve(thread_count);
625
626 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
627
628 // the worker threads call io_context::run(), which will return when there's
629 // no work left. hold a work guard to keep these threads going until join()
630 work.emplace(boost::asio::make_work_guard(context));
631
632 for (int i = 0; i < thread_count; i++) {
633 threads.emplace_back([=] {
634 // request warnings on synchronous librados calls in this thread
635 is_asio_thread = true;
636 boost::system::error_code ec;
637 context.run(ec);
638 });
639 }
640 return 0;
641 }
642
643 void AsioFrontend::stop()
644 {
645 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
646
647 going_down = true;
648
649 boost::system::error_code ec;
650 // close all listeners
651 for (auto& listener : listeners) {
652 listener.acceptor.close(ec);
653 }
654 // close all connections
655 connections.close(ec);
656 pause_mutex.cancel();
657 }
658
659 void AsioFrontend::join()
660 {
661 if (!going_down) {
662 stop();
663 }
664 work.reset();
665
666 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
667 for (auto& thread : threads) {
668 thread.join();
669 }
670 ldout(ctx(), 4) << "frontend done" << dendl;
671 }
672
673 void AsioFrontend::pause()
674 {
675 ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
676
677 // cancel pending calls to accept(), but don't close the sockets
678 boost::system::error_code ec;
679 for (auto& l : listeners) {
680 l.acceptor.cancel(ec);
681 }
682
683 // pause and wait for outstanding requests to complete
684 pause_mutex.lock(ec);
685
686 if (ec) {
687 ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
688 } else {
689 ldout(ctx(), 4) << "frontend paused" << dendl;
690 }
691 }
692
693 void AsioFrontend::unpause(RGWRados* const store,
694 rgw_auth_registry_ptr_t auth_registry)
695 {
696 env.store = store;
697 env.auth_registry = std::move(auth_registry);
698
699 // unpause to unblock connections
700 pause_mutex.unlock();
701
702 // start accepting connections again
703 for (auto& l : listeners) {
704 l.acceptor.async_accept(l.socket,
705 [this, &l] (boost::system::error_code ec) {
706 accept(l, ec);
707 });
708 }
709
710 ldout(ctx(), 4) << "frontend unpaused" << dendl;
711 }
712
713 } // anonymous namespace
714
715 class RGWAsioFrontend::Impl : public AsioFrontend {
716 public:
717 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
718 rgw::dmclock::SchedulerCtx& sched_ctx)
719 : AsioFrontend(env, conf, sched_ctx) {}
720 };
721
722 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
723 RGWFrontendConfig* conf,
724 rgw::dmclock::SchedulerCtx& sched_ctx)
725 : impl(new Impl(env, conf, sched_ctx))
726 {
727 }
728
729 RGWAsioFrontend::~RGWAsioFrontend() = default;
730
731 int RGWAsioFrontend::init()
732 {
733 return impl->init();
734 }
735
736 int RGWAsioFrontend::run()
737 {
738 return impl->run();
739 }
740
741 void RGWAsioFrontend::stop()
742 {
743 impl->stop();
744 }
745
746 void RGWAsioFrontend::join()
747 {
748 impl->join();
749 }
750
751 void RGWAsioFrontend::pause_for_new_config()
752 {
753 impl->pause();
754 }
755
756 void RGWAsioFrontend::unpause_with_new_config(
757 RGWRados* const store,
758 rgw_auth_registry_ptr_t auth_registry
759 ) {
760 impl->unpause(store, std::move(auth_registry));
761 }