]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/Protocol.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / net / Protocol.cc
index bf2633c1c221444988580273351ff48fb44b53c8..50b5c45a335f7eda571e5d654be9bd957b1dd65d 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "crimson/common/log.h"
 #include "crimson/net/Errors.h"
+#include "crimson/net/chained_dispatchers.h"
 #include "crimson/net/Socket.h"
 #include "crimson/net/SocketConnection.h"
 #include "msg/Message.h"
@@ -20,56 +21,73 @@ namespace {
 namespace crimson::net {
 
 Protocol::Protocol(proto_t type,
-                   Dispatcher& dispatcher,
+                   ChainedDispatchers& dispatchers,
                    SocketConnection& conn)
   : proto_type(type),
-    dispatcher(dispatcher),
+    dispatchers(dispatchers),
     conn(conn),
     auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}
 {}
 
 Protocol::~Protocol()
 {
-  ceph_assert(pending_dispatch.is_closed());
+  ceph_assert(gate.is_closed());
   assert(!exit_open);
 }
 
-bool Protocol::is_connected() const
-{
-  return write_state == write_state_t::open;
-}
-
-seastar::future<> Protocol::close()
+void Protocol::close(bool dispatch_reset,
+                     std::optional<std::function<void()>> f_accept_new)
 {
   if (closed) {
     // already closing
-    assert(close_ready.valid());
-    return close_ready.get_future();
+    return;
   }
 
-  // unregister_conn() drops a reference, so hold another until completion
-  auto cleanup = [conn_ref = conn.shared_from_this(), this] {
-      logger().debug("{} closed!", conn);
-    };
+  bool is_replace = f_accept_new ? true : false;
+  logger().info("{} closing: reset {}, replace {}", conn,
+                dispatch_reset ? "yes" : "no",
+                is_replace ? "yes" : "no");
 
+  // atomic operations
+  closed = true;
   trigger_close();
-
-  // close_ready become valid only after state is state_t::closing
-  assert(!close_ready.valid());
-
+  if (f_accept_new) {
+    (*f_accept_new)();
+  }
   if (socket) {
     socket->shutdown();
-    close_ready = pending_dispatch.close().finally([this] {
-      return socket->close();
-    }).finally(std::move(cleanup));
-  } else {
-    close_ready = pending_dispatch.close().finally(std::move(cleanup));
   }
-
-  closed = true;
   set_write_state(write_state_t::drop);
+  assert(!gate.is_closed());
+  auto gate_closed = gate.close();
+
+  if (dispatch_reset) {
+    dispatchers.ms_handle_reset(
+        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+        is_replace);
+  }
 
-  return close_ready.get_future();
+  // asynchronous operations
+  assert(!close_ready.valid());
+  close_ready = std::move(gate_closed).then([this] {
+    if (socket) {
+      return socket->close();
+    } else {
+      return seastar::now();
+    }
+  }).then([this] {
+    logger().debug("{} closed!", conn);
+    on_closed();
+#ifdef UNIT_TESTS_BUILT
+    is_closed_clean = true;
+    if (conn.interceptor) {
+      conn.interceptor->register_conn_closed(conn);
+    }
+#endif
+  }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+    logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr);
+    ceph_abort();
+  });
 }
 
 seastar::future<> Protocol::send(MessageRef msg)
@@ -289,13 +307,9 @@ void Protocol::write_event()
    case write_state_t::open:
      [[fallthrough]];
    case write_state_t::delay:
-    (void) seastar::with_gate(pending_dispatch, [this] {
-      return do_write_dispatch_sweep(
-      ).handle_exception([this] (std::exception_ptr eptr) {
-        logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
-                       conn, eptr);
-        ceph_abort();
-      });
+    assert(!gate.is_closed());
+    gate.dispatch_in_background("do_write_dispatch_sweep", *this, [this] {
+      return do_write_dispatch_sweep();
     });
     return;
    case write_state_t::drop: