X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fcrimson%2Fnet%2FSocket.cc;h=95b1e225034eb567b1bdf1a1107437d6ad84d2df;hb=aee94f6923ba628a85d855d0c5316d0da78bfa2a;hp=6434a407f22a56c6b2c3f75a8d12eedcf181e7ad;hpb=27f45121cc74e31203777ad565f78d8aad9b92a2;p=ceph.git diff --git a/ceph/src/crimson/net/Socket.cc b/ceph/src/crimson/net/Socket.cc index 6434a407f..95b1e2250 100644 --- a/ceph/src/crimson/net/Socket.cc +++ b/ceph/src/crimson/net/Socket.cc @@ -5,6 +5,7 @@ #include #include +#include #include "crimson/common/log.h" #include "Errors.h" @@ -19,6 +20,9 @@ seastar::logger& logger() { return crimson::get_logger(ceph_subsys_ms); } +using tmp_buf = seastar::temporary_buffer; +using packet = seastar::net::packet; + // an input_stream consumer that reads buffer segments into a bufferlist up to // the given number of remaining bytes struct bufferlist_consumer { @@ -28,7 +32,6 @@ struct bufferlist_consumer { bufferlist_consumer(bufferlist& bl, size_t& remaining) : bl(bl), remaining(remaining) {} - using tmp_buf = seastar::temporary_buffer; using consumption_result_type = typename seastar::input_stream::consumption_result_type; // consume some or all of a buffer segment @@ -59,10 +62,64 @@ struct bufferlist_consumer { }; }; +seastar::future<> inject_delay() +{ + if (float delay_period = local_conf()->ms_inject_internal_delays; + delay_period) { + logger().debug("Socket::inject_delay: sleep for {}", delay_period); + return seastar::sleep( + std::chrono::milliseconds((int)(delay_period * 1000.0))); + } + return seastar::now(); +} + +void inject_failure() +{ + if (local_conf()->ms_inject_socket_failures) { + uint64_t rand = + ceph::util::generate_random_number(1, RAND_MAX); + if (rand % local_conf()->ms_inject_socket_failures == 0) { + logger().warn("Socket::inject_failure: injecting socket failure"); + throw std::system_error(make_error_code( + error::negotiation_failure)); + } + } +} + } // anonymous namespace -seastar::future Socket::read(size_t bytes) +Socket::Socket( + seastar::connected_socket &&_socket, + side_t _side, + uint16_t e_port, + construct_tag) + : sid{seastar::this_shard_id()}, + socket(std::move(_socket)), + in(socket.input()), + // the default buffer size 8192 is too small that may impact our write + // performance. see seastar::net::connected_socket::output() + out(socket.output(65536)), + socket_is_shutdown(false), + side(_side), + ephemeral_port(e_port) { + if (local_conf()->ms_tcp_nodelay) { + socket.set_nodelay(true); + } +} + +Socket::~Socket() +{ + assert(seastar::this_shard_id() == sid); +#ifndef NDEBUG + assert(closed); +#endif +} + +seastar::future +Socket::read(size_t bytes) +{ + assert(seastar::this_shard_id() == sid); #ifdef UNIT_TESTS_BUILT return try_trap_pre(next_trap_read).then([bytes, this] { #endif @@ -81,44 +138,103 @@ seastar::future Socket::read(size_t bytes) }); }); #ifdef UNIT_TESTS_BUILT - }).then([this] (auto buf) { + }).then([this](auto buf) { return try_trap_post(next_trap_read - ).then([buf = std::move(buf)] () mutable { + ).then([buf = std::move(buf)]() mutable { return std::move(buf); }); }); #endif } -seastar::future> +seastar::future Socket::read_exactly(size_t bytes) { + assert(seastar::this_shard_id() == sid); #ifdef UNIT_TESTS_BUILT return try_trap_pre(next_trap_read).then([bytes, this] { #endif if (bytes == 0) { - return seastar::make_ready_future>(); + return seastar::make_ready_future(); } return in.read_exactly(bytes).then([bytes](auto buf) { - if (buf.size() < bytes) { + bufferptr ptr(buffer::create(buf.share())); + if (ptr.length() < bytes) { throw std::system_error(make_error_code(error::read_eof)); } inject_failure(); return inject_delay( - ).then([buf = std::move(buf)] () mutable { - return seastar::make_ready_future(std::move(buf)); + ).then([ptr = std::move(ptr)]() mutable { + return seastar::make_ready_future(std::move(ptr)); }); }); #ifdef UNIT_TESTS_BUILT - }).then([this] (auto buf) { + }).then([this](auto ptr) { return try_trap_post(next_trap_read - ).then([buf = std::move(buf)] () mutable { - return std::move(buf); + ).then([ptr = std::move(ptr)]() mutable { + return std::move(ptr); }); }); #endif } -void Socket::shutdown() { +seastar::future<> +Socket::write(bufferlist buf) +{ + assert(seastar::this_shard_id() == sid); +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write + ).then([buf = std::move(buf), this]() mutable { +#endif + inject_failure(); + return inject_delay( + ).then([buf = std::move(buf), this]() mutable { + packet p(std::move(buf)); + return out.write(std::move(p)); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif +} + +seastar::future<> +Socket::flush() +{ + assert(seastar::this_shard_id() == sid); + inject_failure(); + return inject_delay().then([this] { + return out.flush(); + }); +} + +seastar::future<> +Socket::write_flush(bufferlist buf) +{ + assert(seastar::this_shard_id() == sid); +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write + ).then([buf = std::move(buf), this]() mutable { +#endif + inject_failure(); + return inject_delay( + ).then([buf = std::move(buf), this]() mutable { + packet p(std::move(buf)); + return out.write(std::move(p) + ).then([this] { + return out.flush(); + }); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif +} + +void Socket::shutdown() +{ + assert(seastar::this_shard_id() == sid); socket_is_shutdown = true; socket.shutdown_input(); socket.shutdown_output(); @@ -127,19 +243,22 @@ void Socket::shutdown() { static inline seastar::future<> close_and_handle_errors(seastar::output_stream& out) { - return out.close().handle_exception_type([] (const std::system_error& e) { + return out.close().handle_exception_type([](const std::system_error& e) { if (e.code() != std::errc::broken_pipe && e.code() != std::errc::connection_reset) { - logger().error("Socket::close(): unexpected error {}", e); + logger().error("Socket::close(): unexpected error {}", e.what()); ceph_abort(); } // can happen when out is already shutdown, ignore }); } -seastar::future<> Socket::close() { +seastar::future<> +Socket::close() +{ + assert(seastar::this_shard_id() == sid); #ifndef NDEBUG - ceph_assert(!closed); + ceph_assert_always(!closed); closed = true; #endif return seastar::when_all_succeed( @@ -148,39 +267,55 @@ seastar::future<> Socket::close() { close_and_handle_errors(out) ).then_unpack([] { return seastar::make_ready_future<>(); - }).handle_exception([] (auto eptr) { - logger().error("Socket::close(): unexpected exception {}", eptr); + }).handle_exception([](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("Socket::close(): unexpected exception {}", e_what); ceph_abort(); }); } -seastar::future<> Socket::inject_delay () { - if (float delay_period = local_conf()->ms_inject_internal_delays; - delay_period) { - logger().debug("Socket::inject_delay: sleep for {}", delay_period); - return seastar::sleep( - std::chrono::milliseconds((int)(delay_period * 1000.0))); - } - return seastar::now(); +seastar::future +Socket::connect(const entity_addr_t &peer_addr) +{ + inject_failure(); + return inject_delay( + ).then([peer_addr] { + return seastar::connect(peer_addr.in4_addr()); + }).then([peer_addr](seastar::connected_socket socket) { + auto ret = std::make_unique( + std::move(socket), side_t::connector, 0, construct_tag{}); + logger().debug("Socket::connect(): connected to {}, socket {}", + peer_addr, fmt::ptr(ret)); + return ret; + }); } -void Socket::inject_failure() -{ - if (local_conf()->ms_inject_socket_failures) { - uint64_t rand = - ceph::util::generate_random_number(1, RAND_MAX); - if (rand % local_conf()->ms_inject_socket_failures == 0) { - if (true) { - logger().warn("Socket::inject_failure: injecting socket failure"); - throw std::system_error(make_error_code( - crimson::net::error::negotiation_failure)); - } +#ifdef UNIT_TESTS_BUILT +void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { + assert(seastar::this_shard_id() == sid); + blocker = blocker_; + if (type == bp_type_t::READ) { + ceph_assert_always(next_trap_read == bp_action_t::CONTINUE); + next_trap_read = action; + } else { // type == bp_type_t::WRITE + if (next_trap_write == bp_action_t::CONTINUE) { + next_trap_write = action; + } else if (next_trap_write == bp_action_t::FAULT) { + // do_sweep_messages() may combine multiple write events into one socket write + ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); + } else { + ceph_abort(); } } } -#ifdef UNIT_TESTS_BUILT -seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { +seastar::future<> +Socket::try_trap_pre(bp_action_t& trap) { auto action = trap; trap = bp_action_t::CONTINUE; switch (action) { @@ -188,7 +323,7 @@ seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { break; case bp_action_t::FAULT: logger().info("[Test] got FAULT"); - throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + throw std::system_error(make_error_code(error::negotiation_failure)); case bp_action_t::BLOCK: logger().info("[Test] got BLOCK"); return blocker->block(); @@ -201,7 +336,8 @@ seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { return seastar::make_ready_future<>(); } -seastar::future<> Socket::try_trap_post(bp_action_t& trap) { +seastar::future<> +Socket::try_trap_post(bp_action_t& trap) { auto action = trap; trap = bp_action_t::CONTINUE; switch (action) { @@ -216,94 +352,170 @@ seastar::future<> Socket::try_trap_post(bp_action_t& trap) { } return seastar::make_ready_future<>(); } +#endif -void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { - blocker = blocker_; - if (type == bp_type_t::READ) { - ceph_assert(next_trap_read == bp_action_t::CONTINUE); - next_trap_read = action; - } else { // type == bp_type_t::WRITE - if (next_trap_write == bp_action_t::CONTINUE) { - next_trap_write = action; - } else if (next_trap_write == bp_action_t::FAULT) { - // do_sweep_messages() may combine multiple write events into one socket write - ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); - } else { - ceph_abort(); - } - } +ShardedServerSocket::ShardedServerSocket( + seastar::shard_id sid, + bool dispatch_only_on_primary_sid, + construct_tag) + : primary_sid{sid}, dispatch_only_on_primary_sid{dispatch_only_on_primary_sid} +{ +} + +ShardedServerSocket::~ShardedServerSocket() +{ + assert(!listener); + // detect whether user have called destroy() properly + ceph_assert_always(!service); } -#endif -crimson::net::listen_ertr::future<> -FixedCPUServerSocket::listen(entity_addr_t addr) +listen_ertr::future<> +ShardedServerSocket::listen(entity_addr_t addr) { - assert(seastar::this_shard_id() == cpu); - logger().trace("FixedCPUServerSocket::listen({})...", addr); - return container().invoke_on_all([addr] (auto& ss) { - ss.addr = addr; + ceph_assert_always(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::listen()...", addr); + return this->container().invoke_on_all([addr](auto& ss) { + ss.listen_addr = addr; seastar::socket_address s_addr(addr.in4_addr()); seastar::listen_options lo; lo.reuse_address = true; - lo.set_fixed_cpu(ss.cpu); + if (ss.dispatch_only_on_primary_sid) { + lo.set_fixed_cpu(ss.primary_sid); + } ss.listener = seastar::listen(s_addr, lo); }).then([] { return listen_ertr::now(); }).handle_exception_type( - [addr] (const std::system_error& e) -> listen_ertr::future<> { + [addr](const std::system_error& e) -> listen_ertr::future<> { if (e.code() == std::errc::address_in_use) { - logger().trace("FixedCPUServerSocket::listen({}): address in use", addr); + logger().debug("ShardedServerSocket({})::listen(): address in use", addr); return crimson::ct_error::address_in_use::make(); } else if (e.code() == std::errc::address_not_available) { - logger().trace("FixedCPUServerSocket::listen({}): address not available", + logger().debug("ShardedServerSocket({})::listen(): address not available", addr); return crimson::ct_error::address_not_available::make(); } - logger().error("FixedCPUServerSocket::listen({}): " - "got unexpeted error {}", addr, e); + logger().error("ShardedServerSocket({})::listen(): " + "got unexpeted error {}", addr, e.what()); ceph_abort(); }); } -seastar::future<> FixedCPUServerSocket::shutdown() +seastar::future<> +ShardedServerSocket::accept(accept_func_t &&_fn_accept) { - assert(seastar::this_shard_id() == cpu); - logger().trace("FixedCPUServerSocket({})::shutdown()...", addr); - return container().invoke_on_all([] (auto& ss) { + ceph_assert_always(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::accept()...", listen_addr); + return this->container().invoke_on_all([_fn_accept](auto &ss) { + assert(ss.listener); + ss.fn_accept = _fn_accept; + // gate accepting + // ShardedServerSocket::shutdown() will drain the continuations in the gate + // so ignore the returned future + std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] { + return seastar::keep_doing([&ss] { + return ss.listener->accept( + ).then([&ss](seastar::accept_result accept_result) { +#ifndef NDEBUG + if (ss.dispatch_only_on_primary_sid) { + // see seastar::listen_options::set_fixed_cpu() + ceph_assert_always(seastar::this_shard_id() == ss.primary_sid); + } +#endif + auto [socket, paddr] = std::move(accept_result); + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + peer_addr.set_type(ss.listen_addr.get_type()); + SocketRef _socket = std::make_unique( + std::move(socket), Socket::side_t::acceptor, + peer_addr.get_port(), Socket::construct_tag{}); + logger().debug("ShardedServerSocket({})::accept(): accepted peer {}, " + "socket {}, dispatch_only_on_primary_sid = {}", + ss.listen_addr, peer_addr, fmt::ptr(_socket), + ss.dispatch_only_on_primary_sid); + std::ignore = seastar::with_gate( + ss.shutdown_gate, + [socket=std::move(_socket), peer_addr, &ss]() mutable { + return ss.fn_accept(std::move(socket), peer_addr + ).handle_exception([&ss, peer_addr](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("ShardedServerSocket({})::accept(): " + "fn_accept(s, {}) got unexpected exception {}", + ss.listen_addr, peer_addr, e_what); + ceph_abort(); + }); + }); + }); + }).handle_exception_type([&ss](const std::system_error& e) { + if (e.code() == std::errc::connection_aborted || + e.code() == std::errc::invalid_argument) { + logger().debug("ShardedServerSocket({})::accept(): stopped ({})", + ss.listen_addr, e.what()); + } else { + throw; + } + }).handle_exception([&ss](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("ShardedServerSocket({})::accept(): " + "got unexpected exception {}", ss.listen_addr, e_what); + ceph_abort(); + }); + }); + }); +} + +seastar::future<> +ShardedServerSocket::shutdown_destroy() +{ + assert(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr); + // shutdown shards + return this->container().invoke_on_all([](auto& ss) { if (ss.listener) { ss.listener->abort_accept(); } return ss.shutdown_gate.close(); }).then([this] { - return reset(); - }); -} - -seastar::future<> FixedCPUServerSocket::destroy() -{ - assert(seastar::this_shard_id() == cpu); - return shutdown().then([this] { - // we should only construct/stop shards on #0 - return container().invoke_on(0, [] (auto& ss) { + // destroy shards + return this->container().invoke_on_all([](auto& ss) { + assert(ss.shutdown_gate.is_closed()); + ss.listen_addr = entity_addr_t(); + ss.listener.reset(); + }); + }).then([this] { + // stop the sharded service: we should only construct/stop shards on #0 + return this->container().invoke_on(0, [](auto& ss) { assert(ss.service); return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); }); }); } -seastar::future FixedCPUServerSocket::create() +seastar::future +ShardedServerSocket::create(bool dispatch_only_on_this_shard) { - auto cpu = seastar::this_shard_id(); - // we should only construct/stop shards on #0 - return seastar::smp::submit_to(0, [cpu] { + auto primary_sid = seastar::this_shard_id(); + // start the sharded service: we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [primary_sid, dispatch_only_on_this_shard] { auto service = std::make_unique(); - return service->start(cpu, construct_tag{} - ).then([service = std::move(service)] () mutable { + return service->start( + primary_sid, dispatch_only_on_this_shard, construct_tag{} + ).then([service = std::move(service)]() mutable { auto p_shard = service.get(); p_shard->local().service = std::move(service); return p_shard; }); - }).then([] (auto p_shard) { + }).then([](auto p_shard) { return &p_shard->local(); }); }