]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/crimson/test_socket.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / crimson / test_socket.cc
index 5fd596a09282100141cd2013d403e6aa32ad728d..2b61196ead8dfd6ef364d97ea399af88f147c55a 100644 (file)
@@ -24,167 +24,190 @@ using namespace std::chrono_literals;
 using seastar::engine;
 using seastar::future;
 using crimson::net::error;
-using crimson::net::FixedCPUServerSocket;
 using crimson::net::listen_ertr;
+using crimson::net::ShardedServerSocket;
 using crimson::net::Socket;
 using crimson::net::SocketRef;
 using crimson::net::stop_t;
 
 using SocketFRef = seastar::foreign_ptr<SocketRef>;
 
-static seastar::logger logger{"crimsontest"};
-static entity_addr_t get_server_addr() {
-  static int port = 9020;
-  ++port;
-  ceph_assert(port < 9030 && "socket and messenger test ports should not overlap");
+seastar::logger &logger() {
+  return crimson::get_logger(ceph_subsys_test);
+}
+
+entity_addr_t get_server_addr() {
   entity_addr_t saddr;
   saddr.parse("127.0.0.1", nullptr);
-  saddr.set_port(port);
+  saddr.set_port(9020);
   return saddr;
 }
 
 future<SocketRef> socket_connect(const entity_addr_t& saddr) {
-  logger.debug("socket_connect() to {} ...", saddr);
-  return Socket::connect(saddr).then([] (auto socket) {
-    logger.debug("socket_connect() connected");
+  logger().debug("socket_connect() to {} ...", saddr);
+  return Socket::connect(saddr).then([](auto socket) {
+    logger().debug("socket_connect() connected");
     return socket;
   });
 }
 
 future<> test_refused() {
-  logger.info("test_refused()...");
+  logger().info("test_refused()...");
   auto saddr = get_server_addr();
   return socket_connect(saddr).discard_result().then([saddr] {
-    logger.error("test_refused(): connection to {} is not refused", saddr);
+    logger().error("test_refused(): connection to {} is not refused", saddr);
     ceph_abort();
-  }).handle_exception_type([] (const std::system_error& e) {
+  }).handle_exception_type([](const std::system_error& e) {
     if (e.code() != std::errc::connection_refused) {
-      logger.error("test_refused() got unexpeted error {}", e);
+      logger().error("test_refused() got unexpeted error {}", e);
       ceph_abort();
     } else {
-      logger.info("test_refused() ok\n");
+      logger().info("test_refused() ok\n");
     }
-  }).handle_exception([] (auto eptr) {
-    logger.error("test_refused() got unexpeted exception {}", eptr);
+  }).handle_exception([](auto eptr) {
+    logger().error("test_refused() got unexpeted exception {}", eptr);
     ceph_abort();
   });
 }
 
-future<> test_bind_same() {
-  logger.info("test_bind_same()...");
-  return FixedCPUServerSocket::create().then([] (auto pss1) {
+future<> test_bind_same(bool is_fixed_cpu) {
+  logger().info("test_bind_same()...");
+  return ShardedServerSocket::create(is_fixed_cpu
+  ).then([is_fixed_cpu](auto pss1) {
     auto saddr = get_server_addr();
-    return pss1->listen(saddr).safe_then([saddr] {
+    return pss1->listen(saddr).safe_then([saddr, is_fixed_cpu] {
       // try to bind the same address
-      return FixedCPUServerSocket::create().then([saddr] (auto pss2) {
+      return ShardedServerSocket::create(is_fixed_cpu
+      ).then([saddr](auto pss2) {
         return pss2->listen(saddr).safe_then([] {
-          logger.error("test_bind_same() should raise address_in_use");
+          logger().error("test_bind_same() should raise address_in_use");
           ceph_abort();
         }, listen_ertr::all_same_way(
-            [] (const std::error_code& e) {
+            [](const std::error_code& e) {
           if (e == std::errc::address_in_use) {
             // successful!
-            logger.info("test_bind_same() ok\n");
+            logger().info("test_bind_same() ok\n");
           } else {
-            logger.error("test_bind_same() got unexpected error {}", e);
+            logger().error("test_bind_same() got unexpected error {}", e);
             ceph_abort();
           }
           // Note: need to return a explicit ready future, or there will be a
           // runtime error: member access within null pointer of type 'struct promise_base'
           return seastar::now();
         })).then([pss2] {
-          return pss2->destroy();
+          return pss2->shutdown_destroy();
         });
       });
     }, listen_ertr::all_same_way(
-        [saddr] (const std::error_code& e) {
-      logger.error("test_bind_same(): there is another instance running at {}",
-                   saddr);
+        [saddr](const std::error_code& e) {
+      logger().error("test_bind_same(): there is another instance running at {}",
+                     saddr);
       ceph_abort();
     })).then([pss1] {
-      return pss1->destroy();
-    }).handle_exception([] (auto eptr) {
-      logger.error("test_bind_same() got unexpeted exception {}", eptr);
+      return pss1->shutdown_destroy();
+    }).handle_exception([](auto eptr) {
+      logger().error("test_bind_same() got unexpeted exception {}", eptr);
       ceph_abort();
     });
   });
 }
 
-future<> test_accept() {
-  logger.info("test_accept()");
-  return FixedCPUServerSocket::create().then([] (auto pss) {
+future<> test_accept(bool is_fixed_cpu) {
+  logger().info("test_accept()");
+  return ShardedServerSocket::create(is_fixed_cpu
+  ).then([](auto pss) {
     auto saddr = get_server_addr();
-    return pss->listen(saddr).safe_then([pss] {
-      return pss->accept([] (auto socket, auto paddr) {
+    return pss->listen(saddr
+    ).safe_then([pss] {
+      return pss->accept([](auto socket, auto paddr) {
+        logger().info("test_accept(): accepted at shard {}", seastar::this_shard_id());
         // simple accept
-        return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
-          return socket->close().finally([cleanup = std::move(socket)] {});
+        return seastar::sleep(100ms
+        ).then([socket = std::move(socket)]() mutable {
+          return socket->close(
+          ).finally([cleanup = std::move(socket)] {});
         });
       });
     }, listen_ertr::all_same_way(
-        [saddr] (const std::error_code& e) {
-      logger.error("test_accept(): there is another instance running at {}",
-                   saddr);
+        [saddr](const std::error_code& e) {
+      logger().error("test_accept(): there is another instance running at {}",
+                     saddr);
       ceph_abort();
     })).then([saddr] {
       return seastar::when_all(
-        socket_connect(saddr).then([] (auto socket) {
+        socket_connect(saddr).then([](auto socket) {
           return socket->close().finally([cleanup = std::move(socket)] {}); }),
-        socket_connect(saddr).then([] (auto socket) {
+        socket_connect(saddr).then([](auto socket) {
           return socket->close().finally([cleanup = std::move(socket)] {}); }),
-        socket_connect(saddr).then([] (auto socket) {
+        socket_connect(saddr).then([](auto socket) {
           return socket->close().finally([cleanup = std::move(socket)] {}); })
       ).discard_result();
     }).then([] {
       // should be enough to be connected locally
       return seastar::sleep(50ms);
     }).then([] {
-      logger.info("test_accept() ok\n");
+      logger().info("test_accept() ok\n");
     }).then([pss] {
-      return pss->destroy();
-    }).handle_exception([] (auto eptr) {
-      logger.error("test_accept() got unexpeted exception {}", eptr);
+      return pss->shutdown_destroy();
+    }).handle_exception([](auto eptr) {
+      logger().error("test_accept() got unexpeted exception {}", eptr);
       ceph_abort();
     });
   });
 }
 
 class SocketFactory {
+  static constexpr seastar::shard_id CLIENT_CPU = 0u;
   SocketRef client_socket;
-  SocketFRef server_socket;
-  FixedCPUServerSocket *pss = nullptr;
   seastar::promise<> server_connected;
 
+  static constexpr seastar::shard_id SERVER_CPU = 1u;
+  ShardedServerSocket *pss = nullptr;
+
+  seastar::shard_id server_socket_CPU;
+  SocketFRef server_socket;
+
  public:
-  // cb_client() on CPU#0, cb_server() on CPU#1
   template <typename FuncC, typename FuncS>
-  static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
-    assert(seastar::this_shard_id() == 0u);
+  static future<> dispatch_sockets(
+      bool is_fixed_cpu,
+      FuncC&& cb_client,
+      FuncS&& cb_server) {
+    ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU);
     auto owner = std::make_unique<SocketFactory>();
     auto psf = owner.get();
     auto saddr = get_server_addr();
-    return seastar::smp::submit_to(1u, [psf, saddr] {
-      return FixedCPUServerSocket::create().then([psf, saddr] (auto pss) {
+    return seastar::smp::submit_to(SERVER_CPU, [psf, saddr, is_fixed_cpu] {
+      return ShardedServerSocket::create(is_fixed_cpu
+      ).then([psf, saddr](auto pss) {
         psf->pss = pss;
         return pss->listen(saddr
-        ).safe_then([]{}, listen_ertr::all_same_way(
-            [saddr] (const std::error_code& e) {
-          logger.error("dispatch_sockets(): there is another instance running at {}",
-                       saddr);
+        ).safe_then([] {
+        }, listen_ertr::all_same_way([saddr](const std::error_code& e) {
+          logger().error("dispatch_sockets(): there is another instance running at {}",
+                         saddr);
           ceph_abort();
         }));
       });
     }).then([psf, saddr] {
       return seastar::when_all_succeed(
-        seastar::smp::submit_to(0u, [psf, saddr] {
-          return socket_connect(saddr).then([psf] (auto socket) {
+        seastar::smp::submit_to(CLIENT_CPU, [psf, saddr] {
+          return socket_connect(saddr).then([psf](auto socket) {
+            ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU);
             psf->client_socket = std::move(socket);
           });
         }),
-        seastar::smp::submit_to(1u, [psf] {
-          return psf->pss->accept([psf] (auto socket, auto paddr) {
-            psf->server_socket = seastar::make_foreign(std::move(socket));
-            return seastar::smp::submit_to(0u, [psf] {
+        seastar::smp::submit_to(SERVER_CPU, [psf] {
+          return psf->pss->accept([psf](auto _socket, auto paddr) {
+            logger().info("dispatch_sockets(): accepted at shard {}",
+                          seastar::this_shard_id());
+            psf->server_socket_CPU = seastar::this_shard_id();
+            if (psf->pss->is_fixed_shard_dispatching()) {
+              ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
+            }
+            SocketFRef socket = seastar::make_foreign(std::move(_socket));
+            psf->server_socket = std::move(socket);
+            return seastar::smp::submit_to(CLIENT_CPU, [psf] {
               psf->server_connected.set_value();
             });
           });
@@ -196,35 +219,35 @@ class SocketFactory {
       return psf->server_connected.get_future();
     }).then([psf] {
       if (psf->pss) {
-        return seastar::smp::submit_to(1u, [psf] {
-          return psf->pss->destroy();
+        return seastar::smp::submit_to(SERVER_CPU, [psf] {
+          return psf->pss->shutdown_destroy();
         });
       }
       return seastar::now();
     }).then([psf,
              cb_client = std::move(cb_client),
-             cb_server = std::move(cb_server)] () mutable {
-      logger.debug("dispatch_sockets(): client/server socket are ready");
+             cb_server = std::move(cb_server)]() mutable {
+      logger().debug("dispatch_sockets(): client/server socket are ready");
       return seastar::when_all_succeed(
-        seastar::smp::submit_to(0u, [socket = psf->client_socket.get(),
-                                     cb_client = std::move(cb_client)] {
+        seastar::smp::submit_to(CLIENT_CPU,
+            [socket = psf->client_socket.get(), cb_client = std::move(cb_client)] {
           return cb_client(socket).then([socket] {
-            logger.debug("closing client socket...");
+            logger().debug("closing client socket...");
             return socket->close();
-          }).handle_exception([] (auto eptr) {
-            logger.error("dispatch_sockets():"
-                " cb_client() got unexpeted exception {}", eptr);
+          }).handle_exception([](auto eptr) {
+            logger().error("dispatch_sockets():"
+                           " cb_client() got unexpeted exception {}", eptr);
             ceph_abort();
           });
         }),
-        seastar::smp::submit_to(1u, [socket = psf->server_socket.get(),
-                                     cb_server = std::move(cb_server)] {
+        seastar::smp::submit_to(psf->server_socket_CPU,
+            [socket = psf->server_socket.get(), cb_server = std::move(cb_server)] {
           return cb_server(socket).then([socket] {
-            logger.debug("closing server socket...");
+            logger().debug("closing server socket...");
             return socket->close();
-          }).handle_exception([] (auto eptr) {
-            logger.error("dispatch_sockets():"
-                " cb_server() got unexpeted exception {}", eptr);
+          }).handle_exception([](auto eptr) {
+            logger().error("dispatch_sockets():"
+                           " cb_server() got unexpeted exception {}", eptr);
             ceph_abort();
           });
         })
@@ -255,23 +278,25 @@ class Connection {
   }
 
   future<> dispatch_write(unsigned round = 0, bool force_shut = false) {
-    logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut);
+    logger().debug("dispatch_write(round={}, force_shut={})...", round, force_shut);
     return seastar::repeat([this, round, force_shut] {
       if (round != 0 && round <= write_count) {
         return seastar::futurize_invoke([this, force_shut] {
           if (force_shut) {
-            logger.debug("dispatch_write() done, force shutdown output");
+            logger().debug("dispatch_write() done, force shutdown output");
             socket->force_shutdown_out();
           } else {
-            logger.debug("dispatch_write() done");
+            logger().debug("dispatch_write() done");
           }
         }).then([] {
           return seastar::make_ready_future<stop_t>(stop_t::yes);
         });
       } else {
         data[0] = write_count;
-        return socket->write(seastar::net::packet(
-            reinterpret_cast<const char*>(&data), sizeof(data))
+        bufferlist bl;
+        bl.append(buffer::copy(
+          reinterpret_cast<const char*>(&data), sizeof(data)));
+        return socket->write(bl
         ).then([this] {
           return socket->flush();
         }).then([this] {
@@ -286,30 +311,30 @@ class Connection {
     return dispatch_write(
     ).then([] {
       ceph_abort();
-    }).handle_exception_type([this] (const std::system_error& e) {
+    }).handle_exception_type([this](const std::system_error& e) {
       if (e.code() != std::errc::broken_pipe &&
           e.code() != std::errc::connection_reset) {
-        logger.error("dispatch_write_unbounded(): "
-                     "unexpected error {}", e);
+        logger().error("dispatch_write_unbounded(): "
+                       "unexpected error {}", e);
         throw;
       }
       // successful
-      logger.debug("dispatch_write_unbounded(): "
-                   "expected error {}", e);
+      logger().debug("dispatch_write_unbounded(): "
+                     "expected error {}", e);
       shutdown();
     });
   }
 
   future<> dispatch_read(unsigned round = 0, bool force_shut = false) {
-    logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut);
+    logger().debug("dispatch_read(round={}, force_shut={})...", round, force_shut);
     return seastar::repeat([this, round, force_shut] {
       if (round != 0 && round <= read_count) {
         return seastar::futurize_invoke([this, force_shut] {
           if (force_shut) {
-            logger.debug("dispatch_read() done, force shutdown input");
+            logger().debug("dispatch_read() done, force shutdown input");
             socket->force_shutdown_in();
           } else {
-            logger.debug("dispatch_read() done");
+            logger().debug("dispatch_read() done");
           }
         }).then([] {
           return seastar::make_ready_future<stop_t>(stop_t::yes);
@@ -319,7 +344,7 @@ class Connection {
           // we want to test both Socket::read() and Socket::read_exactly()
           if (read_count % 2) {
             return socket->read(DATA_SIZE * sizeof(uint64_t)
-            ).then([this] (ceph::bufferlist bl) {
+            ).then([this](ceph::bufferlist bl) {
               uint64_t read_data[DATA_SIZE];
               auto p = bl.cbegin();
               ::ceph::decode_raw(read_data, p);
@@ -327,8 +352,9 @@ class Connection {
             });
           } else {
             return socket->read_exactly(DATA_SIZE * sizeof(uint64_t)
-            ).then([this] (auto buf) {
-              auto read_data = reinterpret_cast<const uint64_t*>(buf.get());
+            ).then([this](auto bptr) {
+              uint64_t read_data[DATA_SIZE];
+              std::memcpy(read_data, bptr.c_str(), DATA_SIZE * sizeof(uint64_t));
               verify_data_read(read_data);
             });
           }
@@ -344,16 +370,16 @@ class Connection {
     return dispatch_read(
     ).then([] {
       ceph_abort();
-    }).handle_exception_type([this] (const std::system_error& e) {
+    }).handle_exception_type([this](const std::system_error& e) {
       if (e.code() != error::read_eof
        && e.code() != std::errc::connection_reset) {
-        logger.error("dispatch_read_unbounded(): "
-                     "unexpected error {}", e);
+        logger().error("dispatch_read_unbounded(): "
+                       "unexpected error {}", e);
         throw;
       }
       // successful
-      logger.debug("dispatch_read_unbounded(): "
-                   "expected error {}", e);
+      logger().debug("dispatch_read_unbounded(): "
+                     "expected error {}", e);
       shutdown();
     });
   }
@@ -365,10 +391,10 @@ class Connection {
  public:
   static future<> dispatch_rw_bounded(Socket* socket, unsigned round,
                                       bool force_shut = false) {
-    logger.debug("dispatch_rw_bounded(round={}, force_shut={})...",
-                 round, force_shut);
+    logger().debug("dispatch_rw_bounded(round={}, force_shut={})...",
+                   round, force_shut);
     return seastar::do_with(Connection{socket},
-                            [round, force_shut] (auto& conn) {
+                            [round, force_shut](auto& conn) {
       ceph_assert(round != 0);
       return seastar::when_all_succeed(
         conn.dispatch_write(round, force_shut),
@@ -380,15 +406,15 @@ class Connection {
   }
 
   static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) {
-    logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut);
-    return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) {
+    logger().debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut);
+    return seastar::do_with(Connection{socket}, [preemptive_shut](auto& conn) {
       return seastar::when_all_succeed(
         conn.dispatch_write_unbounded(),
         conn.dispatch_read_unbounded(),
         seastar::futurize_invoke([&conn, preemptive_shut] {
           if (preemptive_shut) {
             return seastar::sleep(100ms).then([&conn] {
-              logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)");
+              logger().debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)");
               conn.shutdown();
             });
           } else {
@@ -402,68 +428,87 @@ class Connection {
   }
 };
 
-future<> test_read_write() {
-  logger.info("test_read_write()...");
+future<> test_read_write(bool is_fixed_cpu) {
+  logger().info("test_read_write()...");
   return SocketFactory::dispatch_sockets(
-    [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
-    [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
+    is_fixed_cpu,
+    [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
+    [](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
   ).then([] {
-    logger.info("test_read_write() ok\n");
-  }).handle_exception([] (auto eptr) {
-    logger.error("test_read_write() got unexpeted exception {}", eptr);
+    logger().info("test_read_write() ok\n");
+  }).handle_exception([](auto eptr) {
+    logger().error("test_read_write() got unexpeted exception {}", eptr);
     ceph_abort();
   });
 }
 
-future<> test_unexpected_down() {
-  logger.info("test_unexpected_down()...");
+future<> test_unexpected_down(bool is_fixed_cpu) {
+  logger().info("test_unexpected_down()...");
   return SocketFactory::dispatch_sockets(
-    [] (auto cs) { 
+    is_fixed_cpu,
+    [](auto cs) {
       return Connection::dispatch_rw_bounded(cs, 128, true
-        ).handle_exception_type([] (const std::system_error& e) {
-        logger.debug("test_unexpected_down(): client get error {}", e);
+        ).handle_exception_type([](const std::system_error& e) {
+        logger().debug("test_unexpected_down(): client get error {}", e);
         ceph_assert(e.code() == error::read_eof);
       });
     },
-    [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+    [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
   ).then([] {
-    logger.info("test_unexpected_down() ok\n");
-  }).handle_exception([] (auto eptr) {
-    logger.error("test_unexpected_down() got unexpeted exception {}", eptr);
+    logger().info("test_unexpected_down() ok\n");
+  }).handle_exception([](auto eptr) {
+    logger().error("test_unexpected_down() got unexpeted exception {}", eptr);
     ceph_abort();
   });
 }
 
-future<> test_shutdown_propagated() {
-  logger.info("test_shutdown_propagated()...");
+future<> test_shutdown_propagated(bool is_fixed_cpu) {
+  logger().info("test_shutdown_propagated()...");
   return SocketFactory::dispatch_sockets(
-    [] (auto cs) {
-      logger.debug("test_shutdown_propagated() shutdown client socket");
+    is_fixed_cpu,
+    [](auto cs) {
+      logger().debug("test_shutdown_propagated() shutdown client socket");
       cs->shutdown();
       return seastar::now();
     },
-    [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+    [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
   ).then([] {
-    logger.info("test_shutdown_propagated() ok\n");
-  }).handle_exception([] (auto eptr) {
-    logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr);
+    logger().info("test_shutdown_propagated() ok\n");
+  }).handle_exception([](auto eptr) {
+    logger().error("test_shutdown_propagated() got unexpeted exception {}", eptr);
     ceph_abort();
   });
 }
 
-future<> test_preemptive_down() {
-  logger.info("test_preemptive_down()...");
+future<> test_preemptive_down(bool is_fixed_cpu) {
+  logger().info("test_preemptive_down()...");
   return SocketFactory::dispatch_sockets(
-    [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
-    [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+    is_fixed_cpu,
+    [](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
+    [](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
   ).then([] {
-    logger.info("test_preemptive_down() ok\n");
-  }).handle_exception([] (auto eptr) {
-    logger.error("test_preemptive_down() got unexpeted exception {}", eptr);
+    logger().info("test_preemptive_down() ok\n");
+  }).handle_exception([](auto eptr) {
+    logger().error("test_preemptive_down() got unexpeted exception {}", eptr);
     ceph_abort();
   });
 }
 
+future<> do_test_with_type(bool is_fixed_cpu) {
+  return test_bind_same(is_fixed_cpu
+  ).then([is_fixed_cpu] {
+    return test_accept(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_read_write(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_unexpected_down(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_shutdown_propagated(is_fixed_cpu);
+  }).then([is_fixed_cpu] {
+    return test_preemptive_down(is_fixed_cpu);
+  });
+}
+
 }
 
 seastar::future<int> do_test(seastar::app_template& app)
@@ -475,37 +520,31 @@ seastar::future<int> do_test(seastar::app_template& app)
                                               CEPH_ENTITY_TYPE_CLIENT,
                                               &cluster,
                                               &conf_file_list);
-  return crimson::common::sharded_conf().start(init_params.name, cluster)
-  .then([conf_file_list] {
+  return crimson::common::sharded_conf().start(
+    init_params.name, cluster
+  ).then([] {
+    return local_conf().start();
+  }).then([conf_file_list] {
     return local_conf().parse_config_files(conf_file_list);
   }).then([] {
-      return local_conf().set_val("ms_inject_internal_delays", "0")
-    .then([] {
-      return test_refused();
-    }).then([] {
-      return test_bind_same();
-    }).then([] {
-      return test_accept();
-    }).then([] {
-      return test_read_write();
-    }).then([] {
-      return test_unexpected_down();
-    }).then([] {
-      return test_shutdown_propagated();
-    }).then([] {
-      return test_preemptive_down();
-    }).then([] {
-      logger.info("All tests succeeded");
-      // Seastar has bugs to have events undispatched during shutdown,
-      // which will result in memory leak and thus fail LeakSanitizer.
-      return seastar::sleep(100ms);
-    });
+    return local_conf().set_val("ms_inject_internal_delays", "0");
+  }).then([] {
+    return test_refused();
+  }).then([] {
+    return do_test_with_type(true);
+  }).then([] {
+    return do_test_with_type(false);
+  }).then([] {
+    logger().info("All tests succeeded");
+    // Seastar has bugs to have events undispatched during shutdown,
+    // which will result in memory leak and thus fail LeakSanitizer.
+    return seastar::sleep(100ms);
   }).then([] {
     return crimson::common::sharded_conf().stop();
   }).then([] {
     return 0;
-  }).handle_exception([] (auto eptr) {
-    logger.error("Test failed: got exception {}", eptr);
+  }).handle_exception([](auto eptr) {
+    logger().error("Test failed: got exception {}", eptr);
     return 1;
   });
 }