X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;ds=sidebyside;f=ceph%2Fsrc%2Fcrimson%2Fnet%2Fio_handler.h;fp=ceph%2Fsrc%2Fcrimson%2Fnet%2Fio_handler.h;h=f53c2ba646847e4706a925838322b4627fe7a97d;hb=aee94f6923ba628a85d855d0c5316d0da78bfa2a;hp=e04b6356e8674ea9d95854616ae99426d8276e80;hpb=27f45121cc74e31203777ad565f78d8aad9b92a2;p=ceph.git diff --git a/ceph/src/crimson/net/io_handler.h b/ceph/src/crimson/net/io_handler.h index e04b6356e..f53c2ba64 100644 --- a/ceph/src/crimson/net/io_handler.h +++ b/ceph/src/crimson/net/io_handler.h @@ -3,6 +3,9 @@ #pragma once +#include + +#include #include #include "crimson/common/gated.h" @@ -12,12 +15,106 @@ namespace crimson::net { +/** + * crosscore_t + * + * To preserve the event order across cores. + */ +class crosscore_t { +public: + using seq_t = uint64_t; + + crosscore_t() = default; + ~crosscore_t() = default; + + seq_t get_in_seq() const { + return in_seq; + } + + seq_t prepare_submit() { + ++out_seq; + return out_seq; + } + + bool proceed_or_wait(seq_t seq) { + if (seq == in_seq + 1) { + ++in_seq; + if (unlikely(in_pr_wait.has_value())) { + in_pr_wait->set_value(); + in_pr_wait = std::nullopt; + } + return true; + } else { + return false; + } + } + + seastar::future<> wait(seq_t seq) { + assert(seq != in_seq + 1); + if (!in_pr_wait.has_value()) { + in_pr_wait = seastar::shared_promise<>(); + } + return in_pr_wait->get_shared_future(); + } + +private: + seq_t out_seq = 0; + seq_t in_seq = 0; + std::optional> in_pr_wait; +}; + +/** + * io_handler_state + * + * It is required to populate the states from IOHandler to ProtocolV2 + * asynchronously. + */ +struct io_handler_state { + seq_num_t in_seq; + bool is_out_queued; + bool has_out_sent; + + bool is_out_queued_or_sent() const { + return is_out_queued || has_out_sent; + } + + /* + * should be consistent with the accroding interfaces in IOHandler + */ + + void reset_session(bool full) { + in_seq = 0; + if (full) { + is_out_queued = false; + has_out_sent = false; + } + } + + void reset_peer_state() { + in_seq = 0; + is_out_queued = is_out_queued_or_sent(); + has_out_sent = false; + } + + void requeue_out_sent_up_to() { + // noop since the information is insufficient + } + + void requeue_out_sent() { + if (has_out_sent) { + has_out_sent = false; + is_out_queued = true; + } + } +}; + /** * HandshakeListener * - * The interface class for IOHandler to notify the ProtocolV2 for handshake. + * The interface class for IOHandler to notify the ProtocolV2. * - * The notifications may be cross-core and asynchronous. + * The notifications may be cross-core and must be sent to + * SocketConnection::get_messenger_shard_id() */ class HandshakeListener { public: @@ -28,11 +125,17 @@ public: HandshakeListener &operator=(const HandshakeListener &) = delete; HandshakeListener &operator=(HandshakeListener &&) = delete; - virtual void notify_out() = 0; + virtual seastar::future<> notify_out( + crosscore_t::seq_t cc_seq) = 0; - virtual void notify_out_fault(const char *where, std::exception_ptr) = 0; + virtual seastar::future<> notify_out_fault( + crosscore_t::seq_t cc_seq, + const char *where, + std::exception_ptr, + io_handler_state) = 0; - virtual void notify_mark_down() = 0; + virtual seastar::future<> notify_mark_down( + crosscore_t::seq_t cc_seq) = 0; protected: HandshakeListener() = default; @@ -60,24 +163,32 @@ public: /* * as ConnectionHandler */ -private: +public: + seastar::shard_id get_shard_id() const final { + return shard_states->get_shard_id(); + } + bool is_connected() const final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); return protocol_is_connected; } - seastar::future<> send(MessageURef msg) final; + seastar::future<> send(MessageFRef msg) final; seastar::future<> send_keepalive() final; clock_t::time_point get_last_keepalive() const final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); return last_keepalive; } clock_t::time_point get_last_keepalive_ack() const final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); return last_keepalive_ack; } void set_last_keepalive_ack(clock_t::time_point when) final { + ceph_assert_always(seastar::this_shard_id() == get_shard_id()); last_keepalive_ack = when; } @@ -89,31 +200,39 @@ private: * The calls may be cross-core and asynchronous */ public: + /* + * should not be called cross-core + */ + void set_handshake_listener(HandshakeListener &hl) { + assert(seastar::this_shard_id() == get_shard_id()); ceph_assert_always(handshake_listener == nullptr); handshake_listener = &hl; } + io_handler_state get_states() const { + // might be called from prv_sid during wait_io_exit_dispatching() + return {in_seq, is_out_queued(), has_out_sent()}; + } + struct io_stat_printer { const IOHandler &io_handler; }; void print_io_stat(std::ostream &out) const; - seastar::future<> close_io( - bool is_dispatch_reset, - bool is_replace) { - ceph_assert_always(io_state == io_state_t::drop); - - if (is_dispatch_reset) { - dispatch_reset(is_replace); - } + seastar::future<> set_accepted_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id sid, + ConnectionFRef conn_fref); - ceph_assert_always(conn_ref); - conn_ref.reset(); + /* + * may be called cross-core + */ - assert(!gate.is_closed()); - return gate.close(); - } + seastar::future<> close_io( + crosscore_t::seq_t cc_seq, + bool is_dispatch_reset, + bool is_replace); /** * io_state_t @@ -122,36 +241,219 @@ public: * io behavior accordingly. */ enum class io_state_t : uint8_t { - none, // no IO is possible as the connection is not available to the user yet. - delay, // IO is delayed until open. - open, // Dispatch In and Out concurrently. - drop // Drop IO as the connection is closed. + none, // no IO is possible as the connection is not available to the user yet. + delay, // IO is delayed until open. + open, // Dispatch In and Out concurrently. + drop, // Drop IO as the connection is closed. + switched // IO is switched to a different core + // (is moved to maybe_prv_shard_states) }; friend class fmt::formatter; - void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr); + seastar::future<> set_io_state( + crosscore_t::seq_t cc_seq, + io_state_t new_state, + FrameAssemblerV2Ref fa, + bool set_notify_out); - seastar::future wait_io_exit_dispatching(); + struct exit_dispatching_ret { + FrameAssemblerV2Ref frame_assembler; + io_handler_state io_states; + }; + seastar::future + wait_io_exit_dispatching( + crosscore_t::seq_t cc_seq); - void reset_session(bool full); + seastar::future<> reset_session( + crosscore_t::seq_t cc_seq, + bool full); - void requeue_out_sent_up_to(seq_num_t seq); + seastar::future<> reset_peer_state( + crosscore_t::seq_t cc_seq); - void requeue_out_sent(); + seastar::future<> requeue_out_sent_up_to( + crosscore_t::seq_t cc_seq, + seq_num_t msg_seq); - bool is_out_queued_or_sent() const { - return is_out_queued() || !out_sent_msgs.empty(); - } + seastar::future<> requeue_out_sent( + crosscore_t::seq_t cc_seq); - seq_num_t get_in_seq() const { - return in_seq; + seastar::future<> dispatch_accept( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef, + bool is_replace); + + seastar::future<> dispatch_connect( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef); + + private: + class shard_states_t; + using shard_states_ref_t = std::unique_ptr; + + class shard_states_t { + public: + shard_states_t(seastar::shard_id _sid, io_state_t state) + : sid{_sid}, io_state{state} {} + + seastar::shard_id get_shard_id() const { + return sid; + } + + io_state_t get_io_state() const { + assert(seastar::this_shard_id() == sid); + return io_state; + } + + void set_io_state(io_state_t new_state) { + assert(seastar::this_shard_id() == sid); + assert(io_state != new_state); + pr_io_state_changed.set_value(); + pr_io_state_changed = seastar::promise<>(); + if (io_state == io_state_t::open) { + // from open + if (out_dispatching) { + ceph_assert_always(!out_exit_dispatching.has_value()); + out_exit_dispatching = seastar::promise<>(); + } + } + io_state = new_state; + } + + seastar::future<> wait_state_change() { + assert(seastar::this_shard_id() == sid); + return pr_io_state_changed.get_future(); + } + + template + void dispatch_in_background( + const char *what, SocketConnection &who, Func &&func) { + assert(seastar::this_shard_id() == sid); + ceph_assert_always(!gate.is_closed()); + gate.dispatch_in_background(what, who, std::move(func)); + } + + void enter_in_dispatching() { + assert(seastar::this_shard_id() == sid); + assert(io_state == io_state_t::open); + ceph_assert_always(!in_exit_dispatching.has_value()); + in_exit_dispatching = seastar::promise<>(); + } + + void exit_in_dispatching() { + assert(seastar::this_shard_id() == sid); + assert(io_state != io_state_t::open); + ceph_assert_always(in_exit_dispatching.has_value()); + in_exit_dispatching->set_value(); + in_exit_dispatching = std::nullopt; + } + + bool try_enter_out_dispatching() { + assert(seastar::this_shard_id() == sid); + if (out_dispatching) { + // already dispatching out + return false; + } + switch (io_state) { + case io_state_t::open: + [[fallthrough]]; + case io_state_t::delay: + out_dispatching = true; + return true; + case io_state_t::drop: + [[fallthrough]]; + case io_state_t::switched: + // do not dispatch out + return false; + default: + ceph_abort("impossible"); + } + } + + void notify_out_dispatching_stopped( + const char *what, SocketConnection &conn); + + void exit_out_dispatching( + const char *what, SocketConnection &conn) { + assert(seastar::this_shard_id() == sid); + ceph_assert_always(out_dispatching); + out_dispatching = false; + notify_out_dispatching_stopped(what, conn); + } + + seastar::future<> wait_io_exit_dispatching(); + + seastar::future<> close() { + assert(seastar::this_shard_id() == sid); + assert(!gate.is_closed()); + return gate.close(); + } + + bool assert_closed_and_exit() const { + assert(seastar::this_shard_id() == sid); + if (gate.is_closed()) { + ceph_assert_always(io_state == io_state_t::drop || + io_state == io_state_t::switched); + ceph_assert_always(!out_dispatching); + ceph_assert_always(!out_exit_dispatching); + ceph_assert_always(!in_exit_dispatching); + return true; + } else { + return false; + } + } + + static shard_states_ref_t create( + seastar::shard_id sid, io_state_t state) { + return std::make_unique(sid, state); + } + + static shard_states_ref_t create_from_previous( + shard_states_t &prv_states, seastar::shard_id new_sid); + + private: + const seastar::shard_id sid; + io_state_t io_state; + + crimson::common::Gated gate; + seastar::promise<> pr_io_state_changed; + bool out_dispatching = false; + std::optional> out_exit_dispatching; + std::optional> in_exit_dispatching; + }; + + void do_set_io_state( + io_state_t new_state, + std::optional cc_seq = std::nullopt, + FrameAssemblerV2Ref fa = nullptr, + bool set_notify_out = false); + + io_state_t get_io_state() const { + return shard_states->get_io_state(); } - void dispatch_accept(); + void do_requeue_out_sent(); - void dispatch_connect(); + void do_requeue_out_sent_up_to(seq_num_t seq); + + void assign_frame_assembler(FrameAssemblerV2Ref); + + seastar::future<> send_redirected(MessageFRef msg); + + seastar::future<> do_send(MessageFRef msg); + + seastar::future<> send_keepalive_redirected(); + + seastar::future<> do_send_keepalive(); + + seastar::future<> to_new_sid( + crosscore_t::seq_t cc_seq, + seastar::shard_id new_sid, + ConnectionFRef, + std::optional is_replace); - private: void dispatch_reset(bool is_replace); void dispatch_remote_reset(); @@ -163,26 +465,58 @@ public: next_keepalive_ack.has_value()); } + bool has_out_sent() const { + return !out_sent_msgs.empty(); + } + + void reset_in(); + void reset_out(); - seastar::future try_exit_out_dispatch(); + void discard_out_sent(); - seastar::future<> do_out_dispatch(); + seastar::future<> do_out_dispatch(shard_states_t &ctx); - ceph::bufferlist sweep_out_pending_msgs_to_sent( +#ifdef UNIT_TESTS_BUILT + struct sweep_ret { + ceph::bufferlist bl; + std::vector tags; + }; + sweep_ret +#else + ceph::bufferlist +#endif + sweep_out_pending_msgs_to_sent( bool require_keepalive, std::optional maybe_keepalive_ack, bool require_ack); + void maybe_notify_out_dispatch(); + void notify_out_dispatch(); void ack_out_sent(seq_num_t seq); - seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size); + seastar::future<> read_message( + shard_states_t &ctx, + utime_t throttle_stamp, + std::size_t msg_size); void do_in_dispatch(); + seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid); + private: + shard_states_ref_t shard_states; + + crosscore_t crosscore; + + // drop was happening in the previous sid + std::optional maybe_dropped_sid; + + // the remaining states in the previous sid for cleanup, see to_new_sid() + shard_states_ref_t maybe_prv_shard_states; + ChainedDispatchers &dispatchers; SocketConnection &conn; @@ -192,35 +526,24 @@ private: HandshakeListener *handshake_listener = nullptr; - crimson::common::Gated gate; - FrameAssemblerV2Ref frame_assembler; bool protocol_is_connected = false; bool need_dispatch_reset = true; - io_state_t io_state = io_state_t::none; - - // wait until current io_state changed - seastar::promise<> io_state_changed; - /* * out states for writing */ - bool out_dispatching = false; - - std::optional> out_exit_dispatching; - /// the seq num of the last transmitted message seq_num_t out_seq = 0; // messages to be resent after connection gets reset - std::deque out_pending_msgs; + std::deque out_pending_msgs; // messages sent, but not yet acked by peer - std::deque out_sent_msgs; + std::deque out_sent_msgs; bool need_keepalive = false; @@ -228,12 +551,12 @@ private: uint64_t ack_left = 0; + bool need_notify_out = false; + /* * in states for reading */ - std::optional> in_exit_dispatching; - /// the seq num of the last received message seq_num_t in_seq = 0; @@ -250,6 +573,23 @@ inline std::ostream& operator<<( } // namespace crimson::net +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { + return ctx.begin(); + } + + template + auto format(crimson::net::io_handler_state state, FormatContext& ctx) { + return fmt::format_to( + ctx.out(), + "io(in_seq={}, is_out_queued={}, has_out_sent={})", + state.in_seq, + state.is_out_queued, + state.has_out_sent); + } +}; + template <> struct fmt::formatter : fmt::formatter { @@ -270,6 +610,9 @@ struct fmt::formatter case drop: name = "drop"; break; + case switched: + name = "switched"; + break; } return formatter::format(name, ctx); }