]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/Interceptor.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / Interceptor.h
index 41ec31f3755768d538ea8c66c0689c518d3d8551..35b74e24369c4d6aa9db657ff052b5c8c6412b59 100644 (file)
@@ -45,16 +45,21 @@ enum class bp_action_t {
 class socket_blocker {
   std::optional<seastar::abort_source> p_blocked;
   std::optional<seastar::abort_source> p_unblocked;
+  const seastar::shard_id primary_sid;
 
  public:
+  socket_blocker() : primary_sid{seastar::this_shard_id()} {}
+
   seastar::future<> wait_blocked() {
+    ceph_assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(!p_blocked);
     if (p_unblocked) {
       return seastar::make_ready_future<>();
     } else {
       p_blocked = seastar::abort_source();
-      return seastar::sleep_abortable(std::chrono::seconds(10),
-                                     *p_blocked).then([] {
+      return seastar::sleep_abortable(
+        std::chrono::seconds(10), *p_blocked
+      ).then([] {
         throw std::runtime_error(
             "Timeout (10s) in socket_blocker::wait_blocked()");
       }).handle_exception_type([] (const seastar::sleep_aborted& e) {
@@ -64,21 +69,25 @@ class socket_blocker {
   }
 
   seastar::future<> block() {
-    if (p_blocked) {
-      p_blocked->request_abort();
-      p_blocked = std::nullopt;
-    }
-    ceph_assert(!p_unblocked);
-    p_unblocked = seastar::abort_source();
-    return seastar::sleep_abortable(std::chrono::seconds(10),
-                                   *p_unblocked).then([] {
-      ceph_abort("Timeout (10s) in socket_blocker::block()");
-    }).handle_exception_type([] (const seastar::sleep_aborted& e) {
-      // wait done!
+    return seastar::smp::submit_to(primary_sid, [this] {
+      if (p_blocked) {
+        p_blocked->request_abort();
+        p_blocked = std::nullopt;
+      }
+      ceph_assert(!p_unblocked);
+      p_unblocked = seastar::abort_source();
+      return seastar::sleep_abortable(
+        std::chrono::seconds(10), *p_unblocked
+      ).then([] {
+        ceph_abort("Timeout (10s) in socket_blocker::block()");
+      }).handle_exception_type([] (const seastar::sleep_aborted& e) {
+        // wait done!
+      });
     });
   }
 
   void unblock() {
+    ceph_assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(!p_blocked);
     ceph_assert(p_unblocked);
     p_unblocked->request_abort();
@@ -116,11 +125,13 @@ struct Breakpoint {
 struct Interceptor {
   socket_blocker blocker;
   virtual ~Interceptor() {}
-  virtual void register_conn(SocketConnection& conn) = 0;
-  virtual void register_conn_ready(SocketConnection& conn) = 0;
-  virtual void register_conn_closed(SocketConnection& conn) = 0;
-  virtual void register_conn_replaced(SocketConnection& conn) = 0;
-  virtual bp_action_t intercept(SocketConnection& conn, Breakpoint bp) = 0;
+  virtual void register_conn(ConnectionRef) = 0;
+  virtual void register_conn_ready(ConnectionRef) = 0;
+  virtual void register_conn_closed(ConnectionRef) = 0;
+  virtual void register_conn_replaced(ConnectionRef) = 0;
+
+  virtual seastar::future<bp_action_t>
+  intercept(Connection&, std::vector<Breakpoint> bp) = 0;
 };
 
 } // namespace crimson::net