#include "auth/Auth.h"
#include "Errors.h"
-#include "Dispatcher.h"
#include "Socket.h"
namespace {
const std::string& logic_name,
uint32_t nonce)
: Messenger{myname},
- master_sid{seastar::engine().cpu_id()},
+ master_sid{seastar::this_shard_id()},
logic_name{logic_name},
nonce{nonce}
{}
seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
auto my_addrs = addrs;
for (auto& addr : my_addrs.v) {
addr.nonce = nonce;
return Messenger::set_myaddrs(my_addrs);
}
-seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
ceph_assert(addrs.front().get_family() == AF_INET);
return set_myaddrs(addrs).then([this] {
if (!listener) {
} else {
return seastar::now();
}
- }).then([this] {
- auto listen_addr = get_myaddr();
- logger().debug("{} do_bind: try listen {}...", *this, listen_addr.in4_addr());
+ }).then([this] () -> bind_ertr::future<> {
+ const entity_addr_t listen_addr = get_myaddr();
+ logger().debug("{} do_bind: try listen {}...", *this, listen_addr);
if (!listener) {
logger().warn("{} do_bind: listener doesn't exist", *this);
- return seastar::now();
+ return bind_ertr::now();
}
return listener->listen(listen_addr);
});
}
-seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::bind(const entity_addrvec_t& addrs)
{
- return do_bind(addrs).then([this] {
+ return do_bind(addrs).safe_then([this] {
logger().info("{} bind: done", *this);
});
}
-seastar::future<>
+SocketMessenger::bind_ertr::future<>
SocketMessenger::try_bind(const entity_addrvec_t& addrs,
uint32_t min_port, uint32_t max_port)
{
auto addr = addrs.front();
if (addr.get_port() != 0) {
- return do_bind(addrs).then([this] {
+ return do_bind(addrs).safe_then([this] {
logger().info("{} try_bind: done", *this);
});
}
ceph_assert(min_port <= max_port);
return seastar::do_with(uint32_t(min_port),
[this, max_port, addr] (auto& port) {
- return seastar::repeat([this, max_port, addr, &port] {
+ return seastar::repeat_until_value([this, max_port, addr, &port] {
auto to_bind = addr;
to_bind.set_port(port);
- return do_bind(entity_addrvec_t{to_bind}).then([this] {
+ return do_bind(entity_addrvec_t{to_bind}
+ ).safe_then([this] () -> seastar::future<std::optional<bool>> {
logger().info("{} try_bind: done", *this);
- return stop_t::yes;
- }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
- assert(e.code() == std::errc::address_in_use);
+ return seastar::make_ready_future<std::optional<bool>>(
+ std::make_optional<bool>(true));
+ }, bind_ertr::all_same_way([this, max_port, &port]
+ (const std::error_code& e) mutable
+ -> seastar::future<std::optional<bool>> {
+ assert(e == std::errc::address_in_use);
logger().trace("{} try_bind: {} already used", *this, port);
if (port == max_port) {
- throw;
+ return seastar::make_ready_future<std::optional<bool>>(
+ std::make_optional<bool>(false));
}
++port;
- return stop_t::no;
- });
+ return seastar::make_ready_future<std::optional<bool>>();
+ }));
+ }).then([] (bool success) -> bind_ertr::future<> {
+ if (success) {
+ return bind_ertr::now();
+ } else {
+ return crimson::ct_error::address_in_use::make();
+ }
});
});
}
-seastar::future<> SocketMessenger::start(Dispatcher *disp) {
- assert(seastar::engine().cpu_id() == master_sid);
+seastar::future<> SocketMessenger::start(
+ const dispatchers_t& _dispatchers) {
+ assert(seastar::this_shard_id() == master_sid);
- dispatcher = disp;
+ dispatchers.assign(_dispatchers);
if (listener) {
// make sure we have already bound to a valid address
ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
ceph_assert(get_myaddr().get_port() > 0);
return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
- *this, *dispatcher, get_myaddr().is_msgr2());
+ *this, dispatchers, get_myaddr().is_msgr2());
conn->start_accept(std::move(socket), peer_addr);
return seastar::now();
});
}
crimson::net::ConnectionRef
-SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
// make sure we connect to a valid peer_addr
ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
ceph_assert(peer_addr.get_port() > 0);
if (auto found = lookup_conn(peer_addr); found) {
+ logger().debug("{} connect to existing", *found);
return found->shared_from_this();
}
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
- *this, *dispatcher, peer_addr.is_msgr2());
- conn->start_connect(peer_addr, peer_type);
+ *this, dispatchers, peer_addr.is_msgr2());
+ conn->start_connect(peer_addr, peer_name);
return conn->shared_from_this();
}
seastar::future<> SocketMessenger::shutdown()
{
- assert(seastar::engine().cpu_id() == master_sid);
- return seastar::futurize_apply([this] {
+ assert(seastar::this_shard_id() == master_sid);
+ return seastar::futurize_invoke([this] {
+ assert(dispatchers.empty());
if (listener) {
auto d_listener = listener;
listener = nullptr;
// close all connections
}).then([this] {
return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
- return conn->close();
+ return conn->close_clean(false);
});
}).then([this] {
ceph_assert(accepting_conns.empty());
return seastar::parallel_for_each(connections, [] (auto conn) {
- return conn.second->close();
+ return conn.second->close_clean(false);
+ });
+ }).then([this] {
+ return seastar::parallel_for_each(closing_conns, [] (auto conn) {
+ return conn->close_clean(false);
});
}).then([this] {
ceph_assert(connections.empty());
seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
if (!need_addr) {
if ((!get_myaddr().is_any() &&
get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
}
return seastar::now();
}
- need_addr = false;
if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
// Not bound
entity_addr_t addr = peer_addr_for_me;
addr.set_type(entity_addr_t::TYPE_ANY);
addr.set_port(0);
+ need_addr = false;
return set_myaddrs(entity_addrvec_t{addr}
).then([this, &conn, peer_addr_for_me] {
logger().info("{} learned myaddr={} (unbound) from {}",
entity_addr_t addr = peer_addr_for_me;
addr.set_type(get_myaddr().get_type());
addr.set_port(get_myaddr().get_port());
+ need_addr = false;
return set_myaddrs(entity_addrvec_t{addr}
).then([this, &conn, peer_addr_for_me] {
logger().info("{} learned myaddr={} (blank IP) from {}",
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
} else {
+ need_addr = false;
return seastar::now();
}
}
connections.erase(found);
}
+void SocketMessenger::closing_conn(SocketConnectionRef conn)
+{
+ closing_conns.push_back(conn);
+}
+
+void SocketMessenger::closed_conn(SocketConnectionRef conn)
+{
+ for (auto it = closing_conns.begin();
+ it != closing_conns.end();) {
+ if (*it == conn) {
+ it = closing_conns.erase(it);
+ } else {
+ it++;
+ }
+ }
+}
+
seastar::future<uint32_t>
SocketMessenger::get_global_seq(uint32_t old)
{