]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/crimson/test_messenger.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / test / crimson / test_messenger.cc
index bd549d4d6161339940316c9ccd6baf9e2f610d62..6fc9c1d7750c80e4527a5a7cf5525c5566c97bf6 100644 (file)
@@ -14,6 +14,7 @@
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/net/Interceptor.h"
+#include "crimson/net/SocketConnection.h"
 
 #include <map>
 #include <random>
@@ -27,7 +28,7 @@
 #include <seastar/core/sleep.hh>
 #include <seastar/core/with_timeout.hh>
 
-#include "test_cmds.h"
+#include "test_messenger.h"
 
 using namespace std::chrono_literals;
 namespace bpo = boost::program_options;
@@ -36,7 +37,7 @@ using crimson::common::local_conf;
 namespace {
 
 seastar::logger& logger() {
-  return crimson::get_logger(ceph_subsys_ms);
+  return crimson::get_logger(ceph_subsys_test);
 }
 
 static std::random_device rd;
@@ -77,7 +78,6 @@ static seastar::future<> test_echo(unsigned rounds,
                              const entity_addr_t& addr) {
         msgr = crimson::net::Messenger::create(name, lname, nonce);
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
-        msgr->set_require_authorizer(false);
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
         return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
@@ -198,7 +198,7 @@ static seastar::future<> test_echo(unsigned rounds,
             [this, conn, &count_ping, &count_keepalive] {
               return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
                   if (keepalive_dist(rng)) {
-                    return conn->keepalive()
+                    return conn->send_keepalive()
                       .then([&count_keepalive] {
                         count_keepalive += 1;
                         return seastar::make_ready_future<seastar::stop_iteration>(
@@ -496,10 +496,10 @@ using crimson::net::Dispatcher;
 using crimson::net::Interceptor;
 using crimson::net::Messenger;
 using crimson::net::MessengerRef;
+using crimson::net::SocketConnection;
 using crimson::net::SocketPolicy;
 using crimson::net::tag_bp_t;
-using ceph::net::test::cmd_t;
-using ceph::net::test::policy_t;
+using namespace ceph::net::test;
 
 struct counter_t { unsigned counter = 0; };
 
@@ -525,6 +525,15 @@ std::ostream& operator<<(std::ostream& out, const conn_state_t& state) {
   }
 }
 
+} // anonymous namespace
+
+#if FMT_VERSION >= 90000
+template<>
+struct fmt::formatter<conn_state_t> : fmt::ostream_formatter {};
+#endif
+
+namespace {
+
 struct ConnResult {
   ConnectionRef conn;
   unsigned index;
@@ -543,8 +552,8 @@ struct ConnResult {
   unsigned cnt_reset_dispatched = 0;
   unsigned cnt_remote_reset_dispatched = 0;
 
-  ConnResult(Connection& conn, unsigned index)
-    : conn(conn.shared_from_this()), index(index) {}
+  ConnResult(ConnectionRef conn, unsigned index)
+    : conn(conn), index(index) {}
 
   template <typename T>
   void _assert_eq(const char* expr_actual, T actual,
@@ -690,16 +699,23 @@ struct TestInterceptor : public Interceptor {
   }
 
  private:
-  void register_conn(Connection& conn) override {
+  void register_conn(SocketConnection& _conn) override {
+    auto conn = _conn.get_local_shared_foreign_from_this();
+    auto result = find_result(conn);
+    if (result != nullptr) {
+      logger().error("The connection [{}] {} already exists when register {}",
+                     result->index, *result->conn, _conn);
+      ceph_abort();
+    }
     unsigned index = results.size();
     results.emplace_back(conn, index);
-    conns[conn.shared_from_this()] = index;
+    conns[conn] = index;
     notify();
-    logger().info("[{}] {} new connection registered", index, conn);
+    logger().info("[{}] {} new connection registered", index, _conn);
   }
 
-  void register_conn_closed(Connection& conn) override {
-    auto result = find_result(conn.shared_from_this());
+  void register_conn_closed(SocketConnection& conn) override {
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked closed connection: {}", conn);
       ceph_abort();
@@ -712,8 +728,8 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} closed({})", result->index, conn, result->state);
   }
 
-  void register_conn_ready(Connection& conn) override {
-    auto result = find_result(conn.shared_from_this());
+  void register_conn_ready(SocketConnection& conn) override {
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked ready connection: {}", conn);
       ceph_abort();
@@ -724,8 +740,8 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} ready", result->index, conn);
   }
 
-  void register_conn_replaced(Connection& conn) override {
-    auto result = find_result(conn.shared_from_this());
+  void register_conn_replaced(SocketConnection& conn) override {
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked replaced connection: {}", conn);
       ceph_abort();
@@ -735,10 +751,10 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} {}", result->index, conn, result->state);
   }
 
-  bp_action_t intercept(Connection& conn, Breakpoint bp) override {
+  bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
     ++breakpoints_counter[bp].counter;
 
-    auto result = find_result(conn.shared_from_this());
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
                      conn, bp, breakpoints_counter[bp].counter);
@@ -921,16 +937,16 @@ class FailoverSuite : public Dispatcher {
   }
 
  private:
-  seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+  seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) {
     test_msgr->set_default_policy(policy);
     test_msgr->set_auth_client(&dummy_auth);
     test_msgr->set_auth_server(&dummy_auth);
-    test_msgr->interceptor = &interceptor;
-    return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+    test_msgr->set_interceptor(&interceptor);
+    return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] {
       return test_msgr->start({this});
-    }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+    }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) {
       logger().error("FailoverSuite: "
-                     "there is another instance running at {}", addr);
+                     "there is another instance running at {}", test_addr);
       ceph_abort();
     }));
   }
@@ -977,7 +993,7 @@ class FailoverSuite : public Dispatcher {
           throw std::runtime_error(fmt::format(
                 "The connected connection [{}] {} doesn't"
                 " match the tracked connection [{}] {}",
-                result.index, *result.conn, tracked_index, tracked_conn));
+                result.index, *result.conn, tracked_index, *tracked_conn));
         }
         if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
           result.state = conn_state_t::established;
@@ -1098,7 +1114,7 @@ class FailoverSuite : public Dispatcher {
          entity_addr_t test_peer_addr,
          const TestInterceptor& interceptor) {
     auto suite = std::make_unique<FailoverSuite>(
-        Messenger::create(entity_name_t::OSD(2), "Test", 2),
+        Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE),
         test_peer_addr, interceptor);
     return suite->init(test_addr, test_policy
     ).then([suite = std::move(suite)] () mutable {
@@ -1147,7 +1163,7 @@ class FailoverSuite : public Dispatcher {
   seastar::future<> keepalive_peer() {
     logger().info("[Test] keepalive_peer()");
     ceph_assert(tracked_conn);
-    return tracked_conn->keepalive();
+    return tracked_conn->send_keepalive();
   }
 
   seastar::future<> try_send_peer() {
@@ -1157,10 +1173,13 @@ class FailoverSuite : public Dispatcher {
   }
 
   seastar::future<> markdown() {
-    logger().info("[Test] markdown()");
+    logger().info("[Test] markdown() in 100ms ...");
     ceph_assert(tracked_conn);
-    tracked_conn->mark_down();
-    return seastar::now();
+    // sleep to propagate potential remaining acks
+    return seastar::sleep(100ms
+    ).then([this] {
+      tracked_conn->mark_down();
+    });
   }
 
   seastar::future<> wait_blocked() {
@@ -1305,14 +1324,11 @@ class FailoverTest : public Dispatcher {
   }
 
   static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
-  create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
-    test_addr.set_nonce(2);
-    cmd_peer_addr.set_nonce(3);
-    entity_addr_t test_peer_addr = cmd_peer_addr;
-    test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
-    test_peer_addr.set_nonce(4);
+  create(entity_addr_t test_addr,
+         entity_addr_t cmd_peer_addr,
+         entity_addr_t test_peer_addr) {
     auto test = seastar::make_lw_shared<FailoverTest>(
-        Messenger::create(entity_name_t::OSD(1), "CmdCli", 1),
+        Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE),
         test_addr, test_peer_addr);
     return test->init(cmd_peer_addr).then([test] {
       logger().info("CmdCli ready");
@@ -1390,8 +1406,12 @@ class FailoverTest : public Dispatcher {
   }
 
   seastar::future<> markdown_peer() {
-    logger().info("[Test] markdown_peer()");
-    return prepare_cmd(cmd_t::suite_markdown).then([] {
+    logger().info("[Test] markdown_peer() in 150ms ...");
+    // sleep to propagate potential remaining acks
+    return seastar::sleep(50ms
+    ).then([this] {
+      return prepare_cmd(cmd_t::suite_markdown);
+    }).then([] {
       // sleep awhile for peer markdown propagated
       return seastar::sleep(100ms);
     });
@@ -1431,15 +1451,15 @@ class FailoverSuitePeer : public Dispatcher {
   }
 
  private:
-  seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+  seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) {
     peer_msgr->set_default_policy(policy);
     peer_msgr->set_auth_client(&dummy_auth);
     peer_msgr->set_auth_server(&dummy_auth);
-    return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+    return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] {
       return peer_msgr->start({this});
-    }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+    }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) {
       logger().error("FailoverSuitePeer: "
-                     "there is another instance running at {}", addr);
+                     "there is another instance running at {}", test_peer_addr);
       ceph_abort();
     }));
   }
@@ -1476,9 +1496,9 @@ class FailoverSuitePeer : public Dispatcher {
     return peer_msgr->shutdown();
   }
 
-  seastar::future<> connect_peer(entity_addr_t addr) {
-    logger().info("[TestPeer] connect_peer({})", addr);
-    auto new_tracked_conn = peer_msgr->connect(addr, entity_name_t::TYPE_OSD);
+  seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
+    logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
+    auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
     if (tracked_conn) {
       if (tracked_conn->is_closed()) {
         ceph_assert(tracked_conn != new_tracked_conn);
@@ -1509,7 +1529,7 @@ class FailoverSuitePeer : public Dispatcher {
   seastar::future<> keepalive_peer() {
     logger().info("[TestPeer] keepalive_peer()");
     ceph_assert(tracked_conn);
-    return tracked_conn->keepalive();
+    return tracked_conn->send_keepalive();
   }
 
   seastar::future<> markdown() {
@@ -1520,10 +1540,15 @@ class FailoverSuitePeer : public Dispatcher {
   }
 
   static seastar::future<std::unique_ptr<FailoverSuitePeer>>
-  create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) {
+  create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) {
     auto suite = std::make_unique<FailoverSuitePeer>(
-        Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback);
-    return suite->init(addr, policy
+      Messenger::create(
+        entity_name_t::OSD(TEST_PEER_OSD),
+        "TestPeer",
+        TEST_PEER_NONCE),
+      op_callback
+    );
+    return suite->init(test_peer_addr, policy
     ).then([suite = std::move(suite)] () mutable {
       return std::move(suite);
     });
@@ -1559,7 +1584,7 @@ class FailoverTestPeer : public Dispatcher {
       break;
      }
      default:
-      logger().error("{} got unexpected msg from cmd client: {}", *c, m);
+      logger().error("{} got unexpected msg from cmd client: {}", *c, *m);
       ceph_abort();
     }
     return {seastar::now()};
@@ -1582,8 +1607,8 @@ class FailoverTestPeer : public Dispatcher {
      case cmd_t::suite_start: {
       ceph_assert(!test_suite);
       auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
-      return FailoverSuitePeer::create(test_peer_addr, policy,
-                                       [this] { return notify_recv_op(); }
+      return FailoverSuitePeer::create(
+        test_peer_addr, policy, [this] { return notify_recv_op(); }
       ).then([this] (auto suite) {
         test_suite.swap(suite);
       });
@@ -1595,9 +1620,9 @@ class FailoverTestPeer : public Dispatcher {
       });
      case cmd_t::suite_connect_me: {
       ceph_assert(test_suite);
-      entity_addr_t test_addr = entity_addr_t();
-      test_addr.parse(m_cmd->cmd[1].c_str(), nullptr);
-      return test_suite->connect_peer(test_addr);
+      entity_addr_t test_addr_decoded = entity_addr_t();
+      test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr);
+      return test_suite->connect_peer(test_addr_decoded);
      }
      case cmd_t::suite_send_me:
       ceph_assert(test_suite);
@@ -1609,7 +1634,8 @@ class FailoverTestPeer : public Dispatcher {
       ceph_assert(test_suite);
       return test_suite->markdown();
      default:
-      logger().error("TestPeer got unexpected command {} from Test", m_cmd);
+      logger().error("TestPeer got unexpected command {} from Test",
+                    fmt::ptr(m_cmd.get()));
       ceph_abort();
       return seastar::now();
     }
@@ -1639,12 +1665,10 @@ class FailoverTestPeer : public Dispatcher {
   }
 
   static seastar::future<std::unique_ptr<FailoverTestPeer>>
-  create(entity_addr_t cmd_peer_addr) {
-    // suite bind to cmd_peer_addr, with port + 1
-    entity_addr_t test_peer_addr = cmd_peer_addr;
-    test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+  create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
     auto test_peer = std::make_unique<FailoverTestPeer>(
-        Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr);
+        Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE),
+        test_peer_addr);
     return test_peer->init(cmd_peer_addr
     ).then([test_peer = std::move(test_peer)] () mutable {
       logger().info("CmdSrv ready");
@@ -1656,10 +1680,10 @@ class FailoverTestPeer : public Dispatcher {
 seastar::future<>
 test_v2_lossy_early_connect_fault(FailoverTest& test) {
   return seastar::do_with(std::vector<Breakpoint>{
+      {custom_bp_t::SOCKET_CONNECTING},
       {custom_bp_t::BANNER_WRITE},
       {custom_bp_t::BANNER_READ},
       {custom_bp_t::BANNER_PAYLOAD_READ},
-      {custom_bp_t::SOCKET_CONNECTING},
       {Tag::HELLO, bp_type_t::WRITE},
       {Tag::HELLO, bp_type_t::READ},
       {Tag::AUTH_REQUEST, bp_type_t::WRITE},
@@ -2345,9 +2369,9 @@ test_v2_peer_connected_fault_reaccept(FailoverTest& test) {
 }
 
 seastar::future<bool>
-peer_wins(FailoverTest& test) {
+check_peer_wins(FailoverTest& test) {
   return seastar::do_with(bool(), [&test] (auto& ret) {
-    return test.run_suite("peer_wins",
+    return test.run_suite("check_peer_wins",
                           TestInterceptor(),
                           policy_t::lossy_client,
                           policy_t::stateless_server,
@@ -2357,7 +2381,7 @@ peer_wins(FailoverTest& test) {
       }).then([&ret] (ConnResults& results) {
         results[0].assert_state_at(conn_state_t::established);
         ret = results[0].conn->peer_wins();
-        logger().info("peer_wins: {}", ret);
+        logger().info("check_peer_wins: {}", ret);
       });
     }).then([&ret] {
       return ret;
@@ -2366,8 +2390,9 @@ peer_wins(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_reconnect_win(FailoverTest& test) {
+test_v2_racing_reconnect_acceptor_lose(FailoverTest& test) {
   return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+      {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
       {2, {custom_bp_t::BANNER_WRITE}},
       {2, {custom_bp_t::BANNER_READ}},
       {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
@@ -2377,14 +2402,15 @@ test_v2_racing_reconnect_win(FailoverTest& test) {
       {2, {Tag::AUTH_DONE, bp_type_t::WRITE}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
-      {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // fault acceptor
       interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
+      // block acceptor
       interceptor.make_block(bp.second, bp.first);
       return test.run_suite(
-          fmt::format("test_v2_racing_reconnect_win -- {}({})",
+          fmt::format("test_v2_racing_reconnect_acceptor_lose -- {}({})",
                       bp.second, bp.first),
           interceptor,
           policy_t::lossless_peer,
@@ -2419,8 +2445,10 @@ test_v2_racing_reconnect_win(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_reconnect_lose(FailoverTest& test) {
+test_v2_racing_reconnect_acceptor_win(FailoverTest& test) {
   return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+      {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
+      {2, {custom_bp_t::SOCKET_CONNECTING}},
       {2, {custom_bp_t::BANNER_WRITE}},
       {2, {custom_bp_t::BANNER_READ}},
       {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
@@ -2430,14 +2458,15 @@ test_v2_racing_reconnect_lose(FailoverTest& test) {
       {2, {Tag::AUTH_DONE, bp_type_t::READ}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
       {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
-      {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // fault connector
       interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
+      // block connector
       interceptor.make_block(bp.second, bp.first);
       return test.run_suite(
-          fmt::format("test_v2_racing_reconnect_lose -- {}({})",
+          fmt::format("test_v2_racing_reconnect_acceptor_win -- {}({})",
                       bp.second, bp.first),
           interceptor,
           policy_t::lossless_peer,
@@ -2472,7 +2501,7 @@ test_v2_racing_reconnect_lose(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_connect_win(FailoverTest& test) {
+test_v2_racing_connect_acceptor_lose(FailoverTest& test) {
   return seastar::do_with(std::vector<Breakpoint>{
       {custom_bp_t::BANNER_WRITE},
       {custom_bp_t::BANNER_READ},
@@ -2487,9 +2516,10 @@ test_v2_racing_connect_win(FailoverTest& test) {
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // block acceptor
       interceptor.make_block(bp);
       return test.run_suite(
-          fmt::format("test_v2_racing_connect_win -- {}", bp),
+          fmt::format("test_v2_racing_connect_acceptor_lose -- {}", bp),
           interceptor,
           policy_t::lossless_peer,
           policy_t::lossless_peer,
@@ -2525,8 +2555,9 @@ test_v2_racing_connect_win(FailoverTest& test) {
 }
 
 seastar::future<>
-test_v2_racing_connect_lose(FailoverTest& test) {
+test_v2_racing_connect_acceptor_win(FailoverTest& test) {
   return seastar::do_with(std::vector<Breakpoint>{
+      {custom_bp_t::SOCKET_CONNECTING},
       {custom_bp_t::BANNER_WRITE},
       {custom_bp_t::BANNER_READ},
       {custom_bp_t::BANNER_PAYLOAD_READ},
@@ -2540,9 +2571,10 @@ test_v2_racing_connect_lose(FailoverTest& test) {
   }, [&test] (auto& failure_cases) {
     return seastar::do_for_each(failure_cases, [&test] (auto bp) {
       TestInterceptor interceptor;
+      // block connector
       interceptor.make_block(bp);
       return test.run_suite(
-          fmt::format("test_v2_racing_connect_lose -- {}", bp),
+          fmt::format("test_v2_racing_connect_acceptor_win -- {}", bp),
           interceptor,
           policy_t::lossless_peer,
           policy_t::lossless_peer,
@@ -3198,8 +3230,6 @@ test_v2_peer_reuse_connector(FailoverTest& test) {
       logger().info("-- 2 --");
       logger().info("[Test] acceptor markdown...");
       return test.markdown_peer();
-    }).then([] {
-      return seastar::sleep(100ms);
     }).then([&suite] {
       ceph_assert(suite.is_standby());
       logger().info("-- 3 --");
@@ -3262,11 +3292,9 @@ test_v2_peer_reuse_acceptor(FailoverTest& test) {
       results[1].assert_connect(0, 0, 0, 0);
       results[1].assert_accept(1, 1, 0, 0);
       results[1].assert_reset(0, 0);
-    }).then([] {
+    }).then([&suite] {
       logger().info("-- 2 --");
       logger().info("[Test] acceptor markdown...");
-      return seastar::sleep(100ms);
-    }).then([&suite] {
       return suite.markdown();
     }).then([&suite] {
       return suite.wait_results(2);
@@ -3348,8 +3376,6 @@ test_v2_lossless_peer_connector(FailoverTest& test) {
       logger().info("-- 2 --");
       logger().info("[Test] acceptor markdown...");
       return test.markdown_peer();
-    }).then([] {
-      return seastar::sleep(100ms);
     }).then([&suite] {
       ceph_assert(suite.is_standby());
       logger().info("-- 3 --");
@@ -3412,11 +3438,9 @@ test_v2_lossless_peer_acceptor(FailoverTest& test) {
       results[1].assert_connect(0, 0, 0, 0);
       results[1].assert_accept(1, 1, 0, 0);
       results[1].assert_reset(0, 0);
-    }).then([] {
+    }).then([&suite] {
       logger().info("-- 2 --");
       logger().info("[Test] acceptor markdown...");
-      return seastar::sleep(100ms);
-    }).then([&suite] {
       return suite.markdown();
     }).then([&suite] {
       return suite.wait_results(2);
@@ -3456,17 +3480,25 @@ test_v2_lossless_peer_acceptor(FailoverTest& test) {
 
 seastar::future<>
 test_v2_protocol(entity_addr_t test_addr,
+                 entity_addr_t cmd_peer_addr,
                  entity_addr_t test_peer_addr,
-                 bool test_peer_islocal) {
-  ceph_assert(test_addr.is_msgr2());
-  ceph_assert(test_peer_addr.is_msgr2());
+                 bool test_peer_islocal,
+                 bool peer_wins) {
+  ceph_assert_always(test_addr.is_msgr2());
+  ceph_assert_always(cmd_peer_addr.is_msgr2());
+  ceph_assert_always(test_peer_addr.is_msgr2());
 
   if (test_peer_islocal) {
     // initiate crimson test peer locally
-    logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr);
-    return FailoverTestPeer::create(test_peer_addr
-    ).then([test_addr, test_peer_addr] (auto peer) {
-      return test_v2_protocol(test_addr, test_peer_addr, false
+    logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr);
+    return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr
+    ).then([test_addr, cmd_peer_addr, test_peer_addr, peer_wins](auto peer) {
+      return test_v2_protocol(
+        test_addr,
+        cmd_peer_addr,
+        test_peer_addr,
+        false,
+        peer_wins
       ).then([peer = std::move(peer)] () mutable {
         return peer->wait().then([peer = std::move(peer)] {});
       });
@@ -3476,7 +3508,8 @@ test_v2_protocol(entity_addr_t test_addr,
     });
   }
 
-  return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) {
+  return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr
+  ).then([peer_wins](auto test) {
     return seastar::futurize_invoke([test] {
       return test_v2_lossy_early_connect_fault(*test);
     }).then([test] {
@@ -3518,19 +3551,20 @@ test_v2_protocol(entity_addr_t test_addr,
     }).then([test] {
       return test_v2_peer_connected_fault_reaccept(*test);
     }).then([test] {
-      return peer_wins(*test);
-    }).then([test] (bool peer_wins) {
-      if (peer_wins) {
+      return check_peer_wins(*test);
+    }).then([test, peer_wins](bool ret_peer_wins) {
+      ceph_assert(peer_wins == ret_peer_wins);
+      if (ret_peer_wins) {
         return seastar::futurize_invoke([test] {
-          return test_v2_racing_connect_lose(*test);
+          return test_v2_racing_connect_acceptor_win(*test);
         }).then([test] {
-          return test_v2_racing_reconnect_lose(*test);
+          return test_v2_racing_reconnect_acceptor_win(*test);
         });
       } else {
         return seastar::futurize_invoke([test] {
-          return test_v2_racing_connect_win(*test);
+          return test_v2_racing_connect_acceptor_lose(*test);
         }).then([test] {
-          return test_v2_racing_reconnect_win(*test);
+          return test_v2_racing_reconnect_acceptor_lose(*test);
         });
       }
     }).then([test] {
@@ -3591,20 +3625,39 @@ seastar::future<int> do_test(seastar::app_template& app)
     verbose = config["verbose"].as<bool>();
     auto rounds = config["rounds"].as<unsigned>();
     auto keepalive_ratio = config["keepalive-ratio"].as<double>();
-    entity_addr_t v2_test_addr;
-    ceph_assert(v2_test_addr.parse(
-        config["v2-test-addr"].as<std::string>().c_str(), nullptr));
-    entity_addr_t v2_testpeer_addr;
-    ceph_assert(v2_testpeer_addr.parse(
-        config["v2-testpeer-addr"].as<std::string>().c_str(), nullptr));
-    auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
-    return test_echo(rounds, keepalive_ratio)
-    .then([] {
+    auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
+
+    entity_addr_t test_addr;
+    ceph_assert(test_addr.parse(
+        config["test-addr"].as<std::string>().c_str(), nullptr));
+    test_addr.set_nonce(TEST_NONCE);
+
+    entity_addr_t cmd_peer_addr;
+    ceph_assert(cmd_peer_addr.parse(
+        config["testpeer-addr"].as<std::string>().c_str(), nullptr));
+    cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
+
+    entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+    bool peer_wins = (test_addr > test_peer_addr);
+
+    logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
+                  "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
+                  "testpeer_islocal={}, peer_wins={}",
+                  verbose, rounds, keepalive_ratio,
+                  test_addr, cmd_peer_addr, test_peer_addr,
+                  testpeer_islocal, peer_wins);
+    return test_echo(rounds, keepalive_ratio
+    ).then([] {
       return test_concurrent_dispatch();
     }).then([] {
       return test_preemptive_shutdown();
-    }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
-      return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
+    }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
+      return test_v2_protocol(
+          test_addr,
+          cmd_peer_addr,
+          test_peer_addr,
+          testpeer_islocal,
+          peer_wins);
     }).then([] {
       logger().info("All tests succeeded");
       // Seastar has bugs to have events undispatched during shutdown,
@@ -3631,19 +3684,19 @@ int main(int argc, char** argv)
      "number of pingpong rounds")
     ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
      "ratio of keepalive in ping messages")
-    ("v2-test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
+    ("test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9014"),
      "address of v2 failover tests")
-    ("v2-testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9013"),
+    ("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
      "addresses of v2 failover testpeer"
-     " (CmdSrv address and TestPeer address with port+=1)")
-    ("v2-testpeer-islocal", bpo::value<bool>()->default_value(true),
+     " (This is CmdSrv address, and TestPeer address is at port+=1)")
+    ("testpeer-islocal", bpo::value<bool>()->default_value(true),
      "create a local crimson testpeer, or connect to a remote testpeer");
   return app.run(argc, argv, [&app] {
-    // This test normally succeeds within 60 seconds, so kill it after 120
+    // This test normally succeeds within 60 seconds, so kill it after 300
     // seconds in case it is blocked forever due to unaddressed bugs.
-    return seastar::with_timeout(seastar::lowres_clock::now() + 120s, do_test(app))
+    return seastar::with_timeout(seastar::lowres_clock::now() + 300s, do_test(app))
       .handle_exception_type([](seastar::timed_out_error&) {
-        logger().error("test_messenger timeout after 120s, abort! "
+        logger().error("test_messenger timeout after 300s, abort! "
                        "Consider to extend the period if the test is still running.");
         // use the retcode of timeout(1)
         return 124;