#pragma once
+#include <vector>
+
+#include <seastar/core/shared_future.hh>
#include <seastar/util/later.hh>
#include "crimson/common/gated.h"
namespace crimson::net {
+/**
+ * crosscore_t
+ *
+ * To preserve the event order across cores.
+ */
+class crosscore_t {
+public:
+ using seq_t = uint64_t;
+
+ crosscore_t() = default;
+ ~crosscore_t() = default;
+
+ seq_t get_in_seq() const {
+ return in_seq;
+ }
+
+ seq_t prepare_submit() {
+ ++out_seq;
+ return out_seq;
+ }
+
+ bool proceed_or_wait(seq_t seq) {
+ if (seq == in_seq + 1) {
+ ++in_seq;
+ if (unlikely(in_pr_wait.has_value())) {
+ in_pr_wait->set_value();
+ in_pr_wait = std::nullopt;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ seastar::future<> wait(seq_t seq) {
+ assert(seq != in_seq + 1);
+ if (!in_pr_wait.has_value()) {
+ in_pr_wait = seastar::shared_promise<>();
+ }
+ return in_pr_wait->get_shared_future();
+ }
+
+private:
+ seq_t out_seq = 0;
+ seq_t in_seq = 0;
+ std::optional<seastar::shared_promise<>> in_pr_wait;
+};
+
+/**
+ * io_handler_state
+ *
+ * It is required to populate the states from IOHandler to ProtocolV2
+ * asynchronously.
+ */
+struct io_handler_state {
+ seq_num_t in_seq;
+ bool is_out_queued;
+ bool has_out_sent;
+
+ bool is_out_queued_or_sent() const {
+ return is_out_queued || has_out_sent;
+ }
+
+ /*
+ * should be consistent with the accroding interfaces in IOHandler
+ */
+
+ void reset_session(bool full) {
+ in_seq = 0;
+ if (full) {
+ is_out_queued = false;
+ has_out_sent = false;
+ }
+ }
+
+ void reset_peer_state() {
+ in_seq = 0;
+ is_out_queued = is_out_queued_or_sent();
+ has_out_sent = false;
+ }
+
+ void requeue_out_sent_up_to() {
+ // noop since the information is insufficient
+ }
+
+ void requeue_out_sent() {
+ if (has_out_sent) {
+ has_out_sent = false;
+ is_out_queued = true;
+ }
+ }
+};
+
/**
* HandshakeListener
*
- * The interface class for IOHandler to notify the ProtocolV2 for handshake.
+ * The interface class for IOHandler to notify the ProtocolV2.
*
- * The notifications may be cross-core and asynchronous.
+ * The notifications may be cross-core and must be sent to
+ * SocketConnection::get_messenger_shard_id()
*/
class HandshakeListener {
public:
HandshakeListener &operator=(const HandshakeListener &) = delete;
HandshakeListener &operator=(HandshakeListener &&) = delete;
- virtual void notify_out() = 0;
+ virtual seastar::future<> notify_out(
+ crosscore_t::seq_t cc_seq) = 0;
- virtual void notify_out_fault(const char *where, std::exception_ptr) = 0;
+ virtual seastar::future<> notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr,
+ io_handler_state) = 0;
- virtual void notify_mark_down() = 0;
+ virtual seastar::future<> notify_mark_down(
+ crosscore_t::seq_t cc_seq) = 0;
protected:
HandshakeListener() = default;
/*
* as ConnectionHandler
*/
-private:
+public:
+ seastar::shard_id get_shard_id() const final {
+ return shard_states->get_shard_id();
+ }
+
bool is_connected() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
return protocol_is_connected;
}
- seastar::future<> send(MessageURef msg) final;
+ seastar::future<> send(MessageFRef msg) final;
seastar::future<> send_keepalive() final;
clock_t::time_point get_last_keepalive() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
return last_keepalive;
}
clock_t::time_point get_last_keepalive_ack() const final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
return last_keepalive_ack;
}
void set_last_keepalive_ack(clock_t::time_point when) final {
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
last_keepalive_ack = when;
}
* The calls may be cross-core and asynchronous
*/
public:
+ /*
+ * should not be called cross-core
+ */
+
void set_handshake_listener(HandshakeListener &hl) {
+ assert(seastar::this_shard_id() == get_shard_id());
ceph_assert_always(handshake_listener == nullptr);
handshake_listener = &hl;
}
+ io_handler_state get_states() const {
+ // might be called from prv_sid during wait_io_exit_dispatching()
+ return {in_seq, is_out_queued(), has_out_sent()};
+ }
+
struct io_stat_printer {
const IOHandler &io_handler;
};
void print_io_stat(std::ostream &out) const;
- seastar::future<> close_io(
- bool is_dispatch_reset,
- bool is_replace) {
- ceph_assert_always(io_state == io_state_t::drop);
-
- if (is_dispatch_reset) {
- dispatch_reset(is_replace);
- }
+ seastar::future<> set_accepted_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id sid,
+ ConnectionFRef conn_fref);
- ceph_assert_always(conn_ref);
- conn_ref.reset();
+ /*
+ * may be called cross-core
+ */
- assert(!gate.is_closed());
- return gate.close();
- }
+ seastar::future<> close_io(
+ crosscore_t::seq_t cc_seq,
+ bool is_dispatch_reset,
+ bool is_replace);
/**
* io_state_t
* io behavior accordingly.
*/
enum class io_state_t : uint8_t {
- none, // no IO is possible as the connection is not available to the user yet.
- delay, // IO is delayed until open.
- open, // Dispatch In and Out concurrently.
- drop // Drop IO as the connection is closed.
+ none, // no IO is possible as the connection is not available to the user yet.
+ delay, // IO is delayed until open.
+ open, // Dispatch In and Out concurrently.
+ drop, // Drop IO as the connection is closed.
+ switched // IO is switched to a different core
+ // (is moved to maybe_prv_shard_states)
};
friend class fmt::formatter<io_state_t>;
- void set_io_state(const io_state_t &new_state, FrameAssemblerV2Ref fa=nullptr);
+ seastar::future<> set_io_state(
+ crosscore_t::seq_t cc_seq,
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out);
- seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
+ struct exit_dispatching_ret {
+ FrameAssemblerV2Ref frame_assembler;
+ io_handler_state io_states;
+ };
+ seastar::future<exit_dispatching_ret>
+ wait_io_exit_dispatching(
+ crosscore_t::seq_t cc_seq);
- void reset_session(bool full);
+ seastar::future<> reset_session(
+ crosscore_t::seq_t cc_seq,
+ bool full);
- void requeue_out_sent_up_to(seq_num_t seq);
+ seastar::future<> reset_peer_state(
+ crosscore_t::seq_t cc_seq);
- void requeue_out_sent();
+ seastar::future<> requeue_out_sent_up_to(
+ crosscore_t::seq_t cc_seq,
+ seq_num_t msg_seq);
- bool is_out_queued_or_sent() const {
- return is_out_queued() || !out_sent_msgs.empty();
- }
+ seastar::future<> requeue_out_sent(
+ crosscore_t::seq_t cc_seq);
- seq_num_t get_in_seq() const {
- return in_seq;
+ seastar::future<> dispatch_accept(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef,
+ bool is_replace);
+
+ seastar::future<> dispatch_connect(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef);
+
+ private:
+ class shard_states_t;
+ using shard_states_ref_t = std::unique_ptr<shard_states_t>;
+
+ class shard_states_t {
+ public:
+ shard_states_t(seastar::shard_id _sid, io_state_t state)
+ : sid{_sid}, io_state{state} {}
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+ io_state_t get_io_state() const {
+ assert(seastar::this_shard_id() == sid);
+ return io_state;
+ }
+
+ void set_io_state(io_state_t new_state) {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != new_state);
+ pr_io_state_changed.set_value();
+ pr_io_state_changed = seastar::promise<>();
+ if (io_state == io_state_t::open) {
+ // from open
+ if (out_dispatching) {
+ ceph_assert_always(!out_exit_dispatching.has_value());
+ out_exit_dispatching = seastar::promise<>();
+ }
+ }
+ io_state = new_state;
+ }
+
+ seastar::future<> wait_state_change() {
+ assert(seastar::this_shard_id() == sid);
+ return pr_io_state_changed.get_future();
+ }
+
+ template <typename Func>
+ void dispatch_in_background(
+ const char *what, SocketConnection &who, Func &&func) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(!gate.is_closed());
+ gate.dispatch_in_background(what, who, std::move(func));
+ }
+
+ void enter_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state == io_state_t::open);
+ ceph_assert_always(!in_exit_dispatching.has_value());
+ in_exit_dispatching = seastar::promise<>();
+ }
+
+ void exit_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != io_state_t::open);
+ ceph_assert_always(in_exit_dispatching.has_value());
+ in_exit_dispatching->set_value();
+ in_exit_dispatching = std::nullopt;
+ }
+
+ bool try_enter_out_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ if (out_dispatching) {
+ // already dispatching out
+ return false;
+ }
+ switch (io_state) {
+ case io_state_t::open:
+ [[fallthrough]];
+ case io_state_t::delay:
+ out_dispatching = true;
+ return true;
+ case io_state_t::drop:
+ [[fallthrough]];
+ case io_state_t::switched:
+ // do not dispatch out
+ return false;
+ default:
+ ceph_abort("impossible");
+ }
+ }
+
+ void notify_out_dispatching_stopped(
+ const char *what, SocketConnection &conn);
+
+ void exit_out_dispatching(
+ const char *what, SocketConnection &conn) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(out_dispatching);
+ out_dispatching = false;
+ notify_out_dispatching_stopped(what, conn);
+ }
+
+ seastar::future<> wait_io_exit_dispatching();
+
+ seastar::future<> close() {
+ assert(seastar::this_shard_id() == sid);
+ assert(!gate.is_closed());
+ return gate.close();
+ }
+
+ bool assert_closed_and_exit() const {
+ assert(seastar::this_shard_id() == sid);
+ if (gate.is_closed()) {
+ ceph_assert_always(io_state == io_state_t::drop ||
+ io_state == io_state_t::switched);
+ ceph_assert_always(!out_dispatching);
+ ceph_assert_always(!out_exit_dispatching);
+ ceph_assert_always(!in_exit_dispatching);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ static shard_states_ref_t create(
+ seastar::shard_id sid, io_state_t state) {
+ return std::make_unique<shard_states_t>(sid, state);
+ }
+
+ static shard_states_ref_t create_from_previous(
+ shard_states_t &prv_states, seastar::shard_id new_sid);
+
+ private:
+ const seastar::shard_id sid;
+ io_state_t io_state;
+
+ crimson::common::Gated gate;
+ seastar::promise<> pr_io_state_changed;
+ bool out_dispatching = false;
+ std::optional<seastar::promise<>> out_exit_dispatching;
+ std::optional<seastar::promise<>> in_exit_dispatching;
+ };
+
+ void do_set_io_state(
+ io_state_t new_state,
+ std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+ FrameAssemblerV2Ref fa = nullptr,
+ bool set_notify_out = false);
+
+ io_state_t get_io_state() const {
+ return shard_states->get_io_state();
}
- void dispatch_accept();
+ void do_requeue_out_sent();
- void dispatch_connect();
+ void do_requeue_out_sent_up_to(seq_num_t seq);
+
+ void assign_frame_assembler(FrameAssemblerV2Ref);
+
+ seastar::future<> send_redirected(MessageFRef msg);
+
+ seastar::future<> do_send(MessageFRef msg);
+
+ seastar::future<> send_keepalive_redirected();
+
+ seastar::future<> do_send_keepalive();
+
+ seastar::future<> to_new_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef,
+ std::optional<bool> is_replace);
- private:
void dispatch_reset(bool is_replace);
void dispatch_remote_reset();
next_keepalive_ack.has_value());
}
+ bool has_out_sent() const {
+ return !out_sent_msgs.empty();
+ }
+
+ void reset_in();
+
void reset_out();
- seastar::future<stop_t> try_exit_out_dispatch();
+ void discard_out_sent();
- seastar::future<> do_out_dispatch();
+ seastar::future<> do_out_dispatch(shard_states_t &ctx);
- ceph::bufferlist sweep_out_pending_msgs_to_sent(
+#ifdef UNIT_TESTS_BUILT
+ struct sweep_ret {
+ ceph::bufferlist bl;
+ std::vector<ceph::msgr::v2::Tag> tags;
+ };
+ sweep_ret
+#else
+ ceph::bufferlist
+#endif
+ sweep_out_pending_msgs_to_sent(
bool require_keepalive,
std::optional<utime_t> maybe_keepalive_ack,
bool require_ack);
+ void maybe_notify_out_dispatch();
+
void notify_out_dispatch();
void ack_out_sent(seq_num_t seq);
- seastar::future<> read_message(utime_t throttle_stamp, std::size_t msg_size);
+ seastar::future<> read_message(
+ shard_states_t &ctx,
+ utime_t throttle_stamp,
+ std::size_t msg_size);
void do_in_dispatch();
+ seastar::future<> cleanup_prv_shard(seastar::shard_id prv_sid);
+
private:
+ shard_states_ref_t shard_states;
+
+ crosscore_t crosscore;
+
+ // drop was happening in the previous sid
+ std::optional<seastar::shard_id> maybe_dropped_sid;
+
+ // the remaining states in the previous sid for cleanup, see to_new_sid()
+ shard_states_ref_t maybe_prv_shard_states;
+
ChainedDispatchers &dispatchers;
SocketConnection &conn;
HandshakeListener *handshake_listener = nullptr;
- crimson::common::Gated gate;
-
FrameAssemblerV2Ref frame_assembler;
bool protocol_is_connected = false;
bool need_dispatch_reset = true;
- io_state_t io_state = io_state_t::none;
-
- // wait until current io_state changed
- seastar::promise<> io_state_changed;
-
/*
* out states for writing
*/
- bool out_dispatching = false;
-
- std::optional<seastar::promise<>> out_exit_dispatching;
-
/// the seq num of the last transmitted message
seq_num_t out_seq = 0;
// messages to be resent after connection gets reset
- std::deque<MessageURef> out_pending_msgs;
+ std::deque<MessageFRef> out_pending_msgs;
// messages sent, but not yet acked by peer
- std::deque<MessageURef> out_sent_msgs;
+ std::deque<MessageFRef> out_sent_msgs;
bool need_keepalive = false;
uint64_t ack_left = 0;
+ bool need_notify_out = false;
+
/*
* in states for reading
*/
- std::optional<seastar::promise<>> in_exit_dispatching;
-
/// the seq num of the last received message
seq_num_t in_seq = 0;
} // namespace crimson::net
+template <>
+struct fmt::formatter<crimson::net::io_handler_state> {
+ constexpr auto parse(format_parse_context& ctx) {
+ return ctx.begin();
+ }
+
+ template <typename FormatContext>
+ auto format(crimson::net::io_handler_state state, FormatContext& ctx) {
+ return fmt::format_to(
+ ctx.out(),
+ "io(in_seq={}, is_out_queued={}, has_out_sent={})",
+ state.in_seq,
+ state.is_out_queued,
+ state.has_out_sent);
+ }
+};
+
template <>
struct fmt::formatter<crimson::net::IOHandler::io_state_t>
: fmt::formatter<std::string_view> {
case drop:
name = "drop";
break;
+ case switched:
+ name = "switched";
+ break;
}
return formatter<string_view>::format(name, ctx);
}