]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/Socket.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / Socket.h
index b6125eb8a02a03a60fd39b0d79632e3cdc2f7d49..478f2d630208cc699b4c0aad1a74019f35de6406 100644 (file)
@@ -6,7 +6,6 @@
 #include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sharded.hh>
-#include <seastar/net/packet.hh>
 
 #include "include/buffer.h"
 
@@ -22,146 +21,98 @@ namespace crimson::net {
 
 class Socket;
 using SocketRef = std::unique_ptr<Socket>;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
 
-class Socket
-{
+class Socket {
   struct construct_tag {};
 
- public:
+public:
   // if acceptor side, peer is using a different port (ephemeral_port)
   // if connector side, I'm using a different port (ephemeral_port)
   enum class side_t {
     acceptor,
     connector
   };
+  Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag);
 
-  Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
-    : sid{seastar::this_shard_id()},
-      socket(std::move(_socket)),
-      in(socket.input()),
-      // the default buffer size 8192 is too small that may impact our write
-      // performance. see seastar::net::connected_socket::output()
-      out(socket.output(65536)),
-      socket_is_shutdown(false),
-      side(_side),
-      ephemeral_port(e_port) {}
-
-  ~Socket() {
-#ifndef NDEBUG
-    assert(closed);
-#endif
-  }
+  ~Socket();
 
   Socket(Socket&& o) = delete;
 
-  static seastar::future<SocketRef>
-  connect(const entity_addr_t& peer_addr) {
-    inject_failure();
-    return inject_delay(
-    ).then([peer_addr] {
-      return seastar::connect(peer_addr.in4_addr());
-    }).then([] (seastar::connected_socket socket) {
-      return std::make_unique<Socket>(
-        std::move(socket), side_t::connector, 0, construct_tag{});
-    });
+  seastar::shard_id get_shard_id() const {
+    return sid;
   }
 
-  /// read the requested number of bytes into a bufferlist
-  seastar::future<bufferlist> read(size_t bytes);
-  using tmp_buf = seastar::temporary_buffer<char>;
-  using packet = seastar::net::packet;
-  seastar::future<tmp_buf> read_exactly(size_t bytes);
-
-  seastar::future<> write(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
-    return try_trap_pre(next_trap_write
-    ).then([buf = std::move(buf), this] () mutable {
-#endif
-      inject_failure();
-      return inject_delay(
-      ).then([buf = std::move(buf), this] () mutable {
-        return out.write(std::move(buf));
-      });
-#ifdef UNIT_TESTS_BUILT
-    }).then([this] {
-      return try_trap_post(next_trap_write);
-    });
-#endif
+  side_t get_side() const {
+    return side;
   }
-  seastar::future<> flush() {
-    inject_failure();
-    return inject_delay().then([this] {
-      return out.flush();
-    });
+
+  uint16_t get_ephemeral_port() const {
+    return ephemeral_port;
   }
-  seastar::future<> write_flush(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
-    return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
-#endif
-      inject_failure();
-      return inject_delay(
-      ).then([buf = std::move(buf), this] () mutable {
-        return out.write(std::move(buf)).then([this] { return out.flush(); });
-      });
-#ifdef UNIT_TESTS_BUILT
-    }).then([this] {
-      return try_trap_post(next_trap_write);
-    });
-#endif
+
+  seastar::socket_address get_local_address() const {
+    return socket.local_address();
   }
 
   bool is_shutdown() const {
+    assert(seastar::this_shard_id() == sid);
     return socket_is_shutdown;
   }
 
+  // learn my ephemeral_port as connector.
+  // unfortunately, there's no way to identify which port I'm using as
+  // connector with current seastar interface.
+  void learn_ephemeral_port_as_connector(uint16_t port) {
+    assert(side == side_t::connector &&
+           (ephemeral_port == 0 || ephemeral_port == port));
+    ephemeral_port = port;
+  }
+
+  /// read the requested number of bytes into a bufferlist
+  seastar::future<bufferlist> read(size_t bytes);
+
+  seastar::future<bufferptr> read_exactly(size_t bytes);
+
+  seastar::future<> write(bufferlist);
+
+  seastar::future<> flush();
+
+  seastar::future<> write_flush(bufferlist);
+
   // preemptively disable further reads or writes, can only be shutdown once.
   void shutdown();
 
   /// Socket can only be closed once.
   seastar::future<> close();
 
-  static seastar::future<> inject_delay();
+  static seastar::future<SocketRef>
+  connect(const entity_addr_t& peer_addr);
 
-  static void inject_failure();
+  /*
+   * test interfaces
+   */
 
   // shutdown for tests
   void force_shutdown() {
+    assert(seastar::this_shard_id() == sid);
     socket.shutdown_input();
     socket.shutdown_output();
   }
 
   // shutdown input_stream only, for tests
   void force_shutdown_in() {
+    assert(seastar::this_shard_id() == sid);
     socket.shutdown_input();
   }
 
   // shutdown output_stream only, for tests
   void force_shutdown_out() {
+    assert(seastar::this_shard_id() == sid);
     socket.shutdown_output();
   }
 
-  side_t get_side() const {
-    return side;
-  }
-
-  uint16_t get_ephemeral_port() const {
-    return ephemeral_port;
-  }
-
-  // learn my ephemeral_port as connector.
-  // unfortunately, there's no way to identify which port I'm using as
-  // connector with current seastar interface.
-  void learn_ephemeral_port_as_connector(uint16_t port) {
-    assert(side == side_t::connector &&
-           (ephemeral_port == 0 || ephemeral_port == port));
-    ephemeral_port = port;
-  }
-
-  seastar::socket_address get_local_address() const {
-    return socket.local_address();
-  }
-
- private:
+private:
   const seastar::shard_id sid;
   seastar::connected_socket socket;
   seastar::input_stream<char> in;
@@ -181,18 +132,20 @@ class Socket
   } r;
 
 #ifdef UNIT_TESTS_BUILT
- public:
+public:
   void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
 
- private:
+private:
+  seastar::future<> try_trap_pre(bp_action_t& trap);
+
+  seastar::future<> try_trap_post(bp_action_t& trap);
+
   bp_action_t next_trap_read = bp_action_t::CONTINUE;
   bp_action_t next_trap_write = bp_action_t::CONTINUE;
   socket_blocker* blocker = nullptr;
-  seastar::future<> try_trap_pre(bp_action_t& trap);
-  seastar::future<> try_trap_post(bp_action_t& trap);
 
 #endif
-  friend class FixedCPUServerSocket;
+  friend class ShardedServerSocket;
 };
 
 using listen_ertr = crimson::errorator<
@@ -200,105 +153,49 @@ using listen_ertr = crimson::errorator<
   crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
   >;
 
-class FixedCPUServerSocket
-    : public seastar::peering_sharded_service<FixedCPUServerSocket> {
-  const seastar::shard_id cpu;
-  entity_addr_t addr;
-  std::optional<seastar::server_socket> listener;
-  seastar::gate shutdown_gate;
+class ShardedServerSocket
+    : public seastar::peering_sharded_service<ShardedServerSocket> {
+  struct construct_tag {};
 
-  using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
-  std::unique_ptr<sharded_service_t> service;
+public:
+  ShardedServerSocket(
+      seastar::shard_id sid,
+      bool dispatch_only_on_primary_sid,
+      construct_tag);
 
-  struct construct_tag {};
+  ~ShardedServerSocket();
 
-  static seastar::logger& logger() {
-    return crimson::get_logger(ceph_subsys_ms);
-  }
+  ShardedServerSocket(ShardedServerSocket&&) = delete;
+  ShardedServerSocket(const ShardedServerSocket&) = delete;
+  ShardedServerSocket& operator=(ShardedServerSocket&&) = delete;
+  ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
 
-  seastar::future<> reset() {
-    return container().invoke_on_all([] (auto& ss) {
-      assert(ss.shutdown_gate.is_closed());
-      ss.addr = entity_addr_t();
-      ss.listener.reset();
-    });
+  bool is_fixed_shard_dispatching() const {
+    return dispatch_only_on_primary_sid;
   }
 
-public:
-  FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
-  ~FixedCPUServerSocket() {
-    assert(!listener);
-    // detect whether user have called destroy() properly
-    ceph_assert(!service);
-  }
+  listen_ertr::future<> listen(entity_addr_t addr);
 
-  FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
-  FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
-  FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+  using accept_func_t =
+    std::function<seastar::future<>(SocketRef, entity_addr_t)>;
+  seastar::future<> accept(accept_func_t &&_fn_accept);
 
-  listen_ertr::future<> listen(entity_addr_t addr);
+  seastar::future<> shutdown_destroy();
 
-  // fn_accept should be a nothrow function of type
-  // seastar::future<>(SocketRef, entity_addr_t)
-  template <typename Func>
-  seastar::future<> accept(Func&& fn_accept) {
-    assert(seastar::this_shard_id() == cpu);
-    logger().trace("FixedCPUServerSocket({})::accept()...", addr);
-    return container().invoke_on_all(
-        [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
-      assert(ss.listener);
-      // gate accepting
-      // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
-      // so ignore the returned future
-      std::ignore = seastar::with_gate(ss.shutdown_gate,
-          [&ss, fn_accept = std::move(fn_accept)] () mutable {
-        return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
-          return ss.listener->accept().then(
-              [&ss, fn_accept = std::move(fn_accept)]
-              (seastar::accept_result accept_result) mutable {
-            // assert seastar::listen_options::set_fixed_cpu() works
-            assert(seastar::this_shard_id() == ss.cpu);
-            auto [socket, paddr] = std::move(accept_result);
-            entity_addr_t peer_addr;
-            peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
-            peer_addr.set_type(ss.addr.get_type());
-            SocketRef _socket = std::make_unique<Socket>(
-                std::move(socket), Socket::side_t::acceptor,
-                peer_addr.get_port(), Socket::construct_tag{});
-            std::ignore = seastar::with_gate(ss.shutdown_gate,
-                [socket = std::move(_socket), peer_addr,
-                 &ss, fn_accept = std::move(fn_accept)] () mutable {
-              logger().trace("FixedCPUServerSocket({})::accept(): "
-                             "accepted peer {}", ss.addr, peer_addr);
-              return fn_accept(std::move(socket), peer_addr
-              ).handle_exception([&ss, peer_addr] (auto eptr) {
-                logger().error("FixedCPUServerSocket({})::accept(): "
-                               "fn_accept(s, {}) got unexpected exception {}",
-                               ss.addr, peer_addr, eptr);
-                ceph_abort();
-              });
-            });
-          });
-        }).handle_exception_type([&ss] (const std::system_error& e) {
-          if (e.code() == std::errc::connection_aborted ||
-              e.code() == std::errc::invalid_argument) {
-            logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
-                           ss.addr, e);
-          } else {
-            throw;
-          }
-        }).handle_exception([&ss] (auto eptr) {
-          logger().error("FixedCPUServerSocket({})::accept(): "
-                         "got unexpected exception {}", ss.addr, eptr);
-          ceph_abort();
-        });
-      });
-    });
-  }
+  static seastar::future<ShardedServerSocket*> create(
+      bool dispatch_only_on_this_shard);
 
-  seastar::future<> shutdown();
-  seastar::future<> destroy();
-  static seastar::future<FixedCPUServerSocket*> create();
+private:
+  const seastar::shard_id primary_sid;
+  /// XXX: Remove once all infrastructure uses multi-core messenger
+  const bool dispatch_only_on_primary_sid;
+  entity_addr_t listen_addr;
+  std::optional<seastar::server_socket> listener;
+  seastar::gate shutdown_gate;
+  accept_func_t fn_accept;
+
+  using sharded_service_t = seastar::sharded<ShardedServerSocket>;
+  std::unique_ptr<sharded_service_t> service;
 };
 
 } // namespace crimson::net