#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
#include "crimson/net/Interceptor.h"
-#include "crimson/net/SocketConnection.h"
#include <map>
#include <random>
#include <seastar/core/app-template.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/future-util.hh>
+#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>
return saddr;
}
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+ // we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [=] {
+ auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+ return sharded_obj->start(args...
+ ).then([sharded_obj] {
+ seastar::engine().at_exit([sharded_obj] {
+ return sharded_obj->stop().then([sharded_obj] {});
+ });
+ return sharded_obj.get();
+ });
+ }).then([](seastar::sharded<T> *ptr_shard) {
+ return &ptr_shard->local();
+ });
+}
+
+class ShardedGates
+ : public seastar::peering_sharded_service<ShardedGates> {
+public:
+ ShardedGates() = default;
+ ~ShardedGates() {
+ assert(gate.is_closed());
+ }
+
+ template <typename Func>
+ void dispatch_in_background(const char *what, Func &&f) {
+ std::ignore = seastar::with_gate(
+ container().local().gate, std::forward<Func>(f)
+ ).handle_exception([what](std::exception_ptr eptr) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ logger().error("ShardedGates::dispatch_in_background: "
+ "{} got exxception {}", what, e.what());
+ }
+ });
+ }
+
+ seastar::future<> close() {
+ return container().invoke_on_all([](auto &local) {
+ return local.gate.close();
+ });
+ }
+
+ static seastar::future<ShardedGates*> create() {
+ return create_sharded<ShardedGates>();
+ }
+
+ // seastar::future<> stop() is intentially not implemented
+
+private:
+ seastar::gate gate;
+};
+
static seastar::future<> test_echo(unsigned rounds,
double keepalive_ratio)
{
struct test_state {
struct Server final
: public crimson::net::Dispatcher {
+ ShardedGates &gates;
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
+ Server(ShardedGates &gates) : gates{gates} {}
+
+ void ms_handle_accept(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ logger().info("server accepted {}", *conn);
+ ceph_assert(prv_shard == seastar::this_shard_id());
+ ceph_assert(!is_replace);
+ }
+
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
if (verbose) {
logger().info("server got {}", *m);
}
// reply with a pong
- std::ignore = c->send(crimson::make_message<MPing>());
+ gates.dispatch_in_background("echo_send_pong", [c] {
+ return c->send(crimson::make_message<MPing>());
+ });
return {seastar::now()};
}
const std::string& lname,
const uint64_t nonce,
const entity_addr_t& addr) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, false);
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
}
};
- struct Client final
- : public crimson::net::Dispatcher {
+ class Client final
+ : public crimson::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+ public:
+ Client(seastar::shard_id primary_sid,
+ unsigned rounds,
+ double keepalive_ratio,
+ ShardedGates *gates)
+ : primary_sid{primary_sid},
+ keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
+ rounds(rounds),
+ gates{*gates} {}
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ assert(seastar::this_shard_id() == primary_sid);
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, false);
+ msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->start({this});
+ }
+
+ seastar::future<> shutdown() {
+ assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(msgr);
+ msgr->stop();
+ return msgr->shutdown();
+ }
+
+ seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
+ assert(seastar::this_shard_id() == primary_sid);
+ mono_time start_time = mono_clock::now();
+ auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ return seastar::futurize_invoke([this, conn] {
+ return do_dispatch_pingpong(conn);
+ }).then([] {
+ // 500ms should be enough to establish the connection
+ return seastar::sleep(500ms);
+ }).then([this, conn, start_time] {
+ return container().invoke_on(
+ conn->get_shard_id(),
+ [pconn=&*conn, start_time](auto &local) {
+ assert(pconn->is_connected());
+ auto session = local.find_session(pconn);
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ *pconn, dur_handshake.count(), dur_pingpong.count());
+ }).then([conn] {});
+ });
+ }
+
+ static seastar::future<Client*> create(
+ unsigned rounds,
+ double keepalive_ratio,
+ ShardedGates *gates) {
+ return create_sharded<Client>(
+ seastar::this_shard_id(),
+ rounds,
+ keepalive_ratio,
+ gates);
+ }
+
+ private:
struct PingSession : public seastar::enable_shared_from_this<PingSession> {
unsigned count = 0u;
mono_time connected_time;
};
using PingSessionRef = seastar::shared_ptr<PingSession>;
- unsigned rounds;
- std::bernoulli_distribution keepalive_dist;
- crimson::net::MessengerRef msgr;
- std::map<crimson::net::ConnectionRef, seastar::promise<>> pending_conns;
- std::map<crimson::net::ConnectionRef, PingSessionRef> sessions;
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- Client(unsigned rounds, double keepalive_ratio)
- : rounds(rounds),
- keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
-
- PingSessionRef find_session(crimson::net::ConnectionRef c) {
- auto found = sessions.find(c);
- if (found == sessions.end()) {
- ceph_assert(false);
- }
- return found->second;
- }
-
- void ms_handle_connect(crimson::net::ConnectionRef conn) override {
+ void ms_handle_connect(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard) override {
+ auto &local = container().local();
+ assert(prv_shard == seastar::this_shard_id());
auto session = seastar::make_shared<PingSession>();
- auto [i, added] = sessions.emplace(conn, session);
+ auto [i, added] = local.sessions.emplace(&*conn, session);
std::ignore = i;
ceph_assert(added);
session->connected_time = mono_clock::now();
}
+
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
- auto session = find_session(c);
+ auto &local = container().local();
+ auto session = local.find_session(&*c);
++(session->count);
if (verbose) {
logger().info("client ms_dispatch {}", session->count);
}
- if (session->count == rounds) {
+ if (session->count > rounds) {
+ logger().error("{}: got {} pongs, more than expected {}", *c, session->count, rounds);
+ ceph_abort();
+ } else if (session->count == rounds) {
logger().info("{}: finished receiving {} pongs", *c, session->count);
session->finish_time = mono_clock::now();
- auto found = pending_conns.find(c);
- ceph_assert(found != pending_conns.end());
- found->second.set_value();
+ gates.dispatch_in_background("echo_notify_done", [c, this] {
+ return container().invoke_on(primary_sid, [pconn=&*c](auto &local) {
+ auto found = local.pending_conns.find(pconn);
+ ceph_assert(found != local.pending_conns.end());
+ found->second.set_value();
+ }).then([c] {});
+ });
}
return {seastar::now()};
}
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
- msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->start({this});
- }
-
- seastar::future<> shutdown() {
- ceph_assert(msgr);
- msgr->stop();
- return msgr->shutdown();
- }
-
- seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
- mono_time start_time = mono_clock::now();
- auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
- return seastar::futurize_invoke([this, conn] {
- return do_dispatch_pingpong(conn);
- }).then([this, conn, start_time] {
- auto session = find_session(conn);
- std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
- std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
- logger().info("{}: handshake {}, pingpong {}",
- *conn, dur_handshake.count(), dur_pingpong.count());
- });
+ PingSessionRef find_session(crimson::net::Connection *c) {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ return found->second;
}
- private:
seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) {
- auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
+ auto [i, added] = pending_conns.emplace(&*conn, seastar::promise<>());
std::ignore = i;
ceph_assert(added);
return seastar::do_with(0u, 0u,
},
[this, conn, &count_ping, &count_keepalive] {
return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
- if (keepalive_dist(rng)) {
- return conn->send_keepalive()
- .then([&count_keepalive] {
- count_keepalive += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- } else {
- return conn->send(crimson::make_message<MPing>())
- .then([&count_ping] {
- count_ping += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- });
- }
- });
+ if (keepalive_dist(rng)) {
+ return conn->send_keepalive(
+ ).then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return conn->send(crimson::make_message<MPing>()
+ ).then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
}).then([this, conn] {
- auto found = pending_conns.find(conn);
+ auto found = pending_conns.find(&*conn);
+ assert(found != pending_conns.end());
return found->second.get_future();
}
);
});
}
+
+ private:
+ // primary shard only
+ const seastar::shard_id primary_sid;
+ std::bernoulli_distribution keepalive_dist;
+ crimson::net::MessengerRef msgr;
+ std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+
+ // per shard
+ const unsigned rounds;
+ std::map<crimson::net::Connection*, PingSessionRef> sessions;
+ ShardedGates &gates;
};
};
logger().info("test_echo(rounds={}, keepalive_ratio={}):",
rounds, keepalive_ratio);
- auto server1 = seastar::make_shared<test_state::Server>();
- auto server2 = seastar::make_shared<test_state::Server>();
- auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
- auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
- // start servers and clients
- auto addr1 = get_server_addr();
- auto addr2 = get_server_addr();
- addr1.set_type(entity_addr_t::TYPE_MSGR2);
- addr2.set_type(entity_addr_t::TYPE_MSGR2);
- return seastar::when_all_succeed(
- server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
- server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
- client1->init(entity_name_t::OSD(2), "client1", 3),
- client2->init(entity_name_t::OSD(3), "client2", 4)
- // dispatch pingpoing
- ).then_unpack([client1, client2, server1, server2] {
+ return ShardedGates::create(
+ ).then([rounds, keepalive_ratio](auto *gates) {
return seastar::when_all_succeed(
+ test_state::Client::create(rounds, keepalive_ratio, gates),
+ test_state::Client::create(rounds, keepalive_ratio, gates),
+ seastar::make_ready_future<ShardedGates*>(gates));
+ }).then_unpack([](auto *client1, auto *client2, auto *gates) {
+ auto server1 = seastar::make_shared<test_state::Server>(*gates);
+ auto server2 = seastar::make_shared<test_state::Server>(*gates);
+ // start servers and clients
+ auto addr1 = get_server_addr();
+ auto addr2 = get_server_addr();
+ addr1.set_type(entity_addr_t::TYPE_MSGR2);
+ addr2.set_type(entity_addr_t::TYPE_MSGR2);
+ return seastar::when_all_succeed(
+ server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+ server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+ client1->init(entity_name_t::OSD(2), "client1", 3),
+ client2->init(entity_name_t::OSD(3), "client2", 4)
+ // dispatch pingpoing
+ ).then_unpack([client1, client2, server1, server2] {
+ return seastar::when_all_succeed(
// test connecting in parallel, accepting in parallel
+ client1->dispatch_pingpong(server1->msgr->get_myaddr()),
client1->dispatch_pingpong(server2->msgr->get_myaddr()),
- client2->dispatch_pingpong(server1->msgr->get_myaddr()));
- // shutdown
- }).then_unpack([] {
- return seastar::now();
- }).then([client1] {
- logger().info("client1 shutdown...");
- return client1->shutdown();
- }).then([client2] {
- logger().info("client2 shutdown...");
- return client2->shutdown();
- }).then([server1] {
- logger().info("server1 shutdown...");
- return server1->shutdown();
- }).then([server2] {
- logger().info("server2 shutdown...");
- return server2->shutdown();
- }).then([] {
- logger().info("test_echo() done!\n");
- }).handle_exception([server1, server2, client1, client2] (auto eptr) {
- logger().error("test_echo() failed: got exception {}", eptr);
- throw;
- });
-}
-
-static seastar::future<> test_concurrent_dispatch()
-{
- struct test_state {
- struct Server final
- : public crimson::net::Dispatcher {
- crimson::net::MessengerRef msgr;
- int count = 0;
- seastar::promise<> on_second; // satisfied on second dispatch
- seastar::promise<> on_done; // satisfied when first dispatch unblocks
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- std::optional<seastar::future<>> ms_dispatch(
- crimson::net::ConnectionRef, MessageRef m) override {
- switch (++count) {
- case 1:
- // block on the first request until we reenter with the second
- std::ignore = on_second.get_future().then([this] { on_done.set_value(); });
- break;
- case 2:
- on_second.set_value();
- break;
- default:
- throw std::runtime_error("unexpected count");
- }
- return {seastar::now()};
- }
-
- seastar::future<> wait() { return on_done.get_future(); }
-
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce,
- const entity_addr_t& addr) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
- msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
- return msgr->start({this});
- }, crimson::net::Messenger::bind_ertr::all_same_way(
- [addr] (const std::error_code& e) {
- logger().error("test_concurrent_dispatch(): "
- "there is another instance running at {}", addr);
- ceph_abort();
- }));
- }
- };
-
- struct Client final
- : public crimson::net::Dispatcher {
- crimson::net::MessengerRef msgr;
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- std::optional<seastar::future<>> ms_dispatch(
- crimson::net::ConnectionRef, MessageRef m) override {
- return {seastar::now()};
- }
-
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
- msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->start({this});
- }
- };
- };
-
- logger().info("test_concurrent_dispatch():");
- auto server = seastar::make_shared<test_state::Server>();
- auto client = seastar::make_shared<test_state::Client>();
- auto addr = get_server_addr();
- addr.set_type(entity_addr_t::TYPE_MSGR2);
- addr.set_family(AF_INET);
- return seastar::when_all_succeed(
- server->init(entity_name_t::OSD(4), "server3", 5, addr),
- client->init(entity_name_t::OSD(5), "client3", 6)
- ).then_unpack([server, client] {
- auto conn = client->msgr->connect(server->msgr->get_myaddr(),
- entity_name_t::TYPE_OSD);
- // send two messages
- return conn->send(crimson::make_message<MPing>()).then([conn] {
- return conn->send(crimson::make_message<MPing>());
+ client2->dispatch_pingpong(server1->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server2->msgr->get_myaddr()));
+ // shutdown
+ }).then_unpack([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).then([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).then([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).then([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
+ }).then([] {
+ logger().info("test_echo() done!\n");
+ }).handle_exception([](auto eptr) {
+ logger().error("test_echo() failed: got exception {}", eptr);
+ throw;
+ }).finally([gates, server1, server2] {
+ return gates->close();
});
- }).then([server] {
- return server->wait();
- }).then([client] {
- logger().info("client shutdown...");
- client->msgr->stop();
- return client->msgr->shutdown();
- }).then([server] {
- logger().info("server shutdown...");
- server->msgr->stop();
- return server->msgr->shutdown();
- }).then([] {
- logger().info("test_concurrent_dispatch() done!\n");
- }).handle_exception([server, client] (auto eptr) {
- logger().error("test_concurrent_dispatch() failed: got exception {}", eptr);
- throw;
});
}
const std::string& lname,
const uint64_t nonce,
const entity_addr_t& addr) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, true);
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
seastar::future<> init(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, true);
msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
using crimson::net::Interceptor;
using crimson::net::Messenger;
using crimson::net::MessengerRef;
-using crimson::net::SocketConnection;
using crimson::net::SocketPolicy;
using crimson::net::tag_bp_t;
using namespace ceph::net::test;
struct TestInterceptor : public Interceptor {
std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
std::map<Breakpoint, counter_t> breakpoints_counter;
- std::map<ConnectionRef, unsigned> conns;
+ std::map<Connection*, unsigned> conns;
ConnResults results;
std::optional<seastar::abort_source> signal;
+ const seastar::shard_id primary_sid;
+
+ TestInterceptor() : primary_sid{seastar::this_shard_id()} {}
- TestInterceptor() = default;
// only used for copy breakpoint configurations
- TestInterceptor(const TestInterceptor& other) {
+ TestInterceptor(const TestInterceptor& other) : primary_sid{other.primary_sid} {
assert(other.breakpoints_counter.empty());
assert(other.conns.empty());
assert(other.results.empty());
breakpoints = other.breakpoints;
assert(!other.signal);
+ assert(seastar::this_shard_id() == primary_sid);
}
void make_fault(Breakpoint bp, unsigned round = 1) {
breakpoints[bp][round] = bp_action_t::STALL;
}
- ConnResult* find_result(ConnectionRef conn) {
+ ConnResult* find_result(Connection *conn) {
+ assert(seastar::this_shard_id() == primary_sid);
auto it = conns.find(conn);
if (it == conns.end()) {
return nullptr;
}
seastar::future<> wait() {
+ assert(seastar::this_shard_id() == primary_sid);
assert(!signal);
signal = seastar::abort_source();
return seastar::sleep_abortable(10s, *signal).then([] {
}
void notify() {
+ assert(seastar::this_shard_id() == primary_sid);
if (signal) {
signal->request_abort();
signal = std::nullopt;
}
private:
- void register_conn(SocketConnection& _conn) override {
- auto conn = _conn.get_local_shared_foreign_from_this();
- auto result = find_result(conn);
+ void register_conn(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
if (result != nullptr) {
logger().error("The connection [{}] {} already exists when register {}",
- result->index, *result->conn, _conn);
+ result->index, *result->conn, *conn);
ceph_abort();
}
unsigned index = results.size();
results.emplace_back(conn, index);
- conns[conn] = index;
+ conns[&*conn] = index;
notify();
- logger().info("[{}] {} new connection registered", index, _conn);
+ logger().info("[{}] {} new connection registered", index, *conn);
}
- void register_conn_closed(SocketConnection& conn) override {
- auto result = find_result(conn.get_local_shared_foreign_from_this());
+ void register_conn_closed(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
if (result == nullptr) {
- logger().error("Untracked closed connection: {}", conn);
+ logger().error("Untracked closed connection: {}", *conn);
ceph_abort();
}
result->state = conn_state_t::closed;
}
notify();
- logger().info("[{}] {} closed({})", result->index, conn, result->state);
+ logger().info("[{}] {} closed({})", result->index, *conn, result->state);
}
- void register_conn_ready(SocketConnection& conn) override {
- auto result = find_result(conn.get_local_shared_foreign_from_this());
+ void register_conn_ready(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
if (result == nullptr) {
- logger().error("Untracked ready connection: {}", conn);
+ logger().error("Untracked ready connection: {}", *conn);
ceph_abort();
}
- ceph_assert(conn.is_connected());
+ ceph_assert(conn->is_protocol_ready());
notify();
- logger().info("[{}] {} ready", result->index, conn);
+ logger().info("[{}] {} ready", result->index, *conn);
}
- void register_conn_replaced(SocketConnection& conn) override {
- auto result = find_result(conn.get_local_shared_foreign_from_this());
+ void register_conn_replaced(ConnectionRef conn) override {
+ auto result = find_result(&*conn);
if (result == nullptr) {
- logger().error("Untracked replaced connection: {}", conn);
+ logger().error("Untracked replaced connection: {}", *conn);
ceph_abort();
}
result->state = conn_state_t::replaced;
- logger().info("[{}] {} {}", result->index, conn, result->state);
+ logger().info("[{}] {} {}", result->index, *conn, result->state);
}
- bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
- ++breakpoints_counter[bp].counter;
+ seastar::future<bp_action_t>
+ intercept(Connection &_conn, std::vector<Breakpoint> bps) override {
+ assert(bps.size() >= 1);
+ Connection *conn = &_conn;
- auto result = find_result(conn.get_local_shared_foreign_from_this());
- if (result == nullptr) {
- logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
- conn, bp, breakpoints_counter[bp].counter);
- ceph_abort();
- }
+ return seastar::smp::submit_to(primary_sid, [conn, bps, this] {
+ std::vector<bp_action_t> actions;
+ for (const Breakpoint &bp : bps) {
+ ++breakpoints_counter[bp].counter;
- if (bp == custom_bp_t::SOCKET_CONNECTING) {
- ++result->connect_attempts;
- logger().info("[Test] connect_attempts={}", result->connect_attempts);
- } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
- ++result->client_connect_attempts;
- logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
- } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
- ++result->client_reconnect_attempts;
- logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
- } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
- ++result->accept_attempts;
- logger().info("[Test] accept_attempts={}", result->accept_attempts);
- } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
- ++result->server_connect_attempts;
- logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
- } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
- ++result->server_reconnect_attempts;
- logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
- }
+ auto result = find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
+ *conn, bp, breakpoints_counter[bp].counter);
+ ceph_abort();
+ }
- auto it_bp = breakpoints.find(bp);
- if (it_bp != breakpoints.end()) {
- auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
- if (it_cnt != it_bp->second.end()) {
- logger().info("[{}] {} intercepted {}({}) => {}",
- result->index, conn, bp,
- breakpoints_counter[bp].counter, it_cnt->second);
- return it_cnt->second;
+ if (bp == custom_bp_t::SOCKET_CONNECTING) {
+ ++result->connect_attempts;
+ logger().info("[Test] connect_attempts={}", result->connect_attempts);
+ } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
+ ++result->client_connect_attempts;
+ logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
+ } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
+ ++result->client_reconnect_attempts;
+ logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
+ } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
+ ++result->accept_attempts;
+ logger().info("[Test] accept_attempts={}", result->accept_attempts);
+ } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
+ ++result->server_connect_attempts;
+ logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
+ } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
+ ++result->server_reconnect_attempts;
+ logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
+ }
+
+ auto it_bp = breakpoints.find(bp);
+ if (it_bp != breakpoints.end()) {
+ auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
+ if (it_cnt != it_bp->second.end()) {
+ logger().info("[{}] {} intercepted {}({}) => {}",
+ result->index, *conn, bp,
+ breakpoints_counter[bp].counter, it_cnt->second);
+ actions.emplace_back(it_cnt->second);
+ continue;
+ }
+ }
+ logger().info("[{}] {} intercepted {}({})",
+ result->index, *conn, bp, breakpoints_counter[bp].counter);
+ actions.emplace_back(bp_action_t::CONTINUE);
}
- }
- logger().info("[{}] {} intercepted {}({})",
- result->index, conn, bp, breakpoints_counter[bp].counter);
- return bp_action_t::CONTINUE;
+
+ bp_action_t action = bp_action_t::CONTINUE;
+ for (bp_action_t &a : actions) {
+ if (a != bp_action_t::CONTINUE) {
+ if (action == bp_action_t::CONTINUE) {
+ action = a;
+ } else {
+ ceph_abort("got multiple incompatible actions");
+ }
+ }
+ }
+ return seastar::make_ready_future<bp_action_t>(action);
+ });
}
};
TestInterceptor interceptor;
unsigned tracked_index = 0;
- ConnectionRef tracked_conn;
+ Connection *tracked_conn = nullptr;
unsigned pending_send = 0;
unsigned pending_peer_receive = 0;
unsigned pending_receive = 0;
- std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
- auto result = interceptor.find_result(c);
- if (result == nullptr) {
- logger().error("Untracked ms dispatched connection: {}", *c);
- ceph_abort();
- }
-
- if (tracked_conn != c) {
- logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
- result->index, *c, tracked_index, *tracked_conn);
- ceph_abort();
- }
- ceph_assert(result->index == tracked_index);
+ ShardedGates &gates;
+ const seastar::shard_id primary_sid;
+ std::optional<seastar::future<>> ms_dispatch(
+ ConnectionRef conn_ref, MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
- ceph_assert(pending_receive > 0);
- --pending_receive;
- if (pending_receive == 0) {
- interceptor.notify();
- }
- logger().info("[Test] got op, left {} ops -- [{}] {}",
- pending_receive, result->index, *c);
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked ms dispatched connection: {}", *conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ }
+
+ ceph_assert(pending_receive > 0);
+ --pending_receive;
+ if (pending_receive == 0) {
+ interceptor.notify();
+ }
+ logger().info("[Test] got op, left {} ops -- [{}] {}",
+ pending_receive, result->index, *conn);
+ }).then([conn_ref] {});
+ });
return {seastar::now()};
}
- void ms_handle_accept(ConnectionRef conn) override {
- auto result = interceptor.find_result(conn);
- if (result == nullptr) {
- logger().error("Untracked accepted connection: {}", *conn);
- ceph_abort();
- }
+ void ms_handle_accept(
+ ConnectionRef conn_ref,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked accepted connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn &&
- !tracked_conn->is_closed() &&
- tracked_conn != conn) {
- logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
- }
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got accepted, but there's already a valid traced_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ ceph_abort();
+ }
- tracked_index = result->index;
- tracked_conn = conn;
- ++result->cnt_accept_dispatched;
- logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
- result->cnt_accept_dispatched, result->index, *conn);
- std::ignore = flush_pending_send();
+ tracked_index = result->index;
+ tracked_conn = &*conn;
+ ++result->cnt_accept_dispatched;
+ logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
+ result->cnt_accept_dispatched, result->index, *conn);
+ return flush_pending_send();
+ }).then([conn_ref] {});
+ });
}
- void ms_handle_connect(ConnectionRef conn) override {
- auto result = interceptor.find_result(conn);
- if (result == nullptr) {
- logger().error("Untracked connected connection: {}", *conn);
- ceph_abort();
- }
+ void ms_handle_connect(
+ ConnectionRef conn_ref,
+ seastar::shard_id prv_shard) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked connected connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn != conn) {
- logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
- }
- ceph_assert(result->index == tracked_index);
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != &*conn) {
+ logger().error("[{}] {} got connected, but there's already a avlid tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ ceph_abort();
+ }
+
+ if (tracked_conn == &*conn) {
+ ceph_assert(result->index == tracked_index);
+ }
- ++result->cnt_connect_dispatched;
- logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
- result->cnt_connect_dispatched, result->index, *conn);
+ ++result->cnt_connect_dispatched;
+ logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
+ result->cnt_connect_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
}
- void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
- auto result = interceptor.find_result(conn);
- if (result == nullptr) {
- logger().error("Untracked reset connection: {}", *conn);
- ceph_abort();
- }
+ void ms_handle_reset(
+ ConnectionRef conn_ref,
+ bool is_replace) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked reset connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn != conn) {
- logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
- }
- ceph_assert(result->index == tracked_index);
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ tracked_index = 0;
+ tracked_conn = nullptr;
+ }
- tracked_index = 0;
- tracked_conn = nullptr;
- ++result->cnt_reset_dispatched;
- logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
- result->cnt_reset_dispatched, result->index, *conn);
+ ++result->cnt_reset_dispatched;
+ logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
+ result->cnt_reset_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
}
- void ms_handle_remote_reset(ConnectionRef conn) override {
- auto result = interceptor.find_result(conn);
- if (result == nullptr) {
- logger().error("Untracked remotely reset connection: {}", *conn);
- ceph_abort();
- }
+ void ms_handle_remote_reset(
+ ConnectionRef conn_ref) override {
+ Connection *conn = &*conn_ref;
+ gates.dispatch_in_background("TestSuite_ms_dispatch",
+ [this, conn, conn_ref] {
+ return seastar::smp::submit_to(primary_sid, [this, conn] {
+ auto result = interceptor.find_result(&*conn);
+ if (result == nullptr) {
+ logger().error("Untracked remotely reset connection: {}", *conn);
+ ceph_abort();
+ }
- if (tracked_conn != conn) {
- logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
- result->index, *conn, tracked_index, *tracked_conn);
- ceph_abort();
- }
- ceph_assert(result->index == tracked_index);
+ if (tracked_conn != &*conn) {
+ logger().warn("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
+ result->index, *conn, tracked_index, *tracked_conn);
+ } else {
+ ceph_assert(result->index == tracked_index);
+ }
- ++result->cnt_remote_reset_dispatched;
- logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
- result->cnt_remote_reset_dispatched, result->index, *conn);
+ ++result->cnt_remote_reset_dispatched;
+ logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
+ result->cnt_remote_reset_dispatched, result->index, *conn);
+ }).then([conn_ref] {});
+ });
}
private:
seastar::future<> send_op(bool expect_reply=true) {
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
if (expect_reply) {
++pending_peer_receive;
}
logger().info("[Test] flush sending {} ops", pending_send);
}
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
return seastar::do_until(
[this] { return pending_send == 0; },
[this] {
seastar::future<> wait_ready(unsigned num_ready_conns,
unsigned num_replaced,
bool wait_received) {
+ assert(seastar::this_shard_id() == primary_sid);
unsigned pending_conns = 0;
unsigned pending_establish = 0;
unsigned replaced_conns = 0;
for (auto& result : interceptor.results) {
- if (result.conn->is_closed_clean()) {
+ if (result.conn->is_protocol_closed_clean()) {
if (result.state == conn_state_t::replaced) {
++replaced_conns;
}
- } else if (result.conn->is_connected()) {
- if (tracked_conn != result.conn || tracked_index != result.index) {
- throw std::runtime_error(fmt::format(
- "The connected connection [{}] {} doesn't"
- " match the tracked connection [{}] {}",
- result.index, *result.conn, tracked_index, *tracked_conn));
- }
+ } else if (result.conn->is_protocol_ready()) {
if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
result.state = conn_state_t::established;
} else {
do_wait = true;
}
}
- if (wait_received &&
- (pending_send || pending_peer_receive || pending_receive)) {
- if (pending_conns || pending_establish) {
- logger().info("[Test] wait_ready(): wait for pending_send={},"
- " pending_peer_receive={}, pending_receive={},"
- " pending {}/{} ready/establish connections ...",
- pending_send, pending_peer_receive, pending_receive,
- pending_conns, pending_establish);
- do_wait = true;
+ if (wait_received) {
+ if (pending_send || pending_peer_receive || pending_receive) {
+ if (pending_conns || pending_establish) {
+ logger().info("[Test] wait_ready(): wait for pending_send={},"
+ " pending_peer_receive={}, pending_receive={},"
+ " pending {}/{} ready/establish connections ...",
+ pending_send, pending_peer_receive, pending_receive,
+ pending_conns, pending_establish);
+ do_wait = true;
+ } else {
+ // If there are pending messages, stop waiting if there are
+ // no longer pending connections.
+ }
+ } else {
+ // Stop waiting if there are no pending messages. Pending connections
+ // should not be important.
}
}
if (num_replaced > 0) {
public:
FailoverSuite(MessengerRef test_msgr,
entity_addr_t test_peer_addr,
- const TestInterceptor& interceptor)
+ const TestInterceptor& interceptor,
+ ShardedGates &gates)
: test_msgr(test_msgr),
test_peer_addr(test_peer_addr),
- interceptor(interceptor) { }
+ interceptor(interceptor),
+ gates{gates},
+ primary_sid{seastar::this_shard_id()} { }
entity_addr_t get_addr() const {
return test_msgr->get_myaddr();
create(entity_addr_t test_addr,
SocketPolicy test_policy,
entity_addr_t test_peer_addr,
- const TestInterceptor& interceptor) {
+ const TestInterceptor& interceptor,
+ ShardedGates &gates) {
auto suite = std::make_unique<FailoverSuite>(
- Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE),
- test_peer_addr, interceptor);
+ Messenger::create(
+ entity_name_t::OSD(TEST_OSD),
+ "Test",
+ TEST_NONCE,
+ false),
+ test_peer_addr,
+ interceptor,
+ gates);
return suite->init(test_addr, test_policy
).then([suite = std::move(suite)] () mutable {
return std::move(suite);
public:
seastar::future<> connect_peer() {
logger().info("[Test] connect_peer({})", test_peer_addr);
+ assert(seastar::this_shard_id() == primary_sid);
auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
- auto result = interceptor.find_result(conn);
+ auto result = interceptor.find_result(&*conn);
ceph_assert(result != nullptr);
if (tracked_conn) {
- if (tracked_conn->is_closed()) {
- ceph_assert(tracked_conn != conn);
- logger().info("[Test] this is a new session replacing an closed one");
+ if (tracked_conn->is_protocol_closed()) {
+ logger().info("[Test] this is a new session"
+ " replacing an closed one");
+ ceph_assert(tracked_conn != &*conn);
} else {
- ceph_assert(tracked_index == result->index);
- ceph_assert(tracked_conn == conn);
logger().info("[Test] this is not a new session");
+ ceph_assert(tracked_index == result->index);
+ ceph_assert(tracked_conn == &*conn);
}
} else {
logger().info("[Test] this is a new session");
}
tracked_index = result->index;
- tracked_conn = conn;
+ tracked_conn = &*conn;
return flush_pending_send();
}
seastar::future<> send_peer() {
+ assert(seastar::this_shard_id() == primary_sid);
if (tracked_conn) {
logger().info("[Test] send_peer()");
+ ceph_assert(!tracked_conn->is_protocol_closed());
ceph_assert(!pending_send);
return send_op();
} else {
seastar::future<> keepalive_peer() {
logger().info("[Test] keepalive_peer()");
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
return tracked_conn->send_keepalive();
}
seastar::future<> try_send_peer() {
logger().info("[Test] try_send_peer()");
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
+ ceph_assert(!tracked_conn->is_protocol_closed());
return send_op(false);
}
seastar::future<> markdown() {
logger().info("[Test] markdown() in 100ms ...");
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
// sleep to propagate potential remaining acks
- return seastar::sleep(100ms
+ return seastar::sleep(50ms
).then([this] {
- tracked_conn->mark_down();
+ return seastar::smp::submit_to(
+ tracked_conn->get_shard_id(), [tracked_conn=tracked_conn] {
+ assert(tracked_conn->get_shard_id() == seastar::this_shard_id());
+ tracked_conn->mark_down();
+ });
+ }).then([] {
+ // sleep to wait for markdown propagate to the primary sid
+ return seastar::sleep(100ms);
});
}
seastar::future<> wait_blocked() {
logger().info("[Test] wait_blocked() ...");
+ assert(seastar::this_shard_id() == primary_sid);
return interceptor.blocker.wait_blocked();
}
void unblock() {
logger().info("[Test] unblock()");
+ assert(seastar::this_shard_id() == primary_sid);
return interceptor.blocker.unblock();
}
}
bool is_standby() {
+ assert(seastar::this_shard_id() == primary_sid);
ceph_assert(tracked_conn);
- return !(tracked_conn->is_connected() || tracked_conn->is_closed());
+ return tracked_conn->is_protocol_standby();
}
};
entity_addr_t cmd_peer_addr,
entity_addr_t test_peer_addr) {
auto test = seastar::make_lw_shared<FailoverTest>(
- Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE),
+ Messenger::create(
+ entity_name_t::OSD(CMD_CLI_OSD),
+ "CmdCli",
+ CMD_CLI_NONCE,
+ true),
test_addr, test_peer_addr);
return test->init(cmd_peer_addr).then([test] {
logger().info("CmdCli ready");
logger().info("\n\n[{}]", name);
ceph_assert(!test_suite);
SocketPolicy test_policy_ = to_socket_policy(test_policy);
- return FailoverSuite::create(
- test_addr, test_policy_, test_peer_addr, interceptor
- ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
- ceph_assert(suite->get_addr() == test_addr);
- test_suite.swap(suite);
- return start_peer(peer_policy).then([this, f = std::move(f)] {
- return f(*test_suite);
- }).then([this] {
- test_suite->post_check();
- logger().info("\n[SUCCESS]");
- }).handle_exception([this] (auto eptr) {
- logger().info("\n[FAIL: {}]", eptr);
- test_suite->dump_results();
- throw;
- }).then([this] {
- return stop_peer();
- }).then([this] {
- return test_suite->shutdown().then([this] {
- test_suite.reset();
+ return ShardedGates::create(
+ ).then([this, test_policy_, peer_policy, interceptor,
+ f=std::move(f)](auto *gates) mutable {
+ return FailoverSuite::create(
+ test_addr, test_policy_, test_peer_addr, interceptor, *gates
+ ).then([this, peer_policy, f = std::move(f)](auto suite) mutable {
+ ceph_assert(suite->get_addr() == test_addr);
+ test_suite.swap(suite);
+ return start_peer(peer_policy
+ ).then([this, f = std::move(f)] {
+ return f(*test_suite);
+ }).then([this] {
+ test_suite->post_check();
+ logger().info("\n[SUCCESS]");
+ }).handle_exception([this](auto eptr) {
+ logger().info("\n[FAIL: {}]", eptr);
+ test_suite->dump_results();
+ throw;
+ }).then([this] {
+ return stop_peer();
+ }).then([this] {
+ return test_suite->shutdown(
+ ).then([this] {
+ test_suite.reset();
+ });
});
+ }).then([gates] {
+ return gates->close();
});
});
}
ConnectionRef tracked_conn;
unsigned pending_send = 0;
- std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef conn, MessageRef m) override {
logger().info("[TestPeer] got op from Test");
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
- ceph_assert(tracked_conn == c);
std::ignore = op_callback();
return {seastar::now()};
}
- void ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ assert(prv_shard == seastar::this_shard_id());
logger().info("[TestPeer] got accept from Test");
- ceph_assert(!tracked_conn ||
- tracked_conn->is_closed() ||
- tracked_conn == conn);
+
+ if (tracked_conn &&
+ !tracked_conn->is_protocol_closed() &&
+ tracked_conn != conn) {
+ logger().error("[TestPeer] {} got accepted, but there's already a valid traced_conn {}",
+ *conn, *tracked_conn);
+ }
tracked_conn = conn;
std::ignore = flush_pending_send();
}
void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
logger().info("[TestPeer] got reset from Test");
- ceph_assert(tracked_conn == conn);
- tracked_conn = nullptr;
}
private:
seastar::future<> send_op() {
ceph_assert(tracked_conn);
+ if (tracked_conn->is_protocol_closed()) {
+ logger().error("[TestPeer] send op but the connection is closed -- {}",
+ *tracked_conn);
+ }
+
pg_t pgid;
object_locator_t oloc;
hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
public:
FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
- : peer_msgr(peer_msgr), op_callback(op_callback) { }
+ : peer_msgr(peer_msgr),
+ op_callback(op_callback) { }
seastar::future<> shutdown() {
peer_msgr->stop();
seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
- auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+ auto conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
+
if (tracked_conn) {
- if (tracked_conn->is_closed()) {
- ceph_assert(tracked_conn != new_tracked_conn);
+ if (tracked_conn->is_protocol_closed()) {
logger().info("[TestPeer] this is a new session"
" replacing an closed one");
+ ceph_assert(tracked_conn != conn);
} else {
- ceph_assert(tracked_conn == new_tracked_conn);
logger().info("[TestPeer] this is not a new session");
+ ceph_assert(tracked_conn == conn);
}
} else {
logger().info("[TestPeer] this is a new session");
}
- tracked_conn = new_tracked_conn;
+ tracked_conn = conn;
+
return flush_pending_send();
}
seastar::future<> send_peer() {
if (tracked_conn) {
logger().info("[TestPeer] send_peer()");
+ ceph_assert(!pending_send);
return send_op();
} else {
++pending_send;
Messenger::create(
entity_name_t::OSD(TEST_PEER_OSD),
"TestPeer",
- TEST_PEER_NONCE),
+ TEST_PEER_NONCE,
+ true),
op_callback
);
return suite->init(test_peer_addr, policy
return {seastar::now()};
}
- void ms_handle_accept(ConnectionRef conn) override {
+ void ms_handle_accept(
+ ConnectionRef conn,
+ seastar::shard_id prv_shard,
+ bool is_replace) override {
+ assert(prv_shard == seastar::this_shard_id());
cmd_conn = conn;
}
static seastar::future<std::unique_ptr<FailoverTestPeer>>
create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
auto test_peer = std::make_unique<FailoverTestPeer>(
- Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE),
+ Messenger::create(
+ entity_name_t::OSD(CMD_SRV_OSD),
+ "CmdSrv",
+ CMD_SRV_NONCE,
+ true),
test_peer_addr);
return test_peer->init(cmd_peer_addr
).then([test_peer = std::move(test_peer)] () mutable {
CEPH_ENTITY_TYPE_CLIENT,
&cluster,
&conf_file_list);
- return crimson::common::sharded_conf().start(init_params.name, cluster)
- .then([conf_file_list] {
+ return crimson::common::sharded_conf().start(
+ init_params.name, cluster
+ ).then([] {
+ return local_conf().start();
+ }).then([conf_file_list] {
return local_conf().parse_config_files(conf_file_list);
}).then([&app] {
auto&& config = app.configuration();
logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
"test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
- "testpeer_islocal={}, peer_wins={}",
+ "testpeer_islocal={}, peer_wins={}, smp={}",
verbose, rounds, keepalive_ratio,
test_addr, cmd_peer_addr, test_peer_addr,
- testpeer_islocal, peer_wins);
+ testpeer_islocal, peer_wins,
+ seastar::smp::count);
return test_echo(rounds, keepalive_ratio
).then([] {
- return test_concurrent_dispatch();
- }).then([] {
return test_preemptive_shutdown();
}).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
return test_v2_protocol(