]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/io_handler.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / io_handler.h
index e04b6356e8674ea9d95854616ae99426d8276e80..f53c2ba646847e4706a925838322b4627fe7a97d 100644 (file)
@@ -3,6 +3,9 @@
 
 #pragma once
 
+#include <vector>
+
+#include <seastar/core/shared_future.hh>
 #include <seastar/util/later.hh>
 
 #include "crimson/common/gated.h"
 
 namespace crimson::net {
 
+/**
+ * crosscore_t
+ *
+ * To preserve the event order across cores.
+ */
+class crosscore_t {
+public:
+  using seq_t = uint64_t;
+
+  crosscore_t() = default;
+  ~crosscore_t() = default;
+
+  seq_t get_in_seq() const {
+    return in_seq;
+  }
+
+  seq_t prepare_submit() {
+    ++out_seq;
+    return out_seq;
+  }
+
+  bool proceed_or_wait(seq_t seq) {
+    if (seq == in_seq + 1) {
+      ++in_seq;
+      if (unlikely(in_pr_wait.has_value())) {
+        in_pr_wait->set_value();
+        in_pr_wait = std::nullopt;
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  seastar::future<> wait(seq_t seq) {
+    assert(seq != in_seq + 1);
+    if (!in_pr_wait.has_value()) {
+      in_pr_wait = seastar::shared_promise<>();
+    }
+    return in_pr_wait->get_shared_future();
+  }
+
+private:
+  seq_t out_seq = 0;
+  seq_t in_seq = 0;
+  std::optional<seastar::shared_promise<>> in_pr_wait;
+};
+
+/**
+ * io_handler_state
+ *
+ * It is required to populate the states from IOHandler to ProtocolV2
+ * asynchronously.
+ */
+struct io_handler_state {
+  seq_num_t in_seq;
+  bool is_out_queued;
+  bool has_out_sent;
+
+  bool is_out_queued_or_sent() const {
+    return is_out_queued || has_out_sent;
+  }
+
+  /*
+   * should be consistent with the accroding interfaces in IOHandler
+   */
+
+  void reset_session(bool full) {
+    in_seq = 0;
+    if (full) {
+      is_out_queued = false;
+      has_out_sent = false;
+    }
+  }
+
+  void reset_peer_state() {
+    in_seq = 0;
+    is_out_queued = is_out_queued_or_sent();
+    has_out_sent = false;
+  }
+
+  void requeue_out_sent_up_to() {
+    // noop since the information is insufficient
+  }
+
+  void requeue_out_sent() {
+    if (has_out_sent) {
+      has_out_sent = false;
+      is_out_queued = true;
+    }
+  }
+};
+
 /**
  * HandshakeListener
  *
- * The interface class for IOHandler to notify the ProtocolV2 for handshake.
+ * The interface class for IOHandler to notify the ProtocolV2.
  *
- * The notifications may be cross-core and asynchronous.
+ * The notifications may be cross-core and must be sent to
+ * SocketConnection::get_messenger_shard_id()
  */
 class HandshakeListener {
 public:
@@ -28,11 +125,17 @@ public:
   HandshakeListener &operator=(const HandshakeListener &) = delete;
   HandshakeListener &operator=(HandshakeListener &&) = delete;
 
-  virtual void notify_out() = 0;
+  virtual seastar::future<> notify_out(
+      crosscore_t::seq_t cc_seq) = 0;
 
-  virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
+  virtual seastar::future<> notify_out_fault(
+      crosscore_t::seq_t cc_seq,
+      const char *where,
+      std::exception_ptr,
+      io_handler_state) = 0;
 
-  virtual void notify_mark_down() = 0;
+  virtual seastar::future<> notify_mark_down(
+      crosscore_t::seq_t cc_seq) = 0;
 
 protected:
   HandshakeListener() = default;
@@ -60,24 +163,32 @@ public:
 /*
  * as ConnectionHandler
  */
-private:
+public:
+  seastar::shard_id get_shard_id() const final {
+    return shard_states->get_shard_id();
+  }
+
   bool is_connected() const final {
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     return protocol_is_connected;
   }
 
-  seastar::future<> send(MessageURef msg) final;
+  seastar::future<> send(MessageFRef msg) final;
 
   seastar::future<> send_keepalive() final;
 
   clock_t::time_point get_last_keepalive() const final {
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     return last_keepalive;
   }
 
   clock_t::time_point get_last_keepalive_ack() const final {
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     return last_keepalive_ack;
   }
 
   void set_last_keepalive_ack(clock_t::time_point when) final {
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
     last_keepalive_ack = when;
   }
 
@@ -89,31 +200,39 @@ private:
  * The calls may be cross-core and asynchronous
  */
 public:
+  /*
+   * should not be called cross-core
+   */
+
   void set_handshake_listener(HandshakeListener &hl) {
+    assert(seastar::this_shard_id() == get_shard_id());
     ceph_assert_always(handshake_listener == nullptr);
     handshake_listener = &hl;
   }
 
+  io_handler_state get_states() const {
+    // might be called from prv_sid during wait_io_exit_dispatching()
+    return {in_seq, is_out_queued(), has_out_sent()};
+  }
+
   struct io_stat_printer {
     const IOHandler &io_handler;
   };
   void print_io_stat(std::ostream &out) const;
 
-  seastar::future<> close_io(
-      bool is_dispatch_reset,
-      bool is_replace) {
-    ceph_assert_always(io_state == io_state_t::drop);
-
-    if (is_dispatch_reset) {
-      dispatch_reset(is_replace);
-    }
+  seastar::future<> set_accepted_sid(
+      crosscore_t::seq_t cc_seq,
+      seastar::shard_id sid,
+      ConnectionFRef conn_fref);
 
-    ceph_assert_always(conn_ref);
-    conn_ref.reset();
+  /*
+   * may be called cross-core
+   */
 
-    assert(!gate.is_closed());
-    return gate.close();
-  }
+  seastar::future<> close_io(
+      crosscore_t::seq_t cc_seq,
+      bool is_dispatch_reset,
+      bool is_replace);
 
   /**
    * io_state_t
@@ -122,36 +241,219 @@ public:
    * io behavior accordingly.
    */
   enum class io_state_t : uint8_t {
-    none,  // no IO is possible as the connection is not available to the user yet.
-    delay, // IO is delayed until open.
-    open,  // Dispatch In and Out concurrently.
-    drop   // Drop IO as the connection is closed.
+    none,    // no IO is possible as the connection is not available to the user yet.
+    delay,   // IO is delayed until open.
+    open,    // Dispatch In and Out concurrently.
+    drop,    // Drop IO as the connection is closed.
+    switched // IO is switched to a different core
+             // (is moved to maybe_prv_shard_states)
   };
   friend class fmt::formatter<io_state_t>;
 
-  void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+  seastar::future<> set_io_state(
+      crosscore_t::seq_t cc_seq,
+      io_state_t new_state,
+      FrameAssemblerV2Ref fa,
+      bool set_notify_out);
 
-  seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
+  struct exit_dispatching_ret {
+    FrameAssemblerV2Ref frame_assembler;
+    io_handler_state io_states;
+  };
+  seastar::future<exit_dispatching_ret>
+  wait_io_exit_dispatching(
+      crosscore_t::seq_t cc_seq);
 
-  void reset_session(bool full);
+  seastar::future<> reset_session(
+      crosscore_t::seq_t cc_seq,
+      bool full);
 
-  void requeue_out_sent_up_to(seq_num_t seq);
+  seastar::future<> reset_peer_state(
+      crosscore_t::seq_t cc_seq);
 
-  void requeue_out_sent();
+  seastar::future<> requeue_out_sent_up_to(
+      crosscore_t::seq_t cc_seq,
+      seq_num_t msg_seq);
 
-  bool is_out_queued_or_sent() const {
-    return is_out_queued() || !out_sent_msgs.empty();
-  }
+  seastar::future<> requeue_out_sent(
+      crosscore_t::seq_t cc_seq);
 
-  seq_num_t get_in_seq() const {
-    return in_seq;
+  seastar::future<> dispatch_accept(
+      crosscore_t::seq_t cc_seq,
+      seastar::shard_id new_sid,
+      ConnectionFRef,
+      bool is_replace);
+
+  seastar::future<> dispatch_connect(
+      crosscore_t::seq_t cc_seq,
+      seastar::shard_id new_sid,
+      ConnectionFRef);
+
+ private:
+  class shard_states_t;
+  using shard_states_ref_t = std::unique_ptr<shard_states_t>;
+
+  class shard_states_t {
+  public:
+    shard_states_t(seastar::shard_id _sid, io_state_t state)
+      : sid{_sid}, io_state{state} {}
+
+    seastar::shard_id get_shard_id() const {
+      return sid;
+    }
+
+    io_state_t get_io_state() const {
+      assert(seastar::this_shard_id() == sid);
+      return io_state;
+    }
+
+    void set_io_state(io_state_t new_state) {
+      assert(seastar::this_shard_id() == sid);
+      assert(io_state != new_state);
+      pr_io_state_changed.set_value();
+      pr_io_state_changed = seastar::promise<>();
+      if (io_state == io_state_t::open) {
+        // from open
+        if (out_dispatching) {
+          ceph_assert_always(!out_exit_dispatching.has_value());
+          out_exit_dispatching = seastar::promise<>();
+        }
+      }
+      io_state = new_state;
+    }
+
+    seastar::future<> wait_state_change() {
+      assert(seastar::this_shard_id() == sid);
+      return pr_io_state_changed.get_future();
+    }
+
+    template <typename Func>
+    void dispatch_in_background(
+        const char *what, SocketConnection &who, Func &&func) {
+      assert(seastar::this_shard_id() == sid);
+      ceph_assert_always(!gate.is_closed());
+      gate.dispatch_in_background(what, who, std::move(func));
+    }
+
+    void enter_in_dispatching() {
+      assert(seastar::this_shard_id() == sid);
+      assert(io_state == io_state_t::open);
+      ceph_assert_always(!in_exit_dispatching.has_value());
+      in_exit_dispatching = seastar::promise<>();
+    }
+
+    void exit_in_dispatching() {
+      assert(seastar::this_shard_id() == sid);
+      assert(io_state != io_state_t::open);
+      ceph_assert_always(in_exit_dispatching.has_value());
+      in_exit_dispatching->set_value();
+      in_exit_dispatching = std::nullopt;
+    }
+
+    bool try_enter_out_dispatching() {
+      assert(seastar::this_shard_id() == sid);
+      if (out_dispatching) {
+        // already dispatching out
+        return false;
+      }
+      switch (io_state) {
+      case io_state_t::open:
+        [[fallthrough]];
+      case io_state_t::delay:
+        out_dispatching = true;
+        return true;
+      case io_state_t::drop:
+        [[fallthrough]];
+      case io_state_t::switched:
+        // do not dispatch out
+        return false;
+      default:
+        ceph_abort("impossible");
+      }
+    }
+
+    void notify_out_dispatching_stopped(
+        const char *what, SocketConnection &conn);
+
+    void exit_out_dispatching(
+        const char *what, SocketConnection &conn) {
+      assert(seastar::this_shard_id() == sid);
+      ceph_assert_always(out_dispatching);
+      out_dispatching = false;
+      notify_out_dispatching_stopped(what, conn);
+    }
+
+    seastar::future<> wait_io_exit_dispatching();
+
+    seastar::future<> close() {
+      assert(seastar::this_shard_id() == sid);
+      assert(!gate.is_closed());
+      return gate.close();
+    }
+
+    bool assert_closed_and_exit() const {
+      assert(seastar::this_shard_id() == sid);
+      if (gate.is_closed()) {
+        ceph_assert_always(io_state == io_state_t::drop ||
+                           io_state == io_state_t::switched);
+        ceph_assert_always(!out_dispatching);
+        ceph_assert_always(!out_exit_dispatching);
+        ceph_assert_always(!in_exit_dispatching);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    static shard_states_ref_t create(
+        seastar::shard_id sid, io_state_t state) {
+      return std::make_unique<shard_states_t>(sid, state);
+    }
+
+    static shard_states_ref_t create_from_previous(
+        shard_states_t &prv_states, seastar::shard_id new_sid);
+
+  private:
+    const seastar::shard_id sid;
+    io_state_t io_state;
+
+    crimson::common::Gated gate;
+    seastar::promise<> pr_io_state_changed;
+    bool out_dispatching = false;
+    std::optional<seastar::promise<>> out_exit_dispatching;
+    std::optional<seastar::promise<>> in_exit_dispatching;
+  };
+
+  void do_set_io_state(
+      io_state_t new_state,
+      std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+      FrameAssemblerV2Ref fa = nullptr,
+      bool set_notify_out = false);
+
+  io_state_t get_io_state() const {
+    return shard_states->get_io_state();
   }
 
-  void dispatch_accept();
+  void do_requeue_out_sent();
 
-  void dispatch_connect();
+  void do_requeue_out_sent_up_to(seq_num_t seq);
+
+  void assign_frame_assembler(FrameAssemblerV2Ref);
+
+  seastar::future<> send_redirected(MessageFRef msg);
+
+  seastar::future<> do_send(MessageFRef msg);
+
+  seastar::future<> send_keepalive_redirected();
+
+  seastar::future<> do_send_keepalive();
+
+  seastar::future<> to_new_sid(
+      crosscore_t::seq_t cc_seq,
+      seastar::shard_id new_sid,
+      ConnectionFRef,
+      std::optional<bool> is_replace);
 
- private:
   void dispatch_reset(bool is_replace);
 
   void dispatch_remote_reset();
@@ -163,26 +465,58 @@ public:
             next_keepalive_ack.has_value());
   }
 
+  bool has_out_sent() const {
+    return !out_sent_msgs.empty();
+  }
+
+  void reset_in();
+
   void reset_out();
 
-  seastar::future<stop_t> try_exit_out_dispatch();
+  void discard_out_sent();
 
-  seastar::future<> do_out_dispatch();
+  seastar::future<> do_out_dispatch(shard_states_t &ctx);
 
-  ceph::bufferlist sweep_out_pending_msgs_to_sent(
+#ifdef UNIT_TESTS_BUILT
+  struct sweep_ret {
+    ceph::bufferlist bl;
+    std::vector<ceph::msgr::v2::Tag> tags;
+  };
+  sweep_ret
+#else
+  ceph::bufferlist
+#endif
+  sweep_out_pending_msgs_to_sent(
       bool require_keepalive,
       std::optional<utime_t> maybe_keepalive_ack,
       bool require_ack);
 
+  void maybe_notify_out_dispatch();
+
   void notify_out_dispatch();
 
   void ack_out_sent(seq_num_t seq);
 
-  seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
+  seastar::future<> read_message(
+      shard_states_t &ctx,
+      utime_t throttle_stamp,
+      std::size_t msg_size);
 
   void do_in_dispatch();
 
+  seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid);
+
 private:
+  shard_states_ref_t shard_states;
+
+  crosscore_t crosscore;
+
+  // drop was happening in the previous sid
+  std::optional<seastar::shard_id> maybe_dropped_sid;
+
+  // the remaining states in the previous sid for cleanup, see to_new_sid()
+  shard_states_ref_t maybe_prv_shard_states;
+
   ChainedDispatchers &dispatchers;
 
   SocketConnection &conn;
@@ -192,35 +526,24 @@ private:
 
   HandshakeListener *handshake_listener = nullptr;
 
-  crimson::common::Gated gate;
-
   FrameAssemblerV2Ref frame_assembler;
 
   bool protocol_is_connected = false;
 
   bool need_dispatch_reset = true;
 
-  io_state_t io_state = io_state_t::none;
-
-  // wait until current io_state changed
-  seastar::promise<> io_state_changed;
-
   /*
    * out states for writing
    */
 
-  bool out_dispatching = false;
-
-  std::optional<seastar::promise<>> out_exit_dispatching;
-
   /// the seq num of the last transmitted message
   seq_num_t out_seq = 0;
 
   // messages to be resent after connection gets reset
-  std::deque<MessageURef> out_pending_msgs;
+  std::deque<MessageFRef> out_pending_msgs;
 
   // messages sent, but not yet acked by peer
-  std::deque<MessageURef> out_sent_msgs;
+  std::deque<MessageFRef> out_sent_msgs;
 
   bool need_keepalive = false;
 
@@ -228,12 +551,12 @@ private:
 
   uint64_t ack_left = 0;
 
+  bool need_notify_out = false;
+
   /*
    * in states for reading
    */
 
-  std::optional<seastar::promise<>> in_exit_dispatching;
-
   /// the seq num of the last received message
   seq_num_t in_seq = 0;
 
@@ -250,6 +573,23 @@ inline std::ostream& operator<<(
 
 } // namespace crimson::net
 
+template <>
+struct fmt::formatter<crimson::net::io_handler_state> {
+  constexpr auto parse(format_parse_context& ctx) {
+    return ctx.begin();
+  }
+
+  template <typename FormatContext>
+  auto format(crimson::net::io_handler_state state, FormatContext& ctx) {
+    return fmt::format_to(
+        ctx.out(),
+        "io(in_seq={}, is_out_queued={}, has_out_sent={})",
+        state.in_seq,
+        state.is_out_queued,
+        state.has_out_sent);
+  }
+};
+
 template <>
 struct fmt::formatter<crimson::net::IOHandler::io_state_t>
   : fmt::formatter<std::string_view> {
@@ -270,6 +610,9 @@ struct fmt::formatter<crimson::net::IOHandler::io_state_t>
     case drop:
       name = "drop";
       break;
+    case switched:
+      name = "switched";
+      break;
     }
     return formatter<string_view>::format(name, ctx);
   }