#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
#include "crimson/net/Interceptor.h"
+#include "crimson/net/SocketConnection.h"
#include <map>
#include <random>
#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>
-#include "test_cmds.h"
+#include "test_messenger.h"
using namespace std::chrono_literals;
namespace bpo = boost::program_options;
namespace {
seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_ms);
+ return crimson::get_logger(ceph_subsys_test);
}
static std::random_device rd;
const entity_addr_t& addr) {
msgr = crimson::net::Messenger::create(name, lname, nonce);
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- msgr->set_require_authorizer(false);
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
[this, conn, &count_ping, &count_keepalive] {
return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
if (keepalive_dist(rng)) {
- return conn->keepalive()
+ return conn->send_keepalive()
.then([&count_keepalive] {
count_keepalive += 1;
return seastar::make_ready_future<seastar::stop_iteration>(
using crimson::net::Interceptor;
using crimson::net::Messenger;
using crimson::net::MessengerRef;
+using crimson::net::SocketConnection;
using crimson::net::SocketPolicy;
using crimson::net::tag_bp_t;
-using ceph::net::test::cmd_t;
-using ceph::net::test::policy_t;
+using namespace ceph::net::test;
struct counter_t { unsigned counter = 0; };
}
}
+} // anonymous namespace
+
+#if FMT_VERSION >= 90000
+template<>
+struct fmt::formatter<conn_state_t> : fmt::ostream_formatter {};
+#endif
+
+namespace {
+
struct ConnResult {
ConnectionRef conn;
unsigned index;
unsigned cnt_reset_dispatched = 0;
unsigned cnt_remote_reset_dispatched = 0;
- ConnResult(Connection& conn, unsigned index)
- : conn(conn.shared_from_this()), index(index) {}
+ ConnResult(ConnectionRef conn, unsigned index)
+ : conn(conn), index(index) {}
template <typename T>
void _assert_eq(const char* expr_actual, T actual,
}
private:
- void register_conn(Connection& conn) override {
+ void register_conn(SocketConnection& _conn) override {
+ auto conn = _conn.get_local_shared_foreign_from_this();
+ auto result = find_result(conn);
+ if (result != nullptr) {
+ logger().error("The connection [{}] {} already exists when register {}",
+ result->index, *result->conn, _conn);
+ ceph_abort();
+ }
unsigned index = results.size();
results.emplace_back(conn, index);
- conns[conn.shared_from_this()] = index;
+ conns[conn] = index;
notify();
- logger().info("[{}] {} new connection registered", index, conn);
+ logger().info("[{}] {} new connection registered", index, _conn);
}
- void register_conn_closed(Connection& conn) override {
- auto result = find_result(conn.shared_from_this());
+ void register_conn_closed(SocketConnection& conn) override {
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked closed connection: {}", conn);
ceph_abort();
logger().info("[{}] {} closed({})", result->index, conn, result->state);
}
- void register_conn_ready(Connection& conn) override {
- auto result = find_result(conn.shared_from_this());
+ void register_conn_ready(SocketConnection& conn) override {
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked ready connection: {}", conn);
ceph_abort();
logger().info("[{}] {} ready", result->index, conn);
}
- void register_conn_replaced(Connection& conn) override {
- auto result = find_result(conn.shared_from_this());
+ void register_conn_replaced(SocketConnection& conn) override {
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked replaced connection: {}", conn);
ceph_abort();
logger().info("[{}] {} {}", result->index, conn, result->state);
}
- bp_action_t intercept(Connection& conn, Breakpoint bp) override {
+ bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
++breakpoints_counter[bp].counter;
- auto result = find_result(conn.shared_from_this());
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
conn, bp, breakpoints_counter[bp].counter);
}
private:
- seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+ seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) {
test_msgr->set_default_policy(policy);
test_msgr->set_auth_client(&dummy_auth);
test_msgr->set_auth_server(&dummy_auth);
- test_msgr->interceptor = &interceptor;
- return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+ test_msgr->set_interceptor(&interceptor);
+ return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] {
return test_msgr->start({this});
- }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+ }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) {
logger().error("FailoverSuite: "
- "there is another instance running at {}", addr);
+ "there is another instance running at {}", test_addr);
ceph_abort();
}));
}
throw std::runtime_error(fmt::format(
"The connected connection [{}] {} doesn't"
" match the tracked connection [{}] {}",
- result.index, *result.conn, tracked_index, tracked_conn));
+ result.index, *result.conn, tracked_index, *tracked_conn));
}
if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
result.state = conn_state_t::established;
entity_addr_t test_peer_addr,
const TestInterceptor& interceptor) {
auto suite = std::make_unique<FailoverSuite>(
- Messenger::create(entity_name_t::OSD(2), "Test", 2),
+ Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE),
test_peer_addr, interceptor);
return suite->init(test_addr, test_policy
).then([suite = std::move(suite)] () mutable {
seastar::future<> keepalive_peer() {
logger().info("[Test] keepalive_peer()");
ceph_assert(tracked_conn);
- return tracked_conn->keepalive();
+ return tracked_conn->send_keepalive();
}
seastar::future<> try_send_peer() {
}
seastar::future<> markdown() {
- logger().info("[Test] markdown()");
+ logger().info("[Test] markdown() in 100ms ...");
ceph_assert(tracked_conn);
- tracked_conn->mark_down();
- return seastar::now();
+ // sleep to propagate potential remaining acks
+ return seastar::sleep(100ms
+ ).then([this] {
+ tracked_conn->mark_down();
+ });
}
seastar::future<> wait_blocked() {
}
static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
- create(entity_addr_t cmd_peer_addr, entity_addr_t test_addr) {
- test_addr.set_nonce(2);
- cmd_peer_addr.set_nonce(3);
- entity_addr_t test_peer_addr = cmd_peer_addr;
- test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
- test_peer_addr.set_nonce(4);
+ create(entity_addr_t test_addr,
+ entity_addr_t cmd_peer_addr,
+ entity_addr_t test_peer_addr) {
auto test = seastar::make_lw_shared<FailoverTest>(
- Messenger::create(entity_name_t::OSD(1), "CmdCli", 1),
+ Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE),
test_addr, test_peer_addr);
return test->init(cmd_peer_addr).then([test] {
logger().info("CmdCli ready");
}
seastar::future<> markdown_peer() {
- logger().info("[Test] markdown_peer()");
- return prepare_cmd(cmd_t::suite_markdown).then([] {
+ logger().info("[Test] markdown_peer() in 150ms ...");
+ // sleep to propagate potential remaining acks
+ return seastar::sleep(50ms
+ ).then([this] {
+ return prepare_cmd(cmd_t::suite_markdown);
+ }).then([] {
// sleep awhile for peer markdown propagated
return seastar::sleep(100ms);
});
}
private:
- seastar::future<> init(entity_addr_t addr, SocketPolicy policy) {
+ seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) {
peer_msgr->set_default_policy(policy);
peer_msgr->set_auth_client(&dummy_auth);
peer_msgr->set_auth_server(&dummy_auth);
- return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
+ return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] {
return peer_msgr->start({this});
- }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+ }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) {
logger().error("FailoverSuitePeer: "
- "there is another instance running at {}", addr);
+ "there is another instance running at {}", test_peer_addr);
ceph_abort();
}));
}
return peer_msgr->shutdown();
}
- seastar::future<> connect_peer(entity_addr_t addr) {
- logger().info("[TestPeer] connect_peer({})", addr);
- auto new_tracked_conn = peer_msgr->connect(addr, entity_name_t::TYPE_OSD);
+ seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
+ logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
+ auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
if (tracked_conn) {
if (tracked_conn->is_closed()) {
ceph_assert(tracked_conn != new_tracked_conn);
seastar::future<> keepalive_peer() {
logger().info("[TestPeer] keepalive_peer()");
ceph_assert(tracked_conn);
- return tracked_conn->keepalive();
+ return tracked_conn->send_keepalive();
}
seastar::future<> markdown() {
}
static seastar::future<std::unique_ptr<FailoverSuitePeer>>
- create(entity_addr_t addr, const SocketPolicy& policy, cb_t op_callback) {
+ create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) {
auto suite = std::make_unique<FailoverSuitePeer>(
- Messenger::create(entity_name_t::OSD(4), "TestPeer", 4), op_callback);
- return suite->init(addr, policy
+ Messenger::create(
+ entity_name_t::OSD(TEST_PEER_OSD),
+ "TestPeer",
+ TEST_PEER_NONCE),
+ op_callback
+ );
+ return suite->init(test_peer_addr, policy
).then([suite = std::move(suite)] () mutable {
return std::move(suite);
});
break;
}
default:
- logger().error("{} got unexpected msg from cmd client: {}", *c, m);
+ logger().error("{} got unexpected msg from cmd client: {}", *c, *m);
ceph_abort();
}
return {seastar::now()};
case cmd_t::suite_start: {
ceph_assert(!test_suite);
auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
- return FailoverSuitePeer::create(test_peer_addr, policy,
- [this] { return notify_recv_op(); }
+ return FailoverSuitePeer::create(
+ test_peer_addr, policy, [this] { return notify_recv_op(); }
).then([this] (auto suite) {
test_suite.swap(suite);
});
});
case cmd_t::suite_connect_me: {
ceph_assert(test_suite);
- entity_addr_t test_addr = entity_addr_t();
- test_addr.parse(m_cmd->cmd[1].c_str(), nullptr);
- return test_suite->connect_peer(test_addr);
+ entity_addr_t test_addr_decoded = entity_addr_t();
+ test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr);
+ return test_suite->connect_peer(test_addr_decoded);
}
case cmd_t::suite_send_me:
ceph_assert(test_suite);
ceph_assert(test_suite);
return test_suite->markdown();
default:
- logger().error("TestPeer got unexpected command {} from Test", m_cmd);
+ logger().error("TestPeer got unexpected command {} from Test",
+ fmt::ptr(m_cmd.get()));
ceph_abort();
return seastar::now();
}
}
static seastar::future<std::unique_ptr<FailoverTestPeer>>
- create(entity_addr_t cmd_peer_addr) {
- // suite bind to cmd_peer_addr, with port + 1
- entity_addr_t test_peer_addr = cmd_peer_addr;
- test_peer_addr.set_port(cmd_peer_addr.get_port() + 1);
+ create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
auto test_peer = std::make_unique<FailoverTestPeer>(
- Messenger::create(entity_name_t::OSD(3), "CmdSrv", 3), test_peer_addr);
+ Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE),
+ test_peer_addr);
return test_peer->init(cmd_peer_addr
).then([test_peer = std::move(test_peer)] () mutable {
logger().info("CmdSrv ready");
seastar::future<>
test_v2_lossy_early_connect_fault(FailoverTest& test) {
return seastar::do_with(std::vector<Breakpoint>{
+ {custom_bp_t::SOCKET_CONNECTING},
{custom_bp_t::BANNER_WRITE},
{custom_bp_t::BANNER_READ},
{custom_bp_t::BANNER_PAYLOAD_READ},
- {custom_bp_t::SOCKET_CONNECTING},
{Tag::HELLO, bp_type_t::WRITE},
{Tag::HELLO, bp_type_t::READ},
{Tag::AUTH_REQUEST, bp_type_t::WRITE},
}
seastar::future<bool>
-peer_wins(FailoverTest& test) {
+check_peer_wins(FailoverTest& test) {
return seastar::do_with(bool(), [&test] (auto& ret) {
- return test.run_suite("peer_wins",
+ return test.run_suite("check_peer_wins",
TestInterceptor(),
policy_t::lossy_client,
policy_t::stateless_server,
}).then([&ret] (ConnResults& results) {
results[0].assert_state_at(conn_state_t::established);
ret = results[0].conn->peer_wins();
- logger().info("peer_wins: {}", ret);
+ logger().info("check_peer_wins: {}", ret);
});
}).then([&ret] {
return ret;
}
seastar::future<>
-test_v2_racing_reconnect_win(FailoverTest& test) {
+test_v2_racing_reconnect_acceptor_lose(FailoverTest& test) {
return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+ {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
{2, {custom_bp_t::BANNER_WRITE}},
{2, {custom_bp_t::BANNER_READ}},
{2, {custom_bp_t::BANNER_PAYLOAD_READ}},
{2, {Tag::AUTH_DONE, bp_type_t::WRITE}},
{2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
{2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
- {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
}, [&test] (auto& failure_cases) {
return seastar::do_for_each(failure_cases, [&test] (auto bp) {
TestInterceptor interceptor;
+ // fault acceptor
interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
+ // block acceptor
interceptor.make_block(bp.second, bp.first);
return test.run_suite(
- fmt::format("test_v2_racing_reconnect_win -- {}({})",
+ fmt::format("test_v2_racing_reconnect_acceptor_lose -- {}({})",
bp.second, bp.first),
interceptor,
policy_t::lossless_peer,
}
seastar::future<>
-test_v2_racing_reconnect_lose(FailoverTest& test) {
+test_v2_racing_reconnect_acceptor_win(FailoverTest& test) {
return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
+ {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
+ {2, {custom_bp_t::SOCKET_CONNECTING}},
{2, {custom_bp_t::BANNER_WRITE}},
{2, {custom_bp_t::BANNER_READ}},
{2, {custom_bp_t::BANNER_PAYLOAD_READ}},
{2, {Tag::AUTH_DONE, bp_type_t::READ}},
{2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
{2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
- {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
}, [&test] (auto& failure_cases) {
return seastar::do_for_each(failure_cases, [&test] (auto bp) {
TestInterceptor interceptor;
+ // fault connector
interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
+ // block connector
interceptor.make_block(bp.second, bp.first);
return test.run_suite(
- fmt::format("test_v2_racing_reconnect_lose -- {}({})",
+ fmt::format("test_v2_racing_reconnect_acceptor_win -- {}({})",
bp.second, bp.first),
interceptor,
policy_t::lossless_peer,
}
seastar::future<>
-test_v2_racing_connect_win(FailoverTest& test) {
+test_v2_racing_connect_acceptor_lose(FailoverTest& test) {
return seastar::do_with(std::vector<Breakpoint>{
{custom_bp_t::BANNER_WRITE},
{custom_bp_t::BANNER_READ},
}, [&test] (auto& failure_cases) {
return seastar::do_for_each(failure_cases, [&test] (auto bp) {
TestInterceptor interceptor;
+ // block acceptor
interceptor.make_block(bp);
return test.run_suite(
- fmt::format("test_v2_racing_connect_win -- {}", bp),
+ fmt::format("test_v2_racing_connect_acceptor_lose -- {}", bp),
interceptor,
policy_t::lossless_peer,
policy_t::lossless_peer,
}
seastar::future<>
-test_v2_racing_connect_lose(FailoverTest& test) {
+test_v2_racing_connect_acceptor_win(FailoverTest& test) {
return seastar::do_with(std::vector<Breakpoint>{
+ {custom_bp_t::SOCKET_CONNECTING},
{custom_bp_t::BANNER_WRITE},
{custom_bp_t::BANNER_READ},
{custom_bp_t::BANNER_PAYLOAD_READ},
}, [&test] (auto& failure_cases) {
return seastar::do_for_each(failure_cases, [&test] (auto bp) {
TestInterceptor interceptor;
+ // block connector
interceptor.make_block(bp);
return test.run_suite(
- fmt::format("test_v2_racing_connect_lose -- {}", bp),
+ fmt::format("test_v2_racing_connect_acceptor_win -- {}", bp),
interceptor,
policy_t::lossless_peer,
policy_t::lossless_peer,
logger().info("-- 2 --");
logger().info("[Test] acceptor markdown...");
return test.markdown_peer();
- }).then([] {
- return seastar::sleep(100ms);
}).then([&suite] {
ceph_assert(suite.is_standby());
logger().info("-- 3 --");
results[1].assert_connect(0, 0, 0, 0);
results[1].assert_accept(1, 1, 0, 0);
results[1].assert_reset(0, 0);
- }).then([] {
+ }).then([&suite] {
logger().info("-- 2 --");
logger().info("[Test] acceptor markdown...");
- return seastar::sleep(100ms);
- }).then([&suite] {
return suite.markdown();
}).then([&suite] {
return suite.wait_results(2);
logger().info("-- 2 --");
logger().info("[Test] acceptor markdown...");
return test.markdown_peer();
- }).then([] {
- return seastar::sleep(100ms);
}).then([&suite] {
ceph_assert(suite.is_standby());
logger().info("-- 3 --");
results[1].assert_connect(0, 0, 0, 0);
results[1].assert_accept(1, 1, 0, 0);
results[1].assert_reset(0, 0);
- }).then([] {
+ }).then([&suite] {
logger().info("-- 2 --");
logger().info("[Test] acceptor markdown...");
- return seastar::sleep(100ms);
- }).then([&suite] {
return suite.markdown();
}).then([&suite] {
return suite.wait_results(2);
seastar::future<>
test_v2_protocol(entity_addr_t test_addr,
+ entity_addr_t cmd_peer_addr,
entity_addr_t test_peer_addr,
- bool test_peer_islocal) {
- ceph_assert(test_addr.is_msgr2());
- ceph_assert(test_peer_addr.is_msgr2());
+ bool test_peer_islocal,
+ bool peer_wins) {
+ ceph_assert_always(test_addr.is_msgr2());
+ ceph_assert_always(cmd_peer_addr.is_msgr2());
+ ceph_assert_always(test_peer_addr.is_msgr2());
if (test_peer_islocal) {
// initiate crimson test peer locally
- logger().info("test_v2_protocol: start local TestPeer at {}...", test_peer_addr);
- return FailoverTestPeer::create(test_peer_addr
- ).then([test_addr, test_peer_addr] (auto peer) {
- return test_v2_protocol(test_addr, test_peer_addr, false
+ logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr);
+ return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr
+ ).then([test_addr, cmd_peer_addr, test_peer_addr, peer_wins](auto peer) {
+ return test_v2_protocol(
+ test_addr,
+ cmd_peer_addr,
+ test_peer_addr,
+ false,
+ peer_wins
).then([peer = std::move(peer)] () mutable {
return peer->wait().then([peer = std::move(peer)] {});
});
});
}
- return FailoverTest::create(test_peer_addr, test_addr).then([] (auto test) {
+ return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr
+ ).then([peer_wins](auto test) {
return seastar::futurize_invoke([test] {
return test_v2_lossy_early_connect_fault(*test);
}).then([test] {
}).then([test] {
return test_v2_peer_connected_fault_reaccept(*test);
}).then([test] {
- return peer_wins(*test);
- }).then([test] (bool peer_wins) {
- if (peer_wins) {
+ return check_peer_wins(*test);
+ }).then([test, peer_wins](bool ret_peer_wins) {
+ ceph_assert(peer_wins == ret_peer_wins);
+ if (ret_peer_wins) {
return seastar::futurize_invoke([test] {
- return test_v2_racing_connect_lose(*test);
+ return test_v2_racing_connect_acceptor_win(*test);
}).then([test] {
- return test_v2_racing_reconnect_lose(*test);
+ return test_v2_racing_reconnect_acceptor_win(*test);
});
} else {
return seastar::futurize_invoke([test] {
- return test_v2_racing_connect_win(*test);
+ return test_v2_racing_connect_acceptor_lose(*test);
}).then([test] {
- return test_v2_racing_reconnect_win(*test);
+ return test_v2_racing_reconnect_acceptor_lose(*test);
});
}
}).then([test] {
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();
auto keepalive_ratio = config["keepalive-ratio"].as<double>();
- entity_addr_t v2_test_addr;
- ceph_assert(v2_test_addr.parse(
- config["v2-test-addr"].as<std::string>().c_str(), nullptr));
- entity_addr_t v2_testpeer_addr;
- ceph_assert(v2_testpeer_addr.parse(
- config["v2-testpeer-addr"].as<std::string>().c_str(), nullptr));
- auto v2_testpeer_islocal = config["v2-testpeer-islocal"].as<bool>();
- return test_echo(rounds, keepalive_ratio)
- .then([] {
+ auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
+
+ entity_addr_t test_addr;
+ ceph_assert(test_addr.parse(
+ config["test-addr"].as<std::string>().c_str(), nullptr));
+ test_addr.set_nonce(TEST_NONCE);
+
+ entity_addr_t cmd_peer_addr;
+ ceph_assert(cmd_peer_addr.parse(
+ config["testpeer-addr"].as<std::string>().c_str(), nullptr));
+ cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
+
+ entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr);
+ bool peer_wins = (test_addr > test_peer_addr);
+
+ logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
+ "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
+ "testpeer_islocal={}, peer_wins={}",
+ verbose, rounds, keepalive_ratio,
+ test_addr, cmd_peer_addr, test_peer_addr,
+ testpeer_islocal, peer_wins);
+ return test_echo(rounds, keepalive_ratio
+ ).then([] {
return test_concurrent_dispatch();
}).then([] {
return test_preemptive_shutdown();
- }).then([v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal] {
- return test_v2_protocol(v2_test_addr, v2_testpeer_addr, v2_testpeer_islocal);
+ }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
+ return test_v2_protocol(
+ test_addr,
+ cmd_peer_addr,
+ test_peer_addr,
+ testpeer_islocal,
+ peer_wins);
}).then([] {
logger().info("All tests succeeded");
// Seastar has bugs to have events undispatched during shutdown,
"number of pingpong rounds")
("keepalive-ratio", bpo::value<double>()->default_value(0.1),
"ratio of keepalive in ping messages")
- ("v2-test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
+ ("test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9014"),
"address of v2 failover tests")
- ("v2-testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9013"),
+ ("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
"addresses of v2 failover testpeer"
- " (CmdSrv address and TestPeer address with port+=1)")
- ("v2-testpeer-islocal", bpo::value<bool>()->default_value(true),
+ " (This is CmdSrv address, and TestPeer address is at port+=1)")
+ ("testpeer-islocal", bpo::value<bool>()->default_value(true),
"create a local crimson testpeer, or connect to a remote testpeer");
return app.run(argc, argv, [&app] {
- // This test normally succeeds within 60 seconds, so kill it after 120
+ // This test normally succeeds within 60 seconds, so kill it after 300
// seconds in case it is blocked forever due to unaddressed bugs.
- return seastar::with_timeout(seastar::lowres_clock::now() + 120s, do_test(app))
+ return seastar::with_timeout(seastar::lowres_clock::now() + 300s, do_test(app))
.handle_exception_type([](seastar::timed_out_error&) {
- logger().error("test_messenger timeout after 120s, abort! "
+ logger().error("test_messenger timeout after 300s, abort! "
"Consider to extend the period if the test is still running.");
// use the retcode of timeout(1)
return 124;