#include "ProtocolV2.h"
-#include <seastar/core/lowres_clock.hh>
#include <fmt/format.h>
+#include <fmt/ranges.h>
#include "include/msgr.h"
#include "include/random.h"
+#include "msg/msg_fmt.h"
#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
#include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
-#include "chained_dispatchers.h"
#include "Errors.h"
-#include "Socket.h"
-#include "SocketConnection.h"
#include "SocketMessenger.h"
#ifdef UNIT_TESTS_BUILT
using namespace ceph::msgr::v2;
using crimson::common::local_conf;
+using io_state_t = crimson::net::IOHandler::io_state_t;
+using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
namespace {
throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
}
-[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
- proto.close(dispatch_reset);
- abort_protocol();
+#define ABORT_IN_CLOSE(is_dispatch_reset) { \
+ do_close(is_dispatch_reset); \
+ abort_protocol(); \
}
inline void expect_tag(const Tag& expected,
namespace crimson::net {
#ifdef UNIT_TESTS_BUILT
-void intercept(Breakpoint bp, bp_type_t type,
- SocketConnection& conn, SocketRef& socket) {
- if (conn.interceptor) {
- auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
- socket->set_trap(type, action, &conn.interceptor->blocker);
+// should be consistent to intercept_frame() in FrameAssemblerV2.cc
+void intercept(Breakpoint bp,
+ bp_type_t type,
+ SocketConnection& conn,
+ Interceptor *interceptor,
+ SocketRef& socket) {
+ if (interceptor) {
+ auto action = interceptor->intercept(conn, Breakpoint(bp));
+ socket->set_trap(type, action, &interceptor->blocker);
}
}
#define INTERCEPT_CUSTOM(bp, type) \
-intercept({bp}, type, conn, socket)
-
-#define INTERCEPT_FRAME(tag, type) \
-intercept({static_cast<Tag>(tag), type}, \
- type, conn, socket)
-
-#define INTERCEPT_N_RW(bp) \
-if (conn.interceptor) { \
- auto action = conn.interceptor->intercept(conn, {bp}); \
- ceph_assert(action != bp_action_t::BLOCK); \
- if (action == bp_action_t::FAULT) { \
- abort_in_fault(); \
- } \
-}
-
+intercept({bp}, type, conn, \
+ conn.interceptor, conn.socket)
#else
#define INTERCEPT_CUSTOM(bp, type)
-#define INTERCEPT_FRAME(tag, type)
-#define INTERCEPT_N_RW(bp)
#endif
seastar::future<> ProtocolV2::Timer::backoff(double seconds)
});
}
-ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
- SocketConnection& conn,
- SocketMessenger& messenger)
- : Protocol(proto_t::v2, dispatchers, conn),
- messenger{messenger},
+ProtocolV2::ProtocolV2(SocketConnection& conn,
+ IOHandler &io_handler)
+ : conn{conn},
+ messenger{conn.messenger},
+ io_handler{io_handler},
+ frame_assembler{FrameAssemblerV2::create(conn)},
+ auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
protocol_timer{conn}
{}
ProtocolV2::~ProtocolV2() {}
-bool ProtocolV2::is_connected() const {
- return state == state_t::READY ||
- state == state_t::ESTABLISHING ||
- state == state_t::REPLACING;
-}
-
void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
const entity_name_t& _peer_name)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!socket);
ceph_assert(!gate.is_closed());
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
execute_connecting();
}
-void ProtocolV2::start_accept(SocketRef&& sock,
+void ProtocolV2::start_accept(SocketRef&& new_socket,
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!socket);
// until we know better
conn.target_addr = _peer_addr;
- socket = std::move(sock);
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ is_socket_valid = true;
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
execute_accepting();
}
-// TODO: Frame related implementations, probably to a separate class.
-
-void ProtocolV2::enable_recording()
-{
- rxbuf.clear();
- txbuf.clear();
- record_io = true;
-}
-
-seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
+void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
{
- if (unlikely(record_io)) {
- return socket->read_exactly(bytes)
- .then([this] (auto bl) {
- rxbuf.append(buffer::create(bl.share()));
- return bl;
- });
+ if (!reentrant && new_state == state) {
+ logger().error("{} is not allowed to re-trigger state {}",
+ conn, get_state_name(state));
+ ceph_abort();
+ }
+ if (state == state_t::CLOSING) {
+ logger().error("{} CLOSING is not allowed to trigger state {}",
+ conn, get_state_name(new_state));
+ ceph_abort();
+ }
+ logger().debug("{} TRIGGER {}, was {}",
+ conn, get_state_name(new_state), get_state_name(state));
+ auto pre_state = state;
+ if (pre_state == state_t::READY) {
+ assert(!gate.is_closed());
+ ceph_assert_always(!exit_io.has_value());
+ exit_io = seastar::shared_promise<>();
+ }
+ state = new_state;
+ if (new_state == state_t::READY) {
+ // I'm not responsible to shutdown the socket at READY
+ is_socket_valid = false;
+ io_handler.set_io_state(new_io_state, std::move(frame_assembler));
} else {
- return socket->read_exactly(bytes);
- };
-}
+ io_handler.set_io_state(new_io_state, nullptr);
+ }
-seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
-{
- if (unlikely(record_io)) {
- return socket->read(bytes)
- .then([this] (auto buf) {
- rxbuf.append(buf);
- return buf;
+ /*
+ * not atomic below
+ */
+
+ if (pre_state == state_t::READY) {
+ gate.dispatch_in_background("exit_io", conn, [this] {
+ return io_handler.wait_io_exit_dispatching(
+ ).then([this](FrameAssemblerV2Ref fa) {
+ frame_assembler = std::move(fa);
+ exit_io->set_value();
+ exit_io = std::nullopt;
+ });
});
- } else {
- return socket->read(bytes);
}
}
-seastar::future<> ProtocolV2::write(bufferlist&& buf)
+void ProtocolV2::fault(
+ state_t expected_state,
+ const char *where,
+ std::exception_ptr eptr)
{
- if (unlikely(record_io)) {
- txbuf.append(buf);
+ assert(expected_state == state_t::CONNECTING ||
+ expected_state == state_t::ESTABLISHING ||
+ expected_state == state_t::REPLACING ||
+ expected_state == state_t::READY);
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
}
- return socket->write(std::move(buf));
-}
-seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
-{
- if (unlikely(record_io)) {
- txbuf.append(buf);
+ if (state != expected_state) {
+ logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
+ conn,
+ get_state_name(expected_state),
+ where,
+ get_state_name(state),
+ e_what);
+#ifndef NDEBUG
+ if (expected_state == state_t::REPLACING) {
+ assert(state == state_t::CLOSING);
+ } else if (expected_state == state_t::READY) {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING ||
+ state == state_t::CONNECTING ||
+ state == state_t::STANDBY);
+ } else {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ }
+#endif
+ return;
}
- return socket->write_flush(std::move(buf));
-}
-
-size_t ProtocolV2::get_current_msg_size() const
-{
- ceph_assert(rx_frame_asm.get_num_segments() > 0);
- size_t sum = 0;
- // we don't include SegmentIndex::Msg::HEADER.
- for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
- sum += rx_frame_asm.get_segment_logical_len(idx);
+ assert(state == expected_state);
+
+ if (state != state_t::CONNECTING && conn.policy.lossy) {
+ // socket will be shutdown in do_close()
+ logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
+ conn, get_state_name(state), where, e_what);
+ do_close(true);
+ return;
}
- return sum;
-}
-
-seastar::future<Tag> ProtocolV2::read_main_preamble()
-{
- rx_preamble.clear();
- return read_exactly(rx_frame_asm.get_preamble_onwire_len())
- .then([this] (auto bl) {
- rx_segments_data.clear();
- try {
- rx_preamble.append(buffer::create(std::move(bl)));
- const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
- INTERCEPT_FRAME(tag, bp_type_t::READ);
- return tag;
- } catch (FrameError& e) {
- logger().warn("{} read_main_preamble: {}", conn, e.what());
- abort_in_fault();
- }
- });
-}
-seastar::future<> ProtocolV2::read_frame_payload()
-{
- ceph_assert(rx_segments_data.empty());
-
- return seastar::do_until(
- [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
- [this] {
- // TODO: create aligned and contiguous buffer from socket
- const size_t seg_idx = rx_segments_data.size();
- if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
- alignment != segment_t::DEFAULT_ALIGNMENT) {
- logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
- conn, alignment, rx_segments_data.size());
- }
- uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
- // TODO: create aligned and contiguous buffer from socket
- return read_exactly(onwire_len).then([this] (auto tmp_bl) {
- logger().trace("{} RECV({}) frame segment[{}]",
- conn, tmp_bl.size(), rx_segments_data.size());
- bufferlist segment;
- segment.append(buffer::create(std::move(tmp_bl)));
- rx_segments_data.emplace_back(std::move(segment));
- });
- }
- ).then([this] {
- return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
- }).then([this] (auto bl) {
- logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
- bool ok = false;
- try {
- bufferlist rx_epilogue;
- rx_epilogue.append(buffer::create(std::move(bl)));
- ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
- } catch (FrameError& e) {
- logger().error("read_frame_payload: {} {}", conn, e.what());
- abort_in_fault();
- } catch (ceph::crypto::onwire::MsgAuthError&) {
- logger().error("read_frame_payload: {} bad auth tag", conn);
- abort_in_fault();
- }
- // we do have a mechanism that allows transmitter to start sending message
- // and abort after putting entire data field on wire. This will be used by
- // the kernel client to avoid unnecessary buffering.
- if (!ok) {
- // TODO
- ceph_assert(false);
+ if (likely(has_socket)) {
+ if (likely(is_socket_valid)) {
+ ceph_assert_always(state != state_t::READY);
+ frame_assembler->shutdown_socket();
+ is_socket_valid = false;
+ } else {
+ ceph_assert_always(state != state_t::ESTABLISHING);
}
- });
-}
-
-template <class F>
-seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
-{
- auto bl = frame.get_buffer(tx_frame_asm);
- const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
- logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
- conn, bl.length(), (int)main_preamble->tag,
- (int)main_preamble->num_segments, main_preamble->crc);
- INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
- if (flush) {
- return write_flush(std::move(bl));
- } else {
- return write(std::move(bl));
+ } else { // !has_socket
+ ceph_assert_always(state == state_t::CONNECTING);
+ assert(!is_socket_valid);
}
-}
-void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
-{
- if (!reentrant && _state == state) {
- logger().error("{} is not allowed to re-trigger state {}",
- conn, get_state_name(state));
- ceph_assert(false);
- }
- logger().debug("{} TRIGGER {}, was {}",
- conn, get_state_name(_state), get_state_name(state));
- state = _state;
- set_write_state(_write_state);
-}
-
-void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
-{
- if (conn.policy.lossy) {
- logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
- conn, func_name, get_state_name(state), eptr);
- close(true);
- } else if (conn.policy.server ||
- (conn.policy.standby &&
- (!is_queued() && conn.sent.empty()))) {
- logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
- conn, func_name, get_state_name(state), eptr);
+ if (conn.policy.server ||
+ (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
+ if (conn.policy.server) {
+ logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{io_handler},
+ e_what);
+ } else {
+ logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{io_handler},
+ e_what);
+ }
execute_standby();
- } else if (backoff) {
- logger().info("{} {}: fault at {}, going to WAIT -- {}",
- conn, func_name, get_state_name(state), eptr);
+ } else if (state == state_t::CONNECTING ||
+ state == state_t::REPLACING) {
+ logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{io_handler},
+ e_what);
execute_wait(false);
} else {
- logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
- conn, func_name, get_state_name(state), eptr);
+ assert(state == state_t::READY ||
+ state == state_t::ESTABLISHING);
+ logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{io_handler},
+ e_what);
execute_connecting();
}
}
{
server_cookie = 0;
connect_seq = 0;
- conn.in_seq = 0;
if (full) {
client_cookie = generate_client_cookie();
peer_global_seq = 0;
- reset_write();
- dispatchers.ms_handle_remote_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
+ io_handler.reset_session(full);
}
seastar::future<std::tuple<entity_type_t, entity_addr_t>>
CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
- return write_flush(std::move(bl)).then([this] {
+ return frame_assembler->write_flush(std::move(bl)).then([this] {
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
- return read_exactly(banner_len); // or read exactly?
+ return frame_assembler->read_exactly(banner_len); // or read exactly?
}).then([this] (auto bl) {
// 3. process peer banner and read banner_payload
unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
}
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
- return read(payload_len);
+ return frame_assembler->read(payload_len);
}).then([this, is_connect] (bufferlist bl) {
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
- uint64_t peer_supported_features;
- uint64_t peer_required_features;
+ uint64_t _peer_supported_features;
+ uint64_t _peer_required_features;
try {
- decode(peer_supported_features, p);
- decode(peer_required_features, p);
+ decode(_peer_supported_features, p);
+ decode(_peer_required_features, p);
} catch (const buffer::error &e) {
logger().warn("{} decode banner payload failed", conn);
abort_in_fault();
}
logger().debug("{} RECV({}) banner features: supported={} required={}",
conn, bl.length(),
- peer_supported_features, peer_required_features);
+ _peer_supported_features, _peer_required_features);
// Check feature bit compatibility
uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
- if ((required_features & peer_supported_features) != required_features) {
+ if ((required_features & _peer_supported_features) != required_features) {
logger().error("{} peer does not support all required features"
" required={} peer_supported={}",
- conn, required_features, peer_supported_features);
- abort_in_close(*this, is_connect);
+ conn, required_features, _peer_supported_features);
+ ABORT_IN_CLOSE(is_connect);
}
- if ((supported_features & peer_required_features) != peer_required_features) {
+ if ((supported_features & _peer_required_features) != _peer_required_features) {
logger().error("{} we do not support all peer required features"
" peer_required={} supported={}",
- conn, peer_required_features, supported_features);
- abort_in_close(*this, is_connect);
- }
- this->peer_required_features = peer_required_features;
- if (this->peer_required_features == 0) {
- this->connection_features = msgr2_required;
+ conn, _peer_required_features, supported_features);
+ ABORT_IN_CLOSE(is_connect);
}
- const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- tx_frame_asm.set_is_rev1(is_rev1);
- rx_frame_asm.set_is_rev1(is_rev1);
+ peer_supported_features = _peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
auto hello = HelloFrame::Encode(messenger.get_mytype(),
conn.target_addr);
logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
conn, ceph_entity_type_name(messenger.get_mytype()),
conn.target_addr);
- return write_frame(hello);
+ return frame_assembler->write_flush_frame(hello);
}).then([this] {
//5. read peer HelloFrame
- return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::HELLO, tag, conn, __func__);
- return read_frame_payload();
- }).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
// 6. process peer HelloFrame
- auto hello = HelloFrame::Decode(rx_segments_data.back());
+ auto hello = HelloFrame::Decode(payload->back());
logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
conn, ceph_entity_type_name(hello.entity_type()),
hello.peer_addr());
seastar::future<> ProtocolV2::handle_auth_reply()
{
- return read_main_preamble()
- .then([this] (Tag tag) {
- switch (tag) {
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::AUTH_BAD_METHOD:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_auth_bad_method() logic
- auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
+ auto bad_method = AuthBadMethodFrame::Decode(payload->back());
logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
"allowed_methods={}, allowed_modes={}",
conn, bad_method.method(), cpp_strerror(bad_method.result()),
bad_method.allowed_methods(), bad_method.allowed_modes());
ceph_assert(messenger.get_auth_client());
int r = messenger.get_auth_client()->handle_auth_bad_method(
- conn.shared_from_this(), auth_meta,
+ conn, *auth_meta,
bad_method.method(), bad_method.result(),
bad_method.allowed_methods(), bad_method.allowed_modes());
if (r < 0) {
return client_auth(bad_method.allowed_methods());
});
case Tag::AUTH_REPLY_MORE:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_auth_reply_more() logic
- auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
+ auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
conn, auth_more.auth_payload().length());
ceph_assert(messenger.get_auth_client());
// let execute_connecting() take care of the thrown exception
auto reply = messenger.get_auth_client()->handle_auth_reply_more(
- conn.shared_from_this(), auth_meta, auth_more.auth_payload());
+ conn, *auth_meta, auth_more.auth_payload());
auto more_reply = AuthRequestMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
conn, reply.length());
- return write_frame(more_reply);
+ return frame_assembler->write_flush_frame(more_reply);
}).then([this] {
return handle_auth_reply();
});
case Tag::AUTH_DONE:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_auth_done() logic
- auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+ auto auth_done = AuthDoneFrame::Decode(payload->back());
logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
conn, auth_done.global_id(),
ceph_con_mode_name(auth_done.con_mode()),
auth_done.auth_payload().length());
ceph_assert(messenger.get_auth_client());
int r = messenger.get_auth_client()->handle_auth_done(
- conn.shared_from_this(), auth_meta,
+ conn,
+ *auth_meta,
auth_done.global_id(),
auth_done.con_mode(),
auth_done.auth_payload());
abort_in_fault();
}
auth_meta->con_mode = auth_done.con_mode();
- session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
- nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
+ frame_assembler->create_session_stream_handlers(*auth_meta, false);
return finish_auth();
});
default: {
- unexpected_tag(tag, conn, __func__);
+ unexpected_tag(ret.tag, conn, "handle_auth_reply");
return seastar::now();
}
}
try {
auto [auth_method, preferred_modes, bl] =
- messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
+ messenger.get_auth_client()->get_auth_request(conn, *auth_meta);
auth_meta->auth_method = auth_method;
auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
logger().debug("{} WRITE AuthRequestFrame: method={},"
" preferred_modes={}, payload_len={}",
conn, auth_method, preferred_modes, bl.length());
- return write_frame(frame).then([this] {
+ return frame_assembler->write_flush_frame(frame
+ ).then([this] {
return handle_auth_reply();
});
} catch (const crimson::auth::error& e) {
- logger().error("{} get_initial_auth_request returned {}", conn, e);
- abort_in_close(*this, true);
+ logger().error("{} get_initial_auth_request returned {}", conn, e.what());
+ ABORT_IN_CLOSE(true);
return seastar::now();
}
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::process_wait()
{
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_wait() logic
logger().debug("{} GOT WaitFrame", conn);
- WaitFrame::Decode(rx_segments_data.back());
+ WaitFrame::Decode(payload->back());
return next_step_t::wait;
});
}
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, client_cookie);
- return write_frame(client_ident).then([this] {
- return read_main_preamble();
- }).then([this] (Tag tag) {
- switch (tag) {
+ return frame_assembler->write_flush_frame(client_ident
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::IDENT_MISSING_FEATURES:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_ident_missing_features() logic
- auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+ auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
" (client does not support all server features)",
conn, ident_missing.features());
case Tag::WAIT:
return process_wait();
case Tag::SERVER_IDENT:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_server_ident() logic
- requeue_sent();
- auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+ io_handler.requeue_out_sent();
+ auto server_ident = ServerIdentFrame::Decode(payload->back());
logger().debug("{} GOT ServerIdentFrame:"
" addrs={}, gid={}, gs={},"
" features_supported={}, features_required={},"
logger().error("{} connection peer id ({}) does not match "
"what it should be ({}) during connecting, close",
conn, server_ident.gid(), conn.get_peer_id());
- abort_in_close(*this, true);
+ ABORT_IN_CLOSE(true);
}
conn.set_peer_id(server_ident.gid());
conn.set_features(server_ident.supported_features() &
conn.policy.features_supported);
+ logger().debug("{} UPDATE: features={}", conn, conn.get_features());
peer_global_seq = server_ident.global_seq();
bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
- unexpected_tag(tag, conn, "post_client_connect");
+ unexpected_tag(ret.tag, conn, "post_client_connect");
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
server_cookie,
global_seq,
connect_seq,
- conn.in_seq);
+ io_handler.get_in_seq());
logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
- " server_cookie={}, gs={}, cs={}, msg_seq={}",
+ " server_cookie={}, gs={}, cs={}, in_seq={}",
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
- global_seq, connect_seq, conn.in_seq);
- return write_frame(reconnect).then([this] {
- return read_main_preamble();
- }).then([this] (Tag tag) {
- switch (tag) {
+ global_seq, connect_seq, io_handler.get_in_seq());
+ return frame_assembler->write_flush_frame(reconnect).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::SESSION_RETRY_GLOBAL:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_session_retry_global() logic
- auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+ auto retry = RetryGlobalFrame::Decode(payload->back());
logger().warn("{} GOT RetryGlobalFrame: gs={}",
conn, retry.global_seq());
- return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
- global_seq = gs;
- logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
- return client_reconnect();
- });
+ global_seq = messenger.get_global_seq(retry.global_seq());
+ logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
+ return client_reconnect();
});
case Tag::SESSION_RETRY:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_session_retry() logic
- auto retry = RetryFrame::Decode(rx_segments_data.back());
+ auto retry = RetryFrame::Decode(payload->back());
logger().warn("{} GOT RetryFrame: cs={}",
conn, retry.connect_seq());
connect_seq = retry.connect_seq() + 1;
return client_reconnect();
});
case Tag::SESSION_RESET:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before reset_session()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
// handle_session_reset() logic
- auto reset = ResetFrame::Decode(rx_segments_data.back());
+ auto reset = ResetFrame::Decode(payload->back());
logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
reset_session(reset.full());
return client_connect();
case Tag::WAIT:
return process_wait();
case Tag::SESSION_RECONNECT_OK:
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_reconnect_ok() logic
- auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+ auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
conn, reconnect_ok.msg_seq());
- requeue_up_to(reconnect_ok.msg_seq());
+ io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
default: {
- unexpected_tag(tag, conn, "post_client_reconnect");
+ unexpected_tag(ret.tag, conn, "post_client_reconnect");
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
void ProtocolV2::execute_connecting()
{
- trigger_state(state_t::CONNECTING, write_state_t::delay, true);
- if (socket) {
- socket->shutdown();
- }
- gated_execute("execute_connecting", [this] {
- return messenger.get_global_seq().then([this] (auto gs) {
- global_seq = gs;
- assert(client_cookie != 0);
- if (!conn.policy.lossy && server_cookie != 0) {
- ++connect_seq;
- logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
- conn, global_seq, connect_seq);
- } else { // conn.policy.lossy || server_cookie == 0
- assert(connect_seq == 0);
- assert(server_cookie == 0);
- logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::CONNECTING, io_state_t::delay, false);
+ gated_execute("execute_connecting", conn, [this] {
+ global_seq = messenger.get_global_seq();
+ assert(client_cookie != 0);
+ if (!conn.policy.lossy && server_cookie != 0) {
+ ++connect_seq;
+ logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+ conn, global_seq, connect_seq);
+ } else { // conn.policy.lossy || server_cookie == 0
+ assert(connect_seq == 0);
+ assert(server_cookie == 0);
+ logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+ }
+ return wait_exit_io().then([this] {
+#ifdef UNIT_TESTS_BUILT
+ // process custom_bp_t::SOCKET_CONNECTING
+ // supports CONTINUE/FAULT/BLOCK
+ if (conn.interceptor) {
+ auto action = conn.interceptor->intercept(
+ conn, {custom_bp_t::SOCKET_CONNECTING});
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ return seastar::now();
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ abort_in_fault();
+ case bp_action_t::BLOCK:
+ logger().info("[Test] got BLOCK");
+ return conn.interceptor->blocker.block();
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ } else {
+ return seastar::now();
}
-
- return wait_write_exit();
}).then([this] {
+#endif
+ ceph_assert_always(frame_assembler);
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} before Socket::connect()",
conn, get_state_name(state));
abort_protocol();
}
- if (socket) {
- gate.dispatch_in_background("close_sockect_connecting", *this,
- [sock = std::move(socket)] () mutable {
- return sock->close().then([sock = std::move(sock)] {});
- });
- }
- INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
return Socket::connect(conn.peer_addr);
- }).then([this](SocketRef sock) {
+ }).then([this](SocketRef new_socket) {
logger().debug("{} socket connected", conn);
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} during Socket::connect()",
conn, get_state_name(state));
- return sock->close().then([sock = std::move(sock)] {
+ return new_socket->close().then([sock=std::move(new_socket)] {
abort_protocol();
});
}
- socket = std::move(sock);
+ if (!has_socket) {
+ frame_assembler->set_socket(std::move(new_socket));
+ has_socket = true;
+ } else {
+ gate.dispatch_in_background(
+ "replace_socket_connecting",
+ conn,
+ [this, new_socket=std::move(new_socket)]() mutable {
+ return frame_assembler->replace_shutdown_socket(std::move(new_socket));
+ }
+ );
+ }
+ is_socket_valid = true;
return seastar::now();
}).then([this] {
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- session_stream_handlers = { nullptr, nullptr };
- enable_recording();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
return banner_exchange(true);
}).then([this] (auto&& ret) {
auto [_peer_type, _my_addr_from_peer] = std::move(ret);
logger().warn("{} connection peer type does not match what peer advertises {} != {}",
conn, ceph_entity_type_name(conn.get_peer_type()),
ceph_entity_type_name(_peer_type));
- abort_in_close(*this, true);
+ ABORT_IN_CLOSE(true);
}
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} during banner_exchange(), abort",
conn, get_state_name(state));
abort_protocol();
}
- socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+ frame_assembler->learn_socket_ephemeral_port_as_connector(
+ _my_addr_from_peer.get_port());
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
conn, _my_addr_from_peer);
make_error_code(crimson::net::error::bad_peer_address));
}
_my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
- return messenger.learned_addr(_my_addr_from_peer, conn);
- }).then([this] {
+ messenger.learned_addr(_my_addr_from_peer, conn);
return client_auth();
}).then([this] {
if (server_cookie == 0) {
}
switch (next) {
case next_step_t::ready: {
- logger().info("{} connected:"
- " gs={}, pgs={}, cs={}, client_cookie={},"
- " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ logger().info("{} connected: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, conn.in_seq,
- conn.out_seq, conn.out_q.size());
- execute_ready(true);
+ client_cookie, server_cookie,
+ io_stat_printer{io_handler});
+ io_handler.dispatch_connect();
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} after ms_handle_connect(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_ready();
break;
}
case next_step_t::wait: {
- logger().info("{} execute_connecting(): going to WAIT", conn);
+ logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+ ceph_assert_always(is_socket_valid);
+ frame_assembler->shutdown_socket();
+ is_socket_valid = false;
execute_wait(true);
break;
}
ceph_abort("impossible next step");
}
}
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::CONNECTING) {
- logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::CLOSING ||
- state == state_t::REPLACING);
- return;
- }
-
- if (conn.policy.server ||
- (conn.policy.standby &&
- (!is_queued() && conn.sent.empty()))) {
- logger().info("{} execute_connecting(): fault at {} with nothing to send,"
- " going to STANDBY -- {}",
- conn, get_state_name(state), eptr);
- execute_standby();
- } else {
- logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
- conn, get_state_name(state), eptr);
- execute_wait(false);
- }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::CONNECTING, "execute_connecting", eptr);
});
});
}
"allowed_methods={}, allowed_modes={})",
conn, auth_meta->auth_method, cpp_strerror(r),
allowed_methods, allowed_modes);
- return write_frame(bad_method).then([this] {
+ return frame_assembler->write_flush_frame(bad_method
+ ).then([this] {
return server_auth();
});
}
ceph_assert(messenger.get_auth_server());
bufferlist reply;
int r = messenger.get_auth_server()->handle_auth_request(
- conn.shared_from_this(), auth_meta,
- more, auth_meta->auth_method, auth_payload,
+ conn,
+ *auth_meta,
+ more,
+ auth_meta->auth_method,
+ auth_payload,
+ &conn.peer_global_id,
&reply);
switch (r) {
// successful
logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
conn, conn.peer_global_id,
ceph_con_mode_name(auth_meta->con_mode), reply.length());
- return write_frame(auth_done).then([this] {
+ return frame_assembler->write_flush_frame(auth_done
+ ).then([this] {
ceph_assert(auth_meta);
- session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
- nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
+ frame_assembler->create_session_stream_handlers(*auth_meta, true);
return finish_auth();
});
}
auto more = AuthReplyMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
conn, reply.length());
- return write_frame(more).then([this] {
- return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
- return read_frame_payload();
- }).then([this] {
- auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+ return frame_assembler->write_flush_frame(more
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
+ auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
conn, auth_more.auth_payload().length());
return _handle_auth_request(auth_more.auth_payload(), true);
seastar::future<> ProtocolV2::server_auth()
{
- return read_main_preamble()
- .then([this] (Tag tag) {
- expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
- return read_frame_payload();
- }).then([this] {
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request");
+ return frame_assembler->read_frame_payload();
+ }).then([this](auto payload) {
// handle_auth_request() logic
- auto request = AuthRequestFrame::Decode(rx_segments_data.back());
+ auto request = AuthRequestFrame::Decode(payload->back());
logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
" payload_len={}",
conn, request.method(), request.preferred_modes(),
{
auto wait = WaitFrame::Encode();
logger().debug("{} WRITE WaitFrame", conn);
- return write_frame(wait).then([] {
+ return frame_assembler->write_flush_frame(wait
+ ).then([] {
return next_step_t::wait;
});
}
ProtocolV2* existing_proto, bool do_reset,
bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
{
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before trigger_replacing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
existing_proto->trigger_replacing(reconnect,
do_reset,
- std::move(socket),
+ frame_assembler->to_replace(),
std::move(auth_meta),
- std::move(session_stream_handlers),
peer_global_seq,
client_cookie,
conn.get_peer_name(),
- connection_features,
- tx_frame_asm.get_is_rev1(),
- rx_frame_asm.get_is_rev1(),
+ conn.get_features(),
+ peer_supported_features,
conn_seq,
msg_seq);
+ ceph_assert_always(has_socket && is_socket_valid);
+ is_socket_valid = false;
+ has_socket = false;
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
conn.interceptor->register_conn_replaced(conn);
// close this connection because all the necessary information is delivered
// to the exisiting connection, and jump to error handling code to abort the
// current state.
- abort_in_close(*this, false);
+ ABORT_IN_CLOSE(false);
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
" found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie,
- existing_conn, get_state_name(existing_proto->state),
+ fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state),
existing_proto->global_seq,
existing_proto->peer_global_seq,
existing_proto->connect_seq,
if (!validate_peer_name(existing_conn->get_peer_name())) {
logger().error("{} server_connect: my peer_name doesn't match"
- " the existing connection {}, abort", conn, existing_conn);
+ " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
abort_in_fault();
}
logger().warn("{} server_connect:"
" existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing_conn);
- execute_establishing(existing_conn, true);
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing(existing_conn);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
// by replacing the socket
logger().warn("{} server_connect:"
" found new session (cs={})"
- " when existing {} is with stale session (cs={}, ss={}),"
+ " when existing {} {} is with stale session (cs={}, ss={}),"
" peer must have reset",
- conn, client_cookie,
- *existing_conn, existing_proto->client_cookie,
+ conn,
+ client_cookie,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ existing_proto->client_cookie,
existing_proto->server_cookie);
return reuse_connection(existing_proto, conn.policy.resetcheck);
} else {
// session establishment interrupted between client_ident and server_ident,
// continuing...
- logger().warn("{} server_connect: found client session with existing {}"
+ logger().warn("{} server_connect: found client session with existing {} {}"
" matched (cs={}, ss={}), continuing session establishment",
- conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+ conn,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ client_cookie,
+ existing_proto->server_cookie);
return reuse_connection(existing_proto);
}
} else {
// each other at the same time.
if (existing_proto->client_cookie != client_cookie) {
if (existing_conn->peer_wins()) {
+ // acceptor (this connection, the peer) wins
logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
- " and win, reusing existing {}",
- conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+ " and win, reusing existing {} {}",
+ conn,
+ client_cookie,
+ existing_proto->client_cookie,
+ get_state_name(existing_proto->state),
+ *existing_conn);
return reuse_connection(existing_proto);
} else {
+ // acceptor (this connection, the peer) loses
logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
" and lose to existing {}, ask client to wait",
conn, client_cookie, existing_proto->client_cookie, *existing_conn);
- return existing_conn->keepalive().then([this] {
+ return existing_conn->send_keepalive().then([this] {
return send_wait();
});
}
} else {
- logger().warn("{} server_connect: found client session with existing {}"
+ logger().warn("{} server_connect: found client session with existing {} {}"
" matched (cs={}, ss={}), continuing session establishment",
- conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+ conn,
+ get_state_name(existing_proto->state),
+ *existing_conn,
+ client_cookie,
+ existing_proto->server_cookie);
return reuse_connection(existing_proto);
}
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::server_connect()
{
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_client_ident() logic
- auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+ auto client_ident = ClientIdentFrame::Decode(payload->back());
logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
" gid={}, gs={}, features_supported={},"
" features_required={}, flags={}, cookie={}",
auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
conn, feat_missing);
- return write_frame(ident_missing_features).then([] {
+ return frame_assembler->write_flush_frame(ident_missing_features
+ ).then([] {
return next_step_t::wait;
});
}
- connection_features =
- client_ident.supported_features() & conn.policy.features_supported;
- logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
+ conn.set_features(client_ident.supported_features() &
+ conn.policy.features_supported);
+ logger().debug("{} UPDATE: features={}", conn, conn.get_features());
peer_global_seq = client_ident.global_seq();
+ bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+ if (lossy != conn.policy.lossy) {
+ logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
+ conn, conn.policy.lossy, lossy);
+ }
+
// Looks good so far, let's check if there is already an existing connection
// to this peer.
SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
if (existing_conn) {
- if (existing_conn->protocol->proto_type != proto_t::v2) {
- logger().warn("{} existing connection {} proto version is {}, close existing",
- conn, *existing_conn,
- static_cast<int>(existing_conn->protocol->proto_type));
- // should unregister the existing from msgr atomically
- // NOTE: this is following async messenger logic, but we may miss the reset event.
- execute_establishing(existing_conn, false);
- return seastar::make_ready_future<next_step_t>(next_step_t::ready);
- } else {
- return handle_existing_connection(existing_conn);
- }
+ return handle_existing_connection(existing_conn);
} else {
- execute_establishing(nullptr, true);
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ execute_establishing(nullptr);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
});
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::read_reconnect()
{
- return read_main_preamble()
- .then([this] (Tag tag) {
- expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+ return frame_assembler->read_main_preamble(
+ ).then([this](auto ret) {
+ expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect");
return server_reconnect();
});
}
{
auto retry = RetryFrame::Encode(connect_seq);
logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
- return write_frame(retry).then([this] {
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
return read_reconnect();
});
}
{
auto retry = RetryGlobalFrame::Encode(global_seq);
logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
- return write_frame(retry).then([this] {
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
return read_reconnect();
});
}
{
auto reset = ResetFrame::Encode(full);
logger().warn("{} WRITE ResetFrame: full={}", conn, full);
- return write_frame(reset).then([this] {
- return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+ return frame_assembler->write_flush_frame(reset
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
return server_connect();
});
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::server_reconnect()
{
- return read_frame_payload().then([this] {
+ return frame_assembler->read_frame_payload(
+ ).then([this](auto payload) {
// handle_reconnect() logic
- auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+ auto reconnect = ReconnectFrame::Decode(payload->back());
logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
" server_cookie={}, gs={}, cs={}, msg_seq={}",
return send_reset(true);
}
- if (existing_conn->protocol->proto_type != proto_t::v2) {
- logger().warn("{} server_reconnect: existing connection {} proto version is {},"
- "close existing and reset client.",
- conn, *existing_conn,
- static_cast<int>(existing_conn->protocol->proto_type));
- // NOTE: this is following async messenger logic, but we may miss the reset event.
- existing_conn->mark_down();
- return send_reset(true);
- }
-
ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
existing_conn->protocol.get());
ceph_assert(existing_proto);
" found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
conn, global_seq, peer_global_seq, reconnect.connect_seq(),
reconnect.client_cookie(), reconnect.server_cookie(),
- existing_conn,
+ fmt::ptr(existing_conn.get()),
get_state_name(existing_proto->state),
existing_proto->global_seq,
existing_proto->peer_global_seq,
if (!validate_peer_name(existing_conn->get_peer_name())) {
logger().error("{} server_reconnect: my peer_name doesn't match"
- " the existing connection {}, abort", conn, existing_conn);
+ " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
abort_in_fault();
}
} else if (existing_proto->connect_seq == reconnect.connect_seq()) {
// reconnect race: both peers are sending reconnect messages
if (existing_conn->peer_wins()) {
+ // acceptor (this connection, the peer) wins
logger().warn("{} server_reconnect: reconnect race detected (cs={})"
- " and win, reusing existing {}",
- conn, reconnect.connect_seq(), *existing_conn);
+ " and win, reusing existing {} {}",
+ conn,
+ reconnect.connect_seq(),
+ get_state_name(existing_proto->state),
+ *existing_conn);
return reuse_connection(
existing_proto, false,
true, reconnect.connect_seq(), reconnect.msg_seq());
} else {
+ // acceptor (this connection, the peer) loses
logger().warn("{} server_reconnect: reconnect race detected (cs={})"
" and lose to existing {}, ask client to wait",
conn, reconnect.connect_seq(), *existing_conn);
}
} else { // existing_proto->connect_seq < reconnect.connect_seq()
logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
- " reusing existing {}",
- conn, existing_proto->connect_seq,
- reconnect.connect_seq(), *existing_conn);
+ " reusing existing {} {}",
+ conn,
+ existing_proto->connect_seq,
+ reconnect.connect_seq(),
+ get_state_name(existing_proto->state),
+ *existing_conn);
return reuse_connection(
existing_proto, false,
true, reconnect.connect_seq(), reconnect.msg_seq());
void ProtocolV2::execute_accepting()
{
- trigger_state(state_t::ACCEPTING, write_state_t::none, false);
- gate.dispatch_in_background("execute_accepting", *this, [this] {
+ assert(is_socket_valid);
+ trigger_state(state_t::ACCEPTING, io_state_t::none, false);
+ gate.dispatch_in_background("execute_accepting", conn, [this] {
return seastar::futurize_invoke([this] {
- INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
+#ifdef UNIT_TESTS_BUILT
+ if (conn.interceptor) {
+ auto action = conn.interceptor->intercept(
+ conn, {custom_bp_t::SOCKET_ACCEPTED});
+ switch (action) {
+ case bp_action_t::CONTINUE:
+ break;
+ case bp_action_t::FAULT:
+ logger().info("[Test] got FAULT");
+ abort_in_fault();
+ default:
+ ceph_abort("unexpected action from trap");
+ }
+ }
+#endif
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- session_stream_handlers = { nullptr, nullptr };
- session_comp_handlers = { nullptr, nullptr };
- enable_recording();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
return banner_exchange(false);
}).then([this] (auto&& ret) {
auto [_peer_type, _my_addr_from_peer] = std::move(ret);
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
- return messenger.learned_addr(_my_addr_from_peer, conn);
- }).then([this] {
+ messenger.learned_addr(_my_addr_from_peer, conn);
return server_auth();
}).then([this] {
- return read_main_preamble();
- }).then([this] (Tag tag) {
- switch (tag) {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ switch (ret.tag) {
case Tag::CLIENT_IDENT:
return server_connect();
case Tag::SESSION_RECONNECT:
return server_reconnect();
default: {
- unexpected_tag(tag, conn, "post_server_auth");
+ unexpected_tag(ret.tag, conn, "post_server_auth");
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}
default:
ceph_abort("impossible next step");
}
- }).handle_exception([this] (std::exception_ptr eptr) {
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
- conn, get_state_name(state), eptr);
- close(false);
+ conn, get_state_name(state), e_what);
+ do_close(false);
});
});
}
{
ceph_assert(auth_meta);
+ auto records = frame_assembler->stop_recording();
const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
- auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
+ auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
auto sig_frame = AuthSignatureFrame::Encode(sig);
- ceph_assert(record_io);
- record_io = false;
- rxbuf.clear();
logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
- return write_frame(sig_frame).then([this] {
- return read_main_preamble();
- }).then([this] (Tag tag) {
- expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
- return read_frame_payload();
- }).then([this] {
+ return frame_assembler->write_flush_frame(sig_frame
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
+ }).then([this](auto ret) {
+ expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
+ return frame_assembler->read_frame_payload();
+ }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
// handle_auth_signature() logic
- auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+ auto sig_frame = AuthSignatureFrame::Decode(payload->back());
logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
const auto actual_tx_sig = auth_meta->session_key.empty() ?
conn, actual_tx_sig, sig_frame.signature());
abort_in_fault();
}
- txbuf.clear();
});
}
// ESTABLISHING
-void ProtocolV2::execute_establishing(
- SocketConnectionRef existing_conn, bool dispatch_reset) {
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} before execute_establishing()",
- conn, get_state_name(state));
- abort_protocol();
- }
-
+void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
auto accept_me = [this] {
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
};
- trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+ ceph_assert_always(is_socket_valid);
+ trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
if (existing_conn) {
- existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+ static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
+ true /* is_dispatch_reset */, std::move(accept_me));
if (unlikely(state != state_t::ESTABLISHING)) {
logger().warn("{} triggered {} during execute_establishing(), "
"the accept event will not be delivered!",
accept_me();
}
- dispatchers.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ io_handler.dispatch_accept();
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
- gated_execute("execute_establishing", [this] {
+ gated_execute("execute_establishing", conn, [this] {
return seastar::futurize_invoke([this] {
return send_server_ident();
}).then([this] {
conn, get_state_name(state));
abort_protocol();
}
- logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
- " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+ logger().info("{} established: gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
conn, global_seq, peer_global_seq, connect_seq,
- client_cookie, server_cookie, conn.in_seq,
- conn.out_seq, conn.out_q.size());
- execute_ready(false);
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::ESTABLISHING) {
- logger().info("{} execute_establishing() protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::CLOSING ||
- state == state_t::REPLACING);
- return;
- }
- fault(false, "execute_establishing()", eptr);
+ client_cookie, server_cookie,
+ io_stat_printer{io_handler});
+ execute_ready();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::ESTABLISHING, "execute_establishing", eptr);
});
});
}
// send_server_ident() logic
// refered to async-conn v2: not assign gs to global_seq
- return messenger.get_global_seq().then([this] (auto gs) {
- logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+ global_seq = messenger.get_global_seq();
+ logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
- // this is required for the case when this connection is being replaced
- requeue_up_to(0);
- conn.in_seq = 0;
+ // this is required for the case when this connection is being replaced
+ io_handler.requeue_out_sent_up_to(0);
+ io_handler.reset_session(false);
- if (!conn.policy.lossy) {
- server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
- }
+ if (!conn.policy.lossy) {
+ server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+ }
- uint64_t flags = 0;
- if (conn.policy.lossy) {
- flags = flags | CEPH_MSG_CONNECT_LOSSY;
- }
+ uint64_t flags = 0;
+ if (conn.policy.lossy) {
+ flags = flags | CEPH_MSG_CONNECT_LOSSY;
+ }
- auto server_ident = ServerIdentFrame::Encode(
- messenger.get_myaddrs(),
- messenger.get_myname().num(),
- gs,
- conn.policy.features_supported,
- conn.policy.features_required | msgr2_required,
- flags,
- server_cookie);
-
- logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
- " gs={}, features_supported={}, features_required={},"
- " flags={}, cookie={}",
- conn, messenger.get_myaddrs(), messenger.get_myname().num(),
- gs, conn.policy.features_supported,
- conn.policy.features_required | msgr2_required,
- flags, server_cookie);
-
- conn.set_features(connection_features);
-
- return write_frame(server_ident);
- });
+ auto server_ident = ServerIdentFrame::Encode(
+ messenger.get_myaddrs(),
+ messenger.get_myname().num(),
+ global_seq,
+ conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags,
+ server_cookie);
+
+ logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
+ conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+ global_seq, conn.policy.features_supported,
+ conn.policy.features_required | msgr2_required,
+ flags, server_cookie);
+
+ return frame_assembler->write_flush_frame(server_ident);
}
// REPLACING state
void ProtocolV2::trigger_replacing(bool reconnect,
bool do_reset,
- SocketRef&& new_socket,
+ FrameAssemblerV2::mover_t &&mover,
AuthConnectionMetaRef&& new_auth_meta,
- ceph::crypto::onwire::rxtx_t new_rxtx,
uint64_t new_peer_global_seq,
uint64_t new_client_cookie,
entity_name_t new_peer_name,
uint64_t new_conn_features,
- bool tx_is_rev1,
- bool rx_is_rev1,
+ uint64_t new_peer_supported_features,
uint64_t new_connect_seq,
uint64_t new_msg_seq)
{
- trigger_state(state_t::REPLACING, write_state_t::delay, false);
- if (socket) {
- socket->shutdown();
+ ceph_assert_always(has_socket || state == state_t::CONNECTING);
+ ceph_assert_always(!mover.socket->is_shutdown());
+ trigger_state(state_t::REPLACING, io_state_t::delay, false);
+ if (is_socket_valid) {
+ frame_assembler->shutdown_socket();
+ is_socket_valid = false;
}
- dispatchers.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- gate.dispatch_in_background("trigger_replacing", *this,
- [this,
- reconnect,
- do_reset,
- new_socket = std::move(new_socket),
- new_auth_meta = std::move(new_auth_meta),
- new_rxtx = std::move(new_rxtx),
- tx_is_rev1, rx_is_rev1,
- new_client_cookie, new_peer_name,
- new_conn_features, new_peer_global_seq,
- new_connect_seq, new_msg_seq] () mutable {
- return wait_write_exit().then([this, do_reset] {
- if (do_reset) {
- reset_session(true);
- }
+ gate.dispatch_in_background(
+ "trigger_replacing",
+ conn,
+ [this,
+ reconnect,
+ do_reset,
+ mover = std::move(mover),
+ new_auth_meta = std::move(new_auth_meta),
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_supported_features,
+ new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
+ ceph_assert_always(state == state_t::REPLACING);
+ io_handler.dispatch_accept();
+ // state may become CLOSING, close mover.socket and abort later
+ return wait_exit_io(
+ ).then([this] {
+ ceph_assert_always(frame_assembler);
protocol_timer.cancel();
- return execution_done.get_future();
+ auto done = std::move(execution_done);
+ execution_done = seastar::now();
+ return done;
}).then([this,
reconnect,
- new_socket = std::move(new_socket),
+ do_reset,
+ mover = std::move(mover),
new_auth_meta = std::move(new_auth_meta),
- new_rxtx = std::move(new_rxtx),
- tx_is_rev1, rx_is_rev1,
new_client_cookie, new_peer_name,
- new_conn_features, new_peer_global_seq,
+ new_conn_features, new_peer_supported_features,
+ new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
+ if (state == state_t::REPLACING && do_reset) {
+ reset_session(true);
+ }
+
if (unlikely(state != state_t::REPLACING)) {
- return new_socket->close().then([sock = std::move(new_socket)] {
+ return mover.socket->close(
+ ).then([sock = std::move(mover.socket)] {
abort_protocol();
});
}
- if (socket) {
- gate.dispatch_in_background("close_socket_replacing", *this,
- [sock = std::move(socket)] () mutable {
- return sock->close().then([sock = std::move(sock)] {});
- });
- }
- socket = std::move(new_socket);
auth_meta = std::move(new_auth_meta);
- session_stream_handlers = std::move(new_rxtx);
- record_io = false;
peer_global_seq = new_peer_global_seq;
+ gate.dispatch_in_background(
+ "replace_frame_assembler",
+ conn,
+ [this, mover=std::move(mover)]() mutable {
+ return frame_assembler->replace_by(std::move(mover));
+ }
+ );
+ is_socket_valid = true;
+ has_socket = true;
if (reconnect) {
connect_seq = new_connect_seq;
// send_reconnect_ok() logic
- requeue_up_to(new_msg_seq);
- auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
- logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
- return write_frame(reconnect_ok);
+ io_handler.requeue_out_sent_up_to(new_msg_seq);
+ auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
+ logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
+ return frame_assembler->write_flush_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
assert(conn.get_peer_type() == new_peer_name.type());
if (conn.get_peer_id() == entity_name_t::NEW) {
conn.set_peer_id(new_peer_name.num());
}
- connection_features = new_conn_features;
- tx_frame_asm.set_is_rev1(tx_is_rev1);
- rx_frame_asm.set_is_rev1(rx_is_rev1);
+ conn.set_features(new_conn_features);
+ peer_supported_features = new_peer_supported_features;
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ frame_assembler->set_is_rev1(is_rev1);
return send_server_ident();
}
}).then([this, reconnect] {
conn, get_state_name(state));
abort_protocol();
}
- logger().info("{} replaced ({}):"
- " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
- " in_seq={}, out_seq={}, out_q={}",
+ logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+ "client_cookie={}, server_cookie={}, {}",
conn, reconnect ? "reconnected" : "connected",
- global_seq, peer_global_seq, connect_seq, client_cookie,
- server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
- execute_ready(false);
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::REPLACING) {
- logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::CLOSING);
- return;
- }
- fault(true, "trigger_replacing()", eptr);
+ global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie,
+ io_stat_printer{io_handler});
+ execute_ready();
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::REPLACING, "trigger_replacing", eptr);
});
});
}
// READY state
-ceph::bufferlist ProtocolV2::do_sweep_messages(
- const std::deque<MessageURef>& msgs,
- size_t num_msgs,
- bool require_keepalive,
- std::optional<utime_t> _keepalive_ack,
- bool require_ack)
+void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
{
- ceph::bufferlist bl;
-
- if (unlikely(require_keepalive)) {
- auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(keepalive_frame.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
- }
-
- if (unlikely(_keepalive_ack.has_value())) {
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
- bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
- }
-
- if (require_ack && num_msgs == 0u) {
- auto ack_frame = AckFrame::Encode(conn.in_seq);
- bl.append(ack_frame.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
- }
-
- std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
- // TODO: move to common code
- // set priority
- msg->get_header().src = messenger.get_myname();
-
- msg->encode(conn.features, 0);
-
- ceph_assert(!msg->get_seq() && "message already has seq");
- msg->set_seq(++conn.out_seq);
-
- ceph_msg_header &header = msg->get_header();
- ceph_msg_footer &footer = msg->get_footer();
-
- ceph_msg_header2 header2{header.seq, header.tid,
- header.type, header.priority,
- header.version,
- ceph_le32(0), header.data_off,
- ceph_le64(conn.in_seq),
- footer.flags, header.compat_version,
- header.reserved};
-
- auto message = MessageFrame::Encode(header2,
- msg->get_payload(), msg->get_middle(), msg->get_data());
- logger().debug("{} --> #{} === {} ({})",
- conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(message.get_buffer(tx_frame_asm));
- INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
- });
-
- return bl;
+ fault(state_t::READY, where, eptr);
}
-seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
-{
- return read_frame_payload()
- .then([this, throttle_stamp] {
- utime_t recv_stamp{seastar::lowres_system_clock::now()};
-
- // we need to get the size before std::moving segments data
- const size_t cur_msg_size = get_current_msg_size();
- auto msg_frame = MessageFrame::Decode(rx_segments_data);
- // XXX: paranoid copy just to avoid oops
- ceph_msg_header2 current_header = msg_frame.header();
-
- logger().trace("{} got {} + {} + {} byte message,"
- " envelope type={} src={} off={} seq={}",
- conn, msg_frame.front_len(), msg_frame.middle_len(),
- msg_frame.data_len(), current_header.type, conn.get_peer_name(),
- current_header.data_off, current_header.seq);
-
- ceph_msg_header header{current_header.seq,
- current_header.tid,
- current_header.type,
- current_header.priority,
- current_header.version,
- ceph_le32(msg_frame.front_len()),
- ceph_le32(msg_frame.middle_len()),
- ceph_le32(msg_frame.data_len()),
- current_header.data_off,
- conn.get_peer_name(),
- current_header.compat_version,
- current_header.reserved,
- ceph_le32(0)};
- ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
- ceph_le32(0), ceph_le64(0), current_header.flags};
-
- auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this());
- Message *message = decode_message(nullptr, 0, header, footer,
- msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
- if (!message) {
- logger().warn("{} decode message failed", conn);
- abort_in_fault();
- }
-
- // store reservation size in message, so we don't get confused
- // by messages entering the dispatch queue through other paths.
- message->set_dispatch_throttle_size(cur_msg_size);
-
- message->set_throttle_stamp(throttle_stamp);
- message->set_recv_stamp(recv_stamp);
- message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
-
- // check received seq#. if it is old, drop the message.
- // note that incoming messages may skip ahead. this is convenient for the
- // client side queueing because messages can't be renumbered, but the (kernel)
- // client will occasionally pull a message out of the sent queue to send
- // elsewhere. in that case it doesn't matter if we "got" it or not.
- uint64_t cur_seq = conn.in_seq;
- if (message->get_seq() <= cur_seq) {
- logger().error("{} got old message {} <= {} {}, discarding",
- conn, message->get_seq(), cur_seq, *message);
- if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
- local_conf()->ms_die_on_old_message) {
- ceph_assert(0 == "old msgs despite reconnect_seq feature");
- }
- return seastar::now();
- } else if (message->get_seq() > cur_seq + 1) {
- logger().error("{} missed message? skipped from seq {} to {}",
- conn, cur_seq, message->get_seq());
- if (local_conf()->ms_die_on_skipped_message) {
- ceph_assert(0 == "skipped incoming seq");
- }
- }
-
- // note last received message.
- conn.in_seq = message->get_seq();
- logger().debug("{} <== #{} === {} ({})",
- conn, message->get_seq(), *message, message->get_type());
- notify_ack();
- ack_writes(current_header.ack_seq);
-
- // TODO: change MessageRef with seastar::shared_ptr
- auto msg_ref = MessageRef{message, false};
- // throttle the reading process by the returned future
- return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
- });
-}
-
-void ProtocolV2::execute_ready(bool dispatch_connect)
+void ProtocolV2::execute_ready()
{
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
- trigger_state(state_t::READY, write_state_t::open, false);
- if (dispatch_connect) {
- dispatchers.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }
-#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- conn.interceptor->register_conn_ready(conn);
- }
-#endif
- gated_execute("execute_ready", [this] {
- protocol_timer.cancel();
- return seastar::keep_doing([this] {
- return read_main_preamble()
- .then([this] (Tag tag) {
- switch (tag) {
- case Tag::MESSAGE: {
- return seastar::futurize_invoke([this] {
- // throttle_message() logic
- if (!conn.policy.throttler_messages) {
- return seastar::now();
- }
- // TODO: message throttler
- ceph_assert(false);
- return seastar::now();
- }).then([this] {
- // throttle_bytes() logic
- if (!conn.policy.throttler_bytes) {
- return seastar::now();
- }
- size_t cur_msg_size = get_current_msg_size();
- if (!cur_msg_size) {
- return seastar::now();
- }
- logger().trace("{} wants {} bytes from policy throttler {}/{}",
- conn, cur_msg_size,
- conn.policy.throttler_bytes->get_current(),
- conn.policy.throttler_bytes->get_max());
- return conn.policy.throttler_bytes->get(cur_msg_size);
- }).then([this] {
- // TODO: throttle_dispatch_queue() logic
- utime_t throttle_stamp{seastar::lowres_system_clock::now()};
- return read_message(throttle_stamp);
- });
- }
- case Tag::ACK:
- return read_frame_payload().then([this] {
- // handle_message_ack() logic
- auto ack = AckFrame::Decode(rx_segments_data.back());
- logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
- ack_writes(ack.seq());
- });
- case Tag::KEEPALIVE2:
- return read_frame_payload().then([this] {
- // handle_keepalive2() logic
- auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
- logger().debug("{} GOT KeepAliveFrame: timestamp={}",
- conn, keepalive_frame.timestamp());
- notify_keepalive_ack(keepalive_frame.timestamp());
- conn.set_last_keepalive(seastar::lowres_system_clock::now());
- });
- case Tag::KEEPALIVE2_ACK:
- return read_frame_payload().then([this] {
- // handle_keepalive2_ack() logic
- auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
- conn.set_last_keepalive_ack(
- seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
- logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
- conn, conn.last_keepalive_ack);
- });
- default: {
- unexpected_tag(tag, conn, "execute_ready");
- return seastar::now();
- }
- }
- });
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::READY) {
- logger().info("{} execute_ready(): protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::REPLACING ||
- state == state_t::CLOSING);
- return;
- }
- fault(false, "execute_ready()", eptr);
- });
- });
+ protocol_timer.cancel();
+ ceph_assert_always(is_socket_valid);
+ trigger_state(state_t::READY, io_state_t::open, false);
}
// STANDBY state
void ProtocolV2::execute_standby()
{
- trigger_state(state_t::STANDBY, write_state_t::delay, true);
- if (socket) {
- socket->shutdown();
- }
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::STANDBY, io_state_t::delay, false);
}
-void ProtocolV2::notify_write()
+void ProtocolV2::notify_out()
{
if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
- logger().info("{} notify_write(): at {}, going to CONNECTING",
+ logger().info("{} notify_out(): at {}, going to CONNECTING",
conn, get_state_name(state));
execute_connecting();
}
void ProtocolV2::execute_wait(bool max_backoff)
{
- trigger_state(state_t::WAIT, write_state_t::delay, true);
- if (socket) {
- socket->shutdown();
- }
- gated_execute("execute_wait", [this, max_backoff] {
+ ceph_assert_always(!is_socket_valid);
+ trigger_state(state_t::WAIT, io_state_t::delay, false);
+ gated_execute("execute_wait", conn, [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
backoff = local_conf().get_val<double>("ms_max_backoff");
}
logger().info("{} execute_wait(): going to CONNECTING", conn);
execute_connecting();
- }).handle_exception([this] (std::exception_ptr eptr) {
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
logger().info("{} execute_wait(): protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
+ conn, get_state_name(state), e_what);
assert(state == state_t::REPLACING ||
state == state_t::CLOSING);
});
void ProtocolV2::execute_server_wait()
{
- trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
- gated_execute("execute_server_wait", [this] {
- return read_exactly(1).then([this] (auto bl) {
+ ceph_assert_always(is_socket_valid);
+ trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
+ gated_execute("execute_server_wait", conn, [this] {
+ return frame_assembler->read_exactly(1
+ ).then([this](auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();
- }).handle_exception([this] (std::exception_ptr eptr) {
+ }).handle_exception([this](std::exception_ptr eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
- conn, get_state_name(state), eptr);
- close(false);
+ conn, get_state_name(state), e_what);
+ do_close(false);
});
});
}
// CLOSING state
-void ProtocolV2::trigger_close()
+void ProtocolV2::notify_mark_down()
{
+ do_close(false);
+}
+
+seastar::future<> ProtocolV2::close_clean_yielded()
+{
+ // yield() so that do_close() can be called *after* close_clean_yielded() is
+ // applied to all connections in a container using
+ // seastar::parallel_for_each(). otherwise, we could erase a connection in
+ // the container when seastar::parallel_for_each() is still iterating in it.
+ // that'd lead to a segfault.
+ return seastar::yield(
+ ).then([this, conn_ref = conn.shared_from_this()] {
+ do_close(false);
+ // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
+ // which will otherwise result in deadlock
+ assert(closed_clean_fut.valid());
+ return closed_clean_fut.get_future();
+ });
+}
+
+void ProtocolV2::do_close(
+ bool is_dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
+{
+ if (closed) {
+ // already closing
+ assert(state == state_t::CLOSING);
+ return;
+ }
+
+ bool is_replace = f_accept_new ? true : false;
+ logger().info("{} closing: reset {}, replace {}", conn,
+ is_dispatch_reset ? "yes" : "no",
+ is_replace ? "yes" : "no");
+
+ /*
+ * atomic operations
+ */
+
+ closed = true;
+
+ // trigger close
messenger.closing_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
-
if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
// cannot happen
ceph_assert(false);
}
-
protocol_timer.cancel();
- trigger_state(state_t::CLOSING, write_state_t::drop, false);
-}
+ trigger_state(state_t::CLOSING, io_state_t::drop, false);
-void ProtocolV2::on_closed()
-{
- messenger.closed_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
-}
-
-void ProtocolV2::print(std::ostream& out) const
-{
- out << conn;
+ if (f_accept_new) {
+ (*f_accept_new)();
+ }
+ if (is_socket_valid) {
+ frame_assembler->shutdown_socket();
+ is_socket_valid = false;
+ }
+ assert(!gate.is_closed());
+ auto handshake_closed = gate.close();
+ auto io_closed = io_handler.close_io(
+ is_dispatch_reset, is_replace);
+
+ // asynchronous operations
+ assert(!closed_clean_fut.valid());
+ closed_clean_fut = seastar::when_all(
+ std::move(handshake_closed), std::move(io_closed)
+ ).discard_result().then([this] {
+ ceph_assert_always(!exit_io.has_value());
+ if (has_socket) {
+ ceph_assert_always(frame_assembler);
+ return frame_assembler->close_shutdown_socket();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+#ifdef UNIT_TESTS_BUILT
+ closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(conn);
+ }
+#endif
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing: closed_clean_fut got unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
+ });
}
} // namespace crimson::net