SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
- uint32_t nonce)
- : master_sid{seastar::this_shard_id()},
+ uint32_t nonce,
+ bool dispatch_only_on_this_shard)
+ : sid{seastar::this_shard_id()},
logic_name{logic_name},
nonce{nonce},
+ dispatch_only_on_sid{dispatch_only_on_this_shard},
my_name{myname}
{}
SocketMessenger::~SocketMessenger()
{
logger().debug("~SocketMessenger: {}", logic_name);
+ ceph_assert_always(seastar::this_shard_id() == sid);
ceph_assert(!listener);
}
bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
{
+ assert(seastar::this_shard_id() == sid);
bool ret = false;
entity_addrvec_t newaddrs = my_addrs;
void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
my_addrs = addrs;
for (auto& addr : my_addrs.v) {
addr.nonce = nonce;
crimson::net::listen_ertr::future<>
SocketMessenger::do_listen(const entity_addrvec_t& addrs)
{
- assert(seastar::this_shard_id() == master_sid);
ceph_assert(addrs.front().get_family() == AF_INET);
set_myaddrs(addrs);
return seastar::futurize_invoke([this] {
if (!listener) {
- return FixedCPUServerSocket::create().then([this] (auto _listener) {
+ return ShardedServerSocket::create(dispatch_only_on_sid
+ ).then([this] (auto _listener) {
listener = _listener;
});
} else {
SocketMessenger::bind_ertr::future<>
SocketMessenger::bind(const entity_addrvec_t& addrs)
{
+ assert(seastar::this_shard_id() == sid);
using crimson::common::local_conf;
return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
[this, addrs] (auto& count) {
});
}
+seastar::future<> SocketMessenger::accept(
+ SocketFRef &&socket, const entity_addr_t &peer_addr)
+{
+ assert(seastar::this_shard_id() == sid);
+ SocketConnectionRef conn =
+ seastar::make_shared<SocketConnection>(*this, dispatchers);
+ conn->start_accept(std::move(socket), peer_addr);
+ return seastar::now();
+}
+
seastar::future<> SocketMessenger::start(
const dispatchers_t& _dispatchers) {
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
dispatchers.assign(_dispatchers);
if (listener) {
ceph_assert(get_myaddr().is_msgr2());
ceph_assert(get_myaddr().get_port() > 0);
- return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
- assert(seastar::this_shard_id() == master_sid);
+ return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) {
assert(get_myaddr().is_msgr2());
- SocketConnectionRef conn =
- seastar::make_shared<SocketConnection>(*this, dispatchers);
- conn->start_accept(std::move(socket), peer_addr);
- return seastar::now();
+ SocketFRef socket = seastar::make_foreign(std::move(_socket));
+ if (listener->is_fixed_shard_dispatching()) {
+ return accept(std::move(socket), peer_addr);
+ } else {
+ return seastar::smp::submit_to(sid,
+ [this, peer_addr, socket = std::move(socket)]() mutable {
+ return accept(std::move(socket), peer_addr);
+ });
+ }
});
}
return seastar::now();
crimson::net::ConnectionRef
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
// make sure we connect to a valid peer_addr
if (!peer_addr.is_msgr2()) {
seastar::future<> SocketMessenger::shutdown()
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
return seastar::futurize_invoke([this] {
assert(dispatchers.empty());
if (listener) {
auto d_listener = listener;
listener = nullptr;
- return d_listener->destroy();
+ return d_listener->shutdown_destroy();
} else {
return seastar::now();
}
const entity_addr_t &peer_addr_for_me,
const SocketConnection& conn)
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
if (!need_addr) {
if ((!get_myaddr().is_any() &&
get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
{
+ assert(seastar::this_shard_id() == sid);
return policy_set.get(peer_type);
}
SocketPolicy SocketMessenger::get_default_policy() const
{
+ assert(seastar::this_shard_id() == sid);
return policy_set.get_default();
}
void SocketMessenger::set_default_policy(const SocketPolicy& p)
{
+ assert(seastar::this_shard_id() == sid);
policy_set.set_default(p);
}
void SocketMessenger::set_policy(entity_type_t peer_type,
const SocketPolicy& p)
{
+ assert(seastar::this_shard_id() == sid);
policy_set.set(peer_type, p);
}
void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
Throttle* throttle)
{
+ assert(seastar::this_shard_id() == sid);
// only byte throttler is used in OSD
policy_set.set_throttlers(peer_type, throttle, nullptr);
}
crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
{
+ assert(seastar::this_shard_id() == sid);
if (auto found = connections.find(addr);
found != connections.end()) {
return found->second;
void SocketMessenger::accept_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
accepting_conns.insert(conn);
}
void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
accepting_conns.erase(conn);
}
void SocketMessenger::register_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
std::ignore = i;
ceph_assert(added);
void SocketMessenger::unregister_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
ceph_assert(conn);
auto found = connections.find(conn->get_peer_addr());
ceph_assert(found != connections.end());
void SocketMessenger::closing_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
closing_conns.push_back(conn);
}
void SocketMessenger::closed_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
for (auto it = closing_conns.begin();
it != closing_conns.end();) {
if (*it == conn) {
uint32_t SocketMessenger::get_global_seq(uint32_t old)
{
+ assert(seastar::this_shard_id() == sid);
if (old > global_seq) {
global_seq = old;
}