1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/ceph_argparse.h"
5 #include "common/ceph_time.h"
6 #include "messages/MPing.h"
7 #include "messages/MCommand.h"
8 #include "messages/MCommandReply.h"
9 #include "messages/MOSDOp.h"
10 #include "messages/MOSDOpReply.h"
11 #include "crimson/auth/DummyAuth.h"
12 #include "crimson/common/log.h"
13 #include "crimson/net/Connection.h"
14 #include "crimson/net/Dispatcher.h"
15 #include "crimson/net/Messenger.h"
16 #include "crimson/net/Interceptor.h"
17 #include "crimson/net/SocketConnection.h"
21 #include <boost/program_options.hpp>
22 #include <fmt/format.h>
23 #include <fmt/ostream.h>
24 #include <seastar/core/app-template.hh>
25 #include <seastar/core/do_with.hh>
26 #include <seastar/core/future-util.hh>
27 #include <seastar/core/reactor.hh>
28 #include <seastar/core/sleep.hh>
29 #include <seastar/core/with_timeout.hh>
31 #include "test_messenger.h"
33 using namespace std::chrono_literals
;
34 namespace bpo
= boost::program_options
;
35 using crimson::common::local_conf
;
39 seastar::logger
& logger() {
40 return crimson::get_logger(ceph_subsys_test
);
43 static std::random_device rd
;
44 static std::default_random_engine rng
{rd()};
45 static bool verbose
= false;
47 static entity_addr_t
get_server_addr() {
48 static int port
= 9030;
51 saddr
.parse("127.0.0.1", nullptr);
56 static seastar::future
<> test_echo(unsigned rounds
,
57 double keepalive_ratio
)
61 : public crimson::net::Dispatcher
{
62 crimson::net::MessengerRef msgr
;
63 crimson::auth::DummyAuthClientServer dummy_auth
;
65 std::optional
<seastar::future
<>> ms_dispatch(
66 crimson::net::ConnectionRef c
, MessageRef m
) override
{
68 logger().info("server got {}", *m
);
71 std::ignore
= c
->send(crimson::make_message
<MPing
>());
72 return {seastar::now()};
75 seastar::future
<> init(const entity_name_t
& name
,
76 const std::string
& lname
,
78 const entity_addr_t
& addr
) {
79 msgr
= crimson::net::Messenger::create(name
, lname
, nonce
);
80 msgr
->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
81 msgr
->set_auth_client(&dummy_auth
);
82 msgr
->set_auth_server(&dummy_auth
);
83 return msgr
->bind(entity_addrvec_t
{addr
}).safe_then([this] {
84 return msgr
->start({this});
85 }, crimson::net::Messenger::bind_ertr::all_same_way(
86 [addr
] (const std::error_code
& e
) {
87 logger().error("test_echo(): "
88 "there is another instance running at {}", addr
);
92 seastar::future
<> shutdown() {
95 return msgr
->shutdown();
100 : public crimson::net::Dispatcher
{
101 struct PingSession
: public seastar::enable_shared_from_this
<PingSession
> {
103 mono_time connected_time
;
104 mono_time finish_time
;
106 using PingSessionRef
= seastar::shared_ptr
<PingSession
>;
109 std::bernoulli_distribution keepalive_dist
;
110 crimson::net::MessengerRef msgr
;
111 std::map
<crimson::net::ConnectionRef
, seastar::promise
<>> pending_conns
;
112 std::map
<crimson::net::ConnectionRef
, PingSessionRef
> sessions
;
113 crimson::auth::DummyAuthClientServer dummy_auth
;
115 Client(unsigned rounds
, double keepalive_ratio
)
117 keepalive_dist(std::bernoulli_distribution
{keepalive_ratio
}) {}
119 PingSessionRef
find_session(crimson::net::ConnectionRef c
) {
120 auto found
= sessions
.find(c
);
121 if (found
== sessions
.end()) {
124 return found
->second
;
127 void ms_handle_connect(crimson::net::ConnectionRef conn
) override
{
128 auto session
= seastar::make_shared
<PingSession
>();
129 auto [i
, added
] = sessions
.emplace(conn
, session
);
132 session
->connected_time
= mono_clock::now();
134 std::optional
<seastar::future
<>> ms_dispatch(
135 crimson::net::ConnectionRef c
, MessageRef m
) override
{
136 auto session
= find_session(c
);
139 logger().info("client ms_dispatch {}", session
->count
);
142 if (session
->count
== rounds
) {
143 logger().info("{}: finished receiving {} pongs", *c
, session
->count
);
144 session
->finish_time
= mono_clock::now();
145 auto found
= pending_conns
.find(c
);
146 ceph_assert(found
!= pending_conns
.end());
147 found
->second
.set_value();
149 return {seastar::now()};
152 seastar::future
<> init(const entity_name_t
& name
,
153 const std::string
& lname
,
154 const uint64_t nonce
) {
155 msgr
= crimson::net::Messenger::create(name
, lname
, nonce
);
156 msgr
->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
157 msgr
->set_auth_client(&dummy_auth
);
158 msgr
->set_auth_server(&dummy_auth
);
159 return msgr
->start({this});
162 seastar::future
<> shutdown() {
165 return msgr
->shutdown();
168 seastar::future
<> dispatch_pingpong(const entity_addr_t
& peer_addr
) {
169 mono_time start_time
= mono_clock::now();
170 auto conn
= msgr
->connect(peer_addr
, entity_name_t::TYPE_OSD
);
171 return seastar::futurize_invoke([this, conn
] {
172 return do_dispatch_pingpong(conn
);
173 }).then([this, conn
, start_time
] {
174 auto session
= find_session(conn
);
175 std::chrono::duration
<double> dur_handshake
= session
->connected_time
- start_time
;
176 std::chrono::duration
<double> dur_pingpong
= session
->finish_time
- session
->connected_time
;
177 logger().info("{}: handshake {}, pingpong {}",
178 *conn
, dur_handshake
.count(), dur_pingpong
.count());
183 seastar::future
<> do_dispatch_pingpong(crimson::net::ConnectionRef conn
) {
184 auto [i
, added
] = pending_conns
.emplace(conn
, seastar::promise
<>());
187 return seastar::do_with(0u, 0u,
188 [this, conn
](auto &count_ping
, auto &count_keepalive
) {
189 return seastar::do_until(
190 [this, conn
, &count_ping
, &count_keepalive
] {
191 bool stop
= (count_ping
== rounds
);
193 logger().info("{}: finished sending {} pings with {} keepalives",
194 *conn
, count_ping
, count_keepalive
);
198 [this, conn
, &count_ping
, &count_keepalive
] {
199 return seastar::repeat([this, conn
, &count_ping
, &count_keepalive
] {
200 if (keepalive_dist(rng
)) {
201 return conn
->send_keepalive()
202 .then([&count_keepalive
] {
203 count_keepalive
+= 1;
204 return seastar::make_ready_future
<seastar::stop_iteration
>(
205 seastar::stop_iteration::no
);
208 return conn
->send(crimson::make_message
<MPing
>())
209 .then([&count_ping
] {
211 return seastar::make_ready_future
<seastar::stop_iteration
>(
212 seastar::stop_iteration::yes
);
216 }).then([this, conn
] {
217 auto found
= pending_conns
.find(conn
);
218 return found
->second
.get_future();
226 logger().info("test_echo(rounds={}, keepalive_ratio={}):",
227 rounds
, keepalive_ratio
);
228 auto server1
= seastar::make_shared
<test_state::Server
>();
229 auto server2
= seastar::make_shared
<test_state::Server
>();
230 auto client1
= seastar::make_shared
<test_state::Client
>(rounds
, keepalive_ratio
);
231 auto client2
= seastar::make_shared
<test_state::Client
>(rounds
, keepalive_ratio
);
232 // start servers and clients
233 auto addr1
= get_server_addr();
234 auto addr2
= get_server_addr();
235 addr1
.set_type(entity_addr_t::TYPE_MSGR2
);
236 addr2
.set_type(entity_addr_t::TYPE_MSGR2
);
237 return seastar::when_all_succeed(
238 server1
->init(entity_name_t::OSD(0), "server1", 1, addr1
),
239 server2
->init(entity_name_t::OSD(1), "server2", 2, addr2
),
240 client1
->init(entity_name_t::OSD(2), "client1", 3),
241 client2
->init(entity_name_t::OSD(3), "client2", 4)
242 // dispatch pingpoing
243 ).then_unpack([client1
, client2
, server1
, server2
] {
244 return seastar::when_all_succeed(
245 // test connecting in parallel, accepting in parallel
246 client1
->dispatch_pingpong(server2
->msgr
->get_myaddr()),
247 client2
->dispatch_pingpong(server1
->msgr
->get_myaddr()));
250 return seastar::now();
252 logger().info("client1 shutdown...");
253 return client1
->shutdown();
255 logger().info("client2 shutdown...");
256 return client2
->shutdown();
258 logger().info("server1 shutdown...");
259 return server1
->shutdown();
261 logger().info("server2 shutdown...");
262 return server2
->shutdown();
264 logger().info("test_echo() done!\n");
265 }).handle_exception([server1
, server2
, client1
, client2
] (auto eptr
) {
266 logger().error("test_echo() failed: got exception {}", eptr
);
271 static seastar::future
<> test_concurrent_dispatch()
275 : public crimson::net::Dispatcher
{
276 crimson::net::MessengerRef msgr
;
278 seastar::promise
<> on_second
; // satisfied on second dispatch
279 seastar::promise
<> on_done
; // satisfied when first dispatch unblocks
280 crimson::auth::DummyAuthClientServer dummy_auth
;
282 std::optional
<seastar::future
<>> ms_dispatch(
283 crimson::net::ConnectionRef
, MessageRef m
) override
{
286 // block on the first request until we reenter with the second
287 std::ignore
= on_second
.get_future().then([this] { on_done
.set_value(); });
290 on_second
.set_value();
293 throw std::runtime_error("unexpected count");
295 return {seastar::now()};
298 seastar::future
<> wait() { return on_done
.get_future(); }
300 seastar::future
<> init(const entity_name_t
& name
,
301 const std::string
& lname
,
302 const uint64_t nonce
,
303 const entity_addr_t
& addr
) {
304 msgr
= crimson::net::Messenger::create(name
, lname
, nonce
);
305 msgr
->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
306 msgr
->set_auth_client(&dummy_auth
);
307 msgr
->set_auth_server(&dummy_auth
);
308 return msgr
->bind(entity_addrvec_t
{addr
}).safe_then([this] {
309 return msgr
->start({this});
310 }, crimson::net::Messenger::bind_ertr::all_same_way(
311 [addr
] (const std::error_code
& e
) {
312 logger().error("test_concurrent_dispatch(): "
313 "there is another instance running at {}", addr
);
320 : public crimson::net::Dispatcher
{
321 crimson::net::MessengerRef msgr
;
322 crimson::auth::DummyAuthClientServer dummy_auth
;
324 std::optional
<seastar::future
<>> ms_dispatch(
325 crimson::net::ConnectionRef
, MessageRef m
) override
{
326 return {seastar::now()};
329 seastar::future
<> init(const entity_name_t
& name
,
330 const std::string
& lname
,
331 const uint64_t nonce
) {
332 msgr
= crimson::net::Messenger::create(name
, lname
, nonce
);
333 msgr
->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
334 msgr
->set_auth_client(&dummy_auth
);
335 msgr
->set_auth_server(&dummy_auth
);
336 return msgr
->start({this});
341 logger().info("test_concurrent_dispatch():");
342 auto server
= seastar::make_shared
<test_state::Server
>();
343 auto client
= seastar::make_shared
<test_state::Client
>();
344 auto addr
= get_server_addr();
345 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
346 addr
.set_family(AF_INET
);
347 return seastar::when_all_succeed(
348 server
->init(entity_name_t::OSD(4), "server3", 5, addr
),
349 client
->init(entity_name_t::OSD(5), "client3", 6)
350 ).then_unpack([server
, client
] {
351 auto conn
= client
->msgr
->connect(server
->msgr
->get_myaddr(),
352 entity_name_t::TYPE_OSD
);
354 return conn
->send(crimson::make_message
<MPing
>()).then([conn
] {
355 return conn
->send(crimson::make_message
<MPing
>());
358 return server
->wait();
360 logger().info("client shutdown...");
361 client
->msgr
->stop();
362 return client
->msgr
->shutdown();
364 logger().info("server shutdown...");
365 server
->msgr
->stop();
366 return server
->msgr
->shutdown();
368 logger().info("test_concurrent_dispatch() done!\n");
369 }).handle_exception([server
, client
] (auto eptr
) {
370 logger().error("test_concurrent_dispatch() failed: got exception {}", eptr
);
375 seastar::future
<> test_preemptive_shutdown() {
378 : public crimson::net::Dispatcher
{
379 crimson::net::MessengerRef msgr
;
380 crimson::auth::DummyAuthClientServer dummy_auth
;
382 std::optional
<seastar::future
<>> ms_dispatch(
383 crimson::net::ConnectionRef c
, MessageRef m
) override
{
384 std::ignore
= c
->send(crimson::make_message
<MPing
>());
385 return {seastar::now()};
389 seastar::future
<> init(const entity_name_t
& name
,
390 const std::string
& lname
,
391 const uint64_t nonce
,
392 const entity_addr_t
& addr
) {
393 msgr
= crimson::net::Messenger::create(name
, lname
, nonce
);
394 msgr
->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
395 msgr
->set_auth_client(&dummy_auth
);
396 msgr
->set_auth_server(&dummy_auth
);
397 return msgr
->bind(entity_addrvec_t
{addr
}).safe_then([this] {
398 return msgr
->start({this});
399 }, crimson::net::Messenger::bind_ertr::all_same_way(
400 [addr
] (const std::error_code
& e
) {
401 logger().error("test_preemptive_shutdown(): "
402 "there is another instance running at {}", addr
);
406 entity_addr_t
get_addr() const {
407 return msgr
->get_myaddr();
409 seastar::future
<> shutdown() {
411 return msgr
->shutdown();
416 : public crimson::net::Dispatcher
{
417 crimson::net::MessengerRef msgr
;
418 crimson::auth::DummyAuthClientServer dummy_auth
;
420 bool stop_send
= false;
421 seastar::promise
<> stopped_send_promise
;
423 std::optional
<seastar::future
<>> ms_dispatch(
424 crimson::net::ConnectionRef
, MessageRef m
) override
{
425 return {seastar::now()};
429 seastar::future
<> init(const entity_name_t
& name
,
430 const std::string
& lname
,
431 const uint64_t nonce
) {
432 msgr
= crimson::net::Messenger::create(name
, lname
, nonce
);
433 msgr
->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
434 msgr
->set_auth_client(&dummy_auth
);
435 msgr
->set_auth_server(&dummy_auth
);
436 return msgr
->start({this});
438 void send_pings(const entity_addr_t
& addr
) {
439 auto conn
= msgr
->connect(addr
, entity_name_t::TYPE_OSD
);
440 // forwarded to stopped_send_promise
441 (void) seastar::do_until(
442 [this] { return stop_send
; },
444 return conn
->send(crimson::make_message
<MPing
>()).then([] {
445 return seastar::sleep(0ms
);
448 ).then_wrapped([this, conn
] (auto fut
) {
449 fut
.forward_to(std::move(stopped_send_promise
));
452 seastar::future
<> shutdown() {
454 return msgr
->shutdown().then([this] {
456 return stopped_send_promise
.get_future();
462 logger().info("test_preemptive_shutdown():");
463 auto server
= seastar::make_shared
<test_state::Server
>();
464 auto client
= seastar::make_shared
<test_state::Client
>();
465 auto addr
= get_server_addr();
466 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
467 addr
.set_family(AF_INET
);
468 return seastar::when_all_succeed(
469 server
->init(entity_name_t::OSD(6), "server4", 7, addr
),
470 client
->init(entity_name_t::OSD(7), "client4", 8)
471 ).then_unpack([server
, client
] {
472 client
->send_pings(server
->get_addr());
473 return seastar::sleep(100ms
);
475 logger().info("client shutdown...");
476 return client
->shutdown();
478 logger().info("server shutdown...");
479 return server
->shutdown();
481 logger().info("test_preemptive_shutdown() done!\n");
482 }).handle_exception([server
, client
] (auto eptr
) {
483 logger().error("test_preemptive_shutdown() failed: got exception {}", eptr
);
488 using ceph::msgr::v2::Tag
;
489 using crimson::net::bp_action_t
;
490 using crimson::net::bp_type_t
;
491 using crimson::net::Breakpoint
;
492 using crimson::net::Connection
;
493 using crimson::net::ConnectionRef
;
494 using crimson::net::custom_bp_t
;
495 using crimson::net::Dispatcher
;
496 using crimson::net::Interceptor
;
497 using crimson::net::Messenger
;
498 using crimson::net::MessengerRef
;
499 using crimson::net::SocketConnection
;
500 using crimson::net::SocketPolicy
;
501 using crimson::net::tag_bp_t
;
502 using namespace ceph::net::test
;
504 struct counter_t
{ unsigned counter
= 0; };
506 enum class conn_state_t
{
513 std::ostream
& operator<<(std::ostream
& out
, const conn_state_t
& state
) {
515 case conn_state_t::unknown
:
516 return out
<< "unknown";
517 case conn_state_t::established
:
518 return out
<< "established";
519 case conn_state_t::closed
:
520 return out
<< "closed";
521 case conn_state_t::replaced
:
522 return out
<< "replaced";
528 } // anonymous namespace
530 #if FMT_VERSION >= 90000
532 struct fmt::formatter
<conn_state_t
> : fmt::ostream_formatter
{};
540 conn_state_t state
= conn_state_t::unknown
;
542 unsigned connect_attempts
= 0;
543 unsigned client_connect_attempts
= 0;
544 unsigned client_reconnect_attempts
= 0;
545 unsigned cnt_connect_dispatched
= 0;
547 unsigned accept_attempts
= 0;
548 unsigned server_connect_attempts
= 0;
549 unsigned server_reconnect_attempts
= 0;
550 unsigned cnt_accept_dispatched
= 0;
552 unsigned cnt_reset_dispatched
= 0;
553 unsigned cnt_remote_reset_dispatched
= 0;
555 ConnResult(ConnectionRef conn
, unsigned index
)
556 : conn(conn
), index(index
) {}
558 template <typename T
>
559 void _assert_eq(const char* expr_actual
, T actual
,
560 const char* expr_expected
, T expected
) const {
561 if (actual
!= expected
) {
562 throw std::runtime_error(fmt::format(
563 "[{}] {} '{}' is actually {}, not the expected '{}' {}",
564 index
, *conn
, expr_actual
, actual
, expr_expected
, expected
));
568 #define ASSERT_EQUAL(actual, expected) \
569 _assert_eq(#actual, actual, #expected, expected)
571 void assert_state_at(conn_state_t expected
) const {
572 ASSERT_EQUAL(state
, expected
);
575 void assert_connect(unsigned attempts
,
578 unsigned dispatched
) const {
579 ASSERT_EQUAL(connect_attempts
, attempts
);
580 ASSERT_EQUAL(client_connect_attempts
, connects
);
581 ASSERT_EQUAL(client_reconnect_attempts
, reconnects
);
582 ASSERT_EQUAL(cnt_connect_dispatched
, dispatched
);
585 void assert_connect(unsigned attempts
,
586 unsigned dispatched
) const {
587 ASSERT_EQUAL(connect_attempts
, attempts
);
588 ASSERT_EQUAL(cnt_connect_dispatched
, dispatched
);
591 void assert_accept(unsigned attempts
,
594 unsigned dispatched
) const {
595 ASSERT_EQUAL(accept_attempts
, attempts
);
596 ASSERT_EQUAL(server_connect_attempts
, accepts
);
597 ASSERT_EQUAL(server_reconnect_attempts
, reaccepts
);
598 ASSERT_EQUAL(cnt_accept_dispatched
, dispatched
);
601 void assert_accept(unsigned attempts
,
602 unsigned dispatched
) const {
603 ASSERT_EQUAL(accept_attempts
, attempts
);
604 ASSERT_EQUAL(cnt_accept_dispatched
, dispatched
);
607 void assert_reset(unsigned local
, unsigned remote
) const {
608 ASSERT_EQUAL(cnt_reset_dispatched
, local
);
609 ASSERT_EQUAL(cnt_remote_reset_dispatched
, remote
);
613 logger().info("\nResult({}):\n"
616 " connect_attempts: {}\n"
617 " client_connect_attempts: {}\n"
618 " client_reconnect_attempts: {}\n"
619 " cnt_connect_dispatched: {}\n"
620 " accept_attempts: {}\n"
621 " server_connect_attempts: {}\n"
622 " server_reconnect_attempts: {}\n"
623 " cnt_accept_dispatched: {}\n"
624 " cnt_reset_dispatched: {}\n"
625 " cnt_remote_reset_dispatched: {}\n",
626 static_cast<const void*>(this),
630 client_connect_attempts
,
631 client_reconnect_attempts
,
632 cnt_connect_dispatched
,
634 server_connect_attempts
,
635 server_reconnect_attempts
,
636 cnt_accept_dispatched
,
637 cnt_reset_dispatched
,
638 cnt_remote_reset_dispatched
);
641 using ConnResults
= std::vector
<ConnResult
>;
643 struct TestInterceptor
: public Interceptor
{
644 std::map
<Breakpoint
, std::map
<unsigned, bp_action_t
>> breakpoints
;
645 std::map
<Breakpoint
, counter_t
> breakpoints_counter
;
646 std::map
<ConnectionRef
, unsigned> conns
;
648 std::optional
<seastar::abort_source
> signal
;
650 TestInterceptor() = default;
651 // only used for copy breakpoint configurations
652 TestInterceptor(const TestInterceptor
& other
) {
653 assert(other
.breakpoints_counter
.empty());
654 assert(other
.conns
.empty());
655 assert(other
.results
.empty());
656 breakpoints
= other
.breakpoints
;
657 assert(!other
.signal
);
660 void make_fault(Breakpoint bp
, unsigned round
= 1) {
662 breakpoints
[bp
][round
] = bp_action_t::FAULT
;
665 void make_block(Breakpoint bp
, unsigned round
= 1) {
667 breakpoints
[bp
][round
] = bp_action_t::BLOCK
;
670 void make_stall(Breakpoint bp
, unsigned round
= 1) {
672 breakpoints
[bp
][round
] = bp_action_t::STALL
;
675 ConnResult
* find_result(ConnectionRef conn
) {
676 auto it
= conns
.find(conn
);
677 if (it
== conns
.end()) {
680 return &results
[it
->second
];
684 seastar::future
<> wait() {
686 signal
= seastar::abort_source();
687 return seastar::sleep_abortable(10s
, *signal
).then([] {
688 throw std::runtime_error("Timeout (10s) in TestInterceptor::wait()");
689 }).handle_exception_type([] (const seastar::sleep_aborted
& e
) {
696 signal
->request_abort();
697 signal
= std::nullopt
;
702 void register_conn(SocketConnection
& _conn
) override
{
703 auto conn
= _conn
.get_local_shared_foreign_from_this();
704 auto result
= find_result(conn
);
705 if (result
!= nullptr) {
706 logger().error("The connection [{}] {} already exists when register {}",
707 result
->index
, *result
->conn
, _conn
);
710 unsigned index
= results
.size();
711 results
.emplace_back(conn
, index
);
714 logger().info("[{}] {} new connection registered", index
, _conn
);
717 void register_conn_closed(SocketConnection
& conn
) override
{
718 auto result
= find_result(conn
.get_local_shared_foreign_from_this());
719 if (result
== nullptr) {
720 logger().error("Untracked closed connection: {}", conn
);
724 if (result
->state
!= conn_state_t::replaced
) {
725 result
->state
= conn_state_t::closed
;
728 logger().info("[{}] {} closed({})", result
->index
, conn
, result
->state
);
731 void register_conn_ready(SocketConnection
& conn
) override
{
732 auto result
= find_result(conn
.get_local_shared_foreign_from_this());
733 if (result
== nullptr) {
734 logger().error("Untracked ready connection: {}", conn
);
738 ceph_assert(conn
.is_connected());
740 logger().info("[{}] {} ready", result
->index
, conn
);
743 void register_conn_replaced(SocketConnection
& conn
) override
{
744 auto result
= find_result(conn
.get_local_shared_foreign_from_this());
745 if (result
== nullptr) {
746 logger().error("Untracked replaced connection: {}", conn
);
750 result
->state
= conn_state_t::replaced
;
751 logger().info("[{}] {} {}", result
->index
, conn
, result
->state
);
754 bp_action_t
intercept(SocketConnection
& conn
, Breakpoint bp
) override
{
755 ++breakpoints_counter
[bp
].counter
;
757 auto result
= find_result(conn
.get_local_shared_foreign_from_this());
758 if (result
== nullptr) {
759 logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
760 conn
, bp
, breakpoints_counter
[bp
].counter
);
764 if (bp
== custom_bp_t::SOCKET_CONNECTING
) {
765 ++result
->connect_attempts
;
766 logger().info("[Test] connect_attempts={}", result
->connect_attempts
);
767 } else if (bp
== tag_bp_t
{Tag::CLIENT_IDENT
, bp_type_t::WRITE
}) {
768 ++result
->client_connect_attempts
;
769 logger().info("[Test] client_connect_attempts={}", result
->client_connect_attempts
);
770 } else if (bp
== tag_bp_t
{Tag::SESSION_RECONNECT
, bp_type_t::WRITE
}) {
771 ++result
->client_reconnect_attempts
;
772 logger().info("[Test] client_reconnect_attempts={}", result
->client_reconnect_attempts
);
773 } else if (bp
== custom_bp_t::SOCKET_ACCEPTED
) {
774 ++result
->accept_attempts
;
775 logger().info("[Test] accept_attempts={}", result
->accept_attempts
);
776 } else if (bp
== tag_bp_t
{Tag::CLIENT_IDENT
, bp_type_t::READ
}) {
777 ++result
->server_connect_attempts
;
778 logger().info("[Test] server_connect_attemps={}", result
->server_connect_attempts
);
779 } else if (bp
== tag_bp_t
{Tag::SESSION_RECONNECT
, bp_type_t::READ
}) {
780 ++result
->server_reconnect_attempts
;
781 logger().info("[Test] server_reconnect_attempts={}", result
->server_reconnect_attempts
);
784 auto it_bp
= breakpoints
.find(bp
);
785 if (it_bp
!= breakpoints
.end()) {
786 auto it_cnt
= it_bp
->second
.find(breakpoints_counter
[bp
].counter
);
787 if (it_cnt
!= it_bp
->second
.end()) {
788 logger().info("[{}] {} intercepted {}({}) => {}",
789 result
->index
, conn
, bp
,
790 breakpoints_counter
[bp
].counter
, it_cnt
->second
);
791 return it_cnt
->second
;
794 logger().info("[{}] {} intercepted {}({})",
795 result
->index
, conn
, bp
, breakpoints_counter
[bp
].counter
);
796 return bp_action_t::CONTINUE
;
800 SocketPolicy
to_socket_policy(policy_t policy
) {
802 case policy_t::stateful_server
:
803 return SocketPolicy::stateful_server(0);
804 case policy_t::stateless_server
:
805 return SocketPolicy::stateless_server(0);
806 case policy_t::lossless_peer
:
807 return SocketPolicy::lossless_peer(0);
808 case policy_t::lossless_peer_reuse
:
809 return SocketPolicy::lossless_peer_reuse(0);
810 case policy_t::lossy_client
:
811 return SocketPolicy::lossy_client(0);
812 case policy_t::lossless_client
:
813 return SocketPolicy::lossless_client(0);
815 logger().error("unexpected policy type");
820 class FailoverSuite
: public Dispatcher
{
821 crimson::auth::DummyAuthClientServer dummy_auth
;
822 MessengerRef test_msgr
;
823 const entity_addr_t test_peer_addr
;
824 TestInterceptor interceptor
;
826 unsigned tracked_index
= 0;
827 ConnectionRef tracked_conn
;
828 unsigned pending_send
= 0;
829 unsigned pending_peer_receive
= 0;
830 unsigned pending_receive
= 0;
832 std::optional
<seastar::future
<>> ms_dispatch(ConnectionRef c
, MessageRef m
) override
{
833 auto result
= interceptor
.find_result(c
);
834 if (result
== nullptr) {
835 logger().error("Untracked ms dispatched connection: {}", *c
);
839 if (tracked_conn
!= c
) {
840 logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
841 result
->index
, *c
, tracked_index
, *tracked_conn
);
844 ceph_assert(result
->index
== tracked_index
);
846 ceph_assert(m
->get_type() == CEPH_MSG_OSD_OP
);
847 ceph_assert(pending_receive
> 0);
849 if (pending_receive
== 0) {
850 interceptor
.notify();
852 logger().info("[Test] got op, left {} ops -- [{}] {}",
853 pending_receive
, result
->index
, *c
);
854 return {seastar::now()};
857 void ms_handle_accept(ConnectionRef conn
) override
{
858 auto result
= interceptor
.find_result(conn
);
859 if (result
== nullptr) {
860 logger().error("Untracked accepted connection: {}", *conn
);
865 !tracked_conn
->is_closed() &&
866 tracked_conn
!= conn
) {
867 logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
868 result
->index
, *conn
, tracked_index
, *tracked_conn
);
872 tracked_index
= result
->index
;
874 ++result
->cnt_accept_dispatched
;
875 logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
876 result
->cnt_accept_dispatched
, result
->index
, *conn
);
877 std::ignore
= flush_pending_send();
880 void ms_handle_connect(ConnectionRef conn
) override
{
881 auto result
= interceptor
.find_result(conn
);
882 if (result
== nullptr) {
883 logger().error("Untracked connected connection: {}", *conn
);
887 if (tracked_conn
!= conn
) {
888 logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}",
889 result
->index
, *conn
, tracked_index
, *tracked_conn
);
892 ceph_assert(result
->index
== tracked_index
);
894 ++result
->cnt_connect_dispatched
;
895 logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
896 result
->cnt_connect_dispatched
, result
->index
, *conn
);
899 void ms_handle_reset(ConnectionRef conn
, bool is_replace
) override
{
900 auto result
= interceptor
.find_result(conn
);
901 if (result
== nullptr) {
902 logger().error("Untracked reset connection: {}", *conn
);
906 if (tracked_conn
!= conn
) {
907 logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
908 result
->index
, *conn
, tracked_index
, *tracked_conn
);
911 ceph_assert(result
->index
== tracked_index
);
914 tracked_conn
= nullptr;
915 ++result
->cnt_reset_dispatched
;
916 logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
917 result
->cnt_reset_dispatched
, result
->index
, *conn
);
920 void ms_handle_remote_reset(ConnectionRef conn
) override
{
921 auto result
= interceptor
.find_result(conn
);
922 if (result
== nullptr) {
923 logger().error("Untracked remotely reset connection: {}", *conn
);
927 if (tracked_conn
!= conn
) {
928 logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
929 result
->index
, *conn
, tracked_index
, *tracked_conn
);
932 ceph_assert(result
->index
== tracked_index
);
934 ++result
->cnt_remote_reset_dispatched
;
935 logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
936 result
->cnt_remote_reset_dispatched
, result
->index
, *conn
);
940 seastar::future
<> init(entity_addr_t test_addr
, SocketPolicy policy
) {
941 test_msgr
->set_default_policy(policy
);
942 test_msgr
->set_auth_client(&dummy_auth
);
943 test_msgr
->set_auth_server(&dummy_auth
);
944 test_msgr
->set_interceptor(&interceptor
);
945 return test_msgr
->bind(entity_addrvec_t
{test_addr
}).safe_then([this] {
946 return test_msgr
->start({this});
947 }, Messenger::bind_ertr::all_same_way([test_addr
] (const std::error_code
& e
) {
948 logger().error("FailoverSuite: "
949 "there is another instance running at {}", test_addr
);
954 seastar::future
<> send_op(bool expect_reply
=true) {
955 ceph_assert(tracked_conn
);
957 ++pending_peer_receive
;
960 object_locator_t oloc
;
961 hobject_t
hobj(object_t(), oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
962 pgid
.pool(), oloc
.nspace
);
964 return tracked_conn
->send(crimson::make_message
<MOSDOp
>(0, 0, hobj
, spgid
, 0, 0, 0));
967 seastar::future
<> flush_pending_send() {
968 if (pending_send
!= 0) {
969 logger().info("[Test] flush sending {} ops", pending_send
);
971 ceph_assert(tracked_conn
);
972 return seastar::do_until(
973 [this] { return pending_send
== 0; },
980 seastar::future
<> wait_ready(unsigned num_ready_conns
,
981 unsigned num_replaced
,
982 bool wait_received
) {
983 unsigned pending_conns
= 0;
984 unsigned pending_establish
= 0;
985 unsigned replaced_conns
= 0;
986 for (auto& result
: interceptor
.results
) {
987 if (result
.conn
->is_closed_clean()) {
988 if (result
.state
== conn_state_t::replaced
) {
991 } else if (result
.conn
->is_connected()) {
992 if (tracked_conn
!= result
.conn
|| tracked_index
!= result
.index
) {
993 throw std::runtime_error(fmt::format(
994 "The connected connection [{}] {} doesn't"
995 " match the tracked connection [{}] {}",
996 result
.index
, *result
.conn
, tracked_index
, *tracked_conn
));
998 if (pending_send
== 0 && pending_peer_receive
== 0 && pending_receive
== 0) {
999 result
.state
= conn_state_t::established
;
1001 ++pending_establish
;
1008 bool do_wait
= false;
1009 if (num_ready_conns
> 0) {
1010 if (interceptor
.results
.size() > num_ready_conns
) {
1011 throw std::runtime_error(fmt::format(
1012 "{} connections, more than expected: {}",
1013 interceptor
.results
.size(), num_ready_conns
));
1014 } else if (interceptor
.results
.size() < num_ready_conns
|| pending_conns
> 0) {
1015 logger().info("[Test] wait_ready(): wait for connections,"
1016 " currently {} out of {}, pending {} ready ...",
1017 interceptor
.results
.size(), num_ready_conns
, pending_conns
);
1021 if (wait_received
&&
1022 (pending_send
|| pending_peer_receive
|| pending_receive
)) {
1023 if (pending_conns
|| pending_establish
) {
1024 logger().info("[Test] wait_ready(): wait for pending_send={},"
1025 " pending_peer_receive={}, pending_receive={},"
1026 " pending {}/{} ready/establish connections ...",
1027 pending_send
, pending_peer_receive
, pending_receive
,
1028 pending_conns
, pending_establish
);
1032 if (num_replaced
> 0) {
1033 if (replaced_conns
> num_replaced
) {
1034 throw std::runtime_error(fmt::format(
1035 "{} replaced connections, more than expected: {}",
1036 replaced_conns
, num_replaced
));
1038 if (replaced_conns
< num_replaced
) {
1039 logger().info("[Test] wait_ready(): wait for {} replaced connections,"
1040 " currently {} ...",
1041 num_replaced
, replaced_conns
);
1047 return interceptor
.wait(
1048 ).then([this, num_ready_conns
, num_replaced
, wait_received
] {
1049 return wait_ready(num_ready_conns
, num_replaced
, wait_received
);
1052 logger().info("[Test] wait_ready(): wait done!");
1053 return seastar::now();
1057 // called by FailoverTest
1059 FailoverSuite(MessengerRef test_msgr
,
1060 entity_addr_t test_peer_addr
,
1061 const TestInterceptor
& interceptor
)
1062 : test_msgr(test_msgr
),
1063 test_peer_addr(test_peer_addr
),
1064 interceptor(interceptor
) { }
1066 entity_addr_t
get_addr() const {
1067 return test_msgr
->get_myaddr();
1070 seastar::future
<> shutdown() {
1072 return test_msgr
->shutdown();
1075 void needs_receive() {
1079 void notify_peer_reply() {
1080 ceph_assert(pending_peer_receive
> 0);
1081 --pending_peer_receive
;
1082 logger().info("[Test] TestPeer said got op, left {} ops",
1083 pending_peer_receive
);
1084 if (pending_peer_receive
== 0) {
1085 interceptor
.notify();
1089 void post_check() const {
1090 // make sure all breakpoints were hit
1091 for (auto& kv
: interceptor
.breakpoints
) {
1092 auto it
= interceptor
.breakpoints_counter
.find(kv
.first
);
1093 if (it
== interceptor
.breakpoints_counter
.end()) {
1094 throw std::runtime_error(fmt::format("{} was missed", kv
.first
));
1096 auto expected
= kv
.second
.rbegin()->first
;
1097 if (expected
> it
->second
.counter
) {
1098 throw std::runtime_error(fmt::format(
1099 "{} only triggered {} times, not the expected {}",
1100 kv
.first
, it
->second
.counter
, expected
));
1105 void dump_results() const {
1106 for (auto& result
: interceptor
.results
) {
1111 static seastar::future
<std::unique_ptr
<FailoverSuite
>>
1112 create(entity_addr_t test_addr
,
1113 SocketPolicy test_policy
,
1114 entity_addr_t test_peer_addr
,
1115 const TestInterceptor
& interceptor
) {
1116 auto suite
= std::make_unique
<FailoverSuite
>(
1117 Messenger::create(entity_name_t::OSD(TEST_OSD
), "Test", TEST_NONCE
),
1118 test_peer_addr
, interceptor
);
1119 return suite
->init(test_addr
, test_policy
1120 ).then([suite
= std::move(suite
)] () mutable {
1121 return std::move(suite
);
1127 seastar::future
<> connect_peer() {
1128 logger().info("[Test] connect_peer({})", test_peer_addr
);
1129 auto conn
= test_msgr
->connect(test_peer_addr
, entity_name_t::TYPE_OSD
);
1130 auto result
= interceptor
.find_result(conn
);
1131 ceph_assert(result
!= nullptr);
1134 if (tracked_conn
->is_closed()) {
1135 ceph_assert(tracked_conn
!= conn
);
1136 logger().info("[Test] this is a new session replacing an closed one");
1138 ceph_assert(tracked_index
== result
->index
);
1139 ceph_assert(tracked_conn
== conn
);
1140 logger().info("[Test] this is not a new session");
1143 logger().info("[Test] this is a new session");
1145 tracked_index
= result
->index
;
1146 tracked_conn
= conn
;
1148 return flush_pending_send();
1151 seastar::future
<> send_peer() {
1153 logger().info("[Test] send_peer()");
1154 ceph_assert(!pending_send
);
1158 logger().info("[Test] send_peer() (pending {})", pending_send
);
1159 return seastar::now();
1163 seastar::future
<> keepalive_peer() {
1164 logger().info("[Test] keepalive_peer()");
1165 ceph_assert(tracked_conn
);
1166 return tracked_conn
->send_keepalive();
1169 seastar::future
<> try_send_peer() {
1170 logger().info("[Test] try_send_peer()");
1171 ceph_assert(tracked_conn
);
1172 return send_op(false);
1175 seastar::future
<> markdown() {
1176 logger().info("[Test] markdown() in 100ms ...");
1177 ceph_assert(tracked_conn
);
1178 // sleep to propagate potential remaining acks
1179 return seastar::sleep(100ms
1181 tracked_conn
->mark_down();
1185 seastar::future
<> wait_blocked() {
1186 logger().info("[Test] wait_blocked() ...");
1187 return interceptor
.blocker
.wait_blocked();
1191 logger().info("[Test] unblock()");
1192 return interceptor
.blocker
.unblock();
1195 seastar::future
<> wait_replaced(unsigned count
) {
1196 logger().info("[Test] wait_replaced({}) ...", count
);
1197 return wait_ready(0, count
, false);
1200 seastar::future
<> wait_established() {
1201 logger().info("[Test] wait_established() ...");
1202 return wait_ready(0, 0, true);
1205 seastar::future
<std::reference_wrapper
<ConnResults
>>
1206 wait_results(unsigned count
) {
1207 logger().info("[Test] wait_result({}) ...", count
);
1208 return wait_ready(count
, 0, true).then([this] {
1209 return std::reference_wrapper
<ConnResults
>(interceptor
.results
);
1214 ceph_assert(tracked_conn
);
1215 return !(tracked_conn
->is_connected() || tracked_conn
->is_closed());
1219 class FailoverTest
: public Dispatcher
{
1220 crimson::auth::DummyAuthClientServer dummy_auth
;
1221 MessengerRef cmd_msgr
;
1222 ConnectionRef cmd_conn
;
1223 const entity_addr_t test_addr
;
1224 const entity_addr_t test_peer_addr
;
1226 std::optional
<seastar::promise
<>> recv_pong
;
1227 std::optional
<seastar::promise
<>> recv_cmdreply
;
1229 std::unique_ptr
<FailoverSuite
> test_suite
;
1231 std::optional
<seastar::future
<>> ms_dispatch(ConnectionRef c
, MessageRef m
) override
{
1232 switch (m
->get_type()) {
1234 ceph_assert(recv_pong
);
1235 recv_pong
->set_value();
1236 recv_pong
= std::nullopt
;
1238 case MSG_COMMAND_REPLY
:
1239 ceph_assert(recv_cmdreply
);
1240 recv_cmdreply
->set_value();
1241 recv_cmdreply
= std::nullopt
;
1244 auto m_cmd
= boost::static_pointer_cast
<MCommand
>(m
);
1245 ceph_assert(static_cast<cmd_t
>(m_cmd
->cmd
[0][0]) == cmd_t::suite_recv_op
);
1246 ceph_assert(test_suite
);
1247 test_suite
->notify_peer_reply();
1251 logger().error("{} got unexpected msg from cmd server: {}", *c
, *m
);
1254 return {seastar::now()};
1258 seastar::future
<> prepare_cmd(
1260 std::function
<void(MCommand
&)>
1261 f_prepare
= [] (auto& m
) { return; }) {
1262 assert(!recv_cmdreply
);
1263 recv_cmdreply
= seastar::promise
<>();
1264 auto fut
= recv_cmdreply
->get_future();
1265 auto m
= crimson::make_message
<MCommand
>();
1266 m
->cmd
.emplace_back(1, static_cast<char>(cmd
));
1268 return cmd_conn
->send(std::move(m
)).then([fut
= std::move(fut
)] () mutable {
1269 return std::move(fut
);
1273 seastar::future
<> start_peer(policy_t peer_policy
) {
1274 return prepare_cmd(cmd_t::suite_start
,
1275 [peer_policy
] (auto& m
) {
1276 m
.cmd
.emplace_back(1, static_cast<char>(peer_policy
));
1280 seastar::future
<> stop_peer() {
1281 return prepare_cmd(cmd_t::suite_stop
);
1284 seastar::future
<> pingpong() {
1286 recv_pong
= seastar::promise
<>();
1287 auto fut
= recv_pong
->get_future();
1288 return cmd_conn
->send(crimson::make_message
<MPing
>()
1289 ).then([fut
= std::move(fut
)] () mutable {
1290 return std::move(fut
);
1294 seastar::future
<> init(entity_addr_t cmd_peer_addr
) {
1295 cmd_msgr
->set_default_policy(SocketPolicy::lossy_client(0));
1296 cmd_msgr
->set_auth_client(&dummy_auth
);
1297 cmd_msgr
->set_auth_server(&dummy_auth
);
1298 return cmd_msgr
->start({this}).then([this, cmd_peer_addr
] {
1299 logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr
);
1300 cmd_conn
= cmd_msgr
->connect(cmd_peer_addr
, entity_name_t::TYPE_OSD
);
1306 FailoverTest(MessengerRef cmd_msgr
,
1307 entity_addr_t test_addr
,
1308 entity_addr_t test_peer_addr
)
1309 : cmd_msgr(cmd_msgr
),
1310 test_addr(test_addr
),
1311 test_peer_addr(test_peer_addr
) { }
1313 seastar::future
<> shutdown() {
1314 logger().info("CmdCli shutdown...");
1315 assert(!recv_cmdreply
);
1316 auto m
= crimson::make_message
<MCommand
>();
1317 m
->cmd
.emplace_back(1, static_cast<char>(cmd_t::shutdown
));
1318 return cmd_conn
->send(std::move(m
)).then([] {
1319 return seastar::sleep(200ms
);
1322 return cmd_msgr
->shutdown();
1326 static seastar::future
<seastar::lw_shared_ptr
<FailoverTest
>>
1327 create(entity_addr_t test_addr
,
1328 entity_addr_t cmd_peer_addr
,
1329 entity_addr_t test_peer_addr
) {
1330 auto test
= seastar::make_lw_shared
<FailoverTest
>(
1331 Messenger::create(entity_name_t::OSD(CMD_CLI_OSD
), "CmdCli", CMD_CLI_NONCE
),
1332 test_addr
, test_peer_addr
);
1333 return test
->init(cmd_peer_addr
).then([test
] {
1334 logger().info("CmdCli ready");
1341 seastar::future
<> run_suite(
1343 const TestInterceptor
& interceptor
,
1344 policy_t test_policy
,
1345 policy_t peer_policy
,
1346 std::function
<seastar::future
<>(FailoverSuite
&)>&& f
) {
1347 logger().info("\n\n[{}]", name
);
1348 ceph_assert(!test_suite
);
1349 SocketPolicy test_policy_
= to_socket_policy(test_policy
);
1350 return FailoverSuite::create(
1351 test_addr
, test_policy_
, test_peer_addr
, interceptor
1352 ).then([this, peer_policy
, f
= std::move(f
)] (auto suite
) mutable {
1353 ceph_assert(suite
->get_addr() == test_addr
);
1354 test_suite
.swap(suite
);
1355 return start_peer(peer_policy
).then([this, f
= std::move(f
)] {
1356 return f(*test_suite
);
1358 test_suite
->post_check();
1359 logger().info("\n[SUCCESS]");
1360 }).handle_exception([this] (auto eptr
) {
1361 logger().info("\n[FAIL: {}]", eptr
);
1362 test_suite
->dump_results();
1367 return test_suite
->shutdown().then([this] {
1374 seastar::future
<> peer_connect_me() {
1375 logger().info("[Test] peer_connect_me({})", test_addr
);
1376 return prepare_cmd(cmd_t::suite_connect_me
,
1378 m
.cmd
.emplace_back(fmt::format("{}", test_addr
));
1382 seastar::future
<> peer_send_me() {
1383 logger().info("[Test] peer_send_me()");
1384 ceph_assert(test_suite
);
1385 test_suite
->needs_receive();
1386 return prepare_cmd(cmd_t::suite_send_me
);
1389 seastar::future
<> try_peer_send_me() {
1390 logger().info("[Test] try_peer_send_me()");
1391 ceph_assert(test_suite
);
1392 return prepare_cmd(cmd_t::suite_send_me
);
1395 seastar::future
<> send_bidirectional() {
1396 ceph_assert(test_suite
);
1397 return test_suite
->send_peer().then([this] {
1398 return peer_send_me();
1402 seastar::future
<> peer_keepalive_me() {
1403 logger().info("[Test] peer_keepalive_me()");
1404 ceph_assert(test_suite
);
1405 return prepare_cmd(cmd_t::suite_keepalive_me
);
1408 seastar::future
<> markdown_peer() {
1409 logger().info("[Test] markdown_peer() in 150ms ...");
1410 // sleep to propagate potential remaining acks
1411 return seastar::sleep(50ms
1413 return prepare_cmd(cmd_t::suite_markdown
);
1415 // sleep awhile for peer markdown propagated
1416 return seastar::sleep(100ms
);
1421 class FailoverSuitePeer
: public Dispatcher
{
1422 using cb_t
= std::function
<seastar::future
<>()>;
1423 crimson::auth::DummyAuthClientServer dummy_auth
;
1424 MessengerRef peer_msgr
;
1427 ConnectionRef tracked_conn
;
1428 unsigned pending_send
= 0;
1430 std::optional
<seastar::future
<>> ms_dispatch(ConnectionRef c
, MessageRef m
) override
{
1431 logger().info("[TestPeer] got op from Test");
1432 ceph_assert(m
->get_type() == CEPH_MSG_OSD_OP
);
1433 ceph_assert(tracked_conn
== c
);
1434 std::ignore
= op_callback();
1435 return {seastar::now()};
1438 void ms_handle_accept(ConnectionRef conn
) override
{
1439 logger().info("[TestPeer] got accept from Test");
1440 ceph_assert(!tracked_conn
||
1441 tracked_conn
->is_closed() ||
1442 tracked_conn
== conn
);
1443 tracked_conn
= conn
;
1444 std::ignore
= flush_pending_send();
1447 void ms_handle_reset(ConnectionRef conn
, bool is_replace
) override
{
1448 logger().info("[TestPeer] got reset from Test");
1449 ceph_assert(tracked_conn
== conn
);
1450 tracked_conn
= nullptr;
1454 seastar::future
<> init(entity_addr_t test_peer_addr
, SocketPolicy policy
) {
1455 peer_msgr
->set_default_policy(policy
);
1456 peer_msgr
->set_auth_client(&dummy_auth
);
1457 peer_msgr
->set_auth_server(&dummy_auth
);
1458 return peer_msgr
->bind(entity_addrvec_t
{test_peer_addr
}).safe_then([this] {
1459 return peer_msgr
->start({this});
1460 }, Messenger::bind_ertr::all_same_way([test_peer_addr
] (const std::error_code
& e
) {
1461 logger().error("FailoverSuitePeer: "
1462 "there is another instance running at {}", test_peer_addr
);
1467 seastar::future
<> send_op() {
1468 ceph_assert(tracked_conn
);
1470 object_locator_t oloc
;
1471 hobject_t
hobj(object_t(), oloc
.key
, CEPH_NOSNAP
, pgid
.ps(),
1472 pgid
.pool(), oloc
.nspace
);
1474 return tracked_conn
->send(crimson::make_message
<MOSDOp
>(0, 0, hobj
, spgid
, 0, 0, 0));
1477 seastar::future
<> flush_pending_send() {
1478 if (pending_send
!= 0) {
1479 logger().info("[TestPeer] flush sending {} ops", pending_send
);
1481 ceph_assert(tracked_conn
);
1482 return seastar::do_until(
1483 [this] { return pending_send
== 0; },
1491 FailoverSuitePeer(MessengerRef peer_msgr
, cb_t op_callback
)
1492 : peer_msgr(peer_msgr
), op_callback(op_callback
) { }
1494 seastar::future
<> shutdown() {
1496 return peer_msgr
->shutdown();
1499 seastar::future
<> connect_peer(entity_addr_t test_addr_decoded
) {
1500 logger().info("[TestPeer] connect_peer({})", test_addr_decoded
);
1501 auto new_tracked_conn
= peer_msgr
->connect(test_addr_decoded
, entity_name_t::TYPE_OSD
);
1503 if (tracked_conn
->is_closed()) {
1504 ceph_assert(tracked_conn
!= new_tracked_conn
);
1505 logger().info("[TestPeer] this is a new session"
1506 " replacing an closed one");
1508 ceph_assert(tracked_conn
== new_tracked_conn
);
1509 logger().info("[TestPeer] this is not a new session");
1512 logger().info("[TestPeer] this is a new session");
1514 tracked_conn
= new_tracked_conn
;
1515 return flush_pending_send();
1518 seastar::future
<> send_peer() {
1520 logger().info("[TestPeer] send_peer()");
1524 logger().info("[TestPeer] send_peer() (pending {})", pending_send
);
1525 return seastar::now();
1529 seastar::future
<> keepalive_peer() {
1530 logger().info("[TestPeer] keepalive_peer()");
1531 ceph_assert(tracked_conn
);
1532 return tracked_conn
->send_keepalive();
1535 seastar::future
<> markdown() {
1536 logger().info("[TestPeer] markdown()");
1537 ceph_assert(tracked_conn
);
1538 tracked_conn
->mark_down();
1539 return seastar::now();
1542 static seastar::future
<std::unique_ptr
<FailoverSuitePeer
>>
1543 create(entity_addr_t test_peer_addr
, const SocketPolicy
& policy
, cb_t op_callback
) {
1544 auto suite
= std::make_unique
<FailoverSuitePeer
>(
1546 entity_name_t::OSD(TEST_PEER_OSD
),
1551 return suite
->init(test_peer_addr
, policy
1552 ).then([suite
= std::move(suite
)] () mutable {
1553 return std::move(suite
);
1558 class FailoverTestPeer
: public Dispatcher
{
1559 crimson::auth::DummyAuthClientServer dummy_auth
;
1560 MessengerRef cmd_msgr
;
1561 ConnectionRef cmd_conn
;
1562 const entity_addr_t test_peer_addr
;
1563 std::unique_ptr
<FailoverSuitePeer
> test_suite
;
1565 std::optional
<seastar::future
<>> ms_dispatch(ConnectionRef c
, MessageRef m
) override
{
1566 ceph_assert(cmd_conn
== c
);
1567 switch (m
->get_type()) {
1569 std::ignore
= c
->send(crimson::make_message
<MPing
>());
1572 auto m_cmd
= boost::static_pointer_cast
<MCommand
>(m
);
1573 auto cmd
= static_cast<cmd_t
>(m_cmd
->cmd
[0][0]);
1574 if (cmd
== cmd_t::shutdown
) {
1575 logger().info("CmdSrv shutdown...");
1576 // forwarded to FailoverTestPeer::wait()
1578 std::ignore
= cmd_msgr
->shutdown();
1580 std::ignore
= handle_cmd(cmd
, m_cmd
).then([c
] {
1581 return c
->send(crimson::make_message
<MCommandReply
>());
1587 logger().error("{} got unexpected msg from cmd client: {}", *c
, *m
);
1590 return {seastar::now()};
1593 void ms_handle_accept(ConnectionRef conn
) override
{
1598 seastar::future
<> notify_recv_op() {
1599 ceph_assert(cmd_conn
);
1600 auto m
= crimson::make_message
<MCommand
>();
1601 m
->cmd
.emplace_back(1, static_cast<char>(cmd_t::suite_recv_op
));
1602 return cmd_conn
->send(std::move(m
));
1605 seastar::future
<> handle_cmd(cmd_t cmd
, MRef
<MCommand
> m_cmd
) {
1607 case cmd_t::suite_start
: {
1608 ceph_assert(!test_suite
);
1609 auto policy
= to_socket_policy(static_cast<policy_t
>(m_cmd
->cmd
[1][0]));
1610 return FailoverSuitePeer::create(
1611 test_peer_addr
, policy
, [this] { return notify_recv_op(); }
1612 ).then([this] (auto suite
) {
1613 test_suite
.swap(suite
);
1616 case cmd_t::suite_stop
:
1617 ceph_assert(test_suite
);
1618 return test_suite
->shutdown().then([this] {
1621 case cmd_t::suite_connect_me
: {
1622 ceph_assert(test_suite
);
1623 entity_addr_t test_addr_decoded
= entity_addr_t();
1624 test_addr_decoded
.parse(m_cmd
->cmd
[1].c_str(), nullptr);
1625 return test_suite
->connect_peer(test_addr_decoded
);
1627 case cmd_t::suite_send_me
:
1628 ceph_assert(test_suite
);
1629 return test_suite
->send_peer();
1630 case cmd_t::suite_keepalive_me
:
1631 ceph_assert(test_suite
);
1632 return test_suite
->keepalive_peer();
1633 case cmd_t::suite_markdown
:
1634 ceph_assert(test_suite
);
1635 return test_suite
->markdown();
1637 logger().error("TestPeer got unexpected command {} from Test",
1638 fmt::ptr(m_cmd
.get()));
1640 return seastar::now();
1644 seastar::future
<> init(entity_addr_t cmd_peer_addr
) {
1645 cmd_msgr
->set_default_policy(SocketPolicy::stateless_server(0));
1646 cmd_msgr
->set_auth_client(&dummy_auth
);
1647 cmd_msgr
->set_auth_server(&dummy_auth
);
1648 return cmd_msgr
->bind(entity_addrvec_t
{cmd_peer_addr
}).safe_then([this] {
1649 return cmd_msgr
->start({this});
1650 }, Messenger::bind_ertr::all_same_way([cmd_peer_addr
] (const std::error_code
& e
) {
1651 logger().error("FailoverTestPeer: "
1652 "there is another instance running at {}", cmd_peer_addr
);
1658 FailoverTestPeer(MessengerRef cmd_msgr
,
1659 entity_addr_t test_peer_addr
)
1660 : cmd_msgr(cmd_msgr
),
1661 test_peer_addr(test_peer_addr
) { }
1663 seastar::future
<> wait() {
1664 return cmd_msgr
->wait();
1667 static seastar::future
<std::unique_ptr
<FailoverTestPeer
>>
1668 create(entity_addr_t cmd_peer_addr
, entity_addr_t test_peer_addr
) {
1669 auto test_peer
= std::make_unique
<FailoverTestPeer
>(
1670 Messenger::create(entity_name_t::OSD(CMD_SRV_OSD
), "CmdSrv", CMD_SRV_NONCE
),
1672 return test_peer
->init(cmd_peer_addr
1673 ).then([test_peer
= std::move(test_peer
)] () mutable {
1674 logger().info("CmdSrv ready");
1675 return std::move(test_peer
);
1681 test_v2_lossy_early_connect_fault(FailoverTest
& test
) {
1682 return seastar::do_with(std::vector
<Breakpoint
>{
1683 {custom_bp_t::SOCKET_CONNECTING
},
1684 {custom_bp_t::BANNER_WRITE
},
1685 {custom_bp_t::BANNER_READ
},
1686 {custom_bp_t::BANNER_PAYLOAD_READ
},
1687 {Tag::HELLO
, bp_type_t::WRITE
},
1688 {Tag::HELLO
, bp_type_t::READ
},
1689 {Tag::AUTH_REQUEST
, bp_type_t::WRITE
},
1690 {Tag::AUTH_DONE
, bp_type_t::READ
},
1691 {Tag::AUTH_SIGNATURE
, bp_type_t::WRITE
},
1692 {Tag::AUTH_SIGNATURE
, bp_type_t::READ
},
1693 }, [&test
] (auto& failure_cases
) {
1694 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1695 TestInterceptor interceptor
;
1696 interceptor
.make_fault(bp
);
1697 return test
.run_suite(
1698 fmt::format("test_v2_lossy_early_connect_fault -- {}", bp
),
1700 policy_t::lossy_client
,
1701 policy_t::stateless_server
,
1702 [] (FailoverSuite
& suite
) {
1703 return seastar::futurize_invoke([&suite
] {
1704 return suite
.send_peer();
1706 return suite
.connect_peer();
1708 return suite
.wait_results(1);
1709 }).then([] (ConnResults
& results
) {
1710 results
[0].assert_state_at(conn_state_t::established
);
1711 results
[0].assert_connect(2, 1, 0, 1);
1712 results
[0].assert_accept(0, 0, 0, 0);
1713 results
[0].assert_reset(0, 0);
1721 test_v2_lossy_connect_fault(FailoverTest
& test
) {
1722 return seastar::do_with(std::vector
<Breakpoint
>{
1723 {Tag::CLIENT_IDENT
, bp_type_t::WRITE
},
1724 {Tag::SERVER_IDENT
, bp_type_t::READ
},
1725 }, [&test
] (auto& failure_cases
) {
1726 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1727 TestInterceptor interceptor
;
1728 interceptor
.make_fault(bp
);
1729 return test
.run_suite(
1730 fmt::format("test_v2_lossy_connect_fault -- {}", bp
),
1732 policy_t::lossy_client
,
1733 policy_t::stateless_server
,
1734 [] (FailoverSuite
& suite
) {
1735 return seastar::futurize_invoke([&suite
] {
1736 return suite
.send_peer();
1738 return suite
.connect_peer();
1740 return suite
.wait_results(1);
1741 }).then([] (ConnResults
& results
) {
1742 results
[0].assert_state_at(conn_state_t::established
);
1743 results
[0].assert_connect(2, 2, 0, 1);
1744 results
[0].assert_accept(0, 0, 0, 0);
1745 results
[0].assert_reset(0, 0);
1753 test_v2_lossy_connected_fault(FailoverTest
& test
) {
1754 return seastar::do_with(std::vector
<Breakpoint
>{
1755 {Tag::MESSAGE
, bp_type_t::WRITE
},
1756 {Tag::MESSAGE
, bp_type_t::READ
},
1757 }, [&test
] (auto& failure_cases
) {
1758 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1759 TestInterceptor interceptor
;
1760 interceptor
.make_fault(bp
);
1761 return test
.run_suite(
1762 fmt::format("test_v2_lossy_connected_fault -- {}", bp
),
1764 policy_t::lossy_client
,
1765 policy_t::stateless_server
,
1766 [&test
] (FailoverSuite
& suite
) {
1767 return seastar::futurize_invoke([&test
] {
1768 return test
.send_bidirectional();
1770 return suite
.connect_peer();
1772 return suite
.wait_results(1);
1773 }).then([] (ConnResults
& results
) {
1774 results
[0].assert_state_at(conn_state_t::closed
);
1775 results
[0].assert_connect(1, 1, 0, 1);
1776 results
[0].assert_accept(0, 0, 0, 0);
1777 results
[0].assert_reset(1, 0);
1785 test_v2_lossy_early_accept_fault(FailoverTest
& test
) {
1786 return seastar::do_with(std::vector
<Breakpoint
>{
1787 {custom_bp_t::BANNER_WRITE
},
1788 {custom_bp_t::BANNER_READ
},
1789 {custom_bp_t::BANNER_PAYLOAD_READ
},
1790 {Tag::HELLO
, bp_type_t::WRITE
},
1791 {Tag::HELLO
, bp_type_t::READ
},
1792 {Tag::AUTH_REQUEST
, bp_type_t::READ
},
1793 {Tag::AUTH_DONE
, bp_type_t::WRITE
},
1794 {Tag::AUTH_SIGNATURE
, bp_type_t::WRITE
},
1795 {Tag::AUTH_SIGNATURE
, bp_type_t::READ
},
1796 }, [&test
] (auto& failure_cases
) {
1797 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1798 TestInterceptor interceptor
;
1799 interceptor
.make_fault(bp
);
1800 return test
.run_suite(
1801 fmt::format("test_v2_lossy_early_accept_fault -- {}", bp
),
1803 policy_t::stateless_server
,
1804 policy_t::lossy_client
,
1805 [&test
] (FailoverSuite
& suite
) {
1806 return seastar::futurize_invoke([&test
] {
1807 return test
.peer_send_me();
1809 return test
.peer_connect_me();
1811 return suite
.wait_results(2);
1812 }).then([] (ConnResults
& results
) {
1813 results
[0].assert_state_at(conn_state_t::closed
);
1814 results
[0].assert_connect(0, 0, 0, 0);
1815 results
[0].assert_accept(1, 0, 0, 0);
1816 results
[0].assert_reset(0, 0);
1817 results
[1].assert_state_at(conn_state_t::established
);
1818 results
[1].assert_connect(0, 0, 0, 0);
1819 results
[1].assert_accept(1, 1, 0, 1);
1820 results
[1].assert_reset(0, 0);
1828 test_v2_lossy_accept_fault(FailoverTest
& test
) {
1829 auto bp
= Breakpoint
{Tag::CLIENT_IDENT
, bp_type_t::READ
};
1830 TestInterceptor interceptor
;
1831 interceptor
.make_fault(bp
);
1832 return test
.run_suite(
1833 fmt::format("test_v2_lossy_accept_fault -- {}", bp
),
1835 policy_t::stateless_server
,
1836 policy_t::lossy_client
,
1837 [&test
] (FailoverSuite
& suite
) {
1838 return seastar::futurize_invoke([&test
] {
1839 return test
.peer_send_me();
1841 return test
.peer_connect_me();
1843 return suite
.wait_results(2);
1844 }).then([] (ConnResults
& results
) {
1845 results
[0].assert_state_at(conn_state_t::closed
);
1846 results
[0].assert_connect(0, 0, 0, 0);
1847 results
[0].assert_accept(1, 1, 0, 0);
1848 results
[0].assert_reset(0, 0);
1849 results
[1].assert_state_at(conn_state_t::established
);
1850 results
[1].assert_connect(0, 0, 0, 0);
1851 results
[1].assert_accept(1, 1, 0, 1);
1852 results
[1].assert_reset(0, 0);
1858 test_v2_lossy_establishing_fault(FailoverTest
& test
) {
1859 auto bp
= Breakpoint
{Tag::SERVER_IDENT
, bp_type_t::WRITE
};
1860 TestInterceptor interceptor
;
1861 interceptor
.make_fault(bp
);
1862 return test
.run_suite(
1863 fmt::format("test_v2_lossy_establishing_fault -- {}", bp
),
1865 policy_t::stateless_server
,
1866 policy_t::lossy_client
,
1867 [&test
] (FailoverSuite
& suite
) {
1868 return seastar::futurize_invoke([&test
] {
1869 return test
.peer_send_me();
1871 return test
.peer_connect_me();
1873 return suite
.wait_results(2);
1874 }).then([] (ConnResults
& results
) {
1875 results
[0].assert_state_at(conn_state_t::closed
);
1876 results
[0].assert_connect(0, 0, 0, 0);
1877 results
[0].assert_accept(1, 1, 0, 1);
1878 results
[0].assert_reset(1, 0);
1879 results
[1].assert_state_at(conn_state_t::established
);
1880 results
[1].assert_connect(0, 0, 0, 0);
1881 results
[1].assert_accept(1, 1, 0, 1);
1882 results
[1].assert_reset(0, 0);
1888 test_v2_lossy_accepted_fault(FailoverTest
& test
) {
1889 return seastar::do_with(std::vector
<Breakpoint
>{
1890 {Tag::MESSAGE
, bp_type_t::WRITE
},
1891 {Tag::MESSAGE
, bp_type_t::READ
},
1892 }, [&test
] (auto& failure_cases
) {
1893 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1894 TestInterceptor interceptor
;
1895 interceptor
.make_fault(bp
);
1896 return test
.run_suite(
1897 fmt::format("test_v2_lossy_accepted_fault -- {}", bp
),
1899 policy_t::stateless_server
,
1900 policy_t::lossy_client
,
1901 [&test
] (FailoverSuite
& suite
) {
1902 return seastar::futurize_invoke([&test
] {
1903 return test
.send_bidirectional();
1905 return test
.peer_connect_me();
1907 return suite
.wait_results(1);
1908 }).then([] (ConnResults
& results
) {
1909 results
[0].assert_state_at(conn_state_t::closed
);
1910 results
[0].assert_connect(0, 0, 0, 0);
1911 results
[0].assert_accept(1, 1, 0, 1);
1912 results
[0].assert_reset(1, 0);
1920 test_v2_lossless_connect_fault(FailoverTest
& test
) {
1921 return seastar::do_with(std::vector
<Breakpoint
>{
1922 {Tag::CLIENT_IDENT
, bp_type_t::WRITE
},
1923 {Tag::SERVER_IDENT
, bp_type_t::READ
},
1924 }, [&test
] (auto& failure_cases
) {
1925 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1926 TestInterceptor interceptor
;
1927 interceptor
.make_fault(bp
);
1928 return test
.run_suite(
1929 fmt::format("test_v2_lossless_connect_fault -- {}", bp
),
1931 policy_t::lossless_client
,
1932 policy_t::stateful_server
,
1933 [&test
] (FailoverSuite
& suite
) {
1934 return seastar::futurize_invoke([&test
] {
1935 return test
.send_bidirectional();
1937 return suite
.connect_peer();
1939 return suite
.wait_results(1);
1940 }).then([] (ConnResults
& results
) {
1941 results
[0].assert_state_at(conn_state_t::established
);
1942 results
[0].assert_connect(2, 2, 0, 1);
1943 results
[0].assert_accept(0, 0, 0, 0);
1944 results
[0].assert_reset(0, 0);
1952 test_v2_lossless_connected_fault(FailoverTest
& test
) {
1953 return seastar::do_with(std::vector
<Breakpoint
>{
1954 {Tag::MESSAGE
, bp_type_t::WRITE
},
1955 {Tag::MESSAGE
, bp_type_t::READ
},
1956 }, [&test
] (auto& failure_cases
) {
1957 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1958 TestInterceptor interceptor
;
1959 interceptor
.make_fault(bp
);
1960 return test
.run_suite(
1961 fmt::format("test_v2_lossless_connected_fault -- {}", bp
),
1963 policy_t::lossless_client
,
1964 policy_t::stateful_server
,
1965 [&test
] (FailoverSuite
& suite
) {
1966 return seastar::futurize_invoke([&test
] {
1967 return test
.send_bidirectional();
1969 return suite
.connect_peer();
1971 return suite
.wait_results(1);
1972 }).then([] (ConnResults
& results
) {
1973 results
[0].assert_state_at(conn_state_t::established
);
1974 results
[0].assert_connect(2, 1, 1, 2);
1975 results
[0].assert_accept(0, 0, 0, 0);
1976 results
[0].assert_reset(0, 0);
1984 test_v2_lossless_connected_fault2(FailoverTest
& test
) {
1985 return seastar::do_with(std::vector
<Breakpoint
>{
1986 {Tag::ACK
, bp_type_t::READ
},
1987 {Tag::ACK
, bp_type_t::WRITE
},
1988 {Tag::KEEPALIVE2
, bp_type_t::READ
},
1989 {Tag::KEEPALIVE2
, bp_type_t::WRITE
},
1990 {Tag::KEEPALIVE2_ACK
, bp_type_t::READ
},
1991 {Tag::KEEPALIVE2_ACK
, bp_type_t::WRITE
},
1992 }, [&test
] (auto& failure_cases
) {
1993 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
1994 TestInterceptor interceptor
;
1995 interceptor
.make_fault(bp
);
1996 return test
.run_suite(
1997 fmt::format("test_v2_lossless_connected_fault2 -- {}", bp
),
1999 policy_t::lossless_client
,
2000 policy_t::stateful_server
,
2001 [&test
] (FailoverSuite
& suite
) {
2002 return seastar::futurize_invoke([&suite
] {
2003 return suite
.connect_peer();
2005 return suite
.wait_established();
2007 return suite
.send_peer();
2009 return suite
.keepalive_peer();
2011 return suite
.wait_established();
2013 return test
.peer_send_me();
2015 return test
.peer_keepalive_me();
2017 return suite
.wait_established();
2019 return suite
.send_peer();
2021 return suite
.wait_established();
2023 return test
.peer_send_me();
2025 return suite
.wait_established();
2027 return suite
.wait_results(1);
2028 }).then([] (ConnResults
& results
) {
2029 results
[0].assert_state_at(conn_state_t::established
);
2030 results
[0].assert_connect(2, 1, 1, 2);
2031 results
[0].assert_accept(0, 0, 0, 0);
2032 results
[0].assert_reset(0, 0);
2040 test_v2_lossless_reconnect_fault(FailoverTest
& test
) {
2041 return seastar::do_with(std::vector
<std::pair
<Breakpoint
, Breakpoint
>>{
2042 {{Tag::MESSAGE
, bp_type_t::WRITE
},
2043 {Tag::SESSION_RECONNECT
, bp_type_t::WRITE
}},
2044 {{Tag::MESSAGE
, bp_type_t::WRITE
},
2045 {Tag::SESSION_RECONNECT_OK
, bp_type_t::READ
}},
2046 }, [&test
] (auto& failure_cases
) {
2047 return seastar::do_for_each(failure_cases
, [&test
] (auto bp_pair
) {
2048 TestInterceptor interceptor
;
2049 interceptor
.make_fault(bp_pair
.first
);
2050 interceptor
.make_fault(bp_pair
.second
);
2051 return test
.run_suite(
2052 fmt::format("test_v2_lossless_reconnect_fault -- {}, {}",
2053 bp_pair
.first
, bp_pair
.second
),
2055 policy_t::lossless_client
,
2056 policy_t::stateful_server
,
2057 [&test
] (FailoverSuite
& suite
) {
2058 return seastar::futurize_invoke([&test
] {
2059 return test
.send_bidirectional();
2061 return suite
.connect_peer();
2063 return suite
.wait_results(1);
2064 }).then([] (ConnResults
& results
) {
2065 results
[0].assert_state_at(conn_state_t::established
);
2066 results
[0].assert_connect(3, 1, 2, 2);
2067 results
[0].assert_accept(0, 0, 0, 0);
2068 results
[0].assert_reset(0, 0);
2076 test_v2_lossless_accept_fault(FailoverTest
& test
) {
2077 auto bp
= Breakpoint
{Tag::CLIENT_IDENT
, bp_type_t::READ
};
2078 TestInterceptor interceptor
;
2079 interceptor
.make_fault(bp
);
2080 return test
.run_suite(
2081 fmt::format("test_v2_lossless_accept_fault -- {}", bp
),
2083 policy_t::stateful_server
,
2084 policy_t::lossless_client
,
2085 [&test
] (FailoverSuite
& suite
) {
2086 return seastar::futurize_invoke([&test
] {
2087 return test
.send_bidirectional();
2089 return test
.peer_connect_me();
2091 return suite
.wait_results(2);
2092 }).then([] (ConnResults
& results
) {
2093 results
[0].assert_state_at(conn_state_t::closed
);
2094 results
[0].assert_connect(0, 0, 0, 0);
2095 results
[0].assert_accept(1, 1, 0, 0);
2096 results
[0].assert_reset(0, 0);
2097 results
[1].assert_state_at(conn_state_t::established
);
2098 results
[1].assert_connect(0, 0, 0, 0);
2099 results
[1].assert_accept(1, 1, 0, 1);
2100 results
[1].assert_reset(0, 0);
2106 test_v2_lossless_establishing_fault(FailoverTest
& test
) {
2107 auto bp
= Breakpoint
{Tag::SERVER_IDENT
, bp_type_t::WRITE
};
2108 TestInterceptor interceptor
;
2109 interceptor
.make_fault(bp
);
2110 return test
.run_suite(
2111 fmt::format("test_v2_lossless_establishing_fault -- {}", bp
),
2113 policy_t::stateful_server
,
2114 policy_t::lossless_client
,
2115 [&test
] (FailoverSuite
& suite
) {
2116 return seastar::futurize_invoke([&test
] {
2117 return test
.send_bidirectional();
2119 return test
.peer_connect_me();
2121 return suite
.wait_results(2);
2122 }).then([] (ConnResults
& results
) {
2123 results
[0].assert_state_at(conn_state_t::established
);
2124 results
[0].assert_connect(0, 0, 0, 0);
2125 results
[0].assert_accept(1, 1, 0, 2);
2126 results
[0].assert_reset(0, 0);
2127 results
[1].assert_state_at(conn_state_t::replaced
);
2128 results
[1].assert_connect(0, 0, 0, 0);
2129 results
[1].assert_accept(1, 1, 0, 0);
2130 results
[1].assert_reset(0, 0);
2136 test_v2_lossless_accepted_fault(FailoverTest
& test
) {
2137 return seastar::do_with(std::vector
<Breakpoint
>{
2138 {Tag::MESSAGE
, bp_type_t::WRITE
},
2139 {Tag::MESSAGE
, bp_type_t::READ
},
2140 }, [&test
] (auto& failure_cases
) {
2141 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
2142 TestInterceptor interceptor
;
2143 interceptor
.make_fault(bp
);
2144 return test
.run_suite(
2145 fmt::format("test_v2_lossless_accepted_fault -- {}", bp
),
2147 policy_t::stateful_server
,
2148 policy_t::lossless_client
,
2149 [&test
] (FailoverSuite
& suite
) {
2150 return seastar::futurize_invoke([&test
] {
2151 return test
.send_bidirectional();
2153 return test
.peer_connect_me();
2155 return suite
.wait_results(2);
2156 }).then([] (ConnResults
& results
) {
2157 results
[0].assert_state_at(conn_state_t::established
);
2158 results
[0].assert_connect(0, 0, 0, 0);
2159 results
[0].assert_accept(1, 1, 0, 2);
2160 results
[0].assert_reset(0, 0);
2161 results
[1].assert_state_at(conn_state_t::replaced
);
2162 results
[1].assert_connect(0, 0, 0, 0);
2163 results
[1].assert_accept(1, 0);
2164 results
[1].assert_reset(0, 0);
2172 test_v2_lossless_reaccept_fault(FailoverTest
& test
) {
2173 return seastar::do_with(std::vector
<std::pair
<Breakpoint
, Breakpoint
>>{
2174 {{Tag::MESSAGE
, bp_type_t::READ
},
2175 {Tag::SESSION_RECONNECT
, bp_type_t::READ
}},
2176 {{Tag::MESSAGE
, bp_type_t::READ
},
2177 {Tag::SESSION_RECONNECT_OK
, bp_type_t::WRITE
}},
2178 }, [&test
] (auto& failure_cases
) {
2179 return seastar::do_for_each(failure_cases
, [&test
] (auto bp_pair
) {
2180 TestInterceptor interceptor
;
2181 interceptor
.make_fault(bp_pair
.first
);
2182 interceptor
.make_fault(bp_pair
.second
);
2183 return test
.run_suite(
2184 fmt::format("test_v2_lossless_reaccept_fault -- {}, {}",
2185 bp_pair
.first
, bp_pair
.second
),
2187 policy_t::stateful_server
,
2188 policy_t::lossless_client
,
2189 [&test
, bp
= bp_pair
.second
] (FailoverSuite
& suite
) {
2190 return seastar::futurize_invoke([&test
] {
2191 return test
.send_bidirectional();
2193 return test
.peer_connect_me();
2195 return suite
.wait_results(3);
2196 }).then([bp
] (ConnResults
& results
) {
2197 results
[0].assert_state_at(conn_state_t::established
);
2198 results
[0].assert_connect(0, 0, 0, 0);
2199 if (bp
== Breakpoint
{Tag::SESSION_RECONNECT
, bp_type_t::READ
}) {
2200 results
[0].assert_accept(1, 1, 0, 2);
2202 results
[0].assert_accept(1, 1, 0, 3);
2204 results
[0].assert_reset(0, 0);
2205 if (bp
== Breakpoint
{Tag::SESSION_RECONNECT
, bp_type_t::READ
}) {
2206 results
[1].assert_state_at(conn_state_t::closed
);
2208 results
[1].assert_state_at(conn_state_t::replaced
);
2210 results
[1].assert_connect(0, 0, 0, 0);
2211 results
[1].assert_accept(1, 0, 1, 0);
2212 results
[1].assert_reset(0, 0);
2213 results
[2].assert_state_at(conn_state_t::replaced
);
2214 results
[2].assert_connect(0, 0, 0, 0);
2215 results
[2].assert_accept(1, 0, 1, 0);
2216 results
[2].assert_reset(0, 0);
2224 test_v2_peer_connect_fault(FailoverTest
& test
) {
2225 return seastar::do_with(std::vector
<Breakpoint
>{
2226 {Tag::CLIENT_IDENT
, bp_type_t::WRITE
},
2227 {Tag::SERVER_IDENT
, bp_type_t::READ
},
2228 }, [&test
] (auto& failure_cases
) {
2229 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
2230 TestInterceptor interceptor
;
2231 interceptor
.make_fault(bp
);
2232 return test
.run_suite(
2233 fmt::format("test_v2_peer_connect_fault -- {}", bp
),
2235 policy_t::lossless_peer
,
2236 policy_t::lossless_peer
,
2237 [] (FailoverSuite
& suite
) {
2238 return seastar::futurize_invoke([&suite
] {
2239 return suite
.send_peer();
2241 return suite
.connect_peer();
2243 return suite
.wait_results(1);
2244 }).then([] (ConnResults
& results
) {
2245 results
[0].assert_state_at(conn_state_t::established
);
2246 results
[0].assert_connect(2, 2, 0, 1);
2247 results
[0].assert_accept(0, 0, 0, 0);
2248 results
[0].assert_reset(0, 0);
2256 test_v2_peer_accept_fault(FailoverTest
& test
) {
2257 auto bp
= Breakpoint
{Tag::CLIENT_IDENT
, bp_type_t::READ
};
2258 TestInterceptor interceptor
;
2259 interceptor
.make_fault(bp
);
2260 return test
.run_suite(
2261 fmt::format("test_v2_peer_accept_fault -- {}", bp
),
2263 policy_t::lossless_peer
,
2264 policy_t::lossless_peer
,
2265 [&test
] (FailoverSuite
& suite
) {
2266 return seastar::futurize_invoke([&test
] {
2267 return test
.peer_send_me();
2269 return test
.peer_connect_me();
2271 return suite
.wait_results(2);
2272 }).then([] (ConnResults
& results
) {
2273 results
[0].assert_state_at(conn_state_t::closed
);
2274 results
[0].assert_connect(0, 0, 0, 0);
2275 results
[0].assert_accept(1, 1, 0, 0);
2276 results
[0].assert_reset(0, 0);
2277 results
[1].assert_state_at(conn_state_t::established
);
2278 results
[1].assert_connect(0, 0, 0, 0);
2279 results
[1].assert_accept(1, 1, 0, 1);
2280 results
[1].assert_reset(0, 0);
2286 test_v2_peer_establishing_fault(FailoverTest
& test
) {
2287 auto bp
= Breakpoint
{Tag::SERVER_IDENT
, bp_type_t::WRITE
};
2288 TestInterceptor interceptor
;
2289 interceptor
.make_fault(bp
);
2290 return test
.run_suite(
2291 fmt::format("test_v2_peer_establishing_fault -- {}", bp
),
2293 policy_t::lossless_peer
,
2294 policy_t::lossless_peer
,
2295 [&test
] (FailoverSuite
& suite
) {
2296 return seastar::futurize_invoke([&test
] {
2297 return test
.peer_send_me();
2299 return test
.peer_connect_me();
2301 return suite
.wait_results(2);
2302 }).then([] (ConnResults
& results
) {
2303 results
[0].assert_state_at(conn_state_t::established
);
2304 results
[0].assert_connect(0, 0, 0, 0);
2305 results
[0].assert_accept(1, 1, 0, 2);
2306 results
[0].assert_reset(0, 0);
2307 results
[1].assert_state_at(conn_state_t::replaced
);
2308 results
[1].assert_connect(0, 0, 0, 0);
2309 results
[1].assert_accept(1, 1, 0, 0);
2310 results
[1].assert_reset(0, 0);
2316 test_v2_peer_connected_fault_reconnect(FailoverTest
& test
) {
2317 auto bp
= Breakpoint
{Tag::MESSAGE
, bp_type_t::WRITE
};
2318 TestInterceptor interceptor
;
2319 interceptor
.make_fault(bp
);
2320 return test
.run_suite(
2321 fmt::format("test_v2_peer_connected_fault_reconnect -- {}", bp
),
2323 policy_t::lossless_peer
,
2324 policy_t::lossless_peer
,
2325 [] (FailoverSuite
& suite
) {
2326 return seastar::futurize_invoke([&suite
] {
2327 return suite
.send_peer();
2329 return suite
.connect_peer();
2331 return suite
.wait_results(1);
2332 }).then([] (ConnResults
& results
) {
2333 results
[0].assert_state_at(conn_state_t::established
);
2334 results
[0].assert_connect(2, 1, 1, 2);
2335 results
[0].assert_accept(0, 0, 0, 0);
2336 results
[0].assert_reset(0, 0);
2342 test_v2_peer_connected_fault_reaccept(FailoverTest
& test
) {
2343 auto bp
= Breakpoint
{Tag::MESSAGE
, bp_type_t::READ
};
2344 TestInterceptor interceptor
;
2345 interceptor
.make_fault(bp
);
2346 return test
.run_suite(
2347 fmt::format("test_v2_peer_connected_fault_reaccept -- {}", bp
),
2349 policy_t::lossless_peer
,
2350 policy_t::lossless_peer
,
2351 [&test
] (FailoverSuite
& suite
) {
2352 return seastar::futurize_invoke([&test
] {
2353 return test
.peer_send_me();
2355 return suite
.connect_peer();
2357 return suite
.wait_results(2);
2358 }).then([] (ConnResults
& results
) {
2359 results
[0].assert_state_at(conn_state_t::established
);
2360 results
[0].assert_connect(1, 1, 0, 1);
2361 results
[0].assert_accept(0, 0, 0, 1);
2362 results
[0].assert_reset(0, 0);
2363 results
[1].assert_state_at(conn_state_t::replaced
);
2364 results
[1].assert_connect(0, 0, 0, 0);
2365 results
[1].assert_accept(1, 0, 1, 0);
2366 results
[1].assert_reset(0, 0);
2371 seastar::future
<bool>
2372 check_peer_wins(FailoverTest
& test
) {
2373 return seastar::do_with(bool(), [&test
] (auto& ret
) {
2374 return test
.run_suite("check_peer_wins",
2376 policy_t::lossy_client
,
2377 policy_t::stateless_server
,
2378 [&ret
] (FailoverSuite
& suite
) {
2379 return suite
.connect_peer().then([&suite
] {
2380 return suite
.wait_results(1);
2381 }).then([&ret
] (ConnResults
& results
) {
2382 results
[0].assert_state_at(conn_state_t::established
);
2383 ret
= results
[0].conn
->peer_wins();
2384 logger().info("check_peer_wins: {}", ret
);
2393 test_v2_racing_reconnect_acceptor_lose(FailoverTest
& test
) {
2394 return seastar::do_with(std::vector
<std::pair
<unsigned, Breakpoint
>>{
2395 {1, {Tag::SESSION_RECONNECT
, bp_type_t::READ
}},
2396 {2, {custom_bp_t::BANNER_WRITE
}},
2397 {2, {custom_bp_t::BANNER_READ
}},
2398 {2, {custom_bp_t::BANNER_PAYLOAD_READ
}},
2399 {2, {Tag::HELLO
, bp_type_t::WRITE
}},
2400 {2, {Tag::HELLO
, bp_type_t::READ
}},
2401 {2, {Tag::AUTH_REQUEST
, bp_type_t::READ
}},
2402 {2, {Tag::AUTH_DONE
, bp_type_t::WRITE
}},
2403 {2, {Tag::AUTH_SIGNATURE
, bp_type_t::WRITE
}},
2404 {2, {Tag::AUTH_SIGNATURE
, bp_type_t::READ
}},
2405 }, [&test
] (auto& failure_cases
) {
2406 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
2407 TestInterceptor interceptor
;
2409 interceptor
.make_fault({Tag::MESSAGE
, bp_type_t::READ
});
2411 interceptor
.make_block(bp
.second
, bp
.first
);
2412 return test
.run_suite(
2413 fmt::format("test_v2_racing_reconnect_acceptor_lose -- {}({})",
2414 bp
.second
, bp
.first
),
2416 policy_t::lossless_peer
,
2417 policy_t::lossless_peer
,
2418 [&test
] (FailoverSuite
& suite
) {
2419 return seastar::futurize_invoke([&test
] {
2420 return test
.peer_send_me();
2422 return test
.peer_connect_me();
2424 return suite
.wait_blocked();
2426 return suite
.send_peer();
2428 return suite
.wait_established();
2431 return suite
.wait_results(2);
2432 }).then([] (ConnResults
& results
) {
2433 results
[0].assert_state_at(conn_state_t::established
);
2434 results
[0].assert_connect(1, 0, 1, 1);
2435 results
[0].assert_accept(1, 1, 0, 1);
2436 results
[0].assert_reset(0, 0);
2437 results
[1].assert_state_at(conn_state_t::closed
);
2438 results
[1].assert_connect(0, 0, 0, 0);
2439 results
[1].assert_accept(1, 0);
2440 results
[1].assert_reset(0, 0);
2448 test_v2_racing_reconnect_acceptor_win(FailoverTest
& test
) {
2449 return seastar::do_with(std::vector
<std::pair
<unsigned, Breakpoint
>>{
2450 {1, {Tag::SESSION_RECONNECT
, bp_type_t::WRITE
}},
2451 {2, {custom_bp_t::SOCKET_CONNECTING
}},
2452 {2, {custom_bp_t::BANNER_WRITE
}},
2453 {2, {custom_bp_t::BANNER_READ
}},
2454 {2, {custom_bp_t::BANNER_PAYLOAD_READ
}},
2455 {2, {Tag::HELLO
, bp_type_t::WRITE
}},
2456 {2, {Tag::HELLO
, bp_type_t::READ
}},
2457 {2, {Tag::AUTH_REQUEST
, bp_type_t::WRITE
}},
2458 {2, {Tag::AUTH_DONE
, bp_type_t::READ
}},
2459 {2, {Tag::AUTH_SIGNATURE
, bp_type_t::WRITE
}},
2460 {2, {Tag::AUTH_SIGNATURE
, bp_type_t::READ
}},
2461 }, [&test
] (auto& failure_cases
) {
2462 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
2463 TestInterceptor interceptor
;
2465 interceptor
.make_fault({Tag::MESSAGE
, bp_type_t::WRITE
});
2467 interceptor
.make_block(bp
.second
, bp
.first
);
2468 return test
.run_suite(
2469 fmt::format("test_v2_racing_reconnect_acceptor_win -- {}({})",
2470 bp
.second
, bp
.first
),
2472 policy_t::lossless_peer
,
2473 policy_t::lossless_peer
,
2474 [&test
] (FailoverSuite
& suite
) {
2475 return seastar::futurize_invoke([&suite
] {
2476 return suite
.send_peer();
2478 return suite
.connect_peer();
2480 return suite
.wait_blocked();
2482 return test
.peer_send_me();
2484 return suite
.wait_replaced(1);
2487 return suite
.wait_results(2);
2488 }).then([] (ConnResults
& results
) {
2489 results
[0].assert_state_at(conn_state_t::established
);
2490 results
[0].assert_connect(2, 1);
2491 results
[0].assert_accept(0, 0, 0, 1);
2492 results
[0].assert_reset(0, 0);
2493 results
[1].assert_state_at(conn_state_t::replaced
);
2494 results
[1].assert_connect(0, 0, 0, 0);
2495 results
[1].assert_accept(1, 0, 1, 0);
2496 results
[1].assert_reset(0, 0);
2504 test_v2_racing_connect_acceptor_lose(FailoverTest
& test
) {
2505 return seastar::do_with(std::vector
<Breakpoint
>{
2506 {custom_bp_t::BANNER_WRITE
},
2507 {custom_bp_t::BANNER_READ
},
2508 {custom_bp_t::BANNER_PAYLOAD_READ
},
2509 {Tag::HELLO
, bp_type_t::WRITE
},
2510 {Tag::HELLO
, bp_type_t::READ
},
2511 {Tag::AUTH_REQUEST
, bp_type_t::READ
},
2512 {Tag::AUTH_DONE
, bp_type_t::WRITE
},
2513 {Tag::AUTH_SIGNATURE
, bp_type_t::WRITE
},
2514 {Tag::AUTH_SIGNATURE
, bp_type_t::READ
},
2515 {Tag::CLIENT_IDENT
, bp_type_t::READ
},
2516 }, [&test
] (auto& failure_cases
) {
2517 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
2518 TestInterceptor interceptor
;
2520 interceptor
.make_block(bp
);
2521 return test
.run_suite(
2522 fmt::format("test_v2_racing_connect_acceptor_lose -- {}", bp
),
2524 policy_t::lossless_peer
,
2525 policy_t::lossless_peer
,
2526 [&test
] (FailoverSuite
& suite
) {
2527 return seastar::futurize_invoke([&test
] {
2528 return test
.peer_send_me();
2530 return test
.peer_connect_me();
2532 return suite
.wait_blocked();
2534 return suite
.send_peer();
2536 return suite
.connect_peer();
2538 return suite
.wait_established();
2541 return suite
.wait_results(2);
2542 }).then([] (ConnResults
& results
) {
2543 results
[0].assert_state_at(conn_state_t::closed
);
2544 results
[0].assert_connect(0, 0, 0, 0);
2545 results
[0].assert_accept(1, 0);
2546 results
[0].assert_reset(0, 0);
2547 results
[1].assert_state_at(conn_state_t::established
);
2548 results
[1].assert_connect(1, 1, 0, 1);
2549 results
[1].assert_accept(0, 0, 0, 0);
2550 results
[1].assert_reset(0, 0);
2558 test_v2_racing_connect_acceptor_win(FailoverTest
& test
) {
2559 return seastar::do_with(std::vector
<Breakpoint
>{
2560 {custom_bp_t::SOCKET_CONNECTING
},
2561 {custom_bp_t::BANNER_WRITE
},
2562 {custom_bp_t::BANNER_READ
},
2563 {custom_bp_t::BANNER_PAYLOAD_READ
},
2564 {Tag::HELLO
, bp_type_t::WRITE
},
2565 {Tag::HELLO
, bp_type_t::READ
},
2566 {Tag::AUTH_REQUEST
, bp_type_t::WRITE
},
2567 {Tag::AUTH_DONE
, bp_type_t::READ
},
2568 {Tag::AUTH_SIGNATURE
, bp_type_t::WRITE
},
2569 {Tag::AUTH_SIGNATURE
, bp_type_t::READ
},
2570 {Tag::CLIENT_IDENT
, bp_type_t::WRITE
},
2571 }, [&test
] (auto& failure_cases
) {
2572 return seastar::do_for_each(failure_cases
, [&test
] (auto bp
) {
2573 TestInterceptor interceptor
;
2575 interceptor
.make_block(bp
);
2576 return test
.run_suite(
2577 fmt::format("test_v2_racing_connect_acceptor_win -- {}", bp
),
2579 policy_t::lossless_peer
,
2580 policy_t::lossless_peer
,
2581 [&test
] (FailoverSuite
& suite
) {
2582 return seastar::futurize_invoke([&suite
] {
2583 return suite
.send_peer();
2585 return suite
.connect_peer();
2587 return suite
.wait_blocked();
2589 return test
.peer_send_me();
2591 return test
.peer_connect_me();
2593 return suite
.wait_replaced(1);
2596 return suite
.wait_results(2);
2597 }).then([] (ConnResults
& results
) {
2598 results
[0].assert_state_at(conn_state_t::established
);
2599 results
[0].assert_connect(1, 0);
2600 results
[0].assert_accept(0, 0, 0, 1);
2601 results
[0].assert_reset(0, 0);
2602 results
[1].assert_state_at(conn_state_t::replaced
);
2603 results
[1].assert_connect(0, 0, 0, 0);
2604 results
[1].assert_accept(1, 1, 0, 0);
2605 results
[1].assert_reset(0, 0);
2613 test_v2_racing_connect_reconnect_lose(FailoverTest
& test
) {
2614 TestInterceptor interceptor
;
2615 interceptor
.make_fault({Tag::SERVER_IDENT
, bp_type_t::READ
});
2616 interceptor
.make_block({Tag::CLIENT_IDENT
, bp_type_t::WRITE
}, 2);
2617 return test
.run_suite("test_v2_racing_connect_reconnect_lose",
2619 policy_t::lossless_peer
,
2620 policy_t::lossless_peer
,
2621 [&test
] (FailoverSuite
& suite
) {
2622 return seastar::futurize_invoke([&suite
] {
2623 return suite
.send_peer();
2625 return suite
.connect_peer();
2627 return suite
.wait_blocked();
2629 return test
.peer_send_me();
2631 return suite
.wait_replaced(1);
2634 return suite
.wait_results(2);
2635 }).then([] (ConnResults
& results
) {
2636 results
[0].assert_state_at(conn_state_t::established
);
2637 results
[0].assert_connect(2, 2, 0, 0);
2638 results
[0].assert_accept(0, 0, 0, 1);
2639 results
[0].assert_reset(0, 0);
2640 results
[1].assert_state_at(conn_state_t::replaced
);
2641 results
[1].assert_connect(0, 0, 0, 0);
2642 results
[1].assert_accept(1, 1, 1, 0);
2643 results
[1].assert_reset(0, 0);
2649 test_v2_racing_connect_reconnect_win(FailoverTest
& test
) {
2650 TestInterceptor interceptor
;
2651 interceptor
.make_fault({Tag::SERVER_IDENT
, bp_type_t::READ
});
2652 interceptor
.make_block({Tag::SESSION_RECONNECT
, bp_type_t::READ
});
2653 return test
.run_suite("test_v2_racing_connect_reconnect_win",
2655 policy_t::lossless_peer
,
2656 policy_t::lossless_peer
,
2657 [&test
] (FailoverSuite
& suite
) {
2658 return seastar::futurize_invoke([&test
] {
2659 return test
.peer_send_me();
2661 return suite
.connect_peer();
2663 return suite
.wait_blocked();
2665 return suite
.send_peer();
2667 return suite
.wait_established();
2670 return suite
.wait_results(2);
2671 }).then([] (ConnResults
& results
) {
2672 results
[0].assert_state_at(conn_state_t::established
);
2673 results
[0].assert_connect(2, 2, 0, 1);
2674 results
[0].assert_accept(0, 0, 0, 0);
2675 results
[0].assert_reset(0, 0);
2676 results
[1].assert_state_at(conn_state_t::closed
);
2677 results
[1].assert_connect(0, 0, 0, 0);
2678 results
[1].assert_accept(1, 0, 1, 0);
2679 results
[1].assert_reset(0, 0);
2685 test_v2_stale_connect(FailoverTest
& test
) {
2686 auto bp
= Breakpoint
{Tag::SERVER_IDENT
, bp_type_t::READ
};
2687 TestInterceptor interceptor
;
2688 interceptor
.make_stall(bp
);
2689 return test
.run_suite(
2690 fmt::format("test_v2_stale_connect -- {}", bp
),
2692 policy_t::lossless_peer
,
2693 policy_t::lossless_peer
,
2694 [&test
] (FailoverSuite
& suite
) {
2695 return seastar::futurize_invoke([&suite
] {
2696 return suite
.connect_peer();
2698 return suite
.wait_blocked();
2700 return test
.peer_send_me();
2702 return suite
.wait_replaced(1);
2705 return suite
.wait_results(2);
2706 }).then([] (ConnResults
& results
) {
2707 results
[0].assert_state_at(conn_state_t::established
);
2708 results
[0].assert_connect(1, 1, 0, 0);
2709 results
[0].assert_accept(0, 0, 0, 1);
2710 results
[0].assert_reset(0, 0);
2711 results
[1].assert_state_at(conn_state_t::replaced
);
2712 results
[1].assert_connect(0, 0, 0, 0);
2713 results
[1].assert_accept(1, 1, 1, 0);
2714 results
[1].assert_reset(0, 0);
2720 test_v2_stale_reconnect(FailoverTest
& test
) {
2721 auto bp
= Breakpoint
{Tag::SESSION_RECONNECT_OK
, bp_type_t::READ
};
2722 TestInterceptor interceptor
;
2723 interceptor
.make_fault({Tag::MESSAGE
, bp_type_t::WRITE
});
2724 interceptor
.make_stall(bp
);
2725 return test
.run_suite(
2726 fmt::format("test_v2_stale_reconnect -- {}", bp
),
2728 policy_t::lossless_peer
,
2729 policy_t::lossless_peer
,
2730 [&test
] (FailoverSuite
& suite
) {
2731 return seastar::futurize_invoke([&suite
] {
2732 return suite
.send_peer();
2734 return suite
.connect_peer();
2736 return suite
.wait_blocked();
2738 return test
.peer_send_me();
2740 return suite
.wait_replaced(1);
2743 return suite
.wait_results(2);
2744 }).then([] (ConnResults
& results
) {
2745 results
[0].assert_state_at(conn_state_t::established
);
2746 results
[0].assert_connect(2, 1, 1, 1);
2747 results
[0].assert_accept(0, 0, 0, 1);
2748 results
[0].assert_reset(0, 0);
2749 results
[1].assert_state_at(conn_state_t::replaced
);
2750 results
[1].assert_connect(0, 0, 0, 0);
2751 results
[1].assert_accept(1, 0, 1, 0);
2752 results
[1].assert_reset(0, 0);
2758 test_v2_stale_accept(FailoverTest
& test
) {
2759 auto bp
= Breakpoint
{Tag::CLIENT_IDENT
, bp_type_t::READ
};
2760 TestInterceptor interceptor
;
2761 interceptor
.make_stall(bp
);
2762 return test
.run_suite(
2763 fmt::format("test_v2_stale_accept -- {}", bp
),
2765 policy_t::lossless_peer
,
2766 policy_t::lossless_peer
,
2767 [&test
] (FailoverSuite
& suite
) {
2768 return seastar::futurize_invoke([&test
] {
2769 return test
.peer_connect_me();
2771 return suite
.wait_blocked();
2773 return test
.peer_send_me();
2775 return suite
.wait_established();
2778 return suite
.wait_results(2);
2779 }).then([] (ConnResults
& results
) {
2780 results
[0].assert_state_at(conn_state_t::closed
);
2781 results
[0].assert_connect(0, 0, 0, 0);
2782 results
[0].assert_accept(1, 1, 0, 0);
2783 results
[0].assert_reset(0, 0);
2784 results
[1].assert_state_at(conn_state_t::established
);
2785 results
[1].assert_connect(0, 0, 0, 0);
2786 results
[1].assert_accept(1, 1, 0, 1);
2787 results
[1].assert_reset(0, 0);
2793 test_v2_stale_establishing(FailoverTest
& test
) {
2794 auto bp
= Breakpoint
{Tag::SERVER_IDENT
, bp_type_t::WRITE
};
2795 TestInterceptor interceptor
;
2796 interceptor
.make_stall(bp
);
2797 return test
.run_suite(
2798 fmt::format("test_v2_stale_establishing -- {}", bp
),
2800 policy_t::lossless_peer
,
2801 policy_t::lossless_peer
,
2802 [&test
] (FailoverSuite
& suite
) {
2803 return seastar::futurize_invoke([&test
] {
2804 return test
.peer_connect_me();
2806 return suite
.wait_blocked();
2808 return test
.peer_send_me();
2810 return suite
.wait_replaced(1);
2813 return suite
.wait_results(2);
2814 }).then([] (ConnResults
& results
) {
2815 results
[0].assert_state_at(conn_state_t::established
);
2816 results
[0].assert_connect(0, 0, 0, 0);
2817 results
[0].assert_accept(1, 1, 0, 2);
2818 results
[0].assert_reset(0, 0);
2819 results
[1].assert_state_at(conn_state_t::replaced
);
2820 results
[1].assert_connect(0, 0, 0, 0);
2821 results
[1].assert_accept(1, 0);
2822 results
[1].assert_reset(0, 0);
2828 test_v2_stale_reaccept(FailoverTest
& test
) {
2829 auto bp
= Breakpoint
{Tag::SESSION_RECONNECT_OK
, bp_type_t::WRITE
};
2830 TestInterceptor interceptor
;
2831 interceptor
.make_fault({Tag::MESSAGE
, bp_type_t::READ
});
2832 interceptor
.make_stall(bp
);
2833 return test
.run_suite(
2834 fmt::format("test_v2_stale_reaccept -- {}", bp
),
2836 policy_t::lossless_peer
,
2837 policy_t::lossless_peer
,
2838 [&test
] (FailoverSuite
& suite
) {
2839 return seastar::futurize_invoke([&test
] {
2840 return test
.peer_send_me();
2842 return test
.peer_connect_me();
2844 return suite
.wait_blocked();
2846 logger().info("[Test] block the broken REPLACING for 210ms...");
2847 return seastar::sleep(210ms
);
2850 return suite
.wait_results(3);
2851 }).then([] (ConnResults
& results
) {
2852 results
[0].assert_state_at(conn_state_t::established
);
2853 results
[0].assert_connect(0, 0, 0, 0);
2854 results
[0].assert_accept(1, 1, 0, 3);
2855 results
[0].assert_reset(0, 0);
2856 results
[1].assert_state_at(conn_state_t::replaced
);
2857 results
[1].assert_connect(0, 0, 0, 0);
2858 results
[1].assert_accept(1, 0, 1, 0);
2859 results
[1].assert_reset(0, 0);
2860 results
[2].assert_state_at(conn_state_t::replaced
);
2861 results
[2].assert_connect(0, 0, 0, 0);
2862 results
[2].assert_accept(1, 0);
2863 results
[2].assert_reset(0, 0);
2864 ceph_assert(results
[2].server_reconnect_attempts
>= 1);
2870 test_v2_lossy_client(FailoverTest
& test
) {
2871 return test
.run_suite(
2872 "test_v2_lossy_client",
2874 policy_t::lossy_client
,
2875 policy_t::stateless_server
,
2876 [&test
] (FailoverSuite
& suite
) {
2877 return seastar::futurize_invoke([&suite
] {
2878 logger().info("-- 0 --");
2879 logger().info("[Test] setup connection...");
2880 return suite
.connect_peer();
2882 return test
.send_bidirectional();
2884 return suite
.wait_results(1);
2885 }).then([] (ConnResults
& results
) {
2886 results
[0].assert_state_at(conn_state_t::established
);
2887 results
[0].assert_connect(1, 1, 0, 1);
2888 results
[0].assert_accept(0, 0, 0, 0);
2889 results
[0].assert_reset(0, 0);
2891 logger().info("-- 1 --");
2892 logger().info("[Test] client markdown...");
2893 return suite
.markdown();
2895 return suite
.connect_peer();
2897 return suite
.send_peer();
2899 return suite
.wait_results(2);
2900 }).then([] (ConnResults
& results
) {
2901 results
[0].assert_state_at(conn_state_t::closed
);
2902 results
[0].assert_connect(1, 1, 0, 1);
2903 results
[0].assert_accept(0, 0, 0, 0);
2904 results
[0].assert_reset(0, 0);
2905 results
[1].assert_state_at(conn_state_t::established
);
2906 results
[1].assert_connect(1, 1, 0, 1);
2907 results
[1].assert_accept(0, 0, 0, 0);
2908 results
[1].assert_reset(0, 0);
2910 logger().info("-- 2 --");
2911 logger().info("[Test] server markdown...");
2912 return test
.markdown_peer();
2914 return suite
.wait_results(2);
2915 }).then([] (ConnResults
& results
) {
2916 results
[0].assert_state_at(conn_state_t::closed
);
2917 results
[0].assert_connect(1, 1, 0, 1);
2918 results
[0].assert_accept(0, 0, 0, 0);
2919 results
[0].assert_reset(0, 0);
2920 results
[1].assert_state_at(conn_state_t::closed
);
2921 results
[1].assert_connect(1, 1, 0, 1);
2922 results
[1].assert_accept(0, 0, 0, 0);
2923 results
[1].assert_reset(1, 0);
2925 logger().info("-- 3 --");
2926 logger().info("[Test] client reconnect...");
2927 return suite
.connect_peer();
2929 return suite
.send_peer();
2931 return suite
.wait_results(3);
2932 }).then([] (ConnResults
& results
) {
2933 results
[0].assert_state_at(conn_state_t::closed
);
2934 results
[0].assert_connect(1, 1, 0, 1);
2935 results
[0].assert_accept(0, 0, 0, 0);
2936 results
[0].assert_reset(0, 0);
2937 results
[1].assert_state_at(conn_state_t::closed
);
2938 results
[1].assert_connect(1, 1, 0, 1);
2939 results
[1].assert_accept(0, 0, 0, 0);
2940 results
[1].assert_reset(1, 0);
2941 results
[2].assert_state_at(conn_state_t::established
);
2942 results
[2].assert_connect(1, 1, 0, 1);
2943 results
[2].assert_accept(0, 0, 0, 0);
2944 results
[2].assert_reset(0, 0);
2950 test_v2_stateless_server(FailoverTest
& test
) {
2951 return test
.run_suite(
2952 "test_v2_stateless_server",
2954 policy_t::stateless_server
,
2955 policy_t::lossy_client
,
2956 [&test
] (FailoverSuite
& suite
) {
2957 return seastar::futurize_invoke([&test
] {
2958 logger().info("-- 0 --");
2959 logger().info("[Test] setup connection...");
2960 return test
.peer_connect_me();
2962 return test
.send_bidirectional();
2964 return suite
.wait_results(1);
2965 }).then([] (ConnResults
& results
) {
2966 results
[0].assert_state_at(conn_state_t::established
);
2967 results
[0].assert_connect(0, 0, 0, 0);
2968 results
[0].assert_accept(1, 1, 0, 1);
2969 results
[0].assert_reset(0, 0);
2971 logger().info("-- 1 --");
2972 logger().info("[Test] client markdown...");
2973 return test
.markdown_peer();
2975 return test
.peer_connect_me();
2977 return test
.peer_send_me();
2979 return suite
.wait_results(2);
2980 }).then([] (ConnResults
& results
) {
2981 results
[0].assert_state_at(conn_state_t::closed
);
2982 results
[0].assert_connect(0, 0, 0, 0);
2983 results
[0].assert_accept(1, 1, 0, 1);
2984 results
[0].assert_reset(1, 0);
2985 results
[1].assert_state_at(conn_state_t::established
);
2986 results
[1].assert_connect(0, 0, 0, 0);
2987 results
[1].assert_accept(1, 1, 0, 1);
2988 results
[1].assert_reset(0, 0);
2990 logger().info("-- 2 --");
2991 logger().info("[Test] server markdown...");
2992 return suite
.markdown();
2994 return suite
.wait_results(2);
2995 }).then([] (ConnResults
& results
) {
2996 results
[0].assert_state_at(conn_state_t::closed
);
2997 results
[0].assert_connect(0, 0, 0, 0);
2998 results
[0].assert_accept(1, 1, 0, 1);
2999 results
[0].assert_reset(1, 0);
3000 results
[1].assert_state_at(conn_state_t::closed
);
3001 results
[1].assert_connect(0, 0, 0, 0);
3002 results
[1].assert_accept(1, 1, 0, 1);
3003 results
[1].assert_reset(0, 0);
3005 logger().info("-- 3 --");
3006 logger().info("[Test] client reconnect...");
3007 return test
.peer_connect_me();
3009 return test
.peer_send_me();
3011 return suite
.wait_results(3);
3012 }).then([] (ConnResults
& results
) {
3013 results
[0].assert_state_at(conn_state_t::closed
);
3014 results
[0].assert_connect(0, 0, 0, 0);
3015 results
[0].assert_accept(1, 1, 0, 1);
3016 results
[0].assert_reset(1, 0);
3017 results
[1].assert_state_at(conn_state_t::closed
);
3018 results
[1].assert_connect(0, 0, 0, 0);
3019 results
[1].assert_accept(1, 1, 0, 1);
3020 results
[1].assert_reset(0, 0);
3021 results
[2].assert_state_at(conn_state_t::established
);
3022 results
[2].assert_connect(0, 0, 0, 0);
3023 results
[2].assert_accept(1, 1, 0, 1);
3024 results
[2].assert_reset(0, 0);
3030 test_v2_lossless_client(FailoverTest
& test
) {
3031 return test
.run_suite(
3032 "test_v2_lossless_client",
3034 policy_t::lossless_client
,
3035 policy_t::stateful_server
,
3036 [&test
] (FailoverSuite
& suite
) {
3037 return seastar::futurize_invoke([&suite
] {
3038 logger().info("-- 0 --");
3039 logger().info("[Test] setup connection...");
3040 return suite
.connect_peer();
3042 return test
.send_bidirectional();
3044 return suite
.wait_results(1);
3045 }).then([] (ConnResults
& results
) {
3046 results
[0].assert_state_at(conn_state_t::established
);
3047 results
[0].assert_connect(1, 1, 0, 1);
3048 results
[0].assert_accept(0, 0, 0, 0);
3049 results
[0].assert_reset(0, 0);
3051 logger().info("-- 1 --");
3052 logger().info("[Test] client markdown...");
3053 return suite
.markdown();
3055 return suite
.connect_peer();
3057 return suite
.send_peer();
3059 return suite
.wait_results(2);
3060 }).then([] (ConnResults
& results
) {
3061 results
[0].assert_state_at(conn_state_t::closed
);
3062 results
[0].assert_connect(1, 1, 0, 1);
3063 results
[0].assert_accept(0, 0, 0, 0);
3064 results
[0].assert_reset(0, 0);
3065 results
[1].assert_state_at(conn_state_t::established
);
3066 results
[1].assert_connect(1, 1, 0, 1);
3067 results
[1].assert_accept(0, 0, 0, 0);
3068 results
[1].assert_reset(0, 0);
3070 logger().info("-- 2 --");
3071 logger().info("[Test] server markdown...");
3072 return test
.markdown_peer();
3074 return suite
.wait_results(2);
3075 }).then([] (ConnResults
& results
) {
3076 results
[0].assert_state_at(conn_state_t::closed
);
3077 results
[0].assert_connect(1, 1, 0, 1);
3078 results
[0].assert_accept(0, 0, 0, 0);
3079 results
[0].assert_reset(0, 0);
3080 results
[1].assert_state_at(conn_state_t::established
);
3081 results
[1].assert_connect(2, 2, 1, 2);
3082 results
[1].assert_accept(0, 0, 0, 0);
3083 results
[1].assert_reset(0, 1);
3085 logger().info("-- 3 --");
3086 logger().info("[Test] client reconnect...");
3087 return suite
.connect_peer();
3089 return suite
.send_peer();
3091 return suite
.wait_results(2);
3092 }).then([] (ConnResults
& results
) {
3093 results
[0].assert_state_at(conn_state_t::closed
);
3094 results
[0].assert_connect(1, 1, 0, 1);
3095 results
[0].assert_accept(0, 0, 0, 0);
3096 results
[0].assert_reset(0, 0);
3097 results
[1].assert_state_at(conn_state_t::established
);
3098 results
[1].assert_connect(2, 2, 1, 2);
3099 results
[1].assert_accept(0, 0, 0, 0);
3100 results
[1].assert_reset(0, 1);
3106 test_v2_stateful_server(FailoverTest
& test
) {
3107 return test
.run_suite(
3108 "test_v2_stateful_server",
3110 policy_t::stateful_server
,
3111 policy_t::lossless_client
,
3112 [&test
] (FailoverSuite
& suite
) {
3113 return seastar::futurize_invoke([&test
] {
3114 logger().info("-- 0 --");
3115 logger().info("[Test] setup connection...");
3116 return test
.peer_connect_me();
3118 return test
.send_bidirectional();
3120 return suite
.wait_results(1);
3121 }).then([] (ConnResults
& results
) {
3122 results
[0].assert_state_at(conn_state_t::established
);
3123 results
[0].assert_connect(0, 0, 0, 0);
3124 results
[0].assert_accept(1, 1, 0, 1);
3125 results
[0].assert_reset(0, 0);
3127 logger().info("-- 1 --");
3128 logger().info("[Test] client markdown...");
3129 return test
.markdown_peer();
3131 return test
.peer_connect_me();
3133 return test
.peer_send_me();
3135 return suite
.wait_results(2);
3136 }).then([] (ConnResults
& results
) {
3137 results
[0].assert_state_at(conn_state_t::established
);
3138 results
[0].assert_connect(0, 0, 0, 0);
3139 results
[0].assert_accept(1, 1, 0, 2);
3140 results
[0].assert_reset(0, 1);
3141 results
[1].assert_state_at(conn_state_t::replaced
);
3142 results
[1].assert_connect(0, 0, 0, 0);
3143 results
[1].assert_accept(1, 1, 0, 0);
3144 results
[1].assert_reset(0, 0);
3146 logger().info("-- 2 --");
3147 logger().info("[Test] server markdown...");
3148 return suite
.markdown();
3150 return suite
.wait_results(3);
3151 }).then([] (ConnResults
& results
) {
3152 results
[0].assert_state_at(conn_state_t::closed
);
3153 results
[0].assert_connect(0, 0, 0, 0);
3154 results
[0].assert_accept(1, 1, 0, 2);
3155 results
[0].assert_reset(0, 1);
3156 results
[1].assert_state_at(conn_state_t::replaced
);
3157 results
[1].assert_connect(0, 0, 0, 0);
3158 results
[1].assert_accept(1, 1, 0, 0);
3159 results
[1].assert_reset(0, 0);
3160 results
[2].assert_state_at(conn_state_t::established
);
3161 results
[2].assert_connect(0, 0, 0, 0);
3162 results
[2].assert_accept(1, 1, 1, 1);
3163 results
[2].assert_reset(0, 0);
3165 logger().info("-- 3 --");
3166 logger().info("[Test] client reconnect...");
3167 return test
.peer_connect_me();
3169 return test
.peer_send_me();
3171 return suite
.wait_results(3);
3172 }).then([] (ConnResults
& results
) {
3173 results
[0].assert_state_at(conn_state_t::closed
);
3174 results
[0].assert_connect(0, 0, 0, 0);
3175 results
[0].assert_accept(1, 1, 0, 2);
3176 results
[0].assert_reset(0, 1);
3177 results
[1].assert_state_at(conn_state_t::replaced
);
3178 results
[1].assert_connect(0, 0, 0, 0);
3179 results
[1].assert_accept(1, 1, 0, 0);
3180 results
[1].assert_reset(0, 0);
3181 results
[2].assert_state_at(conn_state_t::established
);
3182 results
[2].assert_connect(0, 0, 0, 0);
3183 results
[2].assert_accept(1, 1, 1, 1);
3184 results
[2].assert_reset(0, 0);
3190 test_v2_peer_reuse_connector(FailoverTest
& test
) {
3191 return test
.run_suite(
3192 "test_v2_peer_reuse_connector",
3194 policy_t::lossless_peer_reuse
,
3195 policy_t::lossless_peer_reuse
,
3196 [&test
] (FailoverSuite
& suite
) {
3197 return seastar::futurize_invoke([&suite
] {
3198 logger().info("-- 0 --");
3199 logger().info("[Test] setup connection...");
3200 return suite
.connect_peer();
3202 return test
.send_bidirectional();
3204 return suite
.wait_results(1);
3205 }).then([] (ConnResults
& results
) {
3206 results
[0].assert_state_at(conn_state_t::established
);
3207 results
[0].assert_connect(1, 1, 0, 1);
3208 results
[0].assert_accept(0, 0, 0, 0);
3209 results
[0].assert_reset(0, 0);
3211 logger().info("-- 1 --");
3212 logger().info("[Test] connector markdown...");
3213 return suite
.markdown();
3215 return suite
.connect_peer();
3217 return suite
.send_peer();
3219 return suite
.wait_results(2);
3220 }).then([] (ConnResults
& results
) {
3221 results
[0].assert_state_at(conn_state_t::closed
);
3222 results
[0].assert_connect(1, 1, 0, 1);
3223 results
[0].assert_accept(0, 0, 0, 0);
3224 results
[0].assert_reset(0, 0);
3225 results
[1].assert_state_at(conn_state_t::established
);
3226 results
[1].assert_connect(1, 1, 0, 1);
3227 results
[1].assert_accept(0, 0, 0, 0);
3228 results
[1].assert_reset(0, 0);
3230 logger().info("-- 2 --");
3231 logger().info("[Test] acceptor markdown...");
3232 return test
.markdown_peer();
3234 ceph_assert(suite
.is_standby());
3235 logger().info("-- 3 --");
3236 logger().info("[Test] connector reconnect...");
3237 return suite
.connect_peer();
3239 return suite
.try_send_peer();
3241 return suite
.wait_results(2);
3242 }).then([] (ConnResults
& results
) {
3243 results
[0].assert_state_at(conn_state_t::closed
);
3244 results
[0].assert_connect(1, 1, 0, 1);
3245 results
[0].assert_accept(0, 0, 0, 0);
3246 results
[0].assert_reset(0, 0);
3247 results
[1].assert_state_at(conn_state_t::established
);
3248 results
[1].assert_connect(2, 2, 1, 2);
3249 results
[1].assert_accept(0, 0, 0, 0);
3250 results
[1].assert_reset(0, 1);
3256 test_v2_peer_reuse_acceptor(FailoverTest
& test
) {
3257 return test
.run_suite(
3258 "test_v2_peer_reuse_acceptor",
3260 policy_t::lossless_peer_reuse
,
3261 policy_t::lossless_peer_reuse
,
3262 [&test
] (FailoverSuite
& suite
) {
3263 return seastar::futurize_invoke([&test
] {
3264 logger().info("-- 0 --");
3265 logger().info("[Test] setup connection...");
3266 return test
.peer_connect_me();
3268 return test
.send_bidirectional();
3270 return suite
.wait_results(1);
3271 }).then([] (ConnResults
& results
) {
3272 results
[0].assert_state_at(conn_state_t::established
);
3273 results
[0].assert_connect(0, 0, 0, 0);
3274 results
[0].assert_accept(1, 1, 0, 1);
3275 results
[0].assert_reset(0, 0);
3277 logger().info("-- 1 --");
3278 logger().info("[Test] connector markdown...");
3279 return test
.markdown_peer();
3281 return test
.peer_connect_me();
3283 return test
.peer_send_me();
3285 return suite
.wait_results(2);
3286 }).then([] (ConnResults
& results
) {
3287 results
[0].assert_state_at(conn_state_t::established
);
3288 results
[0].assert_connect(0, 0, 0, 0);
3289 results
[0].assert_accept(1, 1, 0, 2);
3290 results
[0].assert_reset(0, 1);
3291 results
[1].assert_state_at(conn_state_t::replaced
);
3292 results
[1].assert_connect(0, 0, 0, 0);
3293 results
[1].assert_accept(1, 1, 0, 0);
3294 results
[1].assert_reset(0, 0);
3296 logger().info("-- 2 --");
3297 logger().info("[Test] acceptor markdown...");
3298 return suite
.markdown();
3300 return suite
.wait_results(2);
3301 }).then([] (ConnResults
& results
) {
3302 results
[0].assert_state_at(conn_state_t::closed
);
3303 results
[0].assert_connect(0, 0, 0, 0);
3304 results
[0].assert_accept(1, 1, 0, 2);
3305 results
[0].assert_reset(0, 1);
3306 results
[1].assert_state_at(conn_state_t::replaced
);
3307 results
[1].assert_connect(0, 0, 0, 0);
3308 results
[1].assert_accept(1, 1, 0, 0);
3309 results
[1].assert_reset(0, 0);
3311 logger().info("-- 3 --");
3312 logger().info("[Test] connector reconnect...");
3313 return test
.peer_connect_me();
3315 return test
.try_peer_send_me();
3317 return suite
.wait_results(3);
3318 }).then([] (ConnResults
& results
) {
3319 results
[0].assert_state_at(conn_state_t::closed
);
3320 results
[0].assert_connect(0, 0, 0, 0);
3321 results
[0].assert_accept(1, 1, 0, 2);
3322 results
[0].assert_reset(0, 1);
3323 results
[1].assert_state_at(conn_state_t::replaced
);
3324 results
[1].assert_connect(0, 0, 0, 0);
3325 results
[1].assert_accept(1, 1, 0, 0);
3326 results
[1].assert_reset(0, 0);
3327 results
[2].assert_state_at(conn_state_t::established
);
3328 results
[2].assert_connect(0, 0, 0, 0);
3329 results
[2].assert_accept(1, 1, 1, 1);
3330 results
[2].assert_reset(0, 0);
3336 test_v2_lossless_peer_connector(FailoverTest
& test
) {
3337 return test
.run_suite(
3338 "test_v2_lossless_peer_connector",
3340 policy_t::lossless_peer
,
3341 policy_t::lossless_peer
,
3342 [&test
] (FailoverSuite
& suite
) {
3343 return seastar::futurize_invoke([&suite
] {
3344 logger().info("-- 0 --");
3345 logger().info("[Test] setup connection...");
3346 return suite
.connect_peer();
3348 return test
.send_bidirectional();
3350 return suite
.wait_results(1);
3351 }).then([] (ConnResults
& results
) {
3352 results
[0].assert_state_at(conn_state_t::established
);
3353 results
[0].assert_connect(1, 1, 0, 1);
3354 results
[0].assert_accept(0, 0, 0, 0);
3355 results
[0].assert_reset(0, 0);
3357 logger().info("-- 1 --");
3358 logger().info("[Test] connector markdown...");
3359 return suite
.markdown();
3361 return suite
.connect_peer();
3363 return suite
.send_peer();
3365 return suite
.wait_results(2);
3366 }).then([] (ConnResults
& results
) {
3367 results
[0].assert_state_at(conn_state_t::closed
);
3368 results
[0].assert_connect(1, 1, 0, 1);
3369 results
[0].assert_accept(0, 0, 0, 0);
3370 results
[0].assert_reset(0, 0);
3371 results
[1].assert_state_at(conn_state_t::established
);
3372 results
[1].assert_connect(1, 1, 0, 1);
3373 results
[1].assert_accept(0, 0, 0, 0);
3374 results
[1].assert_reset(0, 0);
3376 logger().info("-- 2 --");
3377 logger().info("[Test] acceptor markdown...");
3378 return test
.markdown_peer();
3380 ceph_assert(suite
.is_standby());
3381 logger().info("-- 3 --");
3382 logger().info("[Test] connector reconnect...");
3383 return suite
.connect_peer();
3385 return suite
.try_send_peer();
3387 return suite
.wait_results(2);
3388 }).then([] (ConnResults
& results
) {
3389 results
[0].assert_state_at(conn_state_t::closed
);
3390 results
[0].assert_connect(1, 1, 0, 1);
3391 results
[0].assert_accept(0, 0, 0, 0);
3392 results
[0].assert_reset(0, 0);
3393 results
[1].assert_state_at(conn_state_t::established
);
3394 results
[1].assert_connect(2, 2, 1, 2);
3395 results
[1].assert_accept(0, 0, 0, 0);
3396 results
[1].assert_reset(0, 1);
3402 test_v2_lossless_peer_acceptor(FailoverTest
& test
) {
3403 return test
.run_suite(
3404 "test_v2_lossless_peer_acceptor",
3406 policy_t::lossless_peer
,
3407 policy_t::lossless_peer
,
3408 [&test
] (FailoverSuite
& suite
) {
3409 return seastar::futurize_invoke([&test
] {
3410 logger().info("-- 0 --");
3411 logger().info("[Test] setup connection...");
3412 return test
.peer_connect_me();
3414 return test
.send_bidirectional();
3416 return suite
.wait_results(1);
3417 }).then([] (ConnResults
& results
) {
3418 results
[0].assert_state_at(conn_state_t::established
);
3419 results
[0].assert_connect(0, 0, 0, 0);
3420 results
[0].assert_accept(1, 1, 0, 1);
3421 results
[0].assert_reset(0, 0);
3423 logger().info("-- 1 --");
3424 logger().info("[Test] connector markdown...");
3425 return test
.markdown_peer();
3427 return test
.peer_connect_me();
3429 return test
.peer_send_me();
3431 return suite
.wait_results(2);
3432 }).then([] (ConnResults
& results
) {
3433 results
[0].assert_state_at(conn_state_t::established
);
3434 results
[0].assert_connect(0, 0, 0, 0);
3435 results
[0].assert_accept(1, 1, 0, 2);
3436 results
[0].assert_reset(0, 0);
3437 results
[1].assert_state_at(conn_state_t::replaced
);
3438 results
[1].assert_connect(0, 0, 0, 0);
3439 results
[1].assert_accept(1, 1, 0, 0);
3440 results
[1].assert_reset(0, 0);
3442 logger().info("-- 2 --");
3443 logger().info("[Test] acceptor markdown...");
3444 return suite
.markdown();
3446 return suite
.wait_results(2);
3447 }).then([] (ConnResults
& results
) {
3448 results
[0].assert_state_at(conn_state_t::closed
);
3449 results
[0].assert_connect(0, 0, 0, 0);
3450 results
[0].assert_accept(1, 1, 0, 2);
3451 results
[0].assert_reset(0, 0);
3452 results
[1].assert_state_at(conn_state_t::replaced
);
3453 results
[1].assert_connect(0, 0, 0, 0);
3454 results
[1].assert_accept(1, 1, 0, 0);
3455 results
[1].assert_reset(0, 0);
3457 logger().info("-- 3 --");
3458 logger().info("[Test] connector reconnect...");
3459 return test
.peer_connect_me();
3461 return test
.try_peer_send_me();
3463 return suite
.wait_results(3);
3464 }).then([] (ConnResults
& results
) {
3465 results
[0].assert_state_at(conn_state_t::closed
);
3466 results
[0].assert_connect(0, 0, 0, 0);
3467 results
[0].assert_accept(1, 1, 0, 2);
3468 results
[0].assert_reset(0, 0);
3469 results
[1].assert_state_at(conn_state_t::replaced
);
3470 results
[1].assert_connect(0, 0, 0, 0);
3471 results
[1].assert_accept(1, 1, 0, 0);
3472 results
[1].assert_reset(0, 0);
3473 results
[2].assert_state_at(conn_state_t::established
);
3474 results
[2].assert_connect(0, 0, 0, 0);
3475 results
[2].assert_accept(1, 1, 1, 1);
3476 results
[2].assert_reset(0, 0);
3482 test_v2_protocol(entity_addr_t test_addr
,
3483 entity_addr_t cmd_peer_addr
,
3484 entity_addr_t test_peer_addr
,
3485 bool test_peer_islocal
,
3487 ceph_assert_always(test_addr
.is_msgr2());
3488 ceph_assert_always(cmd_peer_addr
.is_msgr2());
3489 ceph_assert_always(test_peer_addr
.is_msgr2());
3491 if (test_peer_islocal
) {
3492 // initiate crimson test peer locally
3493 logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr
);
3494 return FailoverTestPeer::create(cmd_peer_addr
, test_peer_addr
3495 ).then([test_addr
, cmd_peer_addr
, test_peer_addr
, peer_wins
](auto peer
) {
3496 return test_v2_protocol(
3502 ).then([peer
= std::move(peer
)] () mutable {
3503 return peer
->wait().then([peer
= std::move(peer
)] {});
3505 }).handle_exception([] (auto eptr
) {
3506 logger().error("FailoverTestPeer failed: got exception {}", eptr
);
3511 return FailoverTest::create(test_addr
, cmd_peer_addr
, test_peer_addr
3512 ).then([peer_wins
](auto test
) {
3513 return seastar::futurize_invoke([test
] {
3514 return test_v2_lossy_early_connect_fault(*test
);
3516 return test_v2_lossy_connect_fault(*test
);
3518 return test_v2_lossy_connected_fault(*test
);
3520 return test_v2_lossy_early_accept_fault(*test
);
3522 return test_v2_lossy_accept_fault(*test
);
3524 return test_v2_lossy_establishing_fault(*test
);
3526 return test_v2_lossy_accepted_fault(*test
);
3528 return test_v2_lossless_connect_fault(*test
);
3530 return test_v2_lossless_connected_fault(*test
);
3532 return test_v2_lossless_connected_fault2(*test
);
3534 return test_v2_lossless_reconnect_fault(*test
);
3536 return test_v2_lossless_accept_fault(*test
);
3538 return test_v2_lossless_establishing_fault(*test
);
3540 return test_v2_lossless_accepted_fault(*test
);
3542 return test_v2_lossless_reaccept_fault(*test
);
3544 return test_v2_peer_connect_fault(*test
);
3546 return test_v2_peer_accept_fault(*test
);
3548 return test_v2_peer_establishing_fault(*test
);
3550 return test_v2_peer_connected_fault_reconnect(*test
);
3552 return test_v2_peer_connected_fault_reaccept(*test
);
3554 return check_peer_wins(*test
);
3555 }).then([test
, peer_wins
](bool ret_peer_wins
) {
3556 ceph_assert(peer_wins
== ret_peer_wins
);
3557 if (ret_peer_wins
) {
3558 return seastar::futurize_invoke([test
] {
3559 return test_v2_racing_connect_acceptor_win(*test
);
3561 return test_v2_racing_reconnect_acceptor_win(*test
);
3564 return seastar::futurize_invoke([test
] {
3565 return test_v2_racing_connect_acceptor_lose(*test
);
3567 return test_v2_racing_reconnect_acceptor_lose(*test
);
3571 return test_v2_racing_connect_reconnect_win(*test
);
3573 return test_v2_racing_connect_reconnect_lose(*test
);
3575 return test_v2_stale_connect(*test
);
3577 return test_v2_stale_reconnect(*test
);
3579 return test_v2_stale_accept(*test
);
3581 return test_v2_stale_establishing(*test
);
3583 return test_v2_stale_reaccept(*test
);
3585 return test_v2_lossy_client(*test
);
3587 return test_v2_stateless_server(*test
);
3589 return test_v2_lossless_client(*test
);
3591 return test_v2_stateful_server(*test
);
3593 return test_v2_peer_reuse_connector(*test
);
3595 return test_v2_peer_reuse_acceptor(*test
);
3597 return test_v2_lossless_peer_connector(*test
);
3599 return test_v2_lossless_peer_acceptor(*test
);
3601 return test
->shutdown().then([test
] {});
3603 }).handle_exception([] (auto eptr
) {
3604 logger().error("FailoverTest failed: got exception {}", eptr
);
3611 seastar::future
<int> do_test(seastar::app_template
& app
)
3613 std::vector
<const char*> args
;
3614 std::string cluster
;
3615 std::string conf_file_list
;
3616 auto init_params
= ceph_argparse_early_args(args
,
3617 CEPH_ENTITY_TYPE_CLIENT
,
3620 return crimson::common::sharded_conf().start(init_params
.name
, cluster
)
3621 .then([conf_file_list
] {
3622 return local_conf().parse_config_files(conf_file_list
);
3624 auto&& config
= app
.configuration();
3625 verbose
= config
["verbose"].as
<bool>();
3626 auto rounds
= config
["rounds"].as
<unsigned>();
3627 auto keepalive_ratio
= config
["keepalive-ratio"].as
<double>();
3628 auto testpeer_islocal
= config
["testpeer-islocal"].as
<bool>();
3630 entity_addr_t test_addr
;
3631 ceph_assert(test_addr
.parse(
3632 config
["test-addr"].as
<std::string
>().c_str(), nullptr));
3633 test_addr
.set_nonce(TEST_NONCE
);
3635 entity_addr_t cmd_peer_addr
;
3636 ceph_assert(cmd_peer_addr
.parse(
3637 config
["testpeer-addr"].as
<std::string
>().c_str(), nullptr));
3638 cmd_peer_addr
.set_nonce(CMD_SRV_NONCE
);
3640 entity_addr_t test_peer_addr
= get_test_peer_addr(cmd_peer_addr
);
3641 bool peer_wins
= (test_addr
> test_peer_addr
);
3643 logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
3644 "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
3645 "testpeer_islocal={}, peer_wins={}",
3646 verbose
, rounds
, keepalive_ratio
,
3647 test_addr
, cmd_peer_addr
, test_peer_addr
,
3648 testpeer_islocal
, peer_wins
);
3649 return test_echo(rounds
, keepalive_ratio
3651 return test_concurrent_dispatch();
3653 return test_preemptive_shutdown();
3654 }).then([test_addr
, cmd_peer_addr
, test_peer_addr
, testpeer_islocal
, peer_wins
] {
3655 return test_v2_protocol(
3662 logger().info("All tests succeeded");
3663 // Seastar has bugs to have events undispatched during shutdown,
3664 // which will result in memory leak and thus fail LeakSanitizer.
3665 return seastar::sleep(100ms
);
3668 return crimson::common::sharded_conf().stop();
3671 }).handle_exception([] (auto eptr
) {
3672 logger().error("Test failed: got exception {}", eptr
);
3677 int main(int argc
, char** argv
)
3679 seastar::app_template app
;
3681 ("verbose,v", bpo::value
<bool>()->default_value(false),
3683 ("rounds", bpo::value
<unsigned>()->default_value(512),
3684 "number of pingpong rounds")
3685 ("keepalive-ratio", bpo::value
<double>()->default_value(0.1),
3686 "ratio of keepalive in ping messages")
3687 ("test-addr", bpo::value
<std::string
>()->default_value("v2:127.0.0.1:9014"),
3688 "address of v2 failover tests")
3689 ("testpeer-addr", bpo::value
<std::string
>()->default_value("v2:127.0.0.1:9012"),
3690 "addresses of v2 failover testpeer"
3691 " (This is CmdSrv address, and TestPeer address is at port+=1)")
3692 ("testpeer-islocal", bpo::value
<bool>()->default_value(true),
3693 "create a local crimson testpeer, or connect to a remote testpeer");
3694 return app
.run(argc
, argv
, [&app
] {
3695 // This test normally succeeds within 60 seconds, so kill it after 300
3696 // seconds in case it is blocked forever due to unaddressed bugs.
3697 return seastar::with_timeout(seastar::lowres_clock::now() + 300s
, do_test(app
))
3698 .handle_exception_type([](seastar::timed_out_error
&) {
3699 logger().error("test_messenger timeout after 300s, abort! "
3700 "Consider to extend the period if the test is still running.");
3701 // use the retcode of timeout(1)