]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/crimson/test_messenger.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / crimson / test_messenger.cc
index 6fc9c1d7750c80e4527a5a7cf5525c5566c97bf6..a4257224658d18634d19bdc68d0917a21d420f84 100644 (file)
@@ -14,7 +14,6 @@
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/net/Interceptor.h"
-#include "crimson/net/SocketConnection.h"
 
 #include <map>
 #include <random>
@@ -24,6 +23,7 @@
 #include <seastar/core/app-template.hh>
 #include <seastar/core/do_with.hh>
 #include <seastar/core/future-util.hh>
+#include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sleep.hh>
 #include <seastar/core/with_timeout.hh>
@@ -53,22 +53,91 @@ static entity_addr_t get_server_addr() {
   return saddr;
 }
 
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+  // we should only construct/stop shards on #0
+  return seastar::smp::submit_to(0, [=] {
+    auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+    return sharded_obj->start(args...
+    ).then([sharded_obj] {
+      seastar::engine().at_exit([sharded_obj] {
+        return sharded_obj->stop().then([sharded_obj] {});
+      });
+      return sharded_obj.get();
+    });
+  }).then([](seastar::sharded<T> *ptr_shard) {
+    return &ptr_shard->local();
+  });
+}
+
+class ShardedGates
+  : public seastar::peering_sharded_service<ShardedGates> {
+public:
+  ShardedGates() = default;
+  ~ShardedGates() {
+    assert(gate.is_closed());
+  }
+
+  template <typename Func>
+  void dispatch_in_background(const char *what, Func &&f) {
+    std::ignore = seastar::with_gate(
+      container().local().gate, std::forward<Func>(f)
+    ).handle_exception([what](std::exception_ptr eptr) {
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        logger().error("ShardedGates::dispatch_in_background: "
+                       "{} got exxception {}", what, e.what());
+      }
+    });
+  }
+
+  seastar::future<> close() {
+    return container().invoke_on_all([](auto &local) {
+      return local.gate.close();
+    });
+  }
+
+  static seastar::future<ShardedGates*> create() {
+    return create_sharded<ShardedGates>();
+  }
+
+  // seastar::future<> stop() is intentially not implemented
+
+private:
+  seastar::gate gate;
+};
+
 static seastar::future<> test_echo(unsigned rounds,
                                    double keepalive_ratio)
 {
   struct test_state {
     struct Server final
         : public crimson::net::Dispatcher {
+      ShardedGates &gates;
       crimson::net::MessengerRef msgr;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
+      Server(ShardedGates &gates) : gates{gates} {}
+
+      void ms_handle_accept(
+          crimson::net::ConnectionRef conn,
+          seastar::shard_id prv_shard,
+          bool is_replace) override {
+        logger().info("server accepted {}", *conn);
+        ceph_assert(prv_shard == seastar::this_shard_id());
+        ceph_assert(!is_replace);
+      }
+
       std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
         if (verbose) {
           logger().info("server got {}", *m);
         }
         // reply with a pong
-        std::ignore = c->send(crimson::make_message<MPing>());
+        gates.dispatch_in_background("echo_send_pong", [c] {
+          return c->send(crimson::make_message<MPing>());
+        });
         return {seastar::now()};
       }
 
@@ -76,7 +145,8 @@ static seastar::future<> test_echo(unsigned rounds,
                              const std::string& lname,
                              const uint64_t nonce,
                              const entity_addr_t& addr) {
-        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr = crimson::net::Messenger::create(
+            name, lname, nonce, false);
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
@@ -96,8 +166,73 @@ static seastar::future<> test_echo(unsigned rounds,
       }
     };
 
-    struct Client final
-        : public crimson::net::Dispatcher {
+    class Client final
+        : public crimson::net::Dispatcher,
+          public seastar::peering_sharded_service<Client> {
+    public:
+      Client(seastar::shard_id primary_sid,
+             unsigned rounds,
+             double keepalive_ratio,
+             ShardedGates *gates)
+        : primary_sid{primary_sid},
+          keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
+          rounds(rounds),
+          gates{*gates} {}
+
+      seastar::future<> init(const entity_name_t& name,
+                             const std::string& lname,
+                             const uint64_t nonce) {
+        assert(seastar::this_shard_id() == primary_sid);
+        msgr = crimson::net::Messenger::create(
+            name, lname, nonce, false);
+        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+        msgr->set_auth_client(&dummy_auth);
+        msgr->set_auth_server(&dummy_auth);
+        return msgr->start({this});
+      }
+
+      seastar::future<> shutdown() {
+        assert(seastar::this_shard_id() == primary_sid);
+        ceph_assert(msgr);
+       msgr->stop();
+        return msgr->shutdown();
+      }
+
+      seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
+        assert(seastar::this_shard_id() == primary_sid);
+        mono_time start_time = mono_clock::now();
+        auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+        return seastar::futurize_invoke([this, conn] {
+          return do_dispatch_pingpong(conn);
+        }).then([] {
+          // 500ms should be enough to establish the connection
+          return seastar::sleep(500ms);
+        }).then([this, conn, start_time] {
+          return container().invoke_on(
+              conn->get_shard_id(),
+              [pconn=&*conn, start_time](auto &local) {
+            assert(pconn->is_connected());
+            auto session = local.find_session(pconn);
+            std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+            std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+            logger().info("{}: handshake {}, pingpong {}",
+                          *pconn, dur_handshake.count(), dur_pingpong.count());
+          }).then([conn] {});
+        });
+      }
+
+      static seastar::future<Client*> create(
+          unsigned rounds,
+          double keepalive_ratio,
+          ShardedGates *gates) {
+        return create_sharded<Client>(
+          seastar::this_shard_id(),
+          rounds,
+          keepalive_ratio,
+          gates);
+      }
+
+     private:
       struct PingSession : public seastar::enable_shared_from_this<PingSession> {
         unsigned count = 0u;
         mono_time connected_time;
@@ -105,83 +240,54 @@ static seastar::future<> test_echo(unsigned rounds,
       };
       using PingSessionRef = seastar::shared_ptr<PingSession>;
 
-      unsigned rounds;
-      std::bernoulli_distribution keepalive_dist;
-      crimson::net::MessengerRef msgr;
-      std::map<crimson::net::ConnectionRef, seastar::promise<>> pending_conns;
-      std::map<crimson::net::ConnectionRef, PingSessionRef> sessions;
-      crimson::auth::DummyAuthClientServer dummy_auth;
-
-      Client(unsigned rounds, double keepalive_ratio)
-        : rounds(rounds),
-          keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
-
-      PingSessionRef find_session(crimson::net::ConnectionRef c) {
-        auto found = sessions.find(c);
-        if (found == sessions.end()) {
-          ceph_assert(false);
-        }
-        return found->second;
-      }
-
-      void ms_handle_connect(crimson::net::ConnectionRef conn) override {
+      void ms_handle_connect(
+          crimson::net::ConnectionRef conn,
+          seastar::shard_id prv_shard) override {
+        auto &local = container().local();
+        assert(prv_shard == seastar::this_shard_id());
         auto session = seastar::make_shared<PingSession>();
-        auto [i, added] = sessions.emplace(conn, session);
+        auto [i, added] = local.sessions.emplace(&*conn, session);
         std::ignore = i;
         ceph_assert(added);
         session->connected_time = mono_clock::now();
       }
+
       std::optional<seastar::future<>> ms_dispatch(
           crimson::net::ConnectionRef c, MessageRef m) override {
-        auto session = find_session(c);
+        auto &local = container().local();
+        auto session = local.find_session(&*c);
         ++(session->count);
         if (verbose) {
           logger().info("client ms_dispatch {}", session->count);
         }
 
-        if (session->count == rounds) {
+        if (session->count > rounds) {
+          logger().error("{}: got {} pongs, more than expected {}", *c, session->count, rounds);
+          ceph_abort();
+        } else if (session->count == rounds) {
           logger().info("{}: finished receiving {} pongs", *c, session->count);
           session->finish_time = mono_clock::now();
-          auto found = pending_conns.find(c);
-          ceph_assert(found != pending_conns.end());
-          found->second.set_value();
+          gates.dispatch_in_background("echo_notify_done", [c, this] {
+            return container().invoke_on(primary_sid, [pconn=&*c](auto &local) {
+              auto found = local.pending_conns.find(pconn);
+              ceph_assert(found != local.pending_conns.end());
+              found->second.set_value();
+            }).then([c] {});
+          });
         }
         return {seastar::now()};
       }
 
-      seastar::future<> init(const entity_name_t& name,
-                             const std::string& lname,
-                             const uint64_t nonce) {
-        msgr = crimson::net::Messenger::create(name, lname, nonce);
-        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-        msgr->set_auth_client(&dummy_auth);
-        msgr->set_auth_server(&dummy_auth);
-        return msgr->start({this});
-      }
-
-      seastar::future<> shutdown() {
-        ceph_assert(msgr);
-       msgr->stop();
-        return msgr->shutdown();
-      }
-
-      seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
-        mono_time start_time = mono_clock::now();
-        auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
-        return seastar::futurize_invoke([this, conn] {
-          return do_dispatch_pingpong(conn);
-        }).then([this, conn, start_time] {
-          auto session = find_session(conn);
-          std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
-          std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
-          logger().info("{}: handshake {}, pingpong {}",
-                        *conn, dur_handshake.count(), dur_pingpong.count());
-        });
+      PingSessionRef find_session(crimson::net::Connection *c) {
+        auto found = sessions.find(c);
+        if (found == sessions.end()) {
+          ceph_assert(false);
+        }
+        return found->second;
       }
 
-     private:
       seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) {
-        auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
+        auto [i, added] = pending_conns.emplace(&*conn, seastar::promise<>());
         std::ignore = i;
         ceph_assert(added);
         return seastar::do_with(0u, 0u,
@@ -197,178 +303,96 @@ static seastar::future<> test_echo(unsigned rounds,
             },
             [this, conn, &count_ping, &count_keepalive] {
               return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
-                  if (keepalive_dist(rng)) {
-                    return conn->send_keepalive()
-                      .then([&count_keepalive] {
-                        count_keepalive += 1;
-                        return seastar::make_ready_future<seastar::stop_iteration>(
-                          seastar::stop_iteration::no);
-                      });
-                  } else {
-                    return conn->send(crimson::make_message<MPing>())
-                      .then([&count_ping] {
-                        count_ping += 1;
-                        return seastar::make_ready_future<seastar::stop_iteration>(
-                          seastar::stop_iteration::yes);
-                      });
-                  }
-                });
+                if (keepalive_dist(rng)) {
+                  return conn->send_keepalive(
+                  ).then([&count_keepalive] {
+                    count_keepalive += 1;
+                    return seastar::make_ready_future<seastar::stop_iteration>(
+                      seastar::stop_iteration::no);
+                  });
+                } else {
+                  return conn->send(crimson::make_message<MPing>()
+                  ).then([&count_ping] {
+                    count_ping += 1;
+                    return seastar::make_ready_future<seastar::stop_iteration>(
+                      seastar::stop_iteration::yes);
+                  });
+                }
+              });
             }).then([this, conn] {
-              auto found = pending_conns.find(conn);
+              auto found = pending_conns.find(&*conn);
+              assert(found != pending_conns.end());
               return found->second.get_future();
             }
           );
         });
       }
+
+    private:
+      // primary shard only
+      const seastar::shard_id primary_sid;
+      std::bernoulli_distribution keepalive_dist;
+      crimson::net::MessengerRef msgr;
+      std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
+      crimson::auth::DummyAuthClientServer dummy_auth;
+
+      // per shard
+      const unsigned rounds;
+      std::map<crimson::net::Connection*, PingSessionRef> sessions;
+      ShardedGates &gates;
     };
   };
 
   logger().info("test_echo(rounds={}, keepalive_ratio={}):",
                 rounds, keepalive_ratio);
-  auto server1 = seastar::make_shared<test_state::Server>();
-  auto server2 = seastar::make_shared<test_state::Server>();
-  auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
-  auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
-  // start servers and clients
-  auto addr1 = get_server_addr();
-  auto addr2 = get_server_addr();
-  addr1.set_type(entity_addr_t::TYPE_MSGR2);
-  addr2.set_type(entity_addr_t::TYPE_MSGR2);
-  return seastar::when_all_succeed(
-      server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
-      server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
-      client1->init(entity_name_t::OSD(2), "client1", 3),
-      client2->init(entity_name_t::OSD(3), "client2", 4)
-  // dispatch pingpoing
-  ).then_unpack([client1, client2, server1, server2] {
+  return ShardedGates::create(
+  ).then([rounds, keepalive_ratio](auto *gates) {
     return seastar::when_all_succeed(
+      test_state::Client::create(rounds, keepalive_ratio, gates),
+      test_state::Client::create(rounds, keepalive_ratio, gates),
+      seastar::make_ready_future<ShardedGates*>(gates));
+  }).then_unpack([](auto *client1, auto *client2, auto *gates) {
+    auto server1 = seastar::make_shared<test_state::Server>(*gates);
+    auto server2 = seastar::make_shared<test_state::Server>(*gates);
+    // start servers and clients
+    auto addr1 = get_server_addr();
+    auto addr2 = get_server_addr();
+    addr1.set_type(entity_addr_t::TYPE_MSGR2);
+    addr2.set_type(entity_addr_t::TYPE_MSGR2);
+    return seastar::when_all_succeed(
+        server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+        server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+        client1->init(entity_name_t::OSD(2), "client1", 3),
+        client2->init(entity_name_t::OSD(3), "client2", 4)
+    // dispatch pingpoing
+    ).then_unpack([client1, client2, server1, server2] {
+      return seastar::when_all_succeed(
         // test connecting in parallel, accepting in parallel
+        client1->dispatch_pingpong(server1->msgr->get_myaddr()),
         client1->dispatch_pingpong(server2->msgr->get_myaddr()),
-        client2->dispatch_pingpong(server1->msgr->get_myaddr()));
-  // shutdown
-  }).then_unpack([] {
-    return seastar::now();
-  }).then([client1] {
-    logger().info("client1 shutdown...");
-    return client1->shutdown();
-  }).then([client2] {
-    logger().info("client2 shutdown...");
-    return client2->shutdown();
-  }).then([server1] {
-    logger().info("server1 shutdown...");
-    return server1->shutdown();
-  }).then([server2] {
-    logger().info("server2 shutdown...");
-    return server2->shutdown();
-  }).then([] {
-    logger().info("test_echo() done!\n");
-  }).handle_exception([server1, server2, client1, client2] (auto eptr) {
-    logger().error("test_echo() failed: got exception {}", eptr);
-    throw;
-  });
-}
-
-static seastar::future<> test_concurrent_dispatch()
-{
-  struct test_state {
-    struct Server final
-      : public crimson::net::Dispatcher {
-      crimson::net::MessengerRef msgr;
-      int count = 0;
-      seastar::promise<> on_second; // satisfied on second dispatch
-      seastar::promise<> on_done; // satisfied when first dispatch unblocks
-      crimson::auth::DummyAuthClientServer dummy_auth;
-
-      std::optional<seastar::future<>> ms_dispatch(
-          crimson::net::ConnectionRef, MessageRef m) override {
-        switch (++count) {
-        case 1:
-          // block on the first request until we reenter with the second
-          std::ignore = on_second.get_future().then([this] { on_done.set_value(); });
-          break;
-        case 2:
-          on_second.set_value();
-          break;
-        default:
-          throw std::runtime_error("unexpected count");
-        }
-        return {seastar::now()};
-      }
-
-      seastar::future<> wait() { return on_done.get_future(); }
-
-      seastar::future<> init(const entity_name_t& name,
-                             const std::string& lname,
-                             const uint64_t nonce,
-                             const entity_addr_t& addr) {
-        msgr = crimson::net::Messenger::create(name, lname, nonce);
-        msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-        msgr->set_auth_client(&dummy_auth);
-        msgr->set_auth_server(&dummy_auth);
-        return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
-          return msgr->start({this});
-        }, crimson::net::Messenger::bind_ertr::all_same_way(
-            [addr] (const std::error_code& e) {
-          logger().error("test_concurrent_dispatch(): "
-                         "there is another instance running at {}", addr);
-          ceph_abort();
-        }));
-      }
-    };
-
-    struct Client final
-      : public crimson::net::Dispatcher {
-      crimson::net::MessengerRef msgr;
-      crimson::auth::DummyAuthClientServer dummy_auth;
-
-      std::optional<seastar::future<>> ms_dispatch(
-          crimson::net::ConnectionRef, MessageRef m) override {
-        return {seastar::now()};
-      }
-
-      seastar::future<> init(const entity_name_t& name,
-                             const std::string& lname,
-                             const uint64_t nonce) {
-        msgr = crimson::net::Messenger::create(name, lname, nonce);
-        msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-        msgr->set_auth_client(&dummy_auth);
-        msgr->set_auth_server(&dummy_auth);
-        return msgr->start({this});
-      }
-    };
-  };
-
-  logger().info("test_concurrent_dispatch():");
-  auto server = seastar::make_shared<test_state::Server>();
-  auto client = seastar::make_shared<test_state::Client>();
-  auto addr = get_server_addr();
-  addr.set_type(entity_addr_t::TYPE_MSGR2);
-  addr.set_family(AF_INET);
-  return seastar::when_all_succeed(
-      server->init(entity_name_t::OSD(4), "server3", 5, addr),
-      client->init(entity_name_t::OSD(5), "client3", 6)
-  ).then_unpack([server, client] {
-    auto conn = client->msgr->connect(server->msgr->get_myaddr(),
-                                      entity_name_t::TYPE_OSD);
-    // send two messages
-    return conn->send(crimson::make_message<MPing>()).then([conn] {
-      return conn->send(crimson::make_message<MPing>());
+        client2->dispatch_pingpong(server1->msgr->get_myaddr()),
+        client2->dispatch_pingpong(server2->msgr->get_myaddr()));
+    // shutdown
+    }).then_unpack([client1] {
+      logger().info("client1 shutdown...");
+      return client1->shutdown();
+    }).then([client2] {
+      logger().info("client2 shutdown...");
+      return client2->shutdown();
+    }).then([server1] {
+      logger().info("server1 shutdown...");
+      return server1->shutdown();
+    }).then([server2] {
+      logger().info("server2 shutdown...");
+      return server2->shutdown();
+    }).then([] {
+      logger().info("test_echo() done!\n");
+    }).handle_exception([](auto eptr) {
+      logger().error("test_echo() failed: got exception {}", eptr);
+      throw;
+    }).finally([gates, server1, server2] {
+      return gates->close();
     });
-  }).then([server] {
-    return server->wait();
-  }).then([client] {
-    logger().info("client shutdown...");
-    client->msgr->stop();
-    return client->msgr->shutdown();
-  }).then([server] {
-    logger().info("server shutdown...");
-    server->msgr->stop();
-    return server->msgr->shutdown();
-  }).then([] {
-    logger().info("test_concurrent_dispatch() done!\n");
-  }).handle_exception([server, client] (auto eptr) {
-    logger().error("test_concurrent_dispatch() failed: got exception {}", eptr);
-    throw;
   });
 }
 
@@ -390,7 +414,8 @@ seastar::future<> test_preemptive_shutdown() {
                              const std::string& lname,
                              const uint64_t nonce,
                              const entity_addr_t& addr) {
-        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr = crimson::net::Messenger::create(
+            name, lname, nonce, true);
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
@@ -429,7 +454,8 @@ seastar::future<> test_preemptive_shutdown() {
       seastar::future<> init(const entity_name_t& name,
                              const std::string& lname,
                              const uint64_t nonce) {
-        msgr = crimson::net::Messenger::create(name, lname, nonce);
+        msgr = crimson::net::Messenger::create(
+            name, lname, nonce, true);
         msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
@@ -496,7 +522,6 @@ using crimson::net::Dispatcher;
 using crimson::net::Interceptor;
 using crimson::net::Messenger;
 using crimson::net::MessengerRef;
-using crimson::net::SocketConnection;
 using crimson::net::SocketPolicy;
 using crimson::net::tag_bp_t;
 using namespace ceph::net::test;
@@ -643,18 +668,21 @@ using ConnResults = std::vector<ConnResult>;
 struct TestInterceptor : public Interceptor {
   std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
   std::map<Breakpoint, counter_t> breakpoints_counter;
-  std::map<ConnectionRef, unsigned> conns;
+  std::map<Connection*, unsigned> conns;
   ConnResults results;
   std::optional<seastar::abort_source> signal;
+  const seastar::shard_id primary_sid;
+
+  TestInterceptor() : primary_sid{seastar::this_shard_id()} {}
 
-  TestInterceptor() = default;
   // only used for copy breakpoint configurations
-  TestInterceptor(const TestInterceptor& other) {
+  TestInterceptor(const TestInterceptor& other) : primary_sid{other.primary_sid} {
     assert(other.breakpoints_counter.empty());
     assert(other.conns.empty());
     assert(other.results.empty());
     breakpoints = other.breakpoints;
     assert(!other.signal);
+    assert(seastar::this_shard_id() == primary_sid);
   }
 
   void make_fault(Breakpoint bp, unsigned round = 1) {
@@ -672,7 +700,8 @@ struct TestInterceptor : public Interceptor {
     breakpoints[bp][round] = bp_action_t::STALL;
   }
 
-  ConnResult* find_result(ConnectionRef conn) {
+  ConnResult* find_result(Connection *conn) {
+    assert(seastar::this_shard_id() == primary_sid);
     auto it = conns.find(conn);
     if (it == conns.end()) {
       return nullptr;
@@ -682,6 +711,7 @@ struct TestInterceptor : public Interceptor {
   }
 
   seastar::future<> wait() {
+    assert(seastar::this_shard_id() == primary_sid);
     assert(!signal);
     signal = seastar::abort_source();
     return seastar::sleep_abortable(10s, *signal).then([] {
@@ -692,6 +722,7 @@ struct TestInterceptor : public Interceptor {
   }
 
   void notify() {
+    assert(seastar::this_shard_id() == primary_sid);
     if (signal) {
       signal->request_abort();
       signal = std::nullopt;
@@ -699,25 +730,24 @@ struct TestInterceptor : public Interceptor {
   }
 
  private:
-  void register_conn(SocketConnection& _conn) override {
-    auto conn = _conn.get_local_shared_foreign_from_this();
-    auto result = find_result(conn);
+  void register_conn(ConnectionRef conn) override {
+    auto result = find_result(&*conn);
     if (result != nullptr) {
       logger().error("The connection [{}] {} already exists when register {}",
-                     result->index, *result->conn, _conn);
+                     result->index, *result->conn, *conn);
       ceph_abort();
     }
     unsigned index = results.size();
     results.emplace_back(conn, index);
-    conns[conn] = index;
+    conns[&*conn] = index;
     notify();
-    logger().info("[{}] {} new connection registered", index, _conn);
+    logger().info("[{}] {} new connection registered", index, *conn);
   }
 
-  void register_conn_closed(SocketConnection& conn) override {
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
+  void register_conn_closed(ConnectionRef conn) override {
+    auto result = find_result(&*conn);
     if (result == nullptr) {
-      logger().error("Untracked closed connection: {}", conn);
+      logger().error("Untracked closed connection: {}", *conn);
       ceph_abort();
     }
 
@@ -725,75 +755,97 @@ struct TestInterceptor : public Interceptor {
       result->state = conn_state_t::closed;
     }
     notify();
-    logger().info("[{}] {} closed({})", result->index, conn, result->state);
+    logger().info("[{}] {} closed({})", result->index, *conn, result->state);
   }
 
-  void register_conn_ready(SocketConnection& conn) override {
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
+  void register_conn_ready(ConnectionRef conn) override {
+    auto result = find_result(&*conn);
     if (result == nullptr) {
-      logger().error("Untracked ready connection: {}", conn);
+      logger().error("Untracked ready connection: {}", *conn);
       ceph_abort();
     }
 
-    ceph_assert(conn.is_connected());
+    ceph_assert(conn->is_protocol_ready());
     notify();
-    logger().info("[{}] {} ready", result->index, conn);
+    logger().info("[{}] {} ready", result->index, *conn);
   }
 
-  void register_conn_replaced(SocketConnection& conn) override {
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
+  void register_conn_replaced(ConnectionRef conn) override {
+    auto result = find_result(&*conn);
     if (result == nullptr) {
-      logger().error("Untracked replaced connection: {}", conn);
+      logger().error("Untracked replaced connection: {}", *conn);
       ceph_abort();
     }
 
     result->state = conn_state_t::replaced;
-    logger().info("[{}] {} {}", result->index, conn, result->state);
+    logger().info("[{}] {} {}", result->index, *conn, result->state);
   }
 
-  bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
-    ++breakpoints_counter[bp].counter;
+  seastar::future<bp_action_t>
+  intercept(Connection &_conn, std::vector<Breakpoint> bps) override {
+    assert(bps.size() >= 1);
+    Connection *conn = &_conn;
 
-    auto result = find_result(conn.get_local_shared_foreign_from_this());
-    if (result == nullptr) {
-      logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
-                     conn, bp, breakpoints_counter[bp].counter);
-      ceph_abort();
-    }
+    return seastar::smp::submit_to(primary_sid, [conn, bps, this] {
+      std::vector<bp_action_t> actions;
+      for (const Breakpoint &bp : bps) {
+        ++breakpoints_counter[bp].counter;
 
-    if (bp == custom_bp_t::SOCKET_CONNECTING) {
-      ++result->connect_attempts;
-      logger().info("[Test] connect_attempts={}", result->connect_attempts);
-    } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
-      ++result->client_connect_attempts;
-      logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
-    } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
-      ++result->client_reconnect_attempts;
-      logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
-    } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
-      ++result->accept_attempts;
-      logger().info("[Test] accept_attempts={}", result->accept_attempts);
-    } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
-      ++result->server_connect_attempts;
-      logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
-    } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
-      ++result->server_reconnect_attempts;
-      logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
-    }
+        auto result = find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
+                         *conn, bp, breakpoints_counter[bp].counter);
+          ceph_abort();
+        }
 
-    auto it_bp = breakpoints.find(bp);
-    if (it_bp != breakpoints.end()) {
-      auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
-      if (it_cnt != it_bp->second.end()) {
-        logger().info("[{}] {} intercepted {}({}) => {}",
-                      result->index, conn, bp,
-                      breakpoints_counter[bp].counter, it_cnt->second);
-        return it_cnt->second;
+        if (bp == custom_bp_t::SOCKET_CONNECTING) {
+          ++result->connect_attempts;
+          logger().info("[Test] connect_attempts={}", result->connect_attempts);
+        } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
+          ++result->client_connect_attempts;
+          logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
+        } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
+          ++result->client_reconnect_attempts;
+          logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
+        } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
+          ++result->accept_attempts;
+          logger().info("[Test] accept_attempts={}", result->accept_attempts);
+        } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
+          ++result->server_connect_attempts;
+          logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
+        } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+          ++result->server_reconnect_attempts;
+          logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
+        }
+
+        auto it_bp = breakpoints.find(bp);
+        if (it_bp != breakpoints.end()) {
+          auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
+          if (it_cnt != it_bp->second.end()) {
+            logger().info("[{}] {} intercepted {}({}) => {}",
+                          result->index, *conn, bp,
+                          breakpoints_counter[bp].counter, it_cnt->second);
+            actions.emplace_back(it_cnt->second);
+            continue;
+          }
+        }
+        logger().info("[{}] {} intercepted {}({})",
+                      result->index, *conn, bp, breakpoints_counter[bp].counter);
+        actions.emplace_back(bp_action_t::CONTINUE);
       }
-    }
-    logger().info("[{}] {} intercepted {}({})",
-                  result->index, conn, bp, breakpoints_counter[bp].counter);
-    return bp_action_t::CONTINUE;
+
+      bp_action_t action = bp_action_t::CONTINUE;
+      for (bp_action_t &a : actions) {
+        if (a != bp_action_t::CONTINUE) {
+          if (action == bp_action_t::CONTINUE) {
+            action = a;
+          } else {
+            ceph_abort("got multiple incompatible actions");
+          }
+        }
+      }
+      return seastar::make_ready_future<bp_action_t>(action);
+    });
   }
 };
 
@@ -824,116 +876,163 @@ class FailoverSuite : public Dispatcher {
   TestInterceptor interceptor;
 
   unsigned tracked_index = 0;
-  ConnectionRef tracked_conn;
+  Connection *tracked_conn = nullptr;
   unsigned pending_send = 0;
   unsigned pending_peer_receive = 0;
   unsigned pending_receive = 0;
 
-  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
-    auto result = interceptor.find_result(c);
-    if (result == nullptr) {
-      logger().error("Untracked ms dispatched connection: {}", *c);
-      ceph_abort();
-    }
-
-    if (tracked_conn != c) {
-      logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
-                     result->index, *c, tracked_index, *tracked_conn);
-      ceph_abort();
-    }
-    ceph_assert(result->index == tracked_index);
+  ShardedGates &gates;
+  const seastar::shard_id primary_sid;
 
+  std::optional<seastar::future<>> ms_dispatch(
+      ConnectionRef conn_ref, MessageRef m) override {
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-    ceph_assert(pending_receive > 0);
-    --pending_receive;
-    if (pending_receive == 0) {
-      interceptor.notify();
-    }
-    logger().info("[Test] got op, left {} ops -- [{}] {}",
-                  pending_receive, result->index, *c);
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked ms dispatched connection: {}", *conn);
+          ceph_abort();
+        }
+
+        if (tracked_conn != &*conn) {
+          logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+                        result->index, *conn, tracked_index, *tracked_conn);
+        } else {
+          ceph_assert(result->index == tracked_index);
+        }
+
+        ceph_assert(pending_receive > 0);
+        --pending_receive;
+        if (pending_receive == 0) {
+          interceptor.notify();
+        }
+        logger().info("[Test] got op, left {} ops -- [{}] {}",
+                      pending_receive, result->index, *conn);
+      }).then([conn_ref] {});
+    });
     return {seastar::now()};
   }
 
-  void ms_handle_accept(ConnectionRef conn) override {
-    auto result = interceptor.find_result(conn);
-    if (result == nullptr) {
-      logger().error("Untracked accepted connection: {}", *conn);
-      ceph_abort();
-    }
+  void ms_handle_accept(
+      ConnectionRef conn_ref,
+      seastar::shard_id prv_shard,
+      bool is_replace) override {
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked accepted connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn &&
-        !tracked_conn->is_closed() &&
-        tracked_conn != conn) {
-      logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
-    }
+        if (tracked_conn &&
+            !tracked_conn->is_protocol_closed() &&
+            tracked_conn != &*conn) {
+          logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
+                         result->index, *conn, tracked_index, *tracked_conn);
+          ceph_abort();
+        }
 
-    tracked_index = result->index;
-    tracked_conn = conn;
-    ++result->cnt_accept_dispatched;
-    logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
-                  result->cnt_accept_dispatched, result->index, *conn);
-    std::ignore = flush_pending_send();
+        tracked_index = result->index;
+        tracked_conn = &*conn;
+        ++result->cnt_accept_dispatched;
+        logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
+                      result->cnt_accept_dispatched, result->index, *conn);
+        return flush_pending_send();
+      }).then([conn_ref] {});
+    });
   }
 
-  void ms_handle_connect(ConnectionRef conn) override {
-    auto result = interceptor.find_result(conn);
-    if (result == nullptr) {
-      logger().error("Untracked connected connection: {}", *conn);
-      ceph_abort();
-    }
+  void ms_handle_connect(
+      ConnectionRef conn_ref,
+      seastar::shard_id prv_shard) override {
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked connected connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn != conn) {
-      logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
-    }
-    ceph_assert(result->index == tracked_index);
+        if (tracked_conn &&
+            !tracked_conn->is_protocol_closed() &&
+            tracked_conn != &*conn) {
+          logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
+                         result->index, *conn, tracked_index, *tracked_conn);
+          ceph_abort();
+        }
+
+        if (tracked_conn == &*conn) {
+          ceph_assert(result->index == tracked_index);
+        }
 
-    ++result->cnt_connect_dispatched;
-    logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
-                  result->cnt_connect_dispatched, result->index, *conn);
+        ++result->cnt_connect_dispatched;
+        logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
+                      result->cnt_connect_dispatched, result->index, *conn);
+      }).then([conn_ref] {});
+    });
   }
 
-  void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
-    auto result = interceptor.find_result(conn);
-    if (result == nullptr) {
-      logger().error("Untracked reset connection: {}", *conn);
-      ceph_abort();
-    }
+  void ms_handle_reset(
+      ConnectionRef conn_ref,
+      bool is_replace) override {
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked reset connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn != conn) {
-      logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
-    }
-    ceph_assert(result->index == tracked_index);
+        if (tracked_conn != &*conn) {
+          logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+                        result->index, *conn, tracked_index, *tracked_conn);
+        } else {
+          ceph_assert(result->index == tracked_index);
+          tracked_index = 0;
+          tracked_conn = nullptr;
+        }
 
-    tracked_index = 0;
-    tracked_conn = nullptr;
-    ++result->cnt_reset_dispatched;
-    logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
-                  result->cnt_reset_dispatched, result->index, *conn);
+        ++result->cnt_reset_dispatched;
+        logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
+                      result->cnt_reset_dispatched, result->index, *conn);
+      }).then([conn_ref] {});
+    });
   }
 
-  void ms_handle_remote_reset(ConnectionRef conn) override {
-    auto result = interceptor.find_result(conn);
-    if (result == nullptr) {
-      logger().error("Untracked remotely reset connection: {}", *conn);
-      ceph_abort();
-    }
+  void ms_handle_remote_reset(
+      ConnectionRef conn_ref) override {
+    Connection *conn = &*conn_ref;
+    gates.dispatch_in_background("TestSuite_ms_dispatch",
+        [this, conn, conn_ref] {
+      return seastar::smp::submit_to(primary_sid, [this, conn] {
+        auto result = interceptor.find_result(&*conn);
+        if (result == nullptr) {
+          logger().error("Untracked remotely reset connection: {}", *conn);
+          ceph_abort();
+        }
 
-    if (tracked_conn != conn) {
-      logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
-                     result->index, *conn, tracked_index, *tracked_conn);
-      ceph_abort();
-    }
-    ceph_assert(result->index == tracked_index);
+        if (tracked_conn != &*conn) {
+          logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+                        result->index, *conn, tracked_index, *tracked_conn);
+        } else {
+          ceph_assert(result->index == tracked_index);
+        }
 
-    ++result->cnt_remote_reset_dispatched;
-    logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
-                  result->cnt_remote_reset_dispatched, result->index, *conn);
+        ++result->cnt_remote_reset_dispatched;
+        logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
+                      result->cnt_remote_reset_dispatched, result->index, *conn);
+      }).then([conn_ref] {});
+    });
   }
 
  private:
@@ -953,6 +1052,7 @@ class FailoverSuite : public Dispatcher {
 
   seastar::future<> send_op(bool expect_reply=true) {
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     if (expect_reply) {
       ++pending_peer_receive;
     }
@@ -969,6 +1069,7 @@ class FailoverSuite : public Dispatcher {
       logger().info("[Test] flush sending {} ops", pending_send);
     }
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     return seastar::do_until(
         [this] { return pending_send == 0; },
         [this] {
@@ -980,21 +1081,16 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> wait_ready(unsigned num_ready_conns,
                                unsigned num_replaced,
                                bool wait_received) {
+    assert(seastar::this_shard_id() == primary_sid);
     unsigned pending_conns = 0;
     unsigned pending_establish = 0;
     unsigned replaced_conns = 0;
     for (auto& result : interceptor.results) {
-      if (result.conn->is_closed_clean()) {
+      if (result.conn->is_protocol_closed_clean()) {
         if (result.state == conn_state_t::replaced) {
           ++replaced_conns;
         }
-      } else if (result.conn->is_connected()) {
-        if (tracked_conn != result.conn || tracked_index != result.index) {
-          throw std::runtime_error(fmt::format(
-                "The connected connection [{}] {} doesn't"
-                " match the tracked connection [{}] {}",
-                result.index, *result.conn, tracked_index, *tracked_conn));
-        }
+      } else if (result.conn->is_protocol_ready()) {
         if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
           result.state = conn_state_t::established;
         } else {
@@ -1018,15 +1114,22 @@ class FailoverSuite : public Dispatcher {
         do_wait = true;
       }
     }
-    if (wait_received &&
-        (pending_send || pending_peer_receive || pending_receive)) {
-      if (pending_conns || pending_establish) {
-        logger().info("[Test] wait_ready(): wait for pending_send={},"
-                      " pending_peer_receive={}, pending_receive={},"
-                      " pending {}/{} ready/establish connections ...",
-                      pending_send, pending_peer_receive, pending_receive,
-                      pending_conns, pending_establish);
-        do_wait = true;
+    if (wait_received) {
+      if (pending_send || pending_peer_receive || pending_receive) {
+        if (pending_conns || pending_establish) {
+          logger().info("[Test] wait_ready(): wait for pending_send={},"
+                        " pending_peer_receive={}, pending_receive={},"
+                        " pending {}/{} ready/establish connections ...",
+                        pending_send, pending_peer_receive, pending_receive,
+                        pending_conns, pending_establish);
+          do_wait = true;
+        } else {
+          // If there are pending messages, stop waiting if there are
+          // no longer pending connections.
+        }
+      } else {
+         // Stop waiting if there are no pending messages. Pending connections
+         // should not be important.
       }
     }
     if (num_replaced > 0) {
@@ -1058,10 +1161,13 @@ class FailoverSuite : public Dispatcher {
  public:
   FailoverSuite(MessengerRef test_msgr,
                 entity_addr_t test_peer_addr,
-                const TestInterceptor& interceptor)
+                const TestInterceptor& interceptor,
+                ShardedGates &gates)
     : test_msgr(test_msgr),
       test_peer_addr(test_peer_addr),
-      interceptor(interceptor) { }
+      interceptor(interceptor),
+      gates{gates},
+      primary_sid{seastar::this_shard_id()} { }
 
   entity_addr_t get_addr() const {
     return test_msgr->get_myaddr();
@@ -1112,10 +1218,17 @@ class FailoverSuite : public Dispatcher {
   create(entity_addr_t test_addr,
          SocketPolicy test_policy,
          entity_addr_t test_peer_addr,
-         const TestInterceptor& interceptor) {
+         const TestInterceptor& interceptor,
+         ShardedGates &gates) {
     auto suite = std::make_unique<FailoverSuite>(
-        Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE),
-        test_peer_addr, interceptor);
+        Messenger::create(
+          entity_name_t::OSD(TEST_OSD),
+          "Test",
+          TEST_NONCE,
+          false),
+        test_peer_addr,
+        interceptor,
+        gates);
     return suite->init(test_addr, test_policy
     ).then([suite = std::move(suite)] () mutable {
       return std::move(suite);
@@ -1126,31 +1239,35 @@ class FailoverSuite : public Dispatcher {
  public:
   seastar::future<> connect_peer() {
     logger().info("[Test] connect_peer({})", test_peer_addr);
+    assert(seastar::this_shard_id() == primary_sid);
     auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
-    auto result = interceptor.find_result(conn);
+    auto result = interceptor.find_result(&*conn);
     ceph_assert(result != nullptr);
 
     if (tracked_conn) {
-      if (tracked_conn->is_closed()) {
-        ceph_assert(tracked_conn != conn);
-        logger().info("[Test] this is a new session replacing an closed one");
+      if (tracked_conn->is_protocol_closed()) {
+        logger().info("[Test] this is a new session"
+                      " replacing an closed one");
+        ceph_assert(tracked_conn != &*conn);
       } else {
-        ceph_assert(tracked_index == result->index);
-        ceph_assert(tracked_conn == conn);
         logger().info("[Test] this is not a new session");
+        ceph_assert(tracked_index == result->index);
+        ceph_assert(tracked_conn == &*conn);
       }
     } else {
       logger().info("[Test] this is a new session");
     }
     tracked_index = result->index;
-    tracked_conn = conn;
+    tracked_conn = &*conn;
 
     return flush_pending_send();
   }
 
   seastar::future<> send_peer() {
+    assert(seastar::this_shard_id() == primary_sid);
     if (tracked_conn) {
       logger().info("[Test] send_peer()");
+      ceph_assert(!tracked_conn->is_protocol_closed());
       ceph_assert(!pending_send);
       return send_op();
     } else {
@@ -1162,33 +1279,47 @@ class FailoverSuite : public Dispatcher {
 
   seastar::future<> keepalive_peer() {
     logger().info("[Test] keepalive_peer()");
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     return tracked_conn->send_keepalive();
   }
 
   seastar::future<> try_send_peer() {
     logger().info("[Test] try_send_peer()");
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
+    ceph_assert(!tracked_conn->is_protocol_closed());
     return send_op(false);
   }
 
   seastar::future<> markdown() {
     logger().info("[Test] markdown() in 100ms ...");
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
     // sleep to propagate potential remaining acks
-    return seastar::sleep(100ms
+    return seastar::sleep(50ms
     ).then([this] {
-      tracked_conn->mark_down();
+      return seastar::smp::submit_to(
+          tracked_conn->get_shard_id(), [tracked_conn=tracked_conn] {
+        assert(tracked_conn->get_shard_id() == seastar::this_shard_id());
+        tracked_conn->mark_down();
+      });
+    }).then([] {
+      // sleep to wait for markdown propagate to the primary sid
+      return seastar::sleep(100ms);
     });
   }
 
   seastar::future<> wait_blocked() {
     logger().info("[Test] wait_blocked() ...");
+    assert(seastar::this_shard_id() == primary_sid);
     return interceptor.blocker.wait_blocked();
   }
 
   void unblock() {
     logger().info("[Test] unblock()");
+    assert(seastar::this_shard_id() == primary_sid);
     return interceptor.blocker.unblock();
   }
 
@@ -1211,8 +1342,9 @@ class FailoverSuite : public Dispatcher {
   }
 
   bool is_standby() {
+    assert(seastar::this_shard_id() == primary_sid);
     ceph_assert(tracked_conn);
-    return !(tracked_conn->is_connected() || tracked_conn->is_closed());
+    return tracked_conn->is_protocol_standby();
   }
 };
 
@@ -1328,7 +1460,11 @@ class FailoverTest : public Dispatcher {
          entity_addr_t cmd_peer_addr,
          entity_addr_t test_peer_addr) {
     auto test = seastar::make_lw_shared<FailoverTest>(
-        Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE),
+        Messenger::create(
+          entity_name_t::OSD(CMD_CLI_OSD),
+          "CmdCli",
+          CMD_CLI_NONCE,
+          true),
         test_addr, test_peer_addr);
     return test->init(cmd_peer_addr).then([test] {
       logger().info("CmdCli ready");
@@ -1347,26 +1483,34 @@ class FailoverTest : public Dispatcher {
     logger().info("\n\n[{}]", name);
     ceph_assert(!test_suite);
     SocketPolicy test_policy_ = to_socket_policy(test_policy);
-    return FailoverSuite::create(
-        test_addr, test_policy_, test_peer_addr, interceptor
-    ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
-      ceph_assert(suite->get_addr() == test_addr);
-      test_suite.swap(suite);
-      return start_peer(peer_policy).then([this, f = std::move(f)] {
-        return f(*test_suite);
-      }).then([this] {
-        test_suite->post_check();
-        logger().info("\n[SUCCESS]");
-      }).handle_exception([this] (auto eptr) {
-        logger().info("\n[FAIL: {}]", eptr);
-        test_suite->dump_results();
-        throw;
-      }).then([this] {
-        return stop_peer();
-      }).then([this] {
-        return test_suite->shutdown().then([this] {
-          test_suite.reset();
+    return ShardedGates::create(
+    ).then([this, test_policy_, peer_policy, interceptor,
+            f=std::move(f)](auto *gates) mutable {
+      return FailoverSuite::create(
+        test_addr, test_policy_, test_peer_addr, interceptor, *gates
+      ).then([this, peer_policy, f = std::move(f)](auto suite) mutable {
+        ceph_assert(suite->get_addr() == test_addr);
+        test_suite.swap(suite);
+        return start_peer(peer_policy
+        ).then([this, f = std::move(f)] {
+          return f(*test_suite);
+        }).then([this] {
+          test_suite->post_check();
+          logger().info("\n[SUCCESS]");
+        }).handle_exception([this](auto eptr) {
+          logger().info("\n[FAIL: {}]", eptr);
+          test_suite->dump_results();
+          throw;
+        }).then([this] {
+          return stop_peer();
+        }).then([this] {
+          return test_suite->shutdown(
+          ).then([this] {
+            test_suite.reset();
+          });
         });
+      }).then([gates] {
+        return gates->close();
       });
     });
   }
@@ -1427,27 +1571,32 @@ class FailoverSuitePeer : public Dispatcher {
   ConnectionRef tracked_conn;
   unsigned pending_send = 0;
 
-  std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+  std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
     logger().info("[TestPeer] got op from Test");
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-    ceph_assert(tracked_conn == c);
     std::ignore = op_callback();
     return {seastar::now()};
   }
 
-  void ms_handle_accept(ConnectionRef conn) override {
+  void ms_handle_accept(
+      ConnectionRef conn,
+      seastar::shard_id prv_shard,
+      bool is_replace) override {
+    assert(prv_shard == seastar::this_shard_id());
     logger().info("[TestPeer] got accept from Test");
-    ceph_assert(!tracked_conn ||
-                tracked_conn->is_closed() ||
-                tracked_conn == conn);
+
+    if (tracked_conn &&
+        !tracked_conn->is_protocol_closed() &&
+        tracked_conn != conn) {
+      logger().error("[TestPeer] {} got accepted, but there's already a valid traced_conn {}",
+                     *conn, *tracked_conn);
+    }
     tracked_conn = conn;
     std::ignore = flush_pending_send();
   }
 
   void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
     logger().info("[TestPeer] got reset from Test");
-    ceph_assert(tracked_conn == conn);
-    tracked_conn = nullptr;
   }
 
  private:
@@ -1466,6 +1615,11 @@ class FailoverSuitePeer : public Dispatcher {
 
   seastar::future<> send_op() {
     ceph_assert(tracked_conn);
+    if (tracked_conn->is_protocol_closed()) {
+      logger().error("[TestPeer] send op but the connection is closed -- {}",
+                     *tracked_conn);
+    }
+
     pg_t pgid;
     object_locator_t oloc;
     hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
@@ -1489,7 +1643,8 @@ class FailoverSuitePeer : public Dispatcher {
 
  public:
   FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
-    : peer_msgr(peer_msgr), op_callback(op_callback) { }
+    : peer_msgr(peer_msgr),
+      op_callback(op_callback) { }
 
   seastar::future<> shutdown() {
     peer_msgr->stop();
@@ -1498,26 +1653,29 @@ class FailoverSuitePeer : public Dispatcher {
 
   seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
     logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
-    auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+    auto conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+
     if (tracked_conn) {
-      if (tracked_conn->is_closed()) {
-        ceph_assert(tracked_conn != new_tracked_conn);
+      if (tracked_conn->is_protocol_closed()) {
         logger().info("[TestPeer] this is a new session"
                       " replacing an closed one");
+        ceph_assert(tracked_conn != conn);
       } else {
-        ceph_assert(tracked_conn == new_tracked_conn);
         logger().info("[TestPeer] this is not a new session");
+        ceph_assert(tracked_conn == conn);
       }
     } else {
       logger().info("[TestPeer] this is a new session");
     }
-    tracked_conn = new_tracked_conn;
+    tracked_conn = conn;
+
     return flush_pending_send();
   }
 
   seastar::future<> send_peer() {
     if (tracked_conn) {
       logger().info("[TestPeer] send_peer()");
+      ceph_assert(!pending_send);
       return send_op();
     } else {
       ++pending_send;
@@ -1545,7 +1703,8 @@ class FailoverSuitePeer : public Dispatcher {
       Messenger::create(
         entity_name_t::OSD(TEST_PEER_OSD),
         "TestPeer",
-        TEST_PEER_NONCE),
+        TEST_PEER_NONCE,
+        true),
       op_callback
     );
     return suite->init(test_peer_addr, policy
@@ -1590,7 +1749,11 @@ class FailoverTestPeer : public Dispatcher {
     return {seastar::now()};
   }
 
-  void ms_handle_accept(ConnectionRef conn) override {
+  void ms_handle_accept(
+      ConnectionRef conn,
+      seastar::shard_id prv_shard,
+      bool is_replace) override {
+    assert(prv_shard == seastar::this_shard_id());
     cmd_conn = conn;
   }
 
@@ -1667,7 +1830,11 @@ class FailoverTestPeer : public Dispatcher {
   static seastar::future<std::unique_ptr<FailoverTestPeer>>
   create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
     auto test_peer = std::make_unique<FailoverTestPeer>(
-        Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE),
+        Messenger::create(
+          entity_name_t::OSD(CMD_SRV_OSD),
+          "CmdSrv",
+          CMD_SRV_NONCE,
+          true),
         test_peer_addr);
     return test_peer->init(cmd_peer_addr
     ).then([test_peer = std::move(test_peer)] () mutable {
@@ -3617,8 +3784,11 @@ seastar::future<int> do_test(seastar::app_template& app)
                                               CEPH_ENTITY_TYPE_CLIENT,
                                               &cluster,
                                               &conf_file_list);
-  return crimson::common::sharded_conf().start(init_params.name, cluster)
-  .then([conf_file_list] {
+  return crimson::common::sharded_conf().start(
+    init_params.name, cluster
+  ).then([] {
+    return local_conf().start();
+  }).then([conf_file_list] {
     return local_conf().parse_config_files(conf_file_list);
   }).then([&app] {
     auto&& config = app.configuration();
@@ -3642,14 +3812,13 @@ seastar::future<int> do_test(seastar::app_template& app)
 
     logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
                   "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
-                  "testpeer_islocal={}, peer_wins={}",
+                  "testpeer_islocal={}, peer_wins={}, smp={}",
                   verbose, rounds, keepalive_ratio,
                   test_addr, cmd_peer_addr, test_peer_addr,
-                  testpeer_islocal, peer_wins);
+                  testpeer_islocal, peer_wins,
+                  seastar::smp::count);
     return test_echo(rounds, keepalive_ratio
     ).then([] {
-      return test_concurrent_dispatch();
-    }).then([] {
       return test_preemptive_shutdown();
     }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
       return test_v2_protocol(