#include "Errors.h"
#include "SocketMessenger.h"
-#ifdef UNIT_TESTS_BUILT
-#include "Interceptor.h"
-#endif
-
using namespace ceph::msgr::v2;
using crimson::common::local_conf;
-using io_state_t = crimson::net::IOHandler::io_state_t;
-using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
namespace {
namespace crimson::net {
-#ifdef UNIT_TESTS_BUILT
-// should be consistent to intercept_frame() in FrameAssemblerV2.cc
-void intercept(Breakpoint bp,
- bp_type_t type,
- SocketConnection& conn,
- Interceptor *interceptor,
- SocketRef& socket) {
- if (interceptor) {
- auto action = interceptor->intercept(conn, Breakpoint(bp));
- socket->set_trap(type, action, &interceptor->blocker);
- }
-}
-
-#define INTERCEPT_CUSTOM(bp, type) \
-intercept({bp}, type, conn, \
- conn.interceptor, conn.socket)
-#else
-#define INTERCEPT_CUSTOM(bp, type)
-#endif
-
seastar::future<> ProtocolV2::Timer::backoff(double seconds)
{
logger().warn("{} waiting {} seconds ...", conn, seconds);
frame_assembler{FrameAssemblerV2::create(conn)},
auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
protocol_timer{conn}
-{}
+{
+ io_states = io_handler.get_states();
+}
ProtocolV2::~ProtocolV2() {}
void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
const entity_name_t& _peer_name)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
ceph_assert(state == state_t::NONE);
ceph_assert(!gate.is_closed());
conn.peer_addr = _peer_addr;
execute_connecting();
}
-void ProtocolV2::start_accept(SocketRef&& new_socket,
+void ProtocolV2::start_accept(SocketFRef&& new_socket,
const entity_addr_t& _peer_addr)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
ceph_assert(state == state_t::NONE);
// until we know better
conn.target_addr = _peer_addr;
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+ auto cc_seq = crosscore.prepare_submit();
+ gate.dispatch_in_background("set_accepted_sid", conn, [this, cc_seq] {
+ return io_handler.set_accepted_sid(
+ cc_seq,
+ frame_assembler->get_socket_shard_id(),
+ seastar::make_foreign(conn.shared_from_this()));
+ });
+
execute_accepting();
}
-void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
+void ProtocolV2::trigger_state_phase1(state_t new_state)
{
- if (!reentrant && new_state == state) {
+ ceph_assert_always(!gate.is_closed());
+ if (new_state == state) {
logger().error("{} is not allowed to re-trigger state {}",
conn, get_state_name(state));
ceph_abort();
}
logger().debug("{} TRIGGER {}, was {}",
conn, get_state_name(new_state), get_state_name(state));
- auto pre_state = state;
- if (pre_state == state_t::READY) {
- assert(!gate.is_closed());
- ceph_assert_always(!exit_io.has_value());
- exit_io = seastar::shared_promise<>();
+
+ if (state == state_t::READY) {
+ // from READY
+ ceph_assert_always(!need_exit_io);
+ ceph_assert_always(!pr_exit_io.has_value());
+ need_exit_io = true;
+ pr_exit_io = seastar::shared_promise<>();
}
+
+ if (new_state == state_t::STANDBY && !conn.policy.server) {
+ need_notify_out = true;
+ } else {
+ need_notify_out = false;
+ }
+
state = new_state;
+}
+
+void ProtocolV2::trigger_state_phase2(
+ state_t new_state, io_state_t new_io_state)
+{
+ ceph_assert_always(new_state == state);
+ ceph_assert_always(!gate.is_closed());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+
+ FrameAssemblerV2Ref fa;
if (new_state == state_t::READY) {
- // I'm not responsible to shutdown the socket at READY
- is_socket_valid = false;
- io_handler.set_io_state(new_io_state, std::move(frame_assembler));
+ assert(new_io_state == io_state_t::open);
+ assert(io_handler.get_shard_id() ==
+ frame_assembler->get_socket_shard_id());
+ frame_assembler->set_shard_id(io_handler.get_shard_id());
+ fa = std::move(frame_assembler);
} else {
- io_handler.set_io_state(new_io_state, nullptr);
+ assert(new_io_state != io_state_t::open);
}
- /*
- * not atomic below
- */
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+ "fa={}, set_notify_out={}",
+ conn, cc_seq, get_state_name(new_state), new_io_state,
+ fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
+ need_notify_out);
+ gate.dispatch_in_background(
+ "set_io_state", conn,
+ [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_state,
+ fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+ return io_handler.set_io_state(
+ cc_seq, new_io_state, std::move(fa), set_notify_out);
+ });
+ });
- if (pre_state == state_t::READY) {
- gate.dispatch_in_background("exit_io", conn, [this] {
- return io_handler.wait_io_exit_dispatching(
- ).then([this](FrameAssemblerV2Ref fa) {
- frame_assembler = std::move(fa);
- exit_io->set_value();
- exit_io = std::nullopt;
+ if (need_exit_io) {
+ // from READY
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
+ conn, cc_seq);
+ assert(pr_exit_io.has_value());
+ assert(new_io_state != io_state_t::open);
+ need_exit_io = false;
+ gate.dispatch_in_background("exit_io", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.wait_io_exit_dispatching(cc_seq);
+ }).then([this, cc_seq](auto ret) {
+ logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
+ conn, cc_seq, ret.io_states);
+ frame_assembler = std::move(ret.frame_assembler);
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ ceph_assert_always(
+ seastar::this_shard_id() == frame_assembler->get_shard_id());
+ ceph_assert_always(!frame_assembler->is_socket_valid());
+ assert(!need_exit_io);
+ io_states = ret.io_states;
+ pr_exit_io->set_value();
+ pr_exit_io = std::nullopt;
});
});
}
if (likely(has_socket)) {
if (likely(is_socket_valid)) {
ceph_assert_always(state != state_t::READY);
- frame_assembler->shutdown_socket();
+ frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
} else {
ceph_assert_always(state != state_t::ESTABLISHING);
}
if (conn.policy.server ||
- (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
+ (conn.policy.standby && !io_states.is_out_queued_or_sent())) {
if (conn.policy.server) {
logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
} else {
logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
}
execute_standby();
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
execute_wait(false);
} else {
conn,
get_state_name(state),
where,
- io_stat_printer{io_handler},
+ io_states,
e_what);
execute_connecting();
}
client_cookie = generate_client_cookie();
peer_global_seq = 0;
}
- io_handler.reset_session(full);
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::reset_session({})",
+ conn, cc_seq, full);
+ io_states.reset_session(full);
+ gate.dispatch_in_background(
+ "reset_session", conn, [this, cc_seq, full] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, full] {
+ return io_handler.reset_session(cc_seq, full);
+ });
+ });
+ // user can make changes
}
seastar::future<std::tuple<entity_type_t, entity_addr_t>>
CRIMSON_MSGR2_SUPPORTED_FEATURES,
CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
- INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
- return frame_assembler->write_flush(std::move(bl)).then([this] {
- // 2. read peer banner
- unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
- INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
- return frame_assembler->read_exactly(banner_len); // or read exactly?
- }).then([this] (auto bl) {
- // 3. process peer banner and read banner_payload
- unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
- logger().debug("{} RECV({}) banner: \"{}\"",
- conn, bl.size(),
- std::string((const char*)bl.get(), banner_prefix_len));
-
- if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
- if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
- logger().warn("{} peer is using V1 protocol", conn);
- } else {
- logger().warn("{} peer sent bad banner", conn);
- }
- abort_in_fault();
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true
+ ).then([this, bl=std::move(bl)]() mutable {
+ return frame_assembler->write_flush(std::move(bl));
+ }
+#else
+ return frame_assembler->write_flush(std::move(bl)
+#endif
+ ).then([this] {
+ // 2. read peer banner
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false
+ ).then([this, banner_len] {
+ return frame_assembler->read_exactly(banner_len);
+ });
+#else
+ return frame_assembler->read_exactly(banner_len);
+#endif
+ }).then([this](auto bptr) {
+ // 3. process peer banner and read banner_payload
+ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ logger().debug("{} RECV({}) banner: \"{}\"",
+ conn, bptr.length(),
+ std::string(bptr.c_str(), banner_prefix_len));
+
+ if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+ if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+ logger().warn("{} peer is using V1 protocol", conn);
+ } else {
+ logger().warn("{} peer sent bad banner", conn);
}
- bl.trim_front(banner_prefix_len);
+ abort_in_fault();
+ }
- uint16_t payload_len;
- bufferlist buf;
- buf.append(buffer::create(std::move(bl)));
- auto ti = buf.cbegin();
- try {
- decode(payload_len, ti);
- } catch (const buffer::error &e) {
- logger().warn("{} decode banner payload len failed", conn);
- abort_in_fault();
- }
- logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
- INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
+ bptr.set_offset(bptr.offset() + banner_prefix_len);
+ bptr.set_length(bptr.length() - banner_prefix_len);
+ assert(bptr.length() == sizeof(ceph_le16));
+
+ uint16_t payload_len;
+ bufferlist buf;
+ buf.append(std::move(bptr));
+ auto ti = buf.cbegin();
+ try {
+ decode(payload_len, ti);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload len failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+#ifdef UNIT_TESTS_BUILT
+ return frame_assembler->intercept_frame(
+ custom_bp_t::BANNER_PAYLOAD_READ, false
+ ).then([this, payload_len] {
return frame_assembler->read(payload_len);
- }).then([this, is_connect] (bufferlist bl) {
- // 4. process peer banner_payload and send HelloFrame
- auto p = bl.cbegin();
- uint64_t _peer_supported_features;
- uint64_t _peer_required_features;
- try {
- decode(_peer_supported_features, p);
- decode(_peer_required_features, p);
- } catch (const buffer::error &e) {
- logger().warn("{} decode banner payload failed", conn);
- abort_in_fault();
- }
- logger().debug("{} RECV({}) banner features: supported={} required={}",
- conn, bl.length(),
- _peer_supported_features, _peer_required_features);
-
- // Check feature bit compatibility
- uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
- uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
- if ((required_features & _peer_supported_features) != required_features) {
- logger().error("{} peer does not support all required features"
- " required={} peer_supported={}",
- conn, required_features, _peer_supported_features);
- ABORT_IN_CLOSE(is_connect);
- }
- if ((supported_features & _peer_required_features) != _peer_required_features) {
- logger().error("{} we do not support all peer required features"
- " peer_required={} supported={}",
- conn, _peer_required_features, supported_features);
- ABORT_IN_CLOSE(is_connect);
- }
- peer_supported_features = _peer_supported_features;
- bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- frame_assembler->set_is_rev1(is_rev1);
-
- auto hello = HelloFrame::Encode(messenger.get_mytype(),
- conn.target_addr);
- logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
- conn, ceph_entity_type_name(messenger.get_mytype()),
- conn.target_addr);
- return frame_assembler->write_flush_frame(hello);
- }).then([this] {
- //5. read peer HelloFrame
- return frame_assembler->read_main_preamble();
- }).then([this](auto ret) {
- expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
- return frame_assembler->read_frame_payload();
- }).then([this](auto payload) {
- // 6. process peer HelloFrame
- auto hello = HelloFrame::Decode(payload->back());
- logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
- conn, ceph_entity_type_name(hello.entity_type()),
- hello.peer_addr());
- return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
- std::make_tuple(hello.entity_type(), hello.peer_addr()));
});
+#else
+ return frame_assembler->read(payload_len);
+#endif
+ }).then([this, is_connect] (bufferlist bl) {
+ // 4. process peer banner_payload and send HelloFrame
+ auto p = bl.cbegin();
+ uint64_t _peer_supported_features;
+ uint64_t _peer_required_features;
+ try {
+ decode(_peer_supported_features, p);
+ decode(_peer_required_features, p);
+ } catch (const buffer::error &e) {
+ logger().warn("{} decode banner payload failed", conn);
+ abort_in_fault();
+ }
+ logger().debug("{} RECV({}) banner features: supported={} required={}",
+ conn, bl.length(),
+ _peer_supported_features, _peer_required_features);
+
+ // Check feature bit compatibility
+ uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
+ uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+ if ((required_features & _peer_supported_features) != required_features) {
+ logger().error("{} peer does not support all required features"
+ " required={} peer_supported={}",
+ conn, required_features, _peer_supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ if ((supported_features & _peer_required_features) != _peer_required_features) {
+ logger().error("{} we do not support all peer required features"
+ " peer_required={} supported={}",
+ conn, _peer_required_features, supported_features);
+ ABORT_IN_CLOSE(is_connect);
+ }
+ peer_supported_features = _peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
+
+ auto hello = HelloFrame::Encode(messenger.get_mytype(),
+ conn.target_addr);
+ logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+ conn, ceph_entity_type_name(messenger.get_mytype()),
+ conn.target_addr);
+ return frame_assembler->write_flush_frame(hello);
+ }).then([this] {
+ //5. read peer HelloFrame
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ // 6. process peer HelloFrame
+ auto hello = HelloFrame::Decode(payload->back());
+ logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+ conn, ceph_entity_type_name(hello.entity_type()),
+ hello.peer_addr());
+ return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+ std::make_tuple(hello.entity_type(), hello.peer_addr()));
+ });
}
// CONNECTING state
case Tag::SERVER_IDENT:
return frame_assembler->read_frame_payload(
).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving SERVER_IDENT",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
// handle_server_ident() logic
- io_handler.requeue_out_sent();
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent()",
+ conn, cc_seq);
+ io_states.requeue_out_sent();
+ gate.dispatch_in_background(
+ "requeue_out_sent", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.requeue_out_sent(cc_seq);
+ });
+ });
+
auto server_ident = ServerIdentFrame::Decode(payload->back());
logger().debug("{} GOT ServerIdentFrame:"
" addrs={}, gid={}, gs={},"
server_cookie,
global_seq,
connect_seq,
- io_handler.get_in_seq());
+ io_states.in_seq);
logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
" server_cookie={}, gs={}, cs={}, in_seq={}",
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
- global_seq, connect_seq, io_handler.get_in_seq());
+ global_seq, connect_seq, io_states.in_seq);
return frame_assembler->write_flush_frame(reconnect).then([this] {
return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
// handle_session_reset() logic
auto reset = ResetFrame::Decode(payload->back());
logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+
reset_session(reset.full());
+ // user can make changes
+
return client_connect();
});
case Tag::WAIT:
case Tag::SESSION_RECONNECT_OK:
return frame_assembler->read_frame_payload(
).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at receiving RECONNECT_OK",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
// handle_reconnect_ok() logic
auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
- logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
- conn, reconnect_ok.msg_seq());
- io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
+ "send {} IOHandler::requeue_out_sent_up_to()",
+ conn, reconnect_ok.msg_seq(), cc_seq);
+
+ io_states.requeue_out_sent_up_to();
+ auto msg_seq = reconnect_ok.msg_seq();
+ gate.dispatch_in_background(
+ "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq);
+ });
+ });
+
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
void ProtocolV2::execute_connecting()
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::CONNECTING, io_state_t::delay, false);
+ trigger_state(state_t::CONNECTING, io_state_t::delay);
gated_execute("execute_connecting", conn, [this] {
- global_seq = messenger.get_global_seq();
- assert(client_cookie != 0);
- if (!conn.policy.lossy && server_cookie != 0) {
- ++connect_seq;
- logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
- conn, global_seq, connect_seq);
- } else { // conn.policy.lossy || server_cookie == 0
- assert(connect_seq == 0);
- assert(server_cookie == 0);
- logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
- }
- return wait_exit_io().then([this] {
+ global_seq = messenger.get_global_seq();
+ assert(client_cookie != 0);
+ if (!conn.policy.lossy && server_cookie != 0) {
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else { // conn.policy.lossy || server_cookie == 0
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ }
+ return wait_exit_io().then([this] {
#ifdef UNIT_TESTS_BUILT
- // process custom_bp_t::SOCKET_CONNECTING
- // supports CONTINUE/FAULT/BLOCK
- if (conn.interceptor) {
- auto action = conn.interceptor->intercept(
- conn, {custom_bp_t::SOCKET_CONNECTING});
- switch (action) {
- case bp_action_t::CONTINUE:
- return seastar::now();
- case bp_action_t::FAULT:
- logger().info("[Test] got FAULT");
- abort_in_fault();
- case bp_action_t::BLOCK:
- logger().info("[Test] got BLOCK");
- return conn.interceptor->blocker.block();
- default:
- ceph_abort("unexpected action from trap");
- }
- } else {
- return seastar::now();
- }
- }).then([this] {
-#endif
- ceph_assert_always(frame_assembler);
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} before Socket::connect()",
- conn, get_state_name(state));
- abort_protocol();
- }
- return Socket::connect(conn.peer_addr);
- }).then([this](SocketRef new_socket) {
- logger().debug("{} socket connected", conn);
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} during Socket::connect()",
- conn, get_state_name(state));
- return new_socket->close().then([sock=std::move(new_socket)] {
- abort_protocol();
- });
- }
- if (!has_socket) {
- frame_assembler->set_socket(std::move(new_socket));
- has_socket = true;
- } else {
- gate.dispatch_in_background(
- "replace_socket_connecting",
- conn,
- [this, new_socket=std::move(new_socket)]() mutable {
- return frame_assembler->replace_shutdown_socket(std::move(new_socket));
- }
- );
- }
- is_socket_valid = true;
+ // process custom_bp_t::SOCKET_CONNECTING
+ // supports CONTINUE/FAULT/BLOCK
+ if (!conn.interceptor) {
+ return seastar::now();
+ }
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}}
+ ).then([this](bp_action_t action) {
+ switch (action) {
+ case bp_action_t::CONTINUE:
return seastar::now();
- }).then([this] {
- auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- frame_assembler->reset_handlers();
- frame_assembler->start_recording();
- return banner_exchange(true);
- }).then([this] (auto&& ret) {
- auto [_peer_type, _my_addr_from_peer] = std::move(ret);
- if (conn.get_peer_type() != _peer_type) {
- logger().warn("{} connection peer type does not match what peer advertises {} != {}",
- conn, ceph_entity_type_name(conn.get_peer_type()),
- ceph_entity_type_name(_peer_type));
- ABORT_IN_CLOSE(true);
- }
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} during banner_exchange(), abort",
- conn, get_state_name(state));
- abort_protocol();
- }
- frame_assembler->learn_socket_ephemeral_port_as_connector(
- _my_addr_from_peer.get_port());
- if (unlikely(_my_addr_from_peer.is_legacy())) {
- logger().warn("{} peer sent a legacy address for me: {}",
- conn, _my_addr_from_peer);
- throw std::system_error(
- make_error_code(crimson::net::error::bad_peer_address));
- }
- _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
- messenger.learned_addr(_my_addr_from_peer, conn);
- return client_auth();
- }).then([this] {
- if (server_cookie == 0) {
- ceph_assert(connect_seq == 0);
- return client_connect();
- } else {
- ceph_assert(connect_seq > 0);
- return client_reconnect();
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ abort_in_fault();
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return conn.interceptor->blocker.block();
+ default:
+ ceph_abort("unexpected action from trap");
+ return seastar::now();
+ }
+ });;
+ }).then([this] {
+#endif
+ ceph_assert_always(frame_assembler);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before Socket::connect()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ return Socket::connect(conn.peer_addr);
+ }).then([this](SocketRef _new_socket) {
+ logger().debug("{} socket connected", conn);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during Socket::connect()",
+ conn, get_state_name(state));
+ return _new_socket->close().then([sock=std::move(_new_socket)] {
+ abort_protocol();
+ });
+ }
+ SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
+ if (!has_socket) {
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ } else {
+ gate.dispatch_in_background(
+ "replace_socket_connecting",
+ conn,
+ [this, new_socket=std::move(new_socket)]() mutable {
+ return frame_assembler->replace_shutdown_socket(std::move(new_socket));
}
- }).then([this] (next_step_t next) {
+ );
+ }
+ is_socket_valid = true;
+ return seastar::now();
+ }).then([this] {
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(true);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ if (conn.get_peer_type() != _peer_type) {
+ logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+ conn, ceph_entity_type_name(conn.get_peer_type()),
+ ceph_entity_type_name(_peer_type));
+ ABORT_IN_CLOSE(true);
+ }
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ frame_assembler->learn_socket_ephemeral_port_as_connector(
+ _my_addr_from_peer.get_port());
+ if (unlikely(_my_addr_from_peer.is_legacy())) {
+ logger().warn("{} peer sent a legacy address for me: {}",
+ conn, _my_addr_from_peer);
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return client_auth();
+ }).then([this] {
+ if (server_cookie == 0) {
+ ceph_assert(connect_seq == 0);
+ return client_connect();
+ } else {
+ ceph_assert(connect_seq > 0);
+ return client_reconnect();
+ }
+ }).then([this] (next_step_t next) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} at the end of execute_connecting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ switch (next) {
+ case next_step_t::ready: {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before dispatch_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_connect()
+ crosscore.prepare_submit();
+ logger().info("{} connected: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "send {} IOHandler::dispatch_connect()",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id(), cc_seq);
+
+ // set io_handler to a new shard
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_connect(
+ cc_seq, new_io_shard, std::move(conn_fref));
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} at the end of execute_connecting()",
+ logger().debug("{} triggered {} after dispatch_connect(), abort",
conn, get_state_name(state));
abort_protocol();
}
- switch (next) {
- case next_step_t::ready: {
- logger().info("{} connected: gs={}, pgs={}, cs={}, "
- "client_cookie={}, server_cookie={}, {}",
- conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie,
- io_stat_printer{io_handler});
- io_handler.dispatch_connect();
- if (unlikely(state != state_t::CONNECTING)) {
- logger().debug("{} triggered {} after ms_handle_connect(), abort",
- conn, get_state_name(state));
- abort_protocol();
- }
- execute_ready();
- break;
- }
- case next_step_t::wait: {
- logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
- ceph_assert_always(is_socket_valid);
- frame_assembler->shutdown_socket();
- is_socket_valid = false;
- execute_wait(true);
- break;
- }
- default: {
- ceph_abort("impossible next step");
- }
- }
- }).handle_exception([this](std::exception_ptr eptr) {
- fault(state_t::CONNECTING, "execute_connecting", eptr);
+ execute_ready();
});
+ }
+ case next_step_t::wait: {
+ logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+ ceph_assert_always(is_socket_valid);
+ frame_assembler->shutdown_socket<true>(&gate);
+ is_socket_valid = false;
+ execute_wait(true);
+ return seastar::now();
+ }
+ default: {
+ ceph_abort("impossible next step");
+ }
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::CONNECTING, "execute_connecting", eptr);
});
+ });
}
// ACCEPTING state
has_socket = false;
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
- conn.interceptor->register_conn_replaced(conn);
+ conn.interceptor->register_conn_replaced(
+ conn.get_local_shared_foreign_from_this());
}
#endif
// close this connection because all the necessary information is delivered
void ProtocolV2::execute_accepting()
{
assert(is_socket_valid);
- trigger_state(state_t::ACCEPTING, io_state_t::none, false);
+ trigger_state(state_t::ACCEPTING, io_state_t::none);
gate.dispatch_in_background("execute_accepting", conn, [this] {
- return seastar::futurize_invoke([this] {
+ return seastar::futurize_invoke([this] {
#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- auto action = conn.interceptor->intercept(
- conn, {custom_bp_t::SOCKET_ACCEPTED});
- switch (action) {
- case bp_action_t::CONTINUE:
- break;
- case bp_action_t::FAULT:
- logger().info("[Test] got FAULT");
- abort_in_fault();
- default:
- ceph_abort("unexpected action from trap");
- }
- }
-#endif
- auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- frame_assembler->reset_handlers();
- frame_assembler->start_recording();
- return banner_exchange(false);
- }).then([this] (auto&& ret) {
- auto [_peer_type, _my_addr_from_peer] = std::move(ret);
- ceph_assert(conn.get_peer_type() == 0);
- conn.set_peer_type(_peer_type);
-
- conn.policy = messenger.get_policy(_peer_type);
- logger().info("{} UPDATE: peer_type={},"
- " policy(lossy={} server={} standby={} resetcheck={})",
- conn, ceph_entity_type_name(_peer_type),
- conn.policy.lossy, conn.policy.server,
- conn.policy.standby, conn.policy.resetcheck);
- if (!messenger.get_myaddr().is_blank_ip() &&
- (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
- messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
- logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
- conn, _my_addr_from_peer, messenger.get_myaddr());
- throw std::system_error(
- make_error_code(crimson::net::error::bad_peer_address));
- }
- messenger.learned_addr(_my_addr_from_peer, conn);
- return server_auth();
- }).then([this] {
- return frame_assembler->read_main_preamble();
- }).then([this](auto ret) {
- switch (ret.tag) {
- case Tag::CLIENT_IDENT:
- return server_connect();
- case Tag::SESSION_RECONNECT:
- return server_reconnect();
- default: {
- unexpected_tag(ret.tag, conn, "post_server_auth");
- return seastar::make_ready_future<next_step_t>(next_step_t::none);
- }
- }
- }).then([this] (next_step_t next) {
- switch (next) {
- case next_step_t::ready:
- assert(state != state_t::ACCEPTING);
- break;
- case next_step_t::wait:
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} at the end of execute_accepting()",
- conn, get_state_name(state));
- abort_protocol();
- }
- logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
- execute_server_wait();
- break;
- default:
- ceph_abort("impossible next step");
- }
- }).handle_exception([this](std::exception_ptr eptr) {
- const char *e_what;
- try {
- std::rethrow_exception(eptr);
- } catch (std::exception &e) {
- e_what = e.what();
- }
- logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
- conn, get_state_name(state), e_what);
- do_close(false);
+ if (conn.interceptor) {
+ // only notify socket accepted
+ gate.dispatch_in_background(
+ "test_intercept_socket_accepted", conn, [this] {
+ return conn.interceptor->intercept(
+ conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}}
+ ).then([](bp_action_t action) {
+ ceph_assert(action == bp_action_t::CONTINUE);
+ });
});
+ }
+#endif
+ auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
+ return banner_exchange(false);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+ ceph_assert(conn.get_peer_type() == 0);
+ conn.set_peer_type(_peer_type);
+
+ conn.policy = messenger.get_policy(_peer_type);
+ logger().info("{} UPDATE: peer_type={},"
+ " policy(lossy={} server={} standby={} resetcheck={})",
+ conn, ceph_entity_type_name(_peer_type),
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
+ if (!messenger.get_myaddr().is_blank_ip() &&
+ (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
+ logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+ conn, _my_addr_from_peer, messenger.get_myaddr());
+ throw std::system_error(
+ make_error_code(crimson::net::error::bad_peer_address));
+ }
+ messenger.learned_addr(_my_addr_from_peer, conn);
+ return server_auth();
+ }).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
+ case Tag::CLIENT_IDENT:
+ return server_connect();
+ case Tag::SESSION_RECONNECT:
+ return server_reconnect();
+ default: {
+ unexpected_tag(ret.tag, conn, "post_server_auth");
+ return seastar::make_ready_future<next_step_t>(next_step_t::none);
+ }
+ }
+ }).then([this] (next_step_t next) {
+ switch (next) {
+ case next_step_t::ready:
+ assert(state != state_t::ACCEPTING);
+ break;
+ case next_step_t::wait:
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} at the end of execute_accepting()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+ execute_server_wait();
+ break;
+ default:
+ ceph_abort("impossible next step");
+ }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+ conn, get_state_name(state), e_what);
+ do_close(false);
});
+ });
}
// CONNECTING or ACCEPTING state
};
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
+ trigger_state(state_t::ESTABLISHING, io_state_t::delay);
+ bool is_replace;
if (existing_conn) {
- static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
- true /* is_dispatch_reset */, std::move(accept_me));
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "close existing {}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ io_states, frame_assembler->get_socket_shard_id(),
+ *existing_conn);
+ is_replace = true;
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ existing_proto->do_close(
+ true, // is_dispatch_reset
+ std::move(accept_me));
if (unlikely(state != state_t::ESTABLISHING)) {
logger().warn("{} triggered {} during execute_establishing(), "
"the accept event will not be delivered!",
abort_protocol();
}
} else {
+ logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+ "no existing",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, io_states,
+ frame_assembler->get_socket_shard_id());
+ is_replace = false;
accept_me();
}
- io_handler.dispatch_accept();
- if (unlikely(state != state_t::ESTABLISHING)) {
- logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
- conn, get_state_name(state));
- abort_protocol();
- }
+ gated_execute("execute_establishing", conn, [this, is_replace] {
+ ceph_assert_always(state == state_t::ESTABLISHING);
+
+ // set io_handler to a new shard
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
+ auto new_io_shard = frame_assembler->get_socket_shard_id();
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard, is_replace,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ cc_seq, new_io_shard, std::move(conn_fref), is_replace);
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
- gated_execute("execute_establishing", conn, [this] {
- return seastar::futurize_invoke([this] {
return send_server_ident();
}).then([this] {
if (unlikely(state != state_t::ESTABLISHING)) {
conn, get_state_name(state));
abort_protocol();
}
- logger().info("{} established: gs={}, pgs={}, cs={}, "
- "client_cookie={}, server_cookie={}, {}",
- conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie,
- io_stat_printer{io_handler});
+ logger().info("{} established, going to ready", conn);
execute_ready();
}).handle_exception([this](std::exception_ptr eptr) {
fault(state_t::ESTABLISHING, "execute_establishing", eptr);
seastar::future<>
ProtocolV2::send_server_ident()
{
+ ceph_assert_always(state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING);
// send_server_ident() logic
// refered to async-conn v2: not assign gs to global_seq
global_seq = messenger.get_global_seq();
- logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} UPDATE: gs={} for server ident, "
+ "send {} IOHandler::reset_peer_state()",
+ conn, global_seq, cc_seq);
// this is required for the case when this connection is being replaced
- io_handler.requeue_out_sent_up_to(0);
- io_handler.reset_session(false);
+ io_states.reset_peer_state();
+ gate.dispatch_in_background(
+ "reset_peer_state", conn, [this, cc_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq] {
+ return io_handler.reset_peer_state(cc_seq);
+ });
+ });
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
uint64_t new_connect_seq,
uint64_t new_msg_seq)
{
+ ceph_assert_always(state >= state_t::ESTABLISHING);
+ ceph_assert_always(state <= state_t::WAIT);
ceph_assert_always(has_socket || state == state_t::CONNECTING);
- ceph_assert_always(!mover.socket->is_shutdown());
- trigger_state(state_t::REPLACING, io_state_t::delay, false);
+ // mover.socket shouldn't be shutdown
+
+ logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
+ "client_cookie was {}, {}, new_sid={}",
+ conn, reconnect ? "reconnected" : "connected",
+ peer_global_seq, connect_seq, client_cookie,
+ io_states, mover.socket->get_shard_id());
if (is_socket_valid) {
- frame_assembler->shutdown_socket();
+ frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
+ trigger_state_phase1(state_t::REPLACING);
gate.dispatch_in_background(
"trigger_replacing",
conn,
new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
ceph_assert_always(state == state_t::REPLACING);
- io_handler.dispatch_accept();
- // state may become CLOSING, close mover.socket and abort later
- return wait_exit_io(
+ auto new_io_shard = mover.socket->get_shard_id();
+ // state may become CLOSING below, but we cannot abort the chain until
+ // mover.socket is correctly handled (closed or replaced).
+
+ // this is preemptive
+ return wait_switch_io_shard(
).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
+ return wait_exit_io();
+ }).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
ceph_assert_always(frame_assembler);
protocol_timer.cancel();
auto done = std::move(execution_done);
execution_done = seastar::now();
return done;
+ }).then([this, new_io_shard] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
+ // set io_handler to a new shard
+ // we should prevent parallel switching core attemps
+ auto cc_seq = crosscore.prepare_submit();
+ // there are 2 hops with dispatch_accept()
+ crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::dispatch_accept({})",
+ conn, cc_seq, new_io_shard);
+ ConnectionFRef conn_fref = seastar::make_foreign(
+ conn.shared_from_this());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ pr_switch_io_shard = seastar::shared_promise<>();
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, new_io_shard,
+ conn_fref=std::move(conn_fref)]() mutable {
+ return io_handler.dispatch_accept(
+ cc_seq, new_io_shard, std::move(conn_fref), false);
+ }).then([this, new_io_shard] {
+ ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+ pr_switch_io_shard->set_value();
+ pr_switch_io_shard = std::nullopt;
+ // user can make changes
+ });
}).then([this,
reconnect,
do_reset,
new_connect_seq, new_msg_seq] () mutable {
if (state == state_t::REPLACING && do_reset) {
reset_session(true);
+ // user can make changes
}
if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
+ conn, get_state_name(state));
+ ceph_assert_always(state == state_t::CLOSING);
return mover.socket->close(
).then([sock = std::move(mover.socket)] {
abort_protocol();
if (reconnect) {
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
- io_handler.requeue_out_sent_up_to(new_msg_seq);
- auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
- logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
+
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
+ conn, cc_seq, new_msg_seq);
+ io_states.requeue_out_sent_up_to();
+ gate.dispatch_in_background(
+ "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] {
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] {
+ return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq);
+ });
+ });
+
+ auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
return frame_assembler->write_flush_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
}
}).then([this, reconnect] {
if (unlikely(state != state_t::REPLACING)) {
- logger().debug("{} triggered {} at the end of trigger_replacing()",
+ logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
conn, get_state_name(state));
+ ceph_assert_always(state == state_t::CLOSING);
abort_protocol();
}
- logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+ logger().info("{} replaced ({}), going to ready: "
+ "gs={}, pgs={}, cs={}, "
"client_cookie={}, server_cookie={}, {}",
conn, reconnect ? "reconnected" : "connected",
global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie,
- io_stat_printer{io_handler});
+ client_cookie, server_cookie, io_states);
execute_ready();
}).handle_exception([this](std::exception_ptr eptr) {
fault(state_t::REPLACING, "trigger_replacing", eptr);
// READY state
-void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
+seastar::future<> ProtocolV2::notify_out_fault(
+ crosscore_t::seq_t cc_seq,
+ const char *where,
+ std::exception_ptr eptr,
+ io_handler_state _io_states)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out_fault(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq, where, eptr, _io_states] {
+ return notify_out_fault(cc_seq, where, eptr, _io_states);
+ });
+ }
+
+ io_states = _io_states;
+ logger().debug("{} got {} notify_out_fault(): io_states={}",
+ conn, cc_seq, io_states);
fault(state_t::READY, where, eptr);
+ return seastar::now();
}
void ProtocolV2::execute_ready()
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
protocol_timer.cancel();
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::READY, io_state_t::open, false);
+ // I'm not responsible to shutdown the socket at READY
+ is_socket_valid = false;
+ trigger_state(state_t::READY, io_state_t::open);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ // FIXME: doesn't support cross-core
+ conn.interceptor->register_conn_ready(
+ conn.get_local_shared_foreign_from_this());
+ }
+#endif
}
// STANDBY state
void ProtocolV2::execute_standby()
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::STANDBY, io_state_t::delay, false);
+ trigger_state(state_t::STANDBY, io_state_t::delay);
}
-void ProtocolV2::notify_out()
+seastar::future<> ProtocolV2::notify_out(
+ crosscore_t::seq_t cc_seq)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_out(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_out(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_out(): at {}",
+ conn, cc_seq, get_state_name(state));
+ io_states.is_out_queued = true;
if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
logger().info("{} notify_out(): at {}, going to CONNECTING",
conn, get_state_name(state));
execute_connecting();
}
+ return seastar::now();
}
// WAIT state
void ProtocolV2::execute_wait(bool max_backoff)
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::WAIT, io_state_t::delay, false);
+ trigger_state(state_t::WAIT, io_state_t::delay);
gated_execute("execute_wait", conn, [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
void ProtocolV2::execute_server_wait()
{
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
+ trigger_state(state_t::SERVER_WAIT, io_state_t::none);
gated_execute("execute_server_wait", conn, [this] {
return frame_assembler->read_exactly(1
- ).then([this](auto bl) {
+ ).then([this](auto bptr) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();
}).handle_exception([this](std::exception_ptr eptr) {
// CLOSING state
-void ProtocolV2::notify_mark_down()
+seastar::future<> ProtocolV2::notify_mark_down(
+ crosscore_t::seq_t cc_seq)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ if (!crosscore.proceed_or_wait(cc_seq)) {
+ logger().debug("{} got {} notify_mark_down(), wait at {}",
+ conn, cc_seq, crosscore.get_in_seq());
+ return crosscore.wait(cc_seq
+ ).then([this, cc_seq] {
+ return notify_mark_down(cc_seq);
+ });
+ }
+
+ logger().debug("{} got {} notify_mark_down()",
+ conn, cc_seq);
do_close(false);
+ return seastar::now();
}
seastar::future<> ProtocolV2::close_clean_yielded()
// the container when seastar::parallel_for_each() is still iterating in it.
// that'd lead to a segfault.
return seastar::yield(
- ).then([this, conn_ref = conn.shared_from_this()] {
+ ).then([this] {
do_close(false);
- // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
- // which will otherwise result in deadlock
- assert(closed_clean_fut.valid());
- return closed_clean_fut.get_future();
- });
+ return pr_closed_clean.get_shared_future();
+
+ // connection may be unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).finally([conn_ref = conn.shared_from_this()] {});;
}
void ProtocolV2::do_close(
bool is_dispatch_reset,
std::optional<std::function<void()>> f_accept_new)
{
- if (closed) {
+ if (state == state_t::CLOSING) {
// already closing
- assert(state == state_t::CLOSING);
return;
}
* atomic operations
*/
- closed = true;
+ ceph_assert_always(!gate.is_closed());
- // trigger close
+ // messenger registrations, must before user events
messenger.closing_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
// cannot happen
ceph_assert(false);
}
- protocol_timer.cancel();
- trigger_state(state_t::CLOSING, io_state_t::drop, false);
-
if (f_accept_new) {
+ // the replacing connection must be registerred after the replaced
+ // connection is unreigsterred.
(*f_accept_new)();
}
+
+ protocol_timer.cancel();
if (is_socket_valid) {
- frame_assembler->shutdown_socket();
+ frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
- assert(!gate.is_closed());
- auto handshake_closed = gate.close();
- auto io_closed = io_handler.close_io(
- is_dispatch_reset, is_replace);
-
- // asynchronous operations
- assert(!closed_clean_fut.valid());
- closed_clean_fut = seastar::when_all(
- std::move(handshake_closed), std::move(io_closed)
- ).discard_result().then([this] {
- ceph_assert_always(!exit_io.has_value());
- if (has_socket) {
- ceph_assert_always(frame_assembler);
- return frame_assembler->close_shutdown_socket();
- } else {
- return seastar::now();
- }
- }).then([this] {
- logger().debug("{} closed!", conn);
- messenger.closed_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
+
+ trigger_state_phase1(state_t::CLOSING);
+ gate.dispatch_in_background(
+ "close_io", conn, [this, is_dispatch_reset, is_replace] {
+ // this is preemptive
+ return wait_switch_io_shard(
+ ).then([this, is_dispatch_reset, is_replace] {
+ trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
+ auto cc_seq = crosscore.prepare_submit();
+ logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
+ conn, cc_seq, is_dispatch_reset, is_replace);
+
+ std::ignore = gate.close(
+ ).then([this] {
+ ceph_assert_always(!need_exit_io);
+ ceph_assert_always(!pr_exit_io.has_value());
+ if (has_socket) {
+ ceph_assert_always(frame_assembler);
+ return frame_assembler->close_shutdown_socket();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ pr_closed_clean.set_value();
#ifdef UNIT_TESTS_BUILT
- closed_clean = true;
- if (conn.interceptor) {
- conn.interceptor->register_conn_closed(conn);
- }
+ closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(
+ conn.get_local_shared_foreign_from_this());
+ }
#endif
- }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
- logger().error("{} closing: closed_clean_fut got unexpected exception {}",
- conn, eptr);
- ceph_abort();
+ // connection is unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing got unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
+ });
+
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, cc_seq, is_dispatch_reset, is_replace] {
+ return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace);
+ });
+ // user can make changes
+ });
});
}