]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/SocketConnection.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / SocketConnection.cc
index 38e2748738f7b2b39980a92f7167c98c8c71bcad..57e5c12c1aed433e89df4c3d4b7c15348b226e08 100644 (file)
@@ -28,8 +28,7 @@ namespace crimson::net {
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    ChainedDispatchers& dispatchers)
-  : core(messenger.shard_id()),
-    messenger(messenger)
+  : msgr_sid{messenger.get_shard_id()}, messenger(messenger)
 {
   auto ret = create_handlers(dispatchers, *this);
   io_handler = std::move(ret.io_handler);
@@ -37,7 +36,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger,
 #ifdef UNIT_TESTS_BUILT
   if (messenger.interceptor) {
     interceptor = messenger.interceptor;
-    interceptor->register_conn(*this);
+    interceptor->register_conn(this->get_local_shared_foreign_from_this());
   }
 #endif
 }
@@ -46,45 +45,51 @@ SocketConnection::~SocketConnection() {}
 
 bool SocketConnection::is_connected() const
 {
-  assert(seastar::this_shard_id() == shard_id());
   return io_handler->is_connected();
 }
 
 #ifdef UNIT_TESTS_BUILT
-bool SocketConnection::is_closed() const
+bool SocketConnection::is_protocol_ready() const
 {
-  assert(seastar::this_shard_id() == shard_id());
+  assert(seastar::this_shard_id() == msgr_sid);
+  return protocol->is_ready();
+}
+
+bool SocketConnection::is_protocol_standby() const {
+  assert(seastar::this_shard_id() == msgr_sid);
+  return protocol->is_standby();
+}
+
+bool SocketConnection::is_protocol_closed() const
+{
+  assert(seastar::this_shard_id() == msgr_sid);
   return protocol->is_closed();
 }
 
-bool SocketConnection::is_closed_clean() const
+bool SocketConnection::is_protocol_closed_clean() const
 {
-  assert(seastar::this_shard_id() == shard_id());
+  assert(seastar::this_shard_id() == msgr_sid);
   return protocol->is_closed_clean();
 }
 
 #endif
 bool SocketConnection::peer_wins() const
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   return (messenger.get_myaddr() > peer_addr || policy.server);
 }
 
-seastar::future<> SocketConnection::send(MessageURef msg)
+seastar::future<> SocketConnection::send(MessageURef _msg)
 {
-  return seastar::smp::submit_to(
-    shard_id(),
-    [this, msg=std::move(msg)]() mutable {
-      return io_handler->send(std::move(msg));
-    });
+  // may be invoked from any core
+  MessageFRef msg = seastar::make_foreign(std::move(_msg));
+  return io_handler->send(std::move(msg));
 }
 
 seastar::future<> SocketConnection::send_keepalive()
 {
-  return seastar::smp::submit_to(
-    shard_id(),
-    [this] {
-      return io_handler->send_keepalive();
-    });
+  // may be invoked from any core
+  return io_handler->send_keepalive();
 }
 
 SocketConnection::clock_t::time_point
@@ -106,7 +111,6 @@ void SocketConnection::set_last_keepalive_ack(clock_t::time_point when)
 
 void SocketConnection::mark_down()
 {
-  assert(seastar::this_shard_id() == shard_id());
   io_handler->mark_down();
 }
 
@@ -114,50 +118,103 @@ void
 SocketConnection::start_connect(const entity_addr_t& _peer_addr,
                                 const entity_name_t& _peer_name)
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   protocol->start_connect(_peer_addr, _peer_name);
 }
 
 void
-SocketConnection::start_accept(SocketRef&& sock,
+SocketConnection::start_accept(SocketFRef&& sock,
                                const entity_addr_t& _peer_addr)
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   protocol->start_accept(std::move(sock), _peer_addr);
 }
 
 seastar::future<>
 SocketConnection::close_clean_yielded()
 {
+  assert(seastar::this_shard_id() == msgr_sid);
   return protocol->close_clean_yielded();
 }
 
-seastar::shard_id SocketConnection::shard_id() const {
-  return core;
-}
-
 seastar::socket_address SocketConnection::get_local_address() const {
+  assert(seastar::this_shard_id() == msgr_sid);
   return socket->get_local_address();
 }
 
 ConnectionRef
 SocketConnection::get_local_shared_foreign_from_this()
 {
-  assert(seastar::this_shard_id() == shard_id());
+  assert(seastar::this_shard_id() == msgr_sid);
   return make_local_shared_foreign(
       seastar::make_foreign(shared_from_this()));
 }
 
+SocketMessenger &
+SocketConnection::get_messenger() const
+{
+  assert(seastar::this_shard_id() == msgr_sid);
+  return messenger;
+}
+
+seastar::shard_id
+SocketConnection::get_messenger_shard_id() const
+{
+  return msgr_sid;
+}
+
+void SocketConnection::set_peer_type(entity_type_t peer_type) {
+  assert(seastar::this_shard_id() == msgr_sid);
+  // it is not allowed to assign an unknown value when the current
+  // value is known
+  assert(!(peer_type == 0 &&
+           peer_name.type() != 0));
+  // it is not allowed to assign a different known value when the
+  // current value is also known.
+  assert(!(peer_type != 0 &&
+           peer_name.type() != 0 &&
+           peer_type != peer_name.type()));
+  peer_name._type = peer_type;
+}
+
+void SocketConnection::set_peer_id(int64_t peer_id) {
+  assert(seastar::this_shard_id() == msgr_sid);
+  // it is not allowed to assign an unknown value when the current
+  // value is known
+  assert(!(peer_id == entity_name_t::NEW &&
+           peer_name.num() != entity_name_t::NEW));
+  // it is not allowed to assign a different known value when the
+  // current value is also known.
+  assert(!(peer_id != entity_name_t::NEW &&
+           peer_name.num() != entity_name_t::NEW &&
+           peer_id != peer_name.num()));
+  peer_name._num = peer_id;
+}
+
+void SocketConnection::set_features(uint64_t f) {
+  assert(seastar::this_shard_id() == msgr_sid);
+  features = f;
+}
+
+void SocketConnection::set_socket(Socket *s) {
+  assert(seastar::this_shard_id() == msgr_sid);
+  socket = s;
+}
+
 void SocketConnection::print(ostream& out) const {
-    out << (void*)this << " ";
-    messenger.print(out);
-    if (!socket) {
-      out << " >> " << get_peer_name() << " " << peer_addr;
-    } else if (socket->get_side() == Socket::side_t::acceptor) {
-      out << " >> " << get_peer_name() << " " << peer_addr
-          << "@" << socket->get_ephemeral_port();
-    } else { // socket->get_side() == Socket::side_t::connector
-      out << "@" << socket->get_ephemeral_port()
-          << " >> " << get_peer_name() << " " << peer_addr;
-    }
+  out << (void*)this << " ";
+  messenger.print(out);
+  if (seastar::this_shard_id() != msgr_sid) {
+    out << " >> " << get_peer_name() << " " << peer_addr;
+  } else if (!socket) {
+    out << " >> " << get_peer_name() << " " << peer_addr;
+  } else if (socket->get_side() == Socket::side_t::acceptor) {
+    out << " >> " << get_peer_name() << " " << peer_addr
+        << "@" << socket->get_ephemeral_port();
+  } else { // socket->get_side() == Socket::side_t::connector
+    out << "@" << socket->get_ephemeral_port()
+        << " >> " << get_peer_name() << " " << peer_addr;
+  }
 }
 
 } // namespace crimson::net