friend void intrusive_ptr_release(Connection *c) { c->put(); }
};
-
class AsioFrontend {
RGWProcessEnv env;
RGWFrontendConfig* conf;
boost::asio::io_service service;
- tcp::acceptor acceptor;
- tcp::socket peer_socket;
+ struct Listener {
+ tcp::endpoint endpoint;
+ tcp::acceptor acceptor;
+ tcp::socket socket;
+
+ Listener(boost::asio::io_service& service)
+ : acceptor(service), socket(service) {}
+ };
+ std::vector<Listener> listeners;
std::vector<std::thread> threads;
Pauser pauser;
CephContext* ctx() const { return env.store->ctx(); }
- void accept(boost::system::error_code ec);
+ void accept(Listener& listener, boost::system::error_code ec);
public:
AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
- : env(env), conf(conf), acceptor(service), peer_socket(service) {}
+ : env(env), conf(conf) {}
int init();
int run();
void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
};
-int AsioFrontend::init()
+unsigned short parse_port(const char *input, boost::system::error_code& ec)
+{
+ char *end = nullptr;
+ auto port = std::strtoul(input, &end, 10);
+ if (port > std::numeric_limits<unsigned short>::max()) {
+ ec.assign(ERANGE, boost::system::system_category());
+ } else if (port == 0 && end == input) {
+ ec.assign(EINVAL, boost::system::system_category());
+ }
+ return port;
+}
+
+tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input,
+ boost::system::error_code& ec)
{
- std::string port_str;
- conf->get_val("port", "80", &port_str);
+ tcp::endpoint endpoint;
- unsigned short port;
- boost::asio::ip::address addr; // default to 'any'
+ auto colon = input.find(':');
+ if (colon != input.npos) {
+ auto port_str = input.substr(colon + 1);
+ endpoint.port(parse_port(port_str.data(), ec));
+ } else {
+ endpoint.port(80);
+ }
+ if (!ec) {
+ auto addr = input.substr(0, colon);
+ endpoint.address(boost::asio::ip::make_address(addr, ec));
+ }
+ return endpoint;
+}
+
+int AsioFrontend::init()
+{
boost::system::error_code ec;
+ auto& config = conf->get_config_map();
- auto colon = port_str.find(':');
- if (colon != port_str.npos) {
- addr = boost::asio::ip::make_address(port_str.substr(0, colon), ec);
+ // parse endpoints
+ auto range = config.equal_range("port");
+ for (auto i = range.first; i != range.second; ++i) {
+ auto port = parse_port(i->second.c_str(), ec);
if (ec) {
- lderr(ctx()) << "failed to parse address '" << port_str << "': " << ec.message() << dendl;
+ lderr(ctx()) << "failed to parse port=" << i->second << dendl;
return -ec.value();
}
- port = std::stoul(port_str.substr(colon + 1), nullptr, 0);
- } else {
- port = std::stoul(port_str, nullptr, 0);
+ listeners.emplace_back(service);
+ listeners.back().endpoint.port(port);
}
- tcp::endpoint ep = {addr, port};
- ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
-
- acceptor.open(ep.protocol(), ec);
- if (ec) {
- lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
- return -ec.value();
+ range = config.equal_range("endpoint");
+ for (auto i = range.first; i != range.second; ++i) {
+ auto endpoint = parse_endpoint(i->second, ec);
+ if (ec) {
+ lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
+ return -ec.value();
+ }
+ listeners.emplace_back(service);
+ listeners.back().endpoint = endpoint;
}
- acceptor.set_option(tcp::acceptor::reuse_address(true));
- acceptor.bind(ep, ec);
- if (ec) {
- lderr(ctx()) << "failed to bind address " << ep <<
- ": " << ec.message() << dendl;
- return -ec.value();
+
+ // start listeners
+ for (auto& l : listeners) {
+ l.acceptor.open(l.endpoint.protocol(), ec);
+ if (ec) {
+ lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
+ return -ec.value();
+ }
+ l.acceptor.set_option(tcp::acceptor::reuse_address(true));
+ l.acceptor.bind(l.endpoint, ec);
+ if (ec) {
+ lderr(ctx()) << "failed to bind address " << l.endpoint
+ << ": " << ec.message() << dendl;
+ return -ec.value();
+ }
+ l.acceptor.listen(boost::asio::socket_base::max_connections);
+ l.acceptor.async_accept(l.socket,
+ [this, &l] (boost::system::error_code ec) {
+ accept(l, ec);
+ });
+
+ ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
}
- acceptor.listen(boost::asio::socket_base::max_connections);
- acceptor.async_accept(peer_socket,
- [this] (boost::system::error_code ec) {
- return accept(ec);
- });
return 0;
}
-void AsioFrontend::accept(boost::system::error_code ec)
+void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
{
- if (!acceptor.is_open()) {
+ if (!l.acceptor.is_open()) {
return;
} else if (ec == boost::asio::error::operation_aborted) {
return;
} else if (ec) {
throw ec;
}
- auto socket = std::move(peer_socket);
- acceptor.async_accept(peer_socket,
- [this] (boost::system::error_code ec) {
- return accept(ec);
- });
+ auto socket = std::move(l.socket);
+ l.acceptor.async_accept(l.socket,
+ [this, &l] (boost::system::error_code ec) {
+ accept(l, ec);
+ });
boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
conn->on_connect();
going_down = true;
boost::system::error_code ec;
- acceptor.close(ec);
+ // close all listeners
+ for (auto& listener : listeners) {
+ listener.acceptor.close(ec);
+ }
// unblock the run() threads
service.stop();