]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/FrameAssemblerV2.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / FrameAssemblerV2.h
index 06c5cb25eee05fbc77affe2b45ab6e4abe412f67..9c89c144e80a1bf69964741be13a894065896fc5 100644 (file)
@@ -7,8 +7,13 @@
 #include "msg/async/crypto_onwire.h"
 #include "msg/async/compression_onwire.h"
 
+#include "crimson/common/gated.h"
 #include "crimson/net/Socket.h"
 
+#ifdef UNIT_TESTS_BUILT
+#include "Interceptor.h"
+#endif
+
 namespace crimson::net {
 
 class SocketConnection;
@@ -19,12 +24,22 @@ class FrameAssemblerV2 {
 public:
   FrameAssemblerV2(SocketConnection &conn);
 
-  ~FrameAssemblerV2() = default;
+  ~FrameAssemblerV2();
 
   FrameAssemblerV2(const FrameAssemblerV2 &) = delete;
 
   FrameAssemblerV2(FrameAssemblerV2 &&) = delete;
 
+  void set_shard_id(seastar::shard_id _sid) {
+    assert(seastar::this_shard_id() == sid);
+    clear();
+    sid = _sid;
+  }
+
+  seastar::shard_id get_shard_id() const {
+    return sid;
+  }
+
   void set_is_rev1(bool is_rev1);
 
   void create_session_stream_handlers(
@@ -38,7 +53,7 @@ public:
    */
 
   struct mover_t {
-    SocketRef socket;
+    SocketFRef socket;
     ceph::crypto::onwire::rxtx_t session_stream_handlers;
     ceph::compression::onwire::rxtx_t session_comp_handlers;
   };
@@ -66,13 +81,17 @@ public:
   // the socket exists and not shutdown
   bool is_socket_valid() const;
 
-  void set_socket(SocketRef &&);
+  seastar::shard_id get_socket_shard_id() const;
+
+  void set_socket(SocketFRef &&);
 
   void learn_socket_ephemeral_port_as_connector(uint16_t port);
 
-  void shutdown_socket();
+  // if may_cross_core == true, gate is required for cross-core shutdown
+  template <bool may_cross_core>
+  void shutdown_socket(crimson::common::Gated *gate);
 
-  seastar::future<> replace_shutdown_socket(SocketRef &&);
+  seastar::future<> replace_shutdown_socket(SocketFRef &&);
 
   seastar::future<> close_shutdown_socket();
 
@@ -80,15 +99,20 @@ public:
    * socket read and write interfaces
    */
 
-  seastar::future<Socket::tmp_buf> read_exactly(std::size_t bytes);
+  template <bool may_cross_core = true>
+  seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes);
 
+  template <bool may_cross_core = true>
   seastar::future<ceph::bufferlist> read(std::size_t bytes);
 
-  seastar::future<> write(ceph::bufferlist &&);
+  template <bool may_cross_core = true>
+  seastar::future<> write(ceph::bufferlist);
 
+  template <bool may_cross_core = true>
   seastar::future<> flush();
 
-  seastar::future<> write_flush(ceph::bufferlist &&);
+  template <bool may_cross_core = true>
+  seastar::future<> write_flush(ceph::bufferlist);
 
   /*
    * frame read and write interfaces
@@ -99,46 +123,101 @@ public:
     ceph::msgr::v2::Tag tag;
     const ceph::msgr::v2::FrameAssembler *rx_frame_asm;
   };
+  template <bool may_cross_core = true>
   seastar::future<read_main_t> read_main_preamble();
 
   /// may throw negotiation_failure as fault
   using read_payload_t = ceph::msgr::v2::segment_bls_t;
   // FIXME: read_payload_t cannot be no-throw move constructible
+  template <bool may_cross_core = true>
   seastar::future<read_payload_t*> read_frame_payload();
 
   template <class F>
   ceph::bufferlist get_buffer(F &tx_frame) {
-#ifdef UNIT_TESTS_BUILT
-    intercept_frame(F::tag, true);
-#endif
+    assert(seastar::this_shard_id() == sid);
     auto bl = tx_frame.get_buffer(tx_frame_asm);
     log_main_preamble(bl);
     return bl;
   }
 
-  template <class F>
+  template <class F, bool may_cross_core = true>
   seastar::future<> write_flush_frame(F &tx_frame) {
+    assert(seastar::this_shard_id() == sid);
     auto bl = get_buffer(tx_frame);
-    return write_flush(std::move(bl));
+#ifdef UNIT_TESTS_BUILT
+    return intercept_frame(F::tag, true
+    ).then([this, bl=std::move(bl)]() mutable {
+      return write_flush<may_cross_core>(std::move(bl));
+    });
+#else
+    return write_flush<may_cross_core>(std::move(bl));
+#endif
   }
 
   static FrameAssemblerV2Ref create(SocketConnection &conn);
 
-private:
-  bool has_socket() const;
+#ifdef UNIT_TESTS_BUILT
+  seastar::future<> intercept_frames(
+      std::vector<ceph::msgr::v2::Tag> tags,
+      bool is_write) {
+    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    std::vector<Breakpoint> bps;
+    for (auto &tag : tags) {
+      bps.emplace_back(Breakpoint{tag, type});
+    }
+    return intercept_frames(bps, type);
+  }
 
-  void log_main_preamble(const ceph::bufferlist &bl);
+  seastar::future<> intercept_frame(
+      ceph::msgr::v2::Tag tag,
+      bool is_write) {
+    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    std::vector<Breakpoint> bps;
+    bps.emplace_back(Breakpoint{tag, type});
+    return intercept_frames(bps, type);
+  }
+
+  seastar::future<> intercept_frame(
+      custom_bp_t bp,
+      bool is_write) {
+    auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
+    std::vector<Breakpoint> bps;
+    bps.emplace_back(Breakpoint{bp});
+    return intercept_frames(bps, type);
+  }
+#endif
 
+private:
 #ifdef UNIT_TESTS_BUILT
-  void intercept_frame(ceph::msgr::v2::Tag, bool is_write);
+  seastar::future<> intercept_frames(
+      std::vector<Breakpoint> bps,
+      bp_type_t type);
 #endif
 
+  bool has_socket() const;
+
+  SocketFRef move_socket();
+
+  void clear();
+
+  void log_main_preamble(const ceph::bufferlist &bl);
+
   SocketConnection &conn;
 
-  Socket *socket = nullptr;
+  SocketFRef socket;
+
+  // checking Socket::is_shutdown() synchronously is impossible when sid is
+  // different from the socket sid.
+  bool is_socket_shutdown = false;
+
+  // the current working shard, can be messenger or socket shard.
+  // if is messenger shard, should call interfaces with may_cross_core = true.
+  seastar::shard_id sid;
 
   /*
    * auth signature
+   *
+   * only in the messenger core
    */
 
   bool record_io = false;
@@ -166,6 +245,10 @@ private:
     &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
     &session_comp_handlers};
 
+  // in the messenger core during handshake,
+  // and in the socket core during open,
+  // must be cleaned before switching cores.
+
   ceph::bufferlist rx_preamble;
 
   read_payload_t rx_segments_data;