#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
-#include <seastar/net/packet.hh>
#include "include/buffer.h"
class Socket;
using SocketRef = std::unique_ptr<Socket>;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
-class Socket
-{
+class Socket {
struct construct_tag {};
- public:
+public:
// if acceptor side, peer is using a different port (ephemeral_port)
// if connector side, I'm using a different port (ephemeral_port)
enum class side_t {
acceptor,
connector
};
+ Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag);
- Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
- : sid{seastar::this_shard_id()},
- socket(std::move(_socket)),
- in(socket.input()),
- // the default buffer size 8192 is too small that may impact our write
- // performance. see seastar::net::connected_socket::output()
- out(socket.output(65536)),
- socket_is_shutdown(false),
- side(_side),
- ephemeral_port(e_port) {}
-
- ~Socket() {
-#ifndef NDEBUG
- assert(closed);
-#endif
- }
+ ~Socket();
Socket(Socket&& o) = delete;
- static seastar::future<SocketRef>
- connect(const entity_addr_t& peer_addr) {
- inject_failure();
- return inject_delay(
- ).then([peer_addr] {
- return seastar::connect(peer_addr.in4_addr());
- }).then([] (seastar::connected_socket socket) {
- return std::make_unique<Socket>(
- std::move(socket), side_t::connector, 0, construct_tag{});
- });
+ seastar::shard_id get_shard_id() const {
+ return sid;
}
- /// read the requested number of bytes into a bufferlist
- seastar::future<bufferlist> read(size_t bytes);
- using tmp_buf = seastar::temporary_buffer<char>;
- using packet = seastar::net::packet;
- seastar::future<tmp_buf> read_exactly(size_t bytes);
-
- seastar::future<> write(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
- return try_trap_pre(next_trap_write
- ).then([buf = std::move(buf), this] () mutable {
-#endif
- inject_failure();
- return inject_delay(
- ).then([buf = std::move(buf), this] () mutable {
- return out.write(std::move(buf));
- });
-#ifdef UNIT_TESTS_BUILT
- }).then([this] {
- return try_trap_post(next_trap_write);
- });
-#endif
+ side_t get_side() const {
+ return side;
}
- seastar::future<> flush() {
- inject_failure();
- return inject_delay().then([this] {
- return out.flush();
- });
+
+ uint16_t get_ephemeral_port() const {
+ return ephemeral_port;
}
- seastar::future<> write_flush(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
- return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
-#endif
- inject_failure();
- return inject_delay(
- ).then([buf = std::move(buf), this] () mutable {
- return out.write(std::move(buf)).then([this] { return out.flush(); });
- });
-#ifdef UNIT_TESTS_BUILT
- }).then([this] {
- return try_trap_post(next_trap_write);
- });
-#endif
+
+ seastar::socket_address get_local_address() const {
+ return socket.local_address();
}
bool is_shutdown() const {
+ assert(seastar::this_shard_id() == sid);
return socket_is_shutdown;
}
+ // learn my ephemeral_port as connector.
+ // unfortunately, there's no way to identify which port I'm using as
+ // connector with current seastar interface.
+ void learn_ephemeral_port_as_connector(uint16_t port) {
+ assert(side == side_t::connector &&
+ (ephemeral_port == 0 || ephemeral_port == port));
+ ephemeral_port = port;
+ }
+
+ /// read the requested number of bytes into a bufferlist
+ seastar::future<bufferlist> read(size_t bytes);
+
+ seastar::future<bufferptr> read_exactly(size_t bytes);
+
+ seastar::future<> write(bufferlist);
+
+ seastar::future<> flush();
+
+ seastar::future<> write_flush(bufferlist);
+
// preemptively disable further reads or writes, can only be shutdown once.
void shutdown();
/// Socket can only be closed once.
seastar::future<> close();
- static seastar::future<> inject_delay();
+ static seastar::future<SocketRef>
+ connect(const entity_addr_t& peer_addr);
- static void inject_failure();
+ /*
+ * test interfaces
+ */
// shutdown for tests
void force_shutdown() {
+ assert(seastar::this_shard_id() == sid);
socket.shutdown_input();
socket.shutdown_output();
}
// shutdown input_stream only, for tests
void force_shutdown_in() {
+ assert(seastar::this_shard_id() == sid);
socket.shutdown_input();
}
// shutdown output_stream only, for tests
void force_shutdown_out() {
+ assert(seastar::this_shard_id() == sid);
socket.shutdown_output();
}
- side_t get_side() const {
- return side;
- }
-
- uint16_t get_ephemeral_port() const {
- return ephemeral_port;
- }
-
- // learn my ephemeral_port as connector.
- // unfortunately, there's no way to identify which port I'm using as
- // connector with current seastar interface.
- void learn_ephemeral_port_as_connector(uint16_t port) {
- assert(side == side_t::connector &&
- (ephemeral_port == 0 || ephemeral_port == port));
- ephemeral_port = port;
- }
-
- seastar::socket_address get_local_address() const {
- return socket.local_address();
- }
-
- private:
+private:
const seastar::shard_id sid;
seastar::connected_socket socket;
seastar::input_stream<char> in;
} r;
#ifdef UNIT_TESTS_BUILT
- public:
+public:
void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
- private:
+private:
+ seastar::future<> try_trap_pre(bp_action_t& trap);
+
+ seastar::future<> try_trap_post(bp_action_t& trap);
+
bp_action_t next_trap_read = bp_action_t::CONTINUE;
bp_action_t next_trap_write = bp_action_t::CONTINUE;
socket_blocker* blocker = nullptr;
- seastar::future<> try_trap_pre(bp_action_t& trap);
- seastar::future<> try_trap_post(bp_action_t& trap);
#endif
- friend class FixedCPUServerSocket;
+ friend class ShardedServerSocket;
};
using listen_ertr = crimson::errorator<
crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
>;
-class FixedCPUServerSocket
- : public seastar::peering_sharded_service<FixedCPUServerSocket> {
- const seastar::shard_id cpu;
- entity_addr_t addr;
- std::optional<seastar::server_socket> listener;
- seastar::gate shutdown_gate;
+class ShardedServerSocket
+ : public seastar::peering_sharded_service<ShardedServerSocket> {
+ struct construct_tag {};
- using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
- std::unique_ptr<sharded_service_t> service;
+public:
+ ShardedServerSocket(
+ seastar::shard_id sid,
+ bool dispatch_only_on_primary_sid,
+ construct_tag);
- struct construct_tag {};
+ ~ShardedServerSocket();
- static seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_ms);
- }
+ ShardedServerSocket(ShardedServerSocket&&) = delete;
+ ShardedServerSocket(const ShardedServerSocket&) = delete;
+ ShardedServerSocket& operator=(ShardedServerSocket&&) = delete;
+ ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
- seastar::future<> reset() {
- return container().invoke_on_all([] (auto& ss) {
- assert(ss.shutdown_gate.is_closed());
- ss.addr = entity_addr_t();
- ss.listener.reset();
- });
+ bool is_fixed_shard_dispatching() const {
+ return dispatch_only_on_primary_sid;
}
-public:
- FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
- ~FixedCPUServerSocket() {
- assert(!listener);
- // detect whether user have called destroy() properly
- ceph_assert(!service);
- }
+ listen_ertr::future<> listen(entity_addr_t addr);
- FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
- FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
- FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+ using accept_func_t =
+ std::function<seastar::future<>(SocketRef, entity_addr_t)>;
+ seastar::future<> accept(accept_func_t &&_fn_accept);
- listen_ertr::future<> listen(entity_addr_t addr);
+ seastar::future<> shutdown_destroy();
- // fn_accept should be a nothrow function of type
- // seastar::future<>(SocketRef, entity_addr_t)
- template <typename Func>
- seastar::future<> accept(Func&& fn_accept) {
- assert(seastar::this_shard_id() == cpu);
- logger().trace("FixedCPUServerSocket({})::accept()...", addr);
- return container().invoke_on_all(
- [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
- assert(ss.listener);
- // gate accepting
- // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
- // so ignore the returned future
- std::ignore = seastar::with_gate(ss.shutdown_gate,
- [&ss, fn_accept = std::move(fn_accept)] () mutable {
- return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
- return ss.listener->accept().then(
- [&ss, fn_accept = std::move(fn_accept)]
- (seastar::accept_result accept_result) mutable {
- // assert seastar::listen_options::set_fixed_cpu() works
- assert(seastar::this_shard_id() == ss.cpu);
- auto [socket, paddr] = std::move(accept_result);
- entity_addr_t peer_addr;
- peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- peer_addr.set_type(ss.addr.get_type());
- SocketRef _socket = std::make_unique<Socket>(
- std::move(socket), Socket::side_t::acceptor,
- peer_addr.get_port(), Socket::construct_tag{});
- std::ignore = seastar::with_gate(ss.shutdown_gate,
- [socket = std::move(_socket), peer_addr,
- &ss, fn_accept = std::move(fn_accept)] () mutable {
- logger().trace("FixedCPUServerSocket({})::accept(): "
- "accepted peer {}", ss.addr, peer_addr);
- return fn_accept(std::move(socket), peer_addr
- ).handle_exception([&ss, peer_addr] (auto eptr) {
- logger().error("FixedCPUServerSocket({})::accept(): "
- "fn_accept(s, {}) got unexpected exception {}",
- ss.addr, peer_addr, eptr);
- ceph_abort();
- });
- });
- });
- }).handle_exception_type([&ss] (const std::system_error& e) {
- if (e.code() == std::errc::connection_aborted ||
- e.code() == std::errc::invalid_argument) {
- logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
- ss.addr, e);
- } else {
- throw;
- }
- }).handle_exception([&ss] (auto eptr) {
- logger().error("FixedCPUServerSocket({})::accept(): "
- "got unexpected exception {}", ss.addr, eptr);
- ceph_abort();
- });
- });
- });
- }
+ static seastar::future<ShardedServerSocket*> create(
+ bool dispatch_only_on_this_shard);
- seastar::future<> shutdown();
- seastar::future<> destroy();
- static seastar::future<FixedCPUServerSocket*> create();
+private:
+ const seastar::shard_id primary_sid;
+ /// XXX: Remove once all infrastructure uses multi-core messenger
+ const bool dispatch_only_on_primary_sid;
+ entity_addr_t listen_addr;
+ std::optional<seastar::server_socket> listener;
+ seastar::gate shutdown_gate;
+ accept_func_t fn_accept;
+
+ using sharded_service_t = seastar::sharded<ShardedServerSocket>;
+ std::unique_ptr<sharded_service_t> service;
};
} // namespace crimson::net