IOHandler::IOHandler(ChainedDispatchers &dispatchers,
SocketConnection &conn)
- : dispatchers(dispatchers),
+ : shard_states(shard_states_t::create(
+ seastar::this_shard_id(), io_state_t::none)),
+ dispatchers(dispatchers),
conn(conn),
conn_ref(conn.get_local_shared_foreign_from_this())
{}
IOHandler::~IOHandler()
{
- ceph_assert(gate.is_closed());
- assert(!out_exit_dispatching);
+ // close_io() must be finished
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ // should be true in the according shard
+ // ceph_assert_always(shard_states->assert_closed_and_exit());
+ assert(!conn_ref);
}
-ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
+#ifdef UNIT_TESTS_BUILT
+IOHandler::sweep_ret
+#else
+ceph::bufferlist
+#endif
+IOHandler::sweep_out_pending_msgs_to_sent(
bool require_keepalive,
std::optional<utime_t> maybe_keepalive_ack,
bool require_ack)
std::size_t num_msgs = out_pending_msgs.size();
ceph::bufferlist bl;
+#ifdef UNIT_TESTS_BUILT
+ std::vector<Tag> tags;
+#endif
+
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
bl.append(frame_assembler->get_buffer(keepalive_frame));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = KeepAliveFrame::tag;
+ tags.push_back(tag);
+#endif
}
if (unlikely(maybe_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = KeepAliveFrameAck::tag;
+ tags.push_back(tag);
+#endif
}
if (require_ack && num_msgs == 0u) {
- auto ack_frame = AckFrame::Encode(get_in_seq());
+ auto ack_frame = AckFrame::Encode(in_seq);
bl.append(frame_assembler->get_buffer(ack_frame));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = AckFrame::tag;
+ tags.push_back(tag);
+#endif
}
std::for_each(
out_pending_msgs.begin(),
out_pending_msgs.begin()+num_msgs,
- [this, &bl](const MessageURef& msg) {
+ [this, &bl
+#ifdef UNIT_TESTS_BUILT
+ , &tags
+#endif
+ ](const MessageFRef& msg) {
// set priority
msg->get_header().src = conn.messenger.get_myname();
header.type, header.priority,
header.version,
ceph_le32(0), header.data_off,
- ceph_le64(get_in_seq()),
+ ceph_le64(in_seq),
footer.flags, header.compat_version,
header.reserved};
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
bl.append(frame_assembler->get_buffer(message));
+#ifdef UNIT_TESTS_BUILT
+ auto tag = MessageFrame::tag;
+ tags.push_back(tag);
+#endif
});
if (!conn.policy.lossy) {
std::make_move_iterator(out_pending_msgs.end()));
}
out_pending_msgs.clear();
+
+#ifdef UNIT_TESTS_BUILT
+ return sweep_ret{std::move(bl), tags};
+#else
return bl;
+#endif
}
-seastar::future<> IOHandler::send(MessageURef msg)
+seastar::future<> IOHandler::send(MessageFRef msg)
{
- if (io_state != io_state_t::drop) {
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send(std::move(msg));
+ } else {
+ logger().trace("{} send() is directed to {} -- {}",
+ conn, get_shard_id(), *msg);
+ return seastar::smp::submit_to(
+ get_shard_id(), [this, msg=std::move(msg)]() mutable {
+ return send_redirected(std::move(msg));
+ });
+ }
+}
+
+seastar::future<> IOHandler::send_redirected(MessageFRef msg)
+{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send(std::move(msg));
+ } else {
+ logger().debug("{} send() is redirected to {} -- {}",
+ conn, get_shard_id(), *msg);
+ return seastar::smp::submit_to(
+ get_shard_id(), [this, msg=std::move(msg)]() mutable {
+ return send_redirected(std::move(msg));
+ });
+ }
+}
+
+seastar::future<> IOHandler::do_send(MessageFRef msg)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ logger().trace("{} do_send() got message -- {}", conn, *msg);
+ if (get_io_state() != io_state_t::drop) {
out_pending_msgs.push_back(std::move(msg));
notify_out_dispatch();
}
seastar::future<> IOHandler::send_keepalive()
{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send_keepalive();
+ } else {
+ logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
+ return seastar::smp::submit_to(
+ get_shard_id(), [this] {
+ return send_keepalive_redirected();
+ });
+ }
+}
+
+seastar::future<> IOHandler::send_keepalive_redirected()
+{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send_keepalive();
+ } else {
+ logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
+ return seastar::smp::submit_to(
+ get_shard_id(), [this] {
+ return send_keepalive_redirected();
+ });
+ }
+}
+
+seastar::future<> IOHandler::do_send_keepalive()
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
if (!need_keepalive) {
need_keepalive = true;
notify_out_dispatch();
void IOHandler::mark_down()
{
- ceph_assert_always(io_state != io_state_t::none);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() != io_state_t::none);
need_dispatch_reset = false;
- if (io_state == io_state_t::drop) {
+ if (get_io_state() == io_state_t::drop) {
return;
}
- logger().info("{} mark_down() with {}",
- conn, io_stat_printer{*this});
- set_io_state(io_state_t::drop);
- handshake_listener->notify_mark_down();
+ auto cc_seq = crosscore.prepare_submit();
+ logger().info("{} mark_down() at {}, send {} notify_mark_down()",
+ conn, io_stat_printer{*this}, cc_seq);
+ do_set_io_state(io_state_t::drop);
+ shard_states->dispatch_in_background(
+ "notify_mark_down", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq] {
+ return handshake_listener->notify_mark_down(cc_seq);
+ });
+ });
}
void IOHandler::print_io_stat(std::ostream &out) const
{
+ assert(seastar::this_shard_id() == get_shard_id());
out << "io_stat("
- << "io_state=" << fmt::format("{}", io_state)
+ << "io_state=" << fmt::format("{}", get_io_state())
<< ", in_seq=" << in_seq
<< ", out_seq=" << out_seq
<< ", out_pending_msgs_size=" << out_pending_msgs.size()
<< ")";
}
-void IOHandler::set_io_state(
- const IOHandler::io_state_t &new_state,
- FrameAssemblerV2Ref fa)
+void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
+{
+ assert(fa != nullptr);
+ ceph_assert_always(frame_assembler == nullptr);
+ frame_assembler = std::move(fa);
+ ceph_assert_always(
+ frame_assembler->get_shard_id() == get_shard_id());
+ // should have been set through dispatch_accept/connect()
+ ceph_assert_always(
+ frame_assembler->get_socket_shard_id() == get_shard_id());
+ ceph_assert_always(frame_assembler->is_socket_valid());
+}
+
+void IOHandler::do_set_io_state(
+ io_state_t new_state,
+ std::optional<crosscore_t::seq_t> cc_seq,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out)
{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ auto prv_state = get_io_state();
+ logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
+ "fa={}, set_notify_out={}, at {}",
+ conn,
+ cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "",
+ prv_state, new_state,
+ fa ? "present" : "N/A", set_notify_out,
+ io_stat_printer{*this});
ceph_assert_always(!(
- (new_state == io_state_t::none && io_state != io_state_t::none) ||
- (new_state == io_state_t::open && io_state == io_state_t::open) ||
- (new_state != io_state_t::drop && io_state == io_state_t::drop)
+ (new_state == io_state_t::none && prv_state != io_state_t::none) ||
+ (new_state == io_state_t::open && prv_state == io_state_t::open)
));
+ if (prv_state == io_state_t::drop) {
+ // only possible due to a racing mark_down() from user
+ if (new_state == io_state_t::open) {
+ assign_frame_assembler(std::move(fa));
+ frame_assembler->shutdown_socket<false>(nullptr);
+ } else {
+ assert(fa == nullptr);
+ }
+ return;
+ }
+
bool dispatch_in = false;
if (new_state == io_state_t::open) {
// to open
ceph_assert_always(protocol_is_connected == true);
- assert(fa != nullptr);
- ceph_assert_always(frame_assembler == nullptr);
- frame_assembler = std::move(fa);
- ceph_assert_always(frame_assembler->is_socket_valid());
+ assign_frame_assembler(std::move(fa));
dispatch_in = true;
-#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- conn.interceptor->register_conn_ready(conn);
- }
-#endif
- } else if (io_state == io_state_t::open) {
+ } else if (prv_state == io_state_t::open) {
// from open
ceph_assert_always(protocol_is_connected == true);
protocol_is_connected = false;
assert(fa == nullptr);
ceph_assert_always(frame_assembler->is_socket_valid());
- frame_assembler->shutdown_socket();
- if (out_dispatching) {
- ceph_assert_always(!out_exit_dispatching.has_value());
- out_exit_dispatching = seastar::promise<>();
- }
+ frame_assembler->shutdown_socket<false>(nullptr);
} else {
assert(fa == nullptr);
}
- if (io_state != new_state) {
- io_state = new_state;
- io_state_changed.set_value();
- io_state_changed = seastar::promise<>();
+ if (new_state == io_state_t::delay) {
+ need_notify_out = set_notify_out;
+ if (need_notify_out) {
+ maybe_notify_out_dispatch();
+ }
+ } else {
+ assert(set_notify_out == false);
+ need_notify_out = false;
+ }
+
+ // FIXME: simplify and drop the prv_state == new_state case
+ if (prv_state != new_state) {
+ shard_states->set_io_state(new_state);
}
/*
}
}
-seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
+seastar::future<> IOHandler::set_io_state(
+ crosscore_t::seq_t cc_seq,
+ io_state_t new_state,
+ FrameAssemblerV2Ref fa,
+ bool set_notify_out)
{
- ceph_assert_always(io_state != io_state_t::open);
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} set_io_state(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_state,
+ fa=std::move(fa), set_notify_out]() mutable {
+ return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
+ });
+ }
+
+ do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out);
+ return seastar::now();
+}
+
+seastar::future<IOHandler::exit_dispatching_ret>
+IOHandler::wait_io_exit_dispatching(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return wait_io_exit_dispatching(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} wait_io_exit_dispatching()",
+ conn, cc_seq);
+ ceph_assert_always(get_io_state() != io_state_t::open);
ceph_assert_always(frame_assembler != nullptr);
ceph_assert_always(!frame_assembler->is_socket_valid());
- return seastar::when_all(
- [this] {
- if (out_exit_dispatching) {
- return out_exit_dispatching->get_future();
- } else {
- return seastar::now();
- }
- }(),
- [this] {
- if (in_exit_dispatching) {
- return in_exit_dispatching->get_future();
- } else {
- return seastar::now();
- }
- }()
- ).discard_result().then([this] {
- return std::move(frame_assembler);
+ return seastar::futurize_invoke([this] {
+ // cannot be running in parallel with to_new_sid()
+ if (maybe_dropped_sid.has_value()) {
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+ assert(shard_states->assert_closed_and_exit());
+ auto prv_sid = *maybe_dropped_sid;
+ return seastar::smp::submit_to(prv_sid, [this] {
+ logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn);
+ assert(maybe_prv_shard_states != nullptr);
+ return maybe_prv_shard_states->wait_io_exit_dispatching();
+ });
+ } else {
+ return shard_states->wait_io_exit_dispatching();
+ }
+ }).then([this] {
+ logger().debug("{} finish wait_io_exit_dispatching at {}",
+ conn, io_stat_printer{*this});
+ ceph_assert_always(frame_assembler != nullptr);
+ ceph_assert_always(!frame_assembler->is_socket_valid());
+ frame_assembler->set_shard_id(conn.get_messenger_shard_id());
+ return exit_dispatching_ret{
+ std::move(frame_assembler),
+ get_states()};
});
}
-void IOHandler::reset_session(bool full)
+seastar::future<> IOHandler::reset_session(
+ crosscore_t::seq_t cc_seq,
+ bool full)
{
- // reset in
- in_seq = 0;
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} reset_session(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, full] {
+ return reset_session(cc_seq, full);
+ });
+ }
+
+ logger().debug("{} got {} reset_session({})",
+ conn, cc_seq, full);
+ assert(get_io_state() != io_state_t::open);
+ reset_in();
if (full) {
reset_out();
dispatch_remote_reset();
}
+ return seastar::now();
}
-void IOHandler::requeue_out_sent()
+seastar::future<> IOHandler::reset_peer_state(
+ crosscore_t::seq_t cc_seq)
{
- assert(io_state != io_state_t::open);
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} reset_peer_state(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return reset_peer_state(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} reset_peer_state()",
+ conn, cc_seq);
+ assert(get_io_state() != io_state_t::open);
+ reset_in();
+ do_requeue_out_sent_up_to(0);
+ discard_out_sent();
+ return seastar::now();
+}
+
+seastar::future<> IOHandler::requeue_out_sent(
+ crosscore_t::seq_t cc_seq)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} requeue_out_sent(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return requeue_out_sent(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} requeue_out_sent()",
+ conn, cc_seq);
+ do_requeue_out_sent();
+ return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent()
+{
+ assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty()) {
return;
}
out_seq -= out_sent_msgs.size();
logger().debug("{} requeue {} items, revert out_seq to {}",
conn, out_sent_msgs.size(), out_seq);
- for (MessageURef& msg : out_sent_msgs) {
+ for (MessageFRef& msg : out_sent_msgs) {
msg->clear_payload();
msg->set_seq(0);
}
std::make_move_iterator(out_sent_msgs.begin()),
std::make_move_iterator(out_sent_msgs.end()));
out_sent_msgs.clear();
- notify_out_dispatch();
+ maybe_notify_out_dispatch();
}
-void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
+seastar::future<> IOHandler::requeue_out_sent_up_to(
+ crosscore_t::seq_t cc_seq,
+ seq_num_t msg_seq)
{
- assert(io_state != io_state_t::open);
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, msg_seq] {
+ return requeue_out_sent_up_to(cc_seq, msg_seq);
+ });
+ }
+
+ logger().debug("{} got {} requeue_out_sent_up_to({})",
+ conn, cc_seq, msg_seq);
+ do_requeue_out_sent_up_to(msg_seq);
+ return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq)
+{
+ assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
conn, out_seq, seq);
out_sent_msgs.pop_front();
}
}
- requeue_out_sent();
+ do_requeue_out_sent();
+}
+
+void IOHandler::reset_in()
+{
+ assert(get_io_state() != io_state_t::open);
+ in_seq = 0;
}
void IOHandler::reset_out()
{
- assert(io_state != io_state_t::open);
- out_seq = 0;
+ assert(get_io_state() != io_state_t::open);
+ discard_out_sent();
out_pending_msgs.clear();
- out_sent_msgs.clear();
need_keepalive = false;
next_keepalive_ack = std::nullopt;
ack_left = 0;
}
-void IOHandler::dispatch_accept()
+void IOHandler::discard_out_sent()
{
- if (io_state == io_state_t::drop) {
- return;
- }
- // protocol_is_connected can be from true to true here if the replacing is
- // happening to a connected connection.
- protocol_is_connected = true;
- dispatchers.ms_handle_accept(conn_ref);
+ assert(get_io_state() != io_state_t::open);
+ out_seq = 0;
+ out_sent_msgs.clear();
}
-void IOHandler::dispatch_connect()
+seastar::future<>
+IOHandler::dispatch_accept(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref,
+ bool is_replace)
{
- if (io_state == io_state_t::drop) {
- return;
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+}
+
+seastar::future<>
+IOHandler::dispatch_connect(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref)
+{
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt);
+}
+
+seastar::future<>
+IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ return seastar::smp::submit_to(prv_sid, [this] {
+ logger().debug("{} got cleanup_prv_shard()", conn);
+ assert(maybe_prv_shard_states != nullptr);
+ auto ref_prv_states = std::move(maybe_prv_shard_states);
+ auto &prv_states = *ref_prv_states;
+ return prv_states.close(
+ ).then([ref_prv_states=std::move(ref_prv_states)] {
+ ceph_assert_always(ref_prv_states->assert_closed_and_exit());
+ });
+ }).then([this] {
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ });
+}
+
+seastar::future<>
+IOHandler::to_new_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id new_sid,
+ ConnectionFRef conn_fref,
+ std::optional<bool> is_replace)
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} to_new_sid(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, new_sid, is_replace,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+ });
}
- ceph_assert_always(protocol_is_connected == false);
- protocol_is_connected = true;
- dispatchers.ms_handle_connect(conn_ref);
+
+ bool is_accept_or_connect = is_replace.has_value();
+ logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
+ conn, cc_seq, new_sid,
+ fmt::format("{}",
+ is_accept_or_connect ?
+ (*is_replace ? "accept(replace)" : "accept(!replace)") :
+ "connect"),
+ io_stat_printer{*this});
+ auto next_cc_seq = ++cc_seq;
+
+ if (get_io_state() != io_state_t::drop) {
+ ceph_assert_always(conn_ref);
+ if (new_sid != seastar::this_shard_id()) {
+ dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect);
+ // user can make changes
+ }
+ } else {
+ // it is possible that both io_handler and protocolv2 are
+ // trying to close each other from different cores simultaneously.
+ assert(!protocol_is_connected);
+ }
+
+ if (get_io_state() != io_state_t::drop) {
+ if (is_accept_or_connect) {
+ // protocol_is_connected can be from true to true here if the replacing is
+ // happening to a connected connection.
+ } else {
+ ceph_assert_always(protocol_is_connected == false);
+ }
+ protocol_is_connected = true;
+ } else {
+ assert(!protocol_is_connected);
+ }
+
+ bool is_dropped = false;
+ if (get_io_state() == io_state_t::drop) {
+ is_dropped = true;
+ }
+ ceph_assert_always(get_io_state() != io_state_t::open);
+
+ // apply the switching atomically
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+ auto prv_sid = get_shard_id();
+ ceph_assert_always(maybe_prv_shard_states == nullptr);
+ maybe_prv_shard_states = std::move(shard_states);
+ shard_states = shard_states_t::create_from_previous(
+ *maybe_prv_shard_states, new_sid);
+ assert(new_sid == get_shard_id());
+
+ return seastar::smp::submit_to(new_sid,
+ [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
+ logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
+ conn, next_cc_seq, prv_sid, is_dropped,
+ fmt::format("{}",
+ is_replace.has_value() ?
+ (*is_replace ? "accept(replace)" : "accept(!replace)") :
+ "connect"),
+ io_stat_printer{*this});
+
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() != io_state_t::open);
+ ceph_assert_always(!maybe_dropped_sid.has_value());
+ ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
+
+ if (is_dropped) {
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+ ceph_assert_always(shard_states->assert_closed_and_exit());
+ maybe_dropped_sid = prv_sid;
+ // cleanup_prv_shard() will be done in a follow-up close_io()
+ } else {
+ // possible at io_state_t::drop
+
+ // previous shard is not cleaned,
+ // but close_io() is responsible to clean up the current shard,
+ // so cleanup the previous shard here.
+ shard_states->dispatch_in_background(
+ "cleanup_prv_sid", conn, [this, prv_sid] {
+ return cleanup_prv_shard(prv_sid);
+ });
+ maybe_notify_out_dispatch();
+ }
+
+ ceph_assert_always(!conn_ref);
+ // assign even if already dropping
+ conn_ref = make_local_shared_foreign(std::move(conn_fref));
+
+ if (get_io_state() != io_state_t::drop) {
+ if (is_replace.has_value()) {
+ dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace);
+ } else {
+ dispatchers.ms_handle_connect(conn_ref, prv_sid);
+ }
+ // user can make changes
+ }
+ });
+}
+
+seastar::future<> IOHandler::set_accepted_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id sid,
+ ConnectionFRef conn_fref)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ assert(get_io_state() == io_state_t::none);
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+ assert(maybe_prv_shard_states == nullptr);
+ shard_states.reset();
+ shard_states = shard_states_t::create(sid, io_state_t::none);
+ return seastar::smp::submit_to(sid,
+ [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
+ // must be the first to proceed
+ ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
+
+ logger().debug("{} set accepted sid", conn);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() == io_state_t::none);
+ assert(maybe_prv_shard_states == nullptr);
+ ceph_assert_always(!conn_ref);
+ conn_ref = make_local_shared_foreign(std::move(conn_fref));
+ });
}
void IOHandler::dispatch_reset(bool is_replace)
{
- ceph_assert_always(io_state == io_state_t::drop);
+ ceph_assert_always(get_io_state() == io_state_t::drop);
if (!need_dispatch_reset) {
return;
}
need_dispatch_reset = false;
+ ceph_assert_always(conn_ref);
+
dispatchers.ms_handle_reset(conn_ref, is_replace);
+ // user can make changes
}
void IOHandler::dispatch_remote_reset()
{
- if (io_state == io_state_t::drop) {
+ if (get_io_state() == io_state_t::drop) {
return;
}
+ ceph_assert_always(conn_ref);
+
dispatchers.ms_handle_remote_reset(conn_ref);
+ // user can make changes
}
void IOHandler::ack_out_sent(seq_num_t seq)
}
}
-seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
- assert(!is_out_queued());
- return frame_assembler->flush(
- ).then([this] {
- if (!is_out_queued()) {
- // still nothing pending to send after flush,
- // the dispatching can ONLY stop now
- ceph_assert(out_dispatching);
- out_dispatching = false;
- if (unlikely(out_exit_dispatching.has_value())) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: nothing queued at {},"
- " set out_exit_dispatching",
- conn, io_state);
- }
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- } else {
- // something is pending to send during flushing
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }
- });
-}
-
-seastar::future<> IOHandler::do_out_dispatch()
+seastar::future<>
+IOHandler::do_out_dispatch(shard_states_t &ctx)
{
- return seastar::repeat([this] {
- switch (io_state) {
+ return seastar::repeat([this, &ctx] {
+ switch (ctx.get_io_state()) {
case io_state_t::open: {
- bool still_queued = is_out_queued();
- if (unlikely(!still_queued)) {
- return try_exit_out_dispatch();
+ if (unlikely(!is_out_queued())) {
+ // try exit open dispatching
+ return frame_assembler->flush<false>(
+ ).then([this, &ctx] {
+ if (ctx.get_io_state() != io_state_t::open || is_out_queued()) {
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ // still nothing pending to send after flush,
+ // open dispatching can ONLY stop now
+ ctx.exit_out_dispatching("exit-open", conn);
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ });
}
+
+ auto require_keepalive = need_keepalive;
+ need_keepalive = false;
+ auto maybe_keepalive_ack = next_keepalive_ack;
+ next_keepalive_ack = std::nullopt;
auto to_ack = ack_left;
assert(to_ack == 0 || in_seq > 0);
- return frame_assembler->write(
- sweep_out_pending_msgs_to_sent(
- need_keepalive, next_keepalive_ack, to_ack > 0)
- ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
- need_keepalive = false;
- if (next_keepalive_ack == prv_keepalive_ack) {
- next_keepalive_ack = std::nullopt;
- }
- assert(ack_left >= to_ack);
- ack_left -= to_ack;
- if (!is_out_queued()) {
- return try_exit_out_dispatch();
- } else {
- // messages were enqueued during socket write
- return seastar::make_ready_future<stop_t>(stop_t::no);
+ ack_left = 0;
+#ifdef UNIT_TESTS_BUILT
+ auto ret = sweep_out_pending_msgs_to_sent(
+ require_keepalive, maybe_keepalive_ack, to_ack > 0);
+ return frame_assembler->intercept_frames(ret.tags, true
+ ).then([this, bl=std::move(ret.bl)]() mutable {
+ return frame_assembler->write<false>(std::move(bl));
+ }
+#else
+ auto bl = sweep_out_pending_msgs_to_sent(
+ require_keepalive, maybe_keepalive_ack, to_ack > 0);
+ return frame_assembler->write<false>(std::move(bl)
+#endif
+ ).then([this, &ctx] {
+ if (ctx.get_io_state() != io_state_t::open) {
+ return frame_assembler->flush<false>(
+ ).then([] {
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
}
+
+ // FIXME: may leak a flush if state is changed after return and before
+ // the next repeat body.
+ return seastar::make_ready_future<stop_t>(stop_t::no);
});
}
case io_state_t::delay:
// delay out dispatching until open
- if (out_exit_dispatching) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
- } else {
- logger().info("{} do_out_dispatch: delay ...", conn);
- }
- return io_state_changed.get_future(
+ ctx.notify_out_dispatching_stopped("delay...", conn);
+ return ctx.wait_state_change(
).then([] { return stop_t::no; });
case io_state_t::drop:
- ceph_assert(out_dispatching);
- out_dispatching = false;
- if (out_exit_dispatching) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
- } else {
- logger().info("{} do_out_dispatch: dropped", conn);
- }
+ ctx.exit_out_dispatching("dropped", conn);
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ case io_state_t::switched:
+ ctx.exit_out_dispatching("switched", conn);
return seastar::make_ready_future<stop_t>(stop_t::yes);
default:
- ceph_assert(false);
+ ceph_abort("impossible");
}
- }).handle_exception_type([this] (const std::system_error& e) {
+ }).handle_exception_type([this, &ctx](const std::system_error& e) {
+ auto io_state = ctx.get_io_state();
if (e.code() != std::errc::broken_pipe &&
e.code() != std::errc::connection_reset &&
e.code() != error::negotiation_failure) {
}
if (io_state == io_state_t::open) {
- logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
- conn, io_state, e.what());
+ auto cc_seq = crosscore.prepare_submit();
+ logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
+ "send {} notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
std::exception_ptr eptr;
try {
throw e;
} catch(...) {
eptr = std::current_exception();
}
- set_io_state(io_state_t::delay);
- handshake_listener->notify_out_fault("do_out_dispatch", eptr);
+ do_set_io_state(io_state_t::delay);
+ shard_states->dispatch_in_background(
+ "notify_out_fault(out)", conn, [this, cc_seq, eptr] {
+ auto states = get_states();
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+ return handshake_listener->notify_out_fault(
+ cc_seq, "do_out_dispatch", eptr, states);
+ });
+ });
} else {
- logger().info("{} do_out_dispatch(): fault at {} -- {}",
- conn, io_state, e.what());
+ if (io_state != io_state_t::switched) {
+ logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
+ conn, io_state, io_stat_printer{*this}, e.what());
+ } else {
+ logger().info("{} do_out_dispatch(): fault at {} -- {}",
+ conn, io_state, e.what());
+ }
}
- return do_out_dispatch();
+ return do_out_dispatch(ctx);
});
}
+void IOHandler::maybe_notify_out_dispatch()
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ if (is_out_queued()) {
+ notify_out_dispatch();
+ }
+}
+
void IOHandler::notify_out_dispatch()
{
- handshake_listener->notify_out();
- if (out_dispatching) {
- // already dispatching
- return;
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ assert(is_out_queued());
+ if (need_notify_out) {
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} notify_out()",
+ conn, cc_seq);
+ shard_states->dispatch_in_background(
+ "notify_out", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq] {
+ return handshake_listener->notify_out(cc_seq);
+ });
+ });
}
- out_dispatching = true;
- switch (io_state) {
- case io_state_t::open:
- [[fallthrough]];
- case io_state_t::delay:
- assert(!gate.is_closed());
- gate.dispatch_in_background("do_out_dispatch", conn, [this] {
- return do_out_dispatch();
+ if (shard_states->try_enter_out_dispatching()) {
+ shard_states->dispatch_in_background(
+ "do_out_dispatch", conn, [this] {
+ return do_out_dispatch(*shard_states);
});
- return;
- case io_state_t::drop:
- out_dispatching = false;
- return;
- default:
- ceph_assert(false);
}
}
seastar::future<>
-IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
+IOHandler::read_message(
+ shard_states_t &ctx,
+ utime_t throttle_stamp,
+ std::size_t msg_size)
{
- return frame_assembler->read_frame_payload(
- ).then([this, throttle_stamp, msg_size](auto payload) {
- if (unlikely(io_state != io_state_t::open)) {
+ return frame_assembler->read_frame_payload<false>(
+ ).then([this, throttle_stamp, msg_size, &ctx](auto payload) {
+ if (unlikely(ctx.get_io_state() != io_state_t::open)) {
logger().debug("{} triggered {} during read_message()",
- conn, io_state);
+ conn, ctx.get_io_state());
abort_protocol();
}
// client side queueing because messages can't be renumbered, but the (kernel)
// client will occasionally pull a message out of the sent queue to send
// elsewhere. in that case it doesn't matter if we "got" it or not.
- uint64_t cur_seq = get_in_seq();
+ uint64_t cur_seq = in_seq;
if (message->get_seq() <= cur_seq) {
logger().error("{} got old message {} <= {} {}, discarding",
conn, message->get_seq(), cur_seq, *message);
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- assert(io_state == io_state_t::open);
+ assert(ctx.get_io_state() == io_state_t::open);
+ assert(get_io_state() == io_state_t::open);
+ ceph_assert_always(conn_ref);
+
// throttle the reading process by the returned future
return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ // user can make changes
});
}
void IOHandler::do_in_dispatch()
{
- ceph_assert_always(!in_exit_dispatching.has_value());
- in_exit_dispatching = seastar::promise<>();
- gate.dispatch_in_background("do_in_dispatch", conn, [this] {
- return seastar::keep_doing([this] {
- return frame_assembler->read_main_preamble(
- ).then([this](auto ret) {
+ shard_states->enter_in_dispatching();
+ shard_states->dispatch_in_background(
+ "do_in_dispatch", conn, [this, &ctx=*shard_states] {
+ return seastar::keep_doing([this, &ctx] {
+ return frame_assembler->read_main_preamble<false>(
+ ).then([this, &ctx](auto ret) {
switch (ret.tag) {
case Tag::MESSAGE: {
size_t msg_size = get_msg_size(*ret.rx_frame_asm);
return seastar::now();
}
// TODO: message throttler
- ceph_assert(false);
+ ceph_abort("TODO");
return seastar::now();
}).then([this, msg_size] {
// throttle_bytes() logic
conn.policy.throttler_bytes->get_current(),
conn.policy.throttler_bytes->get_max());
return conn.policy.throttler_bytes->get(msg_size);
- }).then([this, msg_size] {
+ }).then([this, msg_size, &ctx] {
// TODO: throttle_dispatch_queue() logic
utime_t throttle_stamp{seastar::lowres_system_clock::now()};
- return read_message(throttle_stamp, msg_size);
+ return read_message(ctx, throttle_stamp, msg_size);
});
}
case Tag::ACK:
- return frame_assembler->read_frame_payload(
+ return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_message_ack() logic
auto ack = AckFrame::Decode(payload->back());
ack_out_sent(ack.seq());
});
case Tag::KEEPALIVE2:
- return frame_assembler->read_frame_payload(
+ return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_keepalive2() logic
auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
conn, keepalive_frame.timestamp());
// notify keepalive ack
next_keepalive_ack = keepalive_frame.timestamp();
- notify_out_dispatch();
+ if (seastar::this_shard_id() == get_shard_id()) {
+ notify_out_dispatch();
+ }
last_keepalive = seastar::lowres_system_clock::now();
});
case Tag::KEEPALIVE2_ACK:
- return frame_assembler->read_frame_payload(
+ return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_keepalive2_ack() logic
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
}
}
});
- }).handle_exception([this](std::exception_ptr eptr) {
+ }).handle_exception([this, &ctx](std::exception_ptr eptr) {
const char *e_what;
try {
std::rethrow_exception(eptr);
e_what = e.what();
}
+ auto io_state = ctx.get_io_state();
if (io_state == io_state_t::open) {
- logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
- conn, io_state, e_what);
- set_io_state(io_state_t::delay);
- handshake_listener->notify_out_fault("do_in_dispatch", eptr);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
+ "send {} notify_out_fault()",
+ conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
+ do_set_io_state(io_state_t::delay);
+ shard_states->dispatch_in_background(
+ "notify_out_fault(in)", conn, [this, cc_seq, eptr] {
+ auto states = get_states();
+ return seastar::smp::submit_to(
+ conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+ return handshake_listener->notify_out_fault(
+ cc_seq, "do_in_dispatch", eptr, states);
+ });
+ });
} else {
- logger().info("{} do_in_dispatch(): fault at {} -- {}",
- conn, io_state, e_what);
+ if (io_state != io_state_t::switched) {
+ logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",
+ conn, io_state, io_stat_printer{*this}, e_what);
+ } else {
+ logger().info("{} do_in_dispatch(): fault at {} -- {}",
+ conn, io_state, e_what);
+ }
}
- }).finally([this] {
- ceph_assert_always(in_exit_dispatching.has_value());
- in_exit_dispatching->set_value();
- in_exit_dispatching = std::nullopt;
+ }).finally([&ctx] {
+ ctx.exit_in_dispatching();
});
});
}
+seastar::future<>
+IOHandler::close_io(
+ crosscore_t::seq_t cc_seq,
+ bool is_dispatch_reset,
+ bool is_replace)
+{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} close_io(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, is_dispatch_reset, is_replace] {
+ return close_io(cc_seq, is_dispatch_reset, is_replace);
+ });
+ }
+
+ logger().debug("{} got {} close_io(reset={}, replace={})",
+ conn, cc_seq, is_dispatch_reset, is_replace);
+ ceph_assert_always(get_io_state() == io_state_t::drop);
+
+ if (is_dispatch_reset) {
+ dispatch_reset(is_replace);
+ }
+
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+
+ // cannot be running in parallel with to_new_sid()
+ if (maybe_dropped_sid.has_value()) {
+ assert(shard_states->assert_closed_and_exit());
+ auto prv_sid = *maybe_dropped_sid;
+ return cleanup_prv_shard(prv_sid);
+ } else {
+ return shard_states->close(
+ ).then([this] {
+ assert(shard_states->assert_closed_and_exit());
+ });
+ }
+}
+
+/*
+ * IOHandler::shard_states_t
+ */
+
+void
+IOHandler::shard_states_t::notify_out_dispatching_stopped(
+ const char *what, SocketConnection &conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ if (unlikely(out_exit_dispatching.has_value())) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
+ conn, what, io_state);
+ } else {
+ if (unlikely(io_state != io_state_t::open)) {
+ logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
+ conn, what, io_state);
+ }
+ }
+}
+
+seastar::future<>
+IOHandler::shard_states_t::wait_io_exit_dispatching()
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != io_state_t::open);
+ assert(!gate.is_closed());
+ return seastar::when_all(
+ [this] {
+ if (out_exit_dispatching) {
+ return out_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }(),
+ [this] {
+ if (in_exit_dispatching) {
+ return in_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }()
+ ).discard_result();
+}
+
+IOHandler::shard_states_ref_t
+IOHandler::shard_states_t::create_from_previous(
+ shard_states_t &prv_states,
+ seastar::shard_id new_sid)
+{
+ auto io_state = prv_states.io_state;
+ assert(io_state != io_state_t::open);
+ auto ret = shard_states_t::create(new_sid, io_state);
+ if (io_state == io_state_t::drop) {
+ // the new gate should not never be used
+ auto fut = ret->gate.close();
+ ceph_assert_always(fut.available());
+ }
+ prv_states.set_io_state(io_state_t::switched);
+ return ret;
+}
+
} // namespace crimson::net