#include "Errors.h"
#include "SocketConnection.h"
-#ifdef UNIT_TESTS_BUILT
-#include "Interceptor.h"
-#endif
-
using ceph::msgr::v2::FrameAssembler;
using ceph::msgr::v2::FrameError;
using ceph::msgr::v2::preamble_block_t;
namespace crimson::net {
FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn)
- : conn{_conn}
-{}
+ : conn{_conn}, sid{seastar::this_shard_id()}
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+}
+
+FrameAssemblerV2::~FrameAssemblerV2()
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ assert(seastar::this_shard_id() == sid);
+ if (has_socket()) {
+ std::ignore = move_socket();
+ }
+}
#ifdef UNIT_TESTS_BUILT
// should be consistent to intercept() in ProtocolV2.cc
-void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
+seastar::future<> FrameAssemblerV2::intercept_frames(
+ std::vector<Breakpoint> bps,
+ bp_type_t type)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (conn.interceptor) {
- auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
- auto action = conn.interceptor->intercept(
- conn, Breakpoint{tag, type});
- socket->set_trap(type, action, &conn.interceptor->blocker);
+ if (!conn.interceptor) {
+ return seastar::now();
}
+ return conn.interceptor->intercept(conn, bps
+ ).then([this, type](bp_action_t action) {
+ return seastar::smp::submit_to(
+ socket->get_shard_id(),
+ [this, type, action] {
+ socket->set_trap(type, action, &conn.interceptor->blocker);
+ });
+ });
}
#endif
void FrameAssemblerV2::set_is_rev1(bool _is_rev1)
{
+ assert(seastar::this_shard_id() == sid);
is_rev1 = _is_rev1;
tx_frame_asm.set_is_rev1(_is_rev1);
rx_frame_asm.set_is_rev1(_is_rev1);
const AuthConnectionMeta &auth_meta,
bool crossed)
{
+ assert(seastar::this_shard_id() == sid);
session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
nullptr, auth_meta, is_rev1, crossed);
}
void FrameAssemblerV2::reset_handlers()
{
+ assert(seastar::this_shard_id() == sid);
session_stream_handlers = { nullptr, nullptr };
session_comp_handlers = { nullptr, nullptr };
}
FrameAssemblerV2::mover_t
FrameAssemblerV2::to_replace()
{
+ assert(seastar::this_shard_id() == sid);
assert(is_socket_valid());
- socket = nullptr;
+
+ clear();
+
return mover_t{
- std::move(conn.socket),
+ move_socket(),
std::move(session_stream_handlers),
std::move(session_comp_handlers)};
}
seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
{
- record_io = false;
- rxbuf.clear();
- txbuf.clear();
+ assert(seastar::this_shard_id() == sid);
+
+ clear();
+
session_stream_handlers = std::move(mover.session_stream_handlers);
session_comp_handlers = std::move(mover.session_comp_handlers);
if (has_socket()) {
void FrameAssemblerV2::start_recording()
{
+ assert(seastar::this_shard_id() == sid);
record_io = true;
rxbuf.clear();
txbuf.clear();
FrameAssemblerV2::record_bufs_t
FrameAssemblerV2::stop_recording()
{
+ assert(seastar::this_shard_id() == sid);
ceph_assert_always(record_io == true);
record_io = false;
return record_bufs_t{std::move(rxbuf), std::move(txbuf)};
bool FrameAssemblerV2::has_socket() const
{
assert((socket && conn.socket) || (!socket && !conn.socket));
- return socket != nullptr;
+ return bool(socket);
}
bool FrameAssemblerV2::is_socket_valid() const
{
- return has_socket() && !socket->is_shutdown();
+ assert(seastar::this_shard_id() == sid);
+#ifndef NDEBUG
+ if (has_socket() && socket->get_shard_id() == sid) {
+ assert(socket->is_shutdown() == is_socket_shutdown);
+ }
+#endif
+ return has_socket() && !is_socket_shutdown;
+}
+
+seastar::shard_id
+FrameAssemblerV2::get_socket_shard_id() const
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(is_socket_valid());
+ return socket->get_shard_id();
+}
+
+SocketFRef FrameAssemblerV2::move_socket()
+{
+ assert(has_socket());
+ conn.set_socket(nullptr);
+ return std::move(socket);
}
-void FrameAssemblerV2::set_socket(SocketRef &&new_socket)
+void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
{
+ assert(seastar::this_shard_id() == sid);
assert(!has_socket());
- socket = new_socket.get();
- conn.socket = std::move(new_socket);
+ assert(new_socket);
+ socket = std::move(new_socket);
+ conn.set_socket(socket.get());
+ is_socket_shutdown = false;
assert(is_socket_valid());
}
void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
+ // Note: may not invoke on the socket core
socket->learn_ephemeral_port_as_connector(port);
}
-void FrameAssemblerV2::shutdown_socket()
+template <bool may_cross_core>
+void FrameAssemblerV2::shutdown_socket(crimson::common::Gated *gate)
{
+ assert(seastar::this_shard_id() == sid);
assert(is_socket_valid());
- socket->shutdown();
+ is_socket_shutdown = true;
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ assert(gate);
+ gate->dispatch_in_background("shutdown_socket", conn, [this] {
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ socket->shutdown();
+ });
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ assert(!gate);
+ socket->shutdown();
+ }
}
+template void FrameAssemblerV2::shutdown_socket<true>(crimson::common::Gated *);
+template void FrameAssemblerV2::shutdown_socket<false>(crimson::common::Gated *);
-seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketRef &&new_socket)
+seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- assert(socket->is_shutdown());
- socket = nullptr;
- auto old_socket = std::move(conn.socket);
+ assert(!is_socket_valid());
+ auto old_socket = move_socket();
+ auto old_socket_shard_id = old_socket->get_shard_id();
set_socket(std::move(new_socket));
- return old_socket->close(
- ).then([sock = std::move(old_socket)] {});
+ return seastar::smp::submit_to(
+ old_socket_shard_id,
+ [old_socket = std::move(old_socket)]() mutable {
+ return old_socket->close(
+ ).then([sock = std::move(old_socket)] {});
+ });
}
seastar::future<> FrameAssemblerV2::close_shutdown_socket()
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- assert(socket->is_shutdown());
- return socket->close();
+ assert(!is_socket_valid());
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ return socket->close();
+ });
}
-seastar::future<Socket::tmp_buf>
+template <bool may_cross_core>
+seastar::future<ceph::bufferptr>
FrameAssemblerV2::read_exactly(std::size_t bytes)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- return socket->read_exactly(bytes
- ).then([this](auto bl) {
- rxbuf.append(buffer::create(bl.share()));
- return bl;
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, bytes] {
+ return socket->read_exactly(bytes);
+ }).then([this](auto bptr) {
+ if (record_io) {
+ rxbuf.append(bptr);
+ }
+ return bptr;
});
} else {
+ assert(socket->get_shard_id() == sid);
return socket->read_exactly(bytes);
- };
+ }
}
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<true>(std::size_t);
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<false>(std::size_t);
+template <bool may_cross_core>
seastar::future<ceph::bufferlist>
FrameAssemblerV2::read(std::size_t bytes)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- return socket->read(bytes
- ).then([this](auto buf) {
- rxbuf.append(buf);
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, bytes] {
+ return socket->read(bytes);
+ }).then([this](auto buf) {
+ if (record_io) {
+ rxbuf.append(buf);
+ }
return buf;
});
} else {
+ assert(socket->get_shard_id() == sid);
return socket->read(bytes);
}
}
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<true>(std::size_t);
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<false>(std::size_t);
+template <bool may_cross_core>
seastar::future<>
-FrameAssemblerV2::write(ceph::bufferlist &&buf)
+FrameAssemblerV2::write(ceph::bufferlist buf)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- txbuf.append(buf);
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ if (record_io) {
+ txbuf.append(buf);
+ }
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+ return socket->write(std::move(buf));
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->write(std::move(buf));
}
- return socket->write(std::move(buf));
}
+template seastar::future<> FrameAssemblerV2::write<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write<false>(ceph::bufferlist);
+template <bool may_cross_core>
seastar::future<>
FrameAssemblerV2::flush()
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- return socket->flush();
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ return socket->flush();
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->flush();
+ }
}
+template seastar::future<> FrameAssemblerV2::flush<true>();
+template seastar::future<> FrameAssemblerV2::flush<false>();
+template <bool may_cross_core>
seastar::future<>
-FrameAssemblerV2::write_flush(ceph::bufferlist &&buf)
+FrameAssemblerV2::write_flush(ceph::bufferlist buf)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- txbuf.append(buf);
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+ return socket->write_flush(std::move(buf));
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->write_flush(std::move(buf));
}
- return socket->write_flush(std::move(buf));
}
+template seastar::future<> FrameAssemblerV2::write_flush<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write_flush<false>(ceph::bufferlist);
+template <bool may_cross_core>
seastar::future<FrameAssemblerV2::read_main_t>
FrameAssemblerV2::read_main_preamble()
{
+ assert(seastar::this_shard_id() == sid);
rx_preamble.clear();
- return read_exactly(rx_frame_asm.get_preamble_onwire_len()
- ).then([this](auto bl) {
+ return read_exactly<may_cross_core>(
+ rx_frame_asm.get_preamble_onwire_len()
+ ).then([this](auto bptr) {
+ rx_preamble.append(std::move(bptr));
+ Tag tag;
try {
- rx_preamble.append(buffer::create(std::move(bl)));
- const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
-#ifdef UNIT_TESTS_BUILT
- intercept_frame(tag, false);
-#endif
- return read_main_t{tag, &rx_frame_asm};
+ tag = rx_frame_asm.disassemble_preamble(rx_preamble);
} catch (FrameError& e) {
logger().warn("{} read_main_preamble: {}", conn, e.what());
throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
}
+#ifdef UNIT_TESTS_BUILT
+ return intercept_frame(tag, false
+ ).then([this, tag] {
+ return read_main_t{tag, &rx_frame_asm};
+ });
+#else
+ return read_main_t{tag, &rx_frame_asm};
+#endif
});
}
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<true>();
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<false>();
+template <bool may_cross_core>
seastar::future<FrameAssemblerV2::read_payload_t*>
FrameAssemblerV2::read_frame_payload()
{
+ assert(seastar::this_shard_id() == sid);
rx_segments_data.clear();
return seastar::do_until(
[this] {
}
uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
// TODO: create aligned and contiguous buffer from socket
- return read_exactly(onwire_len
- ).then([this](auto tmp_bl) {
+ return read_exactly<may_cross_core>(onwire_len
+ ).then([this](auto bptr) {
logger().trace("{} RECV({}) frame segment[{}]",
- conn, tmp_bl.size(), rx_segments_data.size());
+ conn, bptr.length(), rx_segments_data.size());
bufferlist segment;
- segment.append(buffer::create(std::move(tmp_bl)));
+ segment.append(std::move(bptr));
rx_segments_data.emplace_back(std::move(segment));
});
}
).then([this] {
- return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
- }).then([this](auto bl) {
- logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
+ return read_exactly<may_cross_core>(rx_frame_asm.get_epilogue_onwire_len());
+ }).then([this](auto bptr) {
+ logger().trace("{} RECV({}) frame epilogue", conn, bptr.length());
bool ok = false;
try {
bufferlist rx_epilogue;
- rx_epilogue.append(buffer::create(std::move(bl)));
+ rx_epilogue.append(std::move(bptr));
ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
} catch (FrameError& e) {
logger().error("read_frame_payload: {} {}", conn, e.what());
return &rx_segments_data;
});
}
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<true>();
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<false>();
void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
{
return std::make_unique<FrameAssemblerV2>(conn);
}
+void FrameAssemblerV2::clear()
+{
+ record_io = false;
+ rxbuf.clear();
+ txbuf.clear();
+ rx_preamble.clear();
+ rx_segments_data.clear();
+}
+
} // namespace crimson::net