X-Git-Url: https://git.proxmox.com/?p=ceph.git;a=blobdiff_plain;f=ceph%2Fsrc%2Fcrimson%2Fnet%2FSocket.cc;h=6434a407f22a56c6b2c3f75a8d12eedcf181e7ad;hp=642bda4921f08439d2f11b9a51a7e1afcaebe4bf;hb=1e59de90020f1d8d374046ef9cca56ccd4e806e2;hpb=bd41e436e25044e8e83156060a37c23cb661c364 diff --git a/ceph/src/crimson/net/Socket.cc b/ceph/src/crimson/net/Socket.cc index 642bda492..6434a407f 100644 --- a/ceph/src/crimson/net/Socket.cc +++ b/ceph/src/crimson/net/Socket.cc @@ -3,11 +3,14 @@ #include "Socket.h" +#include #include #include "crimson/common/log.h" #include "Errors.h" +using crimson::common::local_conf; + namespace crimson::net { namespace { @@ -33,7 +36,7 @@ struct bufferlist_consumer { if (remaining >= data.size()) { // consume the whole buffer remaining -= data.size(); - bl.append(buffer::create_foreign(std::move(data))); + bl.append(buffer::create(std::move(data))); if (remaining > 0) { // return none to request more segments return seastar::make_ready_future( @@ -46,7 +49,7 @@ struct bufferlist_consumer { } if (remaining > 0) { // consume the front - bl.append(buffer::create_foreign(data.share(0, remaining))); + bl.append(buffer::create(data.share(0, remaining))); data.trim_front(remaining); remaining = 0; } @@ -72,7 +75,10 @@ seastar::future Socket::read(size_t bytes) if (r.remaining) { // throw on short reads throw std::system_error(make_error_code(error::read_eof)); } - return seastar::make_ready_future(std::move(r.buffer)); + inject_failure(); + return inject_delay().then([this] { + return seastar::make_ready_future(std::move(r.buffer)); + }); }); #ifdef UNIT_TESTS_BUILT }).then([this] (auto buf) { @@ -96,7 +102,11 @@ Socket::read_exactly(size_t bytes) { if (buf.size() < bytes) { throw std::system_error(make_error_code(error::read_eof)); } - return seastar::make_ready_future(std::move(buf)); + inject_failure(); + return inject_delay( + ).then([buf = std::move(buf)] () mutable { + return seastar::make_ready_future(std::move(buf)); + }); }); #ifdef UNIT_TESTS_BUILT }).then([this] (auto buf) { @@ -109,6 +119,7 @@ Socket::read_exactly(size_t bytes) { } void Socket::shutdown() { + socket_is_shutdown = true; socket.shutdown_input(); socket.shutdown_output(); } @@ -132,6 +143,7 @@ seastar::future<> Socket::close() { closed = true; #endif return seastar::when_all_succeed( + inject_delay(), in.close(), close_and_handle_errors(out) ).then_unpack([] { @@ -142,6 +154,31 @@ seastar::future<> Socket::close() { }); } +seastar::future<> Socket::inject_delay () { + if (float delay_period = local_conf()->ms_inject_internal_delays; + delay_period) { + logger().debug("Socket::inject_delay: sleep for {}", delay_period); + return seastar::sleep( + std::chrono::milliseconds((int)(delay_period * 1000.0))); + } + return seastar::now(); +} + +void Socket::inject_failure() +{ + if (local_conf()->ms_inject_socket_failures) { + uint64_t rand = + ceph::util::generate_random_number(1, RAND_MAX); + if (rand % local_conf()->ms_inject_socket_failures == 0) { + if (true) { + logger().warn("Socket::inject_failure: injecting socket failure"); + throw std::system_error(make_error_code( + crimson::net::error::negotiation_failure)); + } + } + } +} + #ifdef UNIT_TESTS_BUILT seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { auto action = trap; @@ -172,7 +209,7 @@ seastar::future<> Socket::try_trap_post(bp_action_t& trap) { break; case bp_action_t::STALL: logger().info("[Test] got STALL and block"); - shutdown(); + force_shutdown(); return blocker->block(); default: ceph_abort("unexpected action from trap");