]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/Socket.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / net / Socket.cc
index 642bda4921f08439d2f11b9a51a7e1afcaebe4bf..6434a407f22a56c6b2c3f75a8d12eedcf181e7ad 100644 (file)
@@ -3,11 +3,14 @@
 
 #include "Socket.h"
 
+#include <seastar/core/sleep.hh>
 #include <seastar/core/when_all.hh>
 
 #include "crimson/common/log.h"
 #include "Errors.h"
 
+using crimson::common::local_conf;
+
 namespace crimson::net {
 
 namespace {
@@ -33,7 +36,7 @@ struct bufferlist_consumer {
     if (remaining >= data.size()) {
       // consume the whole buffer
       remaining -= data.size();
-      bl.append(buffer::create_foreign(std::move(data)));
+      bl.append(buffer::create(std::move(data)));
       if (remaining > 0) {
         // return none to request more segments
         return seastar::make_ready_future<consumption_result_type>(
@@ -46,7 +49,7 @@ struct bufferlist_consumer {
     }
     if (remaining > 0) {
       // consume the front
-      bl.append(buffer::create_foreign(data.share(0, remaining)));
+      bl.append(buffer::create(data.share(0, remaining)));
       data.trim_front(remaining);
       remaining = 0;
     }
@@ -72,7 +75,10 @@ seastar::future<bufferlist> Socket::read(size_t bytes)
       if (r.remaining) { // throw on short reads
         throw std::system_error(make_error_code(error::read_eof));
       }
-      return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+      inject_failure();
+      return inject_delay().then([this] {
+        return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+      });
     });
 #ifdef UNIT_TESTS_BUILT
   }).then([this] (auto buf) {
@@ -96,7 +102,11 @@ Socket::read_exactly(size_t bytes) {
       if (buf.size() < bytes) {
         throw std::system_error(make_error_code(error::read_eof));
       }
-      return seastar::make_ready_future<tmp_buf>(std::move(buf));
+      inject_failure();
+      return inject_delay(
+      ).then([buf = std::move(buf)] () mutable {
+        return seastar::make_ready_future<tmp_buf>(std::move(buf));
+      });
     });
 #ifdef UNIT_TESTS_BUILT
   }).then([this] (auto buf) {
@@ -109,6 +119,7 @@ Socket::read_exactly(size_t bytes) {
 }
 
 void Socket::shutdown() {
+  socket_is_shutdown = true;
   socket.shutdown_input();
   socket.shutdown_output();
 }
@@ -132,6 +143,7 @@ seastar::future<> Socket::close() {
   closed = true;
 #endif
   return seastar::when_all_succeed(
+    inject_delay(),
     in.close(),
     close_and_handle_errors(out)
   ).then_unpack([] {
@@ -142,6 +154,31 @@ seastar::future<> Socket::close() {
   });
 }
 
+seastar::future<> Socket::inject_delay () {
+  if (float delay_period = local_conf()->ms_inject_internal_delays;
+      delay_period) {
+    logger().debug("Socket::inject_delay: sleep for {}", delay_period);
+    return seastar::sleep(
+      std::chrono::milliseconds((int)(delay_period * 1000.0)));
+  }
+  return seastar::now();
+}
+
+void Socket::inject_failure()
+{
+  if (local_conf()->ms_inject_socket_failures) {
+    uint64_t rand =
+      ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
+    if (rand % local_conf()->ms_inject_socket_failures == 0) {
+      if (true) {
+        logger().warn("Socket::inject_failure: injecting socket failure");
+       throw std::system_error(make_error_code(
+         crimson::net::error::negotiation_failure));
+      }
+    }
+  }
+}
+
 #ifdef UNIT_TESTS_BUILT
 seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
   auto action = trap;
@@ -172,7 +209,7 @@ seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
     break;
    case bp_action_t::STALL:
     logger().info("[Test] got STALL and block");
-    shutdown();
+    force_shutdown();
     return blocker->block();
    default:
     ceph_abort("unexpected action from trap");