#include "Socket.h"
+#include <seastar/core/sleep.hh>
#include <seastar/core/when_all.hh>
#include "crimson/common/log.h"
#include "Errors.h"
+using crimson::common::local_conf;
+
namespace crimson::net {
namespace {
if (remaining >= data.size()) {
// consume the whole buffer
remaining -= data.size();
- bl.append(buffer::create_foreign(std::move(data)));
+ bl.append(buffer::create(std::move(data)));
if (remaining > 0) {
// return none to request more segments
return seastar::make_ready_future<consumption_result_type>(
}
if (remaining > 0) {
// consume the front
- bl.append(buffer::create_foreign(data.share(0, remaining)));
+ bl.append(buffer::create(data.share(0, remaining)));
data.trim_front(remaining);
remaining = 0;
}
if (r.remaining) { // throw on short reads
throw std::system_error(make_error_code(error::read_eof));
}
- return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+ inject_failure();
+ return inject_delay().then([this] {
+ return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+ });
});
#ifdef UNIT_TESTS_BUILT
}).then([this] (auto buf) {
if (buf.size() < bytes) {
throw std::system_error(make_error_code(error::read_eof));
}
- return seastar::make_ready_future<tmp_buf>(std::move(buf));
+ inject_failure();
+ return inject_delay(
+ ).then([buf = std::move(buf)] () mutable {
+ return seastar::make_ready_future<tmp_buf>(std::move(buf));
+ });
});
#ifdef UNIT_TESTS_BUILT
}).then([this] (auto buf) {
}
void Socket::shutdown() {
+ socket_is_shutdown = true;
socket.shutdown_input();
socket.shutdown_output();
}
closed = true;
#endif
return seastar::when_all_succeed(
+ inject_delay(),
in.close(),
close_and_handle_errors(out)
).then_unpack([] {
});
}
+seastar::future<> Socket::inject_delay () {
+ if (float delay_period = local_conf()->ms_inject_internal_delays;
+ delay_period) {
+ logger().debug("Socket::inject_delay: sleep for {}", delay_period);
+ return seastar::sleep(
+ std::chrono::milliseconds((int)(delay_period * 1000.0)));
+ }
+ return seastar::now();
+}
+
+void Socket::inject_failure()
+{
+ if (local_conf()->ms_inject_socket_failures) {
+ uint64_t rand =
+ ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
+ if (rand % local_conf()->ms_inject_socket_failures == 0) {
+ if (true) {
+ logger().warn("Socket::inject_failure: injecting socket failure");
+ throw std::system_error(make_error_code(
+ crimson::net::error::negotiation_failure));
+ }
+ }
+ }
+}
+
#ifdef UNIT_TESTS_BUILT
seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
auto action = trap;
break;
case bp_action_t::STALL:
logger().info("[Test] got STALL and block");
- shutdown();
+ force_shutdown();
return blocker->block();
default:
ceph_abort("unexpected action from trap");