#include <seastar/core/lowres_clock.hh>
#include <fmt/format.h>
-#if FMT_VERSION >= 60000
-#include <fmt/chrono.h>
-#else
-#include <fmt/time.h>
-#endif
#include "include/msgr.h"
#include "include/random.h"
#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
+#include "crimson/common/formatter.h"
-#include "Config.h"
-#include "Dispatcher.h"
+#include "chained_dispatchers.h"
#include "Errors.h"
#include "Socket.h"
#include "SocketConnection.h"
#endif
using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
namespace {
-// TODO: apply the same logging policy to Protocol V1
+// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
+const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
+ (CEPH_MSGR2_FEATURE_REVISION_1 |
+ // CEPH_MSGR2_FEATURE_COMPRESSION |
+ UINT64_C(0));
+
// Log levels in V2 Protocol:
// * error level, something error that cause connection to terminate:
// - fatal errors;
return crimson::get_logger(ceph_subsys_ms);
}
-void abort_in_fault() {
+[[noreturn]] void abort_in_fault() {
throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
}
-void abort_protocol() {
+[[noreturn]] void abort_protocol() {
throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
}
-void abort_in_close(crimson::net::ProtocolV2& proto) {
- (void) proto.close();
+[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
+ proto.close(dispatch_reset);
abort_protocol();
}
} // namespace anonymous
-template <>
-struct fmt::formatter<seastar::lowres_system_clock::time_point> {
- // ignore the format string
- template <typename ParseContext>
- constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
-
- template <typename FormatContext>
- auto format(const seastar::lowres_system_clock::time_point& t,
- FormatContext& ctx) {
- std::time_t tt = std::chrono::duration_cast<std::chrono::seconds>(
- t.time_since_epoch()).count();
- auto milliseconds = (t.time_since_epoch() %
- std::chrono::seconds(1)).count();
- return fmt::format_to(ctx.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}",
- fmt::localtime(tt), milliseconds);
- }
-};
-
-namespace std {
-inline ostream& operator<<(
- ostream& out, const seastar::lowres_system_clock::time_point& t)
-{
- return out << fmt::format("{}", t);
-}
-}
-
namespace crimson::net {
#ifdef UNIT_TESTS_BUILT
});
}
-ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
+ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
SocketConnection& conn,
SocketMessenger& messenger)
- : Protocol(proto_t::v2, dispatcher, conn),
+ : Protocol(proto_t::v2, dispatchers, conn),
messenger{messenger},
- protocol_timer{conn},
- tx_frame_asm(&session_stream_handlers, false)
+ protocol_timer{conn}
{}
ProtocolV2::~ProtocolV2() {}
+bool ProtocolV2::is_connected() const {
+ return state == state_t::READY ||
+ state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING;
+}
+
void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
- const entity_type_t& _peer_type)
+ const entity_name_t& _peer_name)
{
ceph_assert(state == state_t::NONE);
ceph_assert(!socket);
+ ceph_assert(!gate.is_closed());
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
- conn.set_peer_type(_peer_type);
- conn.policy = messenger.get_policy(_peer_type);
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
client_cookie = generate_client_cookie();
- logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}, cc={}"
+ logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
" policy(lossy={}, server={}, standby={}, resetcheck={})",
- conn, _peer_addr, ceph_entity_type_name(_peer_type), client_cookie,
+ conn, _peer_addr, _peer_name, client_cookie,
conn.policy.lossy, conn.policy.server,
conn.policy.standby, conn.policy.resetcheck);
messenger.register_conn(
ceph_assert(!socket);
// until we know better
conn.target_addr = _peer_addr;
- conn.set_ephemeral_port(_peer_addr.get_port(),
- SocketConnection::side_t::acceptor);
socket = std::move(sock);
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
size_t ProtocolV2::get_current_msg_size() const
{
- ceph_assert(!rx_segments_desc.empty());
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
size_t sum = 0;
// we don't include SegmentIndex::Msg::HEADER.
- for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
- sum += rx_segments_desc[idx].length;
+ for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+ sum += rx_frame_asm.get_segment_logical_len(idx);
}
return sum;
}
seastar::future<Tag> ProtocolV2::read_main_preamble()
{
- return read_exactly(sizeof(preamble_block_t))
+ rx_preamble.clear();
+ return read_exactly(rx_frame_asm.get_preamble_onwire_len())
.then([this] (auto bl) {
- if (session_stream_handlers.rx) {
- session_stream_handlers.rx->reset_rx_handler();
- /*
- bl = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(bl), segment_t::DEFAULT_ALIGNMENT);
- */
- }
-
- // I expect ceph_le32 will make the endian conversion for me. Passing
- // everything through ::Decode is unnecessary.
- const auto& main_preamble = \
- *reinterpret_cast<const preamble_block_t*>(bl.get());
- logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
- conn, bl.size(), (int)main_preamble.tag,
- (int)main_preamble.num_segments, main_preamble.crc);
-
- // verify preamble's CRC before any further processing
- const auto rx_crc = ceph_crc32c(0,
- reinterpret_cast<const unsigned char*>(&main_preamble),
- sizeof(main_preamble) - sizeof(main_preamble.crc));
- if (rx_crc != main_preamble.crc) {
- logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
- conn, rx_crc, main_preamble.crc);
- abort_in_fault();
- }
-
- // currently we do support between 1 and MAX_NUM_SEGMENTS segments
- if (main_preamble.num_segments < 1 ||
- main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- logger().warn("{} unsupported num_segments={}",
- conn, main_preamble.num_segments);
- abort_in_fault();
- }
- if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- logger().warn("{} num_segments too much: {}",
- conn, main_preamble.num_segments);
- abort_in_fault();
- }
-
- rx_segments_desc.clear();
rx_segments_data.clear();
-
- for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
- logger().trace("{} GOT frame segment: len={} align={}",
- conn, main_preamble.segments[idx].length,
- main_preamble.segments[idx].alignment);
- rx_segments_desc.emplace_back(main_preamble.segments[idx]);
+ try {
+ rx_preamble.append(buffer::create(std::move(bl)));
+ const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+ INTERCEPT_FRAME(tag, bp_type_t::READ);
+ return tag;
+ } catch (FrameError& e) {
+ logger().warn("{} read_main_preamble: {}", conn, e.what());
+ abort_in_fault();
}
-
- INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
- return static_cast<Tag>(main_preamble.tag);
});
}
seastar::future<> ProtocolV2::read_frame_payload()
{
- ceph_assert(!rx_segments_desc.empty());
ceph_assert(rx_segments_data.empty());
return seastar::do_until(
- [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
+ [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
[this] {
- // description of current segment to read
- const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
// TODO: create aligned and contiguous buffer from socket
- if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
+ const size_t seg_idx = rx_segments_data.size();
+ if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+ alignment != segment_t::DEFAULT_ALIGNMENT) {
logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
- conn, cur_rx_desc.alignment, rx_segments_data.size());
+ conn, alignment, rx_segments_data.size());
}
+ uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
// TODO: create aligned and contiguous buffer from socket
- return read_exactly(cur_rx_desc.length)
- .then([this] (auto tmp_bl) {
+ return read_exactly(onwire_len).then([this] (auto tmp_bl) {
logger().trace("{} RECV({}) frame segment[{}]",
conn, tmp_bl.size(), rx_segments_data.size());
- bufferlist data;
- data.append(buffer::create(std::move(tmp_bl)));
- if (session_stream_handlers.rx) {
- // TODO
- ceph_assert(false);
- }
- rx_segments_data.emplace_back(std::move(data));
+ bufferlist segment;
+ segment.append(buffer::create(std::move(tmp_bl)));
+ rx_segments_data.emplace_back(std::move(segment));
});
}
).then([this] {
- // TODO: get_epilogue_size()
- ceph_assert(!session_stream_handlers.rx);
- return read_exactly(sizeof(epilogue_crc_rev0_block_t));
+ return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
}).then([this] (auto bl) {
logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
-
- __u8 late_flags;
- if (session_stream_handlers.rx) {
- // TODO
- ceph_assert(false);
- } else {
- auto& epilogue = *reinterpret_cast<const epilogue_crc_rev0_block_t*>(bl.get());
- for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
- const __u32 expected_crc = epilogue.crc_values[idx];
- const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
- if (expected_crc != calculated_crc) {
- logger().warn("{} message integrity check failed at index {}:"
- " expected_crc={} calculated_crc={}",
- conn, (unsigned int)idx, expected_crc, calculated_crc);
- abort_in_fault();
- } else {
- logger().trace("{} message integrity check success at index {}: crc={}",
- conn, (unsigned int)idx, expected_crc);
- }
- }
- late_flags = epilogue.late_flags;
+ bool ok = false;
+ try {
+ bufferlist rx_epilogue;
+ rx_epilogue.append(buffer::create(std::move(bl)));
+ ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
+ } catch (FrameError& e) {
+ logger().error("read_frame_payload: {} {}", conn, e.what());
+ abort_in_fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ logger().error("read_frame_payload: {} bad auth tag", conn);
+ abort_in_fault();
}
- logger().trace("{} GOT frame epilogue: late_flags={}",
- conn, (unsigned)late_flags);
-
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
// the kernel client to avoid unnecessary buffering.
- if (late_flags & FRAME_LATE_FLAG_ABORTED) {
+ if (!ok) {
// TODO
ceph_assert(false);
}
if (conn.policy.lossy) {
logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
conn, func_name, get_state_name(state), eptr);
- dispatch_reset();
- (void) close();
+ close(true);
} else if (conn.policy.server ||
(conn.policy.standby &&
(!is_queued() && conn.sent.empty()))) {
}
}
-void ProtocolV2::dispatch_reset()
-{
- (void) seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_reset()");
- });
-}
-
void ProtocolV2::reset_session(bool full)
{
server_cookie = 0;
client_cookie = generate_client_cookie();
peer_global_seq = 0;
reset_write();
- (void) seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_remote_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_remote_reset()");
- });
+ dispatchers.ms_handle_remote_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
}
-seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
+seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ProtocolV2::banner_exchange(bool is_connect)
{
// 1. prepare and send banner
bufferlist banner_payload;
- encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+ encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
bufferlist bl;
logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
"required={}, banner=\"{}\"",
conn, bl.length(), len_payload,
- CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+ CRIMSON_MSGR2_SUPPORTED_FEATURES,
+ CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
return write_flush(std::move(bl)).then([this] {
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
return read(payload_len);
- }).then([this] (bufferlist bl) {
+ }).then([this, is_connect] (bufferlist bl) {
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
uint64_t peer_supported_features;
peer_supported_features, peer_required_features);
// Check feature bit compatibility
- uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+ uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
if ((required_features & peer_supported_features) != required_features) {
logger().error("{} peer does not support all required features"
" required={} peer_supported={}",
conn, required_features, peer_supported_features);
- abort_in_close(*this);
+ abort_in_close(*this, is_connect);
}
if ((supported_features & peer_required_features) != peer_required_features) {
logger().error("{} we do not support all peer required features"
" peer_required={} supported={}",
conn, peer_required_features, supported_features);
- abort_in_close(*this);
+ abort_in_close(*this, is_connect);
}
this->peer_required_features = peer_required_features;
if (this->peer_required_features == 0) {
this->connection_features = msgr2_required;
}
+ const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ tx_frame_asm.set_is_rev1(is_rev1);
+ rx_frame_asm.set_is_rev1(is_rev1);
auto hello = HelloFrame::Encode(messenger.get_mytype(),
conn.target_addr);
logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
conn, ceph_entity_type_name(hello.entity_type()),
hello.peer_addr());
- return seastar::make_ready_future<entity_type_t, entity_addr_t>(
- hello.entity_type(), hello.peer_addr());
+ return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+ std::make_tuple(hello.entity_type(), hello.peer_addr()));
});
}
abort_in_fault();
}
auth_meta->con_mode = auth_done.con_mode();
- // TODO
- ceph_assert(!auth_meta->is_mode_secure());
- session_stream_handlers = { nullptr, nullptr };
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
return finish_auth();
});
default: {
});
} catch (const crimson::auth::error& e) {
logger().error("{} get_initial_auth_request returned {}", conn, e);
- dispatch_reset();
- abort_in_close(*this);
+ abort_in_close(*this, true);
return seastar::now();
}
}
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != server_ident.gid()) {
+ logger().error("{} connection peer id ({}) does not match "
+ "what it should be ({}) during connecting, close",
+ conn, server_ident.gid(), conn.get_peer_id());
+ abort_in_close(*this, true);
+ }
conn.set_peer_id(server_ident.gid());
conn.set_features(server_ident.supported_features() &
conn.policy.features_supported);
if (socket) {
socket->shutdown();
}
- execution_done = seastar::with_gate(pending_dispatch, [this] {
- // we don't know my socket_port yet
- conn.set_ephemeral_port(0, SocketConnection::side_t::none);
+ gated_execute("execute_connecting", [this] {
return messenger.get_global_seq().then([this] (auto gs) {
global_seq = gs;
assert(client_cookie != 0);
abort_protocol();
}
if (socket) {
- (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+ gate.dispatch_in_background("close_sockect_connecting", *this,
+ [sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
enable_recording();
- return banner_exchange();
- }).then([this] (entity_type_t _peer_type,
- entity_addr_t _my_addr_from_peer) {
+ return banner_exchange(true);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
if (conn.get_peer_type() != _peer_type) {
logger().warn("{} connection peer type does not match what peer advertises {} != {}",
conn, ceph_entity_type_name(conn.get_peer_type()),
ceph_entity_type_name(_peer_type));
- dispatch_reset();
- abort_in_close(*this);
+ abort_in_close(*this, true);
}
- conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
- SocketConnection::side_t::connector);
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} during banner_exchange(), abort",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
conn, _my_addr_from_peer);
}
switch (next) {
case next_step_t::ready: {
- (void) seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_connect caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_connect()");
- });
logger().info("{} connected:"
" gs={}, pgs={}, cs={}, client_cookie={},"
" server_cookie={}, in_seq={}, out_seq={}, out_q={}",
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq,
conn.out_seq, conn.out_q.size());
- execute_ready();
+ execute_ready(true);
break;
}
case next_step_t::wait: {
ceph_con_mode_name(auth_meta->con_mode), reply.length());
return write_frame(auth_done).then([this] {
ceph_assert(auth_meta);
- // TODO
- ceph_assert(!auth_meta->is_mode_secure());
- session_stream_handlers = { nullptr, nullptr };
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
return finish_auth();
});
}
});
}
+bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
+{
+ auto my_peer_name = conn.get_peer_name();
+ if (my_peer_name.type() != peer_name.type()) {
+ return false;
+ }
+ if (my_peer_name.num() != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ my_peer_name.num() != peer_name.num()) {
+ return false;
+ }
+ return true;
+}
+
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::send_wait()
{
client_cookie,
conn.get_peer_name(),
connection_features,
+ tx_frame_asm.get_is_rev1(),
+ rx_frame_asm.get_is_rev1(),
conn_seq,
msg_seq);
#ifdef UNIT_TESTS_BUILT
// close this connection because all the necessary information is delivered
// to the exisiting connection, and jump to error handling code to abort the
// current state.
- abort_in_close(*this);
+ abort_in_close(*this, false);
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
existing_proto->client_cookie,
existing_proto->server_cookie);
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_connect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
if (existing_proto->state == state_t::REPLACING) {
logger().warn("{} server_connect: racing replace happened while"
" replacing existing connection {}, send wait.",
logger().warn("{} server_connect:"
" existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing_conn);
- existing_proto->dispatch_reset();
- (void) existing_proto->close();
-
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} in execute_accepting()",
- conn, get_state_name(state));
- abort_protocol();
- }
- execute_establishing();
+ execute_establishing(existing_conn, true);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
- // TODO: change peer_addr to entity_addrvec_t
- entity_addr_t paddr = client_ident.addrs().front();
- if ((paddr.is_msgr2() || paddr.is_any()) &&
- paddr.is_same_host(conn.target_addr)) {
- // good
- } else {
- logger().warn("{} peer's address {} is not v2 or not the same host with {}",
- conn, paddr, conn.target_addr);
- throw std::system_error(
- make_error_code(crimson::net::error::bad_peer_address));
- }
- conn.peer_addr = paddr;
+ conn.peer_addr = client_ident.addrs().front();
logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
conn.target_addr = conn.peer_addr;
if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
make_error_code(crimson::net::error::bad_peer_address));
}
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != client_ident.gid()) {
+ logger().error("{} client_ident peer_id ({}) does not match"
+ " what it should be ({}) during accepting, abort",
+ conn, client_ident.gid(), conn.get_peer_id());
+ abort_in_fault();
+ }
conn.set_peer_id(client_ident.gid());
client_cookie = client_ident.cookie();
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
// should unregister the existing from msgr atomically
- (void) existing_conn->close();
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
+ execute_establishing(existing_conn, false);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
} else {
return handle_existing_connection(existing_conn);
}
+ } else {
+ execute_establishing(nullptr, true);
+ return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
-
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} in execute_accepting()",
- conn, get_state_name(state));
- abort_protocol();
- }
- execute_establishing();
- return seastar::make_ready_future<next_step_t>(next_step_t::ready);
});
}
"close existing and reset client.",
conn, *existing_conn,
static_cast<int>(existing_conn->protocol->proto_type));
- (void) existing_conn->close();
+ // NOTE: this is following async messenger logic, but we may miss the reset event.
+ existing_conn->mark_down();
return send_reset(true);
}
existing_proto->client_cookie,
existing_proto->server_cookie);
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_reconnect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
if (existing_proto->state == state_t::REPLACING) {
logger().warn("{} server_reconnect: racing replace happened while "
" replacing existing connection {}, retry global.",
void ProtocolV2::execute_accepting()
{
trigger_state(state_t::ACCEPTING, write_state_t::none, false);
- (void) seastar::with_gate(pending_dispatch, [this] {
- return seastar::futurize_apply([this] {
+ gate.dispatch_in_background("execute_accepting", *this, [this] {
+ return seastar::futurize_invoke([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
+ session_comp_handlers = { nullptr, nullptr };
enable_recording();
- return banner_exchange();
- }).then([this] (entity_type_t _peer_type,
- entity_addr_t _my_addr_from_peer) {
+ return banner_exchange(false);
+ }).then([this] (auto&& ret) {
+ auto [_peer_type, _my_addr_from_peer] = std::move(ret);
ceph_assert(conn.get_peer_type() == 0);
conn.set_peer_type(_peer_type);
conn, ceph_entity_type_name(_peer_type),
conn.policy.lossy, conn.policy.server,
conn.policy.standby, conn.policy.resetcheck);
- if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
- messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
+ if (!messenger.get_myaddr().is_blank_ip() &&
+ (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
conn, _my_addr_from_peer, messenger.get_myaddr());
throw std::system_error(
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- (void) close();
+ close(false);
});
});
}
// ESTABLISHING
-void ProtocolV2::execute_establishing() {
+void ProtocolV2::execute_establishing(
+ SocketConnectionRef existing_conn, bool dispatch_reset) {
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
+ auto accept_me = [this] {
+ messenger.register_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ messenger.unaccept_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ };
+
trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
- (void) seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_accept()");
- });
- messenger.register_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
- messenger.unaccept_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
- execution_done = seastar::with_gate(pending_dispatch, [this] {
- return seastar::futurize_apply([this] {
+ if (existing_conn) {
+ existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().warn("{} triggered {} during execute_establishing(), "
+ "the accept event will not be delivered!",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+ } else {
+ accept_me();
+ }
+
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+ gated_execute("execute_establishing", [this] {
+ return seastar::futurize_invoke([this] {
return send_server_ident();
}).then([this] {
if (unlikely(state != state_t::ESTABLISHING)) {
conn, global_seq, peer_global_seq, connect_seq,
client_cookie, server_cookie, conn.in_seq,
conn.out_seq, conn.out_q.size());
- execute_ready();
+ execute_ready(false);
}).handle_exception([this] (std::exception_ptr eptr) {
if (state != state_t::ESTABLISHING) {
logger().info("{} execute_establishing() protocol aborted at {} -- {}",
uint64_t new_client_cookie,
entity_name_t new_peer_name,
uint64_t new_conn_features,
+ bool tx_is_rev1,
+ bool rx_is_rev1,
uint64_t new_connect_seq,
uint64_t new_msg_seq)
{
if (socket) {
socket->shutdown();
}
- (void) seastar::with_gate(pending_dispatch, [this] {
- return dispatcher.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_handle_accept()");
- });
- (void) seastar::with_gate(pending_dispatch,
- [this,
- reconnect,
- do_reset,
- new_socket = std::move(new_socket),
- new_auth_meta = std::move(new_auth_meta),
- new_rxtx = std::move(new_rxtx),
- new_client_cookie, new_peer_name,
- new_conn_features, new_peer_global_seq,
- new_connect_seq, new_msg_seq] () mutable {
+ dispatchers.ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ gate.dispatch_in_background("trigger_replacing", *this,
+ [this,
+ reconnect,
+ do_reset,
+ new_socket = std::move(new_socket),
+ new_auth_meta = std::move(new_auth_meta),
+ new_rxtx = std::move(new_rxtx),
+ tx_is_rev1, rx_is_rev1,
+ new_client_cookie, new_peer_name,
+ new_conn_features, new_peer_global_seq,
+ new_connect_seq, new_msg_seq] () mutable {
return wait_write_exit().then([this, do_reset] {
if (do_reset) {
reset_session(true);
new_socket = std::move(new_socket),
new_auth_meta = std::move(new_auth_meta),
new_rxtx = std::move(new_rxtx),
+ tx_is_rev1, rx_is_rev1,
new_client_cookie, new_peer_name,
new_conn_features, new_peer_global_seq,
new_connect_seq, new_msg_seq] () mutable {
}
if (socket) {
- (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+ gate.dispatch_in_background("close_socket_replacing", *this,
+ [sock = std::move(socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
return write_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
- conn.set_peer_name(new_peer_name);
+ assert(conn.get_peer_type() == new_peer_name.type());
+ if (conn.get_peer_id() == entity_name_t::NEW) {
+ conn.set_peer_id(new_peer_name.num());
+ }
connection_features = new_conn_features;
+ tx_frame_asm.set_is_rev1(tx_is_rev1);
+ rx_frame_asm.set_is_rev1(rx_is_rev1);
return send_server_ident();
}
}).then([this, reconnect] {
conn, reconnect ? "reconnected" : "connected",
global_seq, peer_global_seq, connect_seq, client_cookie,
server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
- execute_ready();
+ execute_ready(false);
}).handle_exception([this] (std::exception_ptr eptr) {
if (state != state_t::REPLACING) {
logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
// READY state
ceph::bufferlist ProtocolV2::do_sweep_messages(
- const std::deque<MessageRef>& msgs,
+ const std::deque<MessageURef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> _keepalive_ack,
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
}
- if (require_ack && !num_msgs) {
+ if (require_ack && num_msgs == 0u) {
auto ack_frame = AckFrame::Encode(conn.in_seq);
bl.append(ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
}
- std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
// TODO: move to common code
// set priority
msg->get_header().src = messenger.get_myname();
ceph_msg_header2 header2{header.seq, header.tid,
header.type, header.priority,
header.version,
- init_le32(0), header.data_off,
- init_le64(conn.in_seq),
+ ceph_le32(0), header.data_off,
+ ceph_le64(conn.in_seq),
footer.flags, header.compat_version,
header.reserved};
current_header.type,
current_header.priority,
current_header.version,
- init_le32(msg_frame.front_len()),
- init_le32(msg_frame.middle_len()),
- init_le32(msg_frame.data_len()),
+ ceph_le32(msg_frame.front_len()),
+ ceph_le32(msg_frame.middle_len()),
+ ceph_le32(msg_frame.data_len()),
current_header.data_off,
conn.get_peer_name(),
current_header.compat_version,
current_header.reserved,
- init_le32(0)};
- ceph_msg_footer footer{init_le32(0), init_le32(0),
- init_le32(0), init_le64(0), current_header.flags};
+ ceph_le32(0)};
+ ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+ ceph_le32(0), ceph_le64(0), current_header.flags};
- auto pconn = seastar::static_pointer_cast<SocketConnection>(
+ auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this());
Message *message = decode_message(nullptr, 0, header, footer,
- msg_frame.front(), msg_frame.middle(), msg_frame.data(),
- std::move(pconn));
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
if (!message) {
logger().warn("{} decode message failed", conn);
abort_in_fault();
// elsewhere. in that case it doesn't matter if we "got" it or not.
uint64_t cur_seq = conn.in_seq;
if (message->get_seq() <= cur_seq) {
- logger().error("{} got old message {} <= {} {} {}, discarding",
- conn, message->get_seq(), cur_seq, message, *message);
+ logger().error("{} got old message {} <= {} {}, discarding",
+ conn, message->get_seq(), cur_seq, *message);
if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
- conf.ms_die_on_old_message) {
+ local_conf()->ms_die_on_old_message) {
ceph_assert(0 == "old msgs despite reconnect_seq feature");
}
- return;
+ return seastar::now();
} else if (message->get_seq() > cur_seq + 1) {
logger().error("{} missed message? skipped from seq {} to {}",
conn, cur_seq, message->get_seq());
- if (conf.ms_die_on_skipped_message) {
+ if (local_conf()->ms_die_on_skipped_message) {
ceph_assert(0 == "skipped incoming seq");
}
}
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
- return dispatcher.ms_dispatch(&conn, std::move(msg));
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
- ceph_abort("unexpected exception from ms_dispatch()");
- });
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
});
}
-void ProtocolV2::execute_ready()
+void ProtocolV2::execute_ready(bool dispatch_connect)
{
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
trigger_state(state_t::READY, write_state_t::open, false);
+ if (dispatch_connect) {
+ dispatchers.ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ }
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
conn.interceptor->register_conn_ready(conn);
}
#endif
- execution_done = seastar::with_gate(pending_dispatch, [this] {
+ gated_execute("execute_ready", [this] {
protocol_timer.cancel();
return seastar::keep_doing([this] {
return read_main_preamble()
.then([this] (Tag tag) {
switch (tag) {
case Tag::MESSAGE: {
- return seastar::futurize_apply([this] {
+ return seastar::futurize_invoke([this] {
// throttle_message() logic
if (!conn.policy.throttler_messages) {
return seastar::now();
if (socket) {
socket->shutdown();
}
- execution_done = seastar::with_gate(pending_dispatch,
- [this, max_backoff] {
+ gated_execute("execute_wait", [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
- backoff = conf.ms_max_backoff;
+ backoff = local_conf().get_val<double>("ms_max_backoff");
} else if (backoff > 0) {
- backoff = std::min(conf.ms_max_backoff, 2 * backoff);
+ backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
} else {
- backoff = conf.ms_initial_backoff;
+ backoff = local_conf().get_val<double>("ms_initial_backoff");
}
return protocol_timer.backoff(backoff).then([this] {
if (unlikely(state != state_t::WAIT)) {
void ProtocolV2::execute_server_wait()
{
trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
- execution_done = seastar::with_gate(pending_dispatch, [this] {
+ gated_execute("execute_server_wait", [this] {
return read_exactly(1).then([this] (auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- (void) close();
+ close(false);
});
});
}
void ProtocolV2::trigger_close()
{
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+
if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
}
protocol_timer.cancel();
-
trigger_state(state_t::CLOSING, write_state_t::drop, false);
-#ifdef UNIT_TESTS_BUILT
- if (conn.interceptor) {
- conn.interceptor->register_conn_closed(conn);
- }
-#endif
+}
+
+void ProtocolV2::on_closed()
+{
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+}
+
+void ProtocolV2::print(std::ostream& out) const
+{
+ out << conn;
}
} // namespace crimson::net