X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fcrimson%2Fnet%2FSocketMessenger.cc;h=382d08f986ce94bf2cc1c8f38ea39d6a756d9d5a;hb=aee94f6923ba628a85d855d0c5316d0da78bfa2a;hp=a112b50800d4a0f6c6daca0127aa209496485112;hpb=27f45121cc74e31203777ad565f78d8aad9b92a2;p=ceph.git diff --git a/ceph/src/crimson/net/SocketMessenger.cc b/ceph/src/crimson/net/SocketMessenger.cc index a112b5080..382d08f98 100644 --- a/ceph/src/crimson/net/SocketMessenger.cc +++ b/ceph/src/crimson/net/SocketMessenger.cc @@ -34,21 +34,25 @@ namespace crimson::net { SocketMessenger::SocketMessenger(const entity_name_t& myname, const std::string& logic_name, - uint32_t nonce) - : master_sid{seastar::this_shard_id()}, + uint32_t nonce, + bool dispatch_only_on_this_shard) + : sid{seastar::this_shard_id()}, logic_name{logic_name}, nonce{nonce}, + dispatch_only_on_sid{dispatch_only_on_this_shard}, my_name{myname} {} SocketMessenger::~SocketMessenger() { logger().debug("~SocketMessenger: {}", logic_name); + ceph_assert_always(seastar::this_shard_id() == sid); ceph_assert(!listener); } bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) { + assert(seastar::this_shard_id() == sid); bool ret = false; entity_addrvec_t newaddrs = my_addrs; @@ -76,7 +80,7 @@ bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); my_addrs = addrs; for (auto& addr : my_addrs.v) { addr.nonce = nonce; @@ -86,12 +90,12 @@ void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) crimson::net::listen_ertr::future<> SocketMessenger::do_listen(const entity_addrvec_t& addrs) { - assert(seastar::this_shard_id() == master_sid); ceph_assert(addrs.front().get_family() == AF_INET); set_myaddrs(addrs); return seastar::futurize_invoke([this] { if (!listener) { - return FixedCPUServerSocket::create().then([this] (auto _listener) { + return ShardedServerSocket::create(dispatch_only_on_sid + ).then([this] (auto _listener) { listener = _listener; }); } else { @@ -161,6 +165,7 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs, SocketMessenger::bind_ertr::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) { + assert(seastar::this_shard_id() == sid); using crimson::common::local_conf; return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count}, [this, addrs] (auto& count) { @@ -204,9 +209,19 @@ SocketMessenger::bind(const entity_addrvec_t& addrs) }); } +seastar::future<> SocketMessenger::accept( + SocketFRef &&socket, const entity_addr_t &peer_addr) +{ + assert(seastar::this_shard_id() == sid); + SocketConnectionRef conn = + seastar::make_shared(*this, dispatchers); + conn->start_accept(std::move(socket), peer_addr); + return seastar::now(); +} + seastar::future<> SocketMessenger::start( const dispatchers_t& _dispatchers) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); dispatchers.assign(_dispatchers); if (listener) { @@ -214,13 +229,17 @@ seastar::future<> SocketMessenger::start( ceph_assert(get_myaddr().is_msgr2()); ceph_assert(get_myaddr().get_port() > 0); - return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) { - assert(seastar::this_shard_id() == master_sid); + return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) { assert(get_myaddr().is_msgr2()); - SocketConnectionRef conn = - seastar::make_shared(*this, dispatchers); - conn->start_accept(std::move(socket), peer_addr); - return seastar::now(); + SocketFRef socket = seastar::make_foreign(std::move(_socket)); + if (listener->is_fixed_shard_dispatching()) { + return accept(std::move(socket), peer_addr); + } else { + return seastar::smp::submit_to(sid, + [this, peer_addr, socket = std::move(socket)]() mutable { + return accept(std::move(socket), peer_addr); + }); + } }); } return seastar::now(); @@ -229,7 +248,7 @@ seastar::future<> SocketMessenger::start( crimson::net::ConnectionRef SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); // make sure we connect to a valid peer_addr if (!peer_addr.is_msgr2()) { @@ -249,13 +268,13 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe seastar::future<> SocketMessenger::shutdown() { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); return seastar::futurize_invoke([this] { assert(dispatchers.empty()); if (listener) { auto d_listener = listener; listener = nullptr; - return d_listener->destroy(); + return d_listener->shutdown_destroy(); } else { return seastar::now(); } @@ -306,7 +325,7 @@ void SocketMessenger::learned_addr( const entity_addr_t &peer_addr_for_me, const SocketConnection& conn) { - assert(seastar::this_shard_id() == master_sid); + assert(seastar::this_shard_id() == sid); if (!need_addr) { if ((!get_myaddr().is_any() && get_myaddr().get_type() != peer_addr_for_me.get_type()) || @@ -363,34 +382,40 @@ void SocketMessenger::learned_addr( SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const { + assert(seastar::this_shard_id() == sid); return policy_set.get(peer_type); } SocketPolicy SocketMessenger::get_default_policy() const { + assert(seastar::this_shard_id() == sid); return policy_set.get_default(); } void SocketMessenger::set_default_policy(const SocketPolicy& p) { + assert(seastar::this_shard_id() == sid); policy_set.set_default(p); } void SocketMessenger::set_policy(entity_type_t peer_type, const SocketPolicy& p) { + assert(seastar::this_shard_id() == sid); policy_set.set(peer_type, p); } void SocketMessenger::set_policy_throttler(entity_type_t peer_type, Throttle* throttle) { + assert(seastar::this_shard_id() == sid); // only byte throttler is used in OSD policy_set.set_throttlers(peer_type, throttle, nullptr); } crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) { + assert(seastar::this_shard_id() == sid); if (auto found = connections.find(addr); found != connections.end()) { return found->second; @@ -401,16 +426,19 @@ crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr void SocketMessenger::accept_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); accepting_conns.insert(conn); } void SocketMessenger::unaccept_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); accepting_conns.erase(conn); } void SocketMessenger::register_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); std::ignore = i; ceph_assert(added); @@ -418,6 +446,7 @@ void SocketMessenger::register_conn(SocketConnectionRef conn) void SocketMessenger::unregister_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); ceph_assert(conn); auto found = connections.find(conn->get_peer_addr()); ceph_assert(found != connections.end()); @@ -427,11 +456,13 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn) void SocketMessenger::closing_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); closing_conns.push_back(conn); } void SocketMessenger::closed_conn(SocketConnectionRef conn) { + assert(seastar::this_shard_id() == sid); for (auto it = closing_conns.begin(); it != closing_conns.end();) { if (*it == conn) { @@ -444,6 +475,7 @@ void SocketMessenger::closed_conn(SocketConnectionRef conn) uint32_t SocketMessenger::get_global_seq(uint32_t old) { + assert(seastar::this_shard_id() == sid); if (old > global_seq) { global_seq = old; }