]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/SocketMessenger.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / SocketMessenger.cc
index a112b50800d4a0f6c6daca0127aa209496485112..382d08f986ce94bf2cc1c8f38ea39d6a756d9d5a 100644 (file)
@@ -34,21 +34,25 @@ namespace crimson::net {
 
 SocketMessenger::SocketMessenger(const entity_name_t& myname,
                                  const std::string& logic_name,
-                                 uint32_t nonce)
-  : master_sid{seastar::this_shard_id()},
+                                 uint32_t nonce,
+                                 bool dispatch_only_on_this_shard)
+  : sid{seastar::this_shard_id()},
     logic_name{logic_name},
     nonce{nonce},
+    dispatch_only_on_sid{dispatch_only_on_this_shard},
     my_name{myname}
 {}
 
 SocketMessenger::~SocketMessenger()
 {
   logger().debug("~SocketMessenger: {}", logic_name);
+  ceph_assert_always(seastar::this_shard_id() == sid);
   ceph_assert(!listener);
 }
 
 bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 {
+  assert(seastar::this_shard_id() == sid);
   bool ret = false;
 
   entity_addrvec_t newaddrs = my_addrs;
@@ -76,7 +80,7 @@ bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 
 void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
   my_addrs = addrs;
   for (auto& addr : my_addrs.v) {
     addr.nonce = nonce;
@@ -86,12 +90,12 @@ void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
 crimson::net::listen_ertr::future<>
 SocketMessenger::do_listen(const entity_addrvec_t& addrs)
 {
-  assert(seastar::this_shard_id() == master_sid);
   ceph_assert(addrs.front().get_family() == AF_INET);
   set_myaddrs(addrs);
   return seastar::futurize_invoke([this] {
     if (!listener) {
-      return FixedCPUServerSocket::create().then([this] (auto _listener) {
+      return ShardedServerSocket::create(dispatch_only_on_sid
+      ).then([this] (auto _listener) {
         listener = _listener;
       });
     } else {
@@ -161,6 +165,7 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs,
 SocketMessenger::bind_ertr::future<>
 SocketMessenger::bind(const entity_addrvec_t& addrs)
 {
+  assert(seastar::this_shard_id() == sid);
   using crimson::common::local_conf;
   return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
                           [this, addrs] (auto& count) {
@@ -204,9 +209,19 @@ SocketMessenger::bind(const entity_addrvec_t& addrs)
   });
 }
 
+seastar::future<> SocketMessenger::accept(
+    SocketFRef &&socket, const entity_addr_t &peer_addr)
+{
+  assert(seastar::this_shard_id() == sid);
+  SocketConnectionRef conn =
+    seastar::make_shared<SocketConnection>(*this, dispatchers);
+  conn->start_accept(std::move(socket), peer_addr);
+  return seastar::now();
+}
+
 seastar::future<> SocketMessenger::start(
     const dispatchers_t& _dispatchers) {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
 
   dispatchers.assign(_dispatchers);
   if (listener) {
@@ -214,13 +229,17 @@ seastar::future<> SocketMessenger::start(
     ceph_assert(get_myaddr().is_msgr2());
     ceph_assert(get_myaddr().get_port() > 0);
 
-    return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
-      assert(seastar::this_shard_id() == master_sid);
+    return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) {
       assert(get_myaddr().is_msgr2());
-      SocketConnectionRef conn =
-        seastar::make_shared<SocketConnection>(*this, dispatchers);
-      conn->start_accept(std::move(socket), peer_addr);
-      return seastar::now();
+      SocketFRef socket = seastar::make_foreign(std::move(_socket));
+      if (listener->is_fixed_shard_dispatching()) {
+        return accept(std::move(socket), peer_addr);
+      } else {
+        return seastar::smp::submit_to(sid,
+            [this, peer_addr, socket = std::move(socket)]() mutable {
+          return accept(std::move(socket), peer_addr);
+        });
+      }
     });
   }
   return seastar::now();
@@ -229,7 +248,7 @@ seastar::future<> SocketMessenger::start(
 crimson::net::ConnectionRef
 SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
 
   // make sure we connect to a valid peer_addr
   if (!peer_addr.is_msgr2()) {
@@ -249,13 +268,13 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe
 
 seastar::future<> SocketMessenger::shutdown()
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
   return seastar::futurize_invoke([this] {
     assert(dispatchers.empty());
     if (listener) {
       auto d_listener = listener;
       listener = nullptr;
-      return d_listener->destroy();
+      return d_listener->shutdown_destroy();
     } else {
       return seastar::now();
     }
@@ -306,7 +325,7 @@ void SocketMessenger::learned_addr(
     const entity_addr_t &peer_addr_for_me,
     const SocketConnection& conn)
 {
-  assert(seastar::this_shard_id() == master_sid);
+  assert(seastar::this_shard_id() == sid);
   if (!need_addr) {
     if ((!get_myaddr().is_any() &&
          get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
@@ -363,34 +382,40 @@ void SocketMessenger::learned_addr(
 
 SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
 {
+  assert(seastar::this_shard_id() == sid);
   return policy_set.get(peer_type);
 }
 
 SocketPolicy SocketMessenger::get_default_policy() const
 {
+  assert(seastar::this_shard_id() == sid);
   return policy_set.get_default();
 }
 
 void SocketMessenger::set_default_policy(const SocketPolicy& p)
 {
+  assert(seastar::this_shard_id() == sid);
   policy_set.set_default(p);
 }
 
 void SocketMessenger::set_policy(entity_type_t peer_type,
                                 const SocketPolicy& p)
 {
+  assert(seastar::this_shard_id() == sid);
   policy_set.set(peer_type, p);
 }
 
 void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
                                           Throttle* throttle)
 {
+  assert(seastar::this_shard_id() == sid);
   // only byte throttler is used in OSD
   policy_set.set_throttlers(peer_type, throttle, nullptr);
 }
 
 crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
 {
+  assert(seastar::this_shard_id() == sid);
   if (auto found = connections.find(addr);
       found != connections.end()) {
     return found->second;
@@ -401,16 +426,19 @@ crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr
 
 void SocketMessenger::accept_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   accepting_conns.insert(conn);
 }
 
 void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   accepting_conns.erase(conn);
 }
 
 void SocketMessenger::register_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
   std::ignore = i;
   ceph_assert(added);
@@ -418,6 +446,7 @@ void SocketMessenger::register_conn(SocketConnectionRef conn)
 
 void SocketMessenger::unregister_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   ceph_assert(conn);
   auto found = connections.find(conn->get_peer_addr());
   ceph_assert(found != connections.end());
@@ -427,11 +456,13 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
 
 void SocketMessenger::closing_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   closing_conns.push_back(conn);
 }
 
 void SocketMessenger::closed_conn(SocketConnectionRef conn)
 {
+  assert(seastar::this_shard_id() == sid);
   for (auto it = closing_conns.begin();
        it != closing_conns.end();) {
     if (*it == conn) {
@@ -444,6 +475,7 @@ void SocketMessenger::closed_conn(SocketConnectionRef conn)
 
 uint32_t SocketMessenger::get_global_seq(uint32_t old)
 {
+  assert(seastar::this_shard_id() == sid);
   if (old > global_seq) {
     global_seq = old;
   }