]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/crimson/test_messenger.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / test / crimson / test_messenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/ceph_argparse.h"
5 #include "common/ceph_time.h"
6 #include "messages/MPing.h"
7 #include "messages/MCommand.h"
8 #include "messages/MCommandReply.h"
9 #include "messages/MOSDOp.h"
10 #include "messages/MOSDOpReply.h"
11 #include "crimson/auth/DummyAuth.h"
12 #include "crimson/common/log.h"
13 #include "crimson/net/Connection.h"
14 #include "crimson/net/Dispatcher.h"
15 #include "crimson/net/Messenger.h"
16 #include "crimson/net/Interceptor.h"
17 #include "crimson/net/SocketConnection.h"
18
19 #include <map>
20 #include <random>
21 #include <boost/program_options.hpp>
22 #include <fmt/format.h>
23 #include <fmt/ostream.h>
24 #include <seastar/core/app-template.hh>
25 #include <seastar/core/do_with.hh>
26 #include <seastar/core/future-util.hh>
27 #include <seastar/core/reactor.hh>
28 #include <seastar/core/sleep.hh>
29 #include <seastar/core/with_timeout.hh>
30
31 #include "test_messenger.h"
32
33 using namespace std::chrono_literals;
34 namespace bpo = boost::program_options;
35 using crimson::common::local_conf;
36
37 namespace {
38
39 seastar::logger& logger() {
40 return crimson::get_logger(ceph_subsys_test);
41 }
42
43 static std::random_device rd;
44 static std::default_random_engine rng{rd()};
45 static bool verbose = false;
46
47 static entity_addr_t get_server_addr() {
48 static int port = 9030;
49 ++port;
50 entity_addr_t saddr;
51 saddr.parse("127.0.0.1", nullptr);
52 saddr.set_port(port);
53 return saddr;
54 }
55
56 static seastar::future<> test_echo(unsigned rounds,
57 double keepalive_ratio)
58 {
59 struct test_state {
60 struct Server final
61 : public crimson::net::Dispatcher {
62 crimson::net::MessengerRef msgr;
63 crimson::auth::DummyAuthClientServer dummy_auth;
64
65 std::optional<seastar::future<>> ms_dispatch(
66 crimson::net::ConnectionRef c, MessageRef m) override {
67 if (verbose) {
68 logger().info("server got {}", *m);
69 }
70 // reply with a pong
71 std::ignore = c->send(crimson::make_message<MPing>());
72 return {seastar::now()};
73 }
74
75 seastar::future<> init(const entity_name_t& name,
76 const std::string& lname,
77 const uint64_t nonce,
78 const entity_addr_t& addr) {
79 msgr = crimson::net::Messenger::create(name, lname, nonce);
80 msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
81 msgr->set_auth_client(&dummy_auth);
82 msgr->set_auth_server(&dummy_auth);
83 return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
84 return msgr->start({this});
85 }, crimson::net::Messenger::bind_ertr::all_same_way(
86 [addr] (const std::error_code& e) {
87 logger().error("test_echo(): "
88 "there is another instance running at {}", addr);
89 ceph_abort();
90 }));
91 }
92 seastar::future<> shutdown() {
93 ceph_assert(msgr);
94 msgr->stop();
95 return msgr->shutdown();
96 }
97 };
98
99 struct Client final
100 : public crimson::net::Dispatcher {
101 struct PingSession : public seastar::enable_shared_from_this<PingSession> {
102 unsigned count = 0u;
103 mono_time connected_time;
104 mono_time finish_time;
105 };
106 using PingSessionRef = seastar::shared_ptr<PingSession>;
107
108 unsigned rounds;
109 std::bernoulli_distribution keepalive_dist;
110 crimson::net::MessengerRef msgr;
111 std::map<crimson::net::ConnectionRef, seastar::promise<>> pending_conns;
112 std::map<crimson::net::ConnectionRef, PingSessionRef> sessions;
113 crimson::auth::DummyAuthClientServer dummy_auth;
114
115 Client(unsigned rounds, double keepalive_ratio)
116 : rounds(rounds),
117 keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
118
119 PingSessionRef find_session(crimson::net::ConnectionRef c) {
120 auto found = sessions.find(c);
121 if (found == sessions.end()) {
122 ceph_assert(false);
123 }
124 return found->second;
125 }
126
127 void ms_handle_connect(crimson::net::ConnectionRef conn) override {
128 auto session = seastar::make_shared<PingSession>();
129 auto [i, added] = sessions.emplace(conn, session);
130 std::ignore = i;
131 ceph_assert(added);
132 session->connected_time = mono_clock::now();
133 }
134 std::optional<seastar::future<>> ms_dispatch(
135 crimson::net::ConnectionRef c, MessageRef m) override {
136 auto session = find_session(c);
137 ++(session->count);
138 if (verbose) {
139 logger().info("client ms_dispatch {}", session->count);
140 }
141
142 if (session->count == rounds) {
143 logger().info("{}: finished receiving {} pongs", *c, session->count);
144 session->finish_time = mono_clock::now();
145 auto found = pending_conns.find(c);
146 ceph_assert(found != pending_conns.end());
147 found->second.set_value();
148 }
149 return {seastar::now()};
150 }
151
152 seastar::future<> init(const entity_name_t& name,
153 const std::string& lname,
154 const uint64_t nonce) {
155 msgr = crimson::net::Messenger::create(name, lname, nonce);
156 msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
157 msgr->set_auth_client(&dummy_auth);
158 msgr->set_auth_server(&dummy_auth);
159 return msgr->start({this});
160 }
161
162 seastar::future<> shutdown() {
163 ceph_assert(msgr);
164 msgr->stop();
165 return msgr->shutdown();
166 }
167
168 seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
169 mono_time start_time = mono_clock::now();
170 auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
171 return seastar::futurize_invoke([this, conn] {
172 return do_dispatch_pingpong(conn);
173 }).then([this, conn, start_time] {
174 auto session = find_session(conn);
175 std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
176 std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
177 logger().info("{}: handshake {}, pingpong {}",
178 *conn, dur_handshake.count(), dur_pingpong.count());
179 });
180 }
181
182 private:
183 seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) {
184 auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
185 std::ignore = i;
186 ceph_assert(added);
187 return seastar::do_with(0u, 0u,
188 [this, conn](auto &count_ping, auto &count_keepalive) {
189 return seastar::do_until(
190 [this, conn, &count_ping, &count_keepalive] {
191 bool stop = (count_ping == rounds);
192 if (stop) {
193 logger().info("{}: finished sending {} pings with {} keepalives",
194 *conn, count_ping, count_keepalive);
195 }
196 return stop;
197 },
198 [this, conn, &count_ping, &count_keepalive] {
199 return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
200 if (keepalive_dist(rng)) {
201 return conn->send_keepalive()
202 .then([&count_keepalive] {
203 count_keepalive += 1;
204 return seastar::make_ready_future<seastar::stop_iteration>(
205 seastar::stop_iteration::no);
206 });
207 } else {
208 return conn->send(crimson::make_message<MPing>())
209 .then([&count_ping] {
210 count_ping += 1;
211 return seastar::make_ready_future<seastar::stop_iteration>(
212 seastar::stop_iteration::yes);
213 });
214 }
215 });
216 }).then([this, conn] {
217 auto found = pending_conns.find(conn);
218 return found->second.get_future();
219 }
220 );
221 });
222 }
223 };
224 };
225
226 logger().info("test_echo(rounds={}, keepalive_ratio={}):",
227 rounds, keepalive_ratio);
228 auto server1 = seastar::make_shared<test_state::Server>();
229 auto server2 = seastar::make_shared<test_state::Server>();
230 auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
231 auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
232 // start servers and clients
233 auto addr1 = get_server_addr();
234 auto addr2 = get_server_addr();
235 addr1.set_type(entity_addr_t::TYPE_MSGR2);
236 addr2.set_type(entity_addr_t::TYPE_MSGR2);
237 return seastar::when_all_succeed(
238 server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
239 server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
240 client1->init(entity_name_t::OSD(2), "client1", 3),
241 client2->init(entity_name_t::OSD(3), "client2", 4)
242 // dispatch pingpoing
243 ).then_unpack([client1, client2, server1, server2] {
244 return seastar::when_all_succeed(
245 // test connecting in parallel, accepting in parallel
246 client1->dispatch_pingpong(server2->msgr->get_myaddr()),
247 client2->dispatch_pingpong(server1->msgr->get_myaddr()));
248 // shutdown
249 }).then_unpack([] {
250 return seastar::now();
251 }).then([client1] {
252 logger().info("client1 shutdown...");
253 return client1->shutdown();
254 }).then([client2] {
255 logger().info("client2 shutdown...");
256 return client2->shutdown();
257 }).then([server1] {
258 logger().info("server1 shutdown...");
259 return server1->shutdown();
260 }).then([server2] {
261 logger().info("server2 shutdown...");
262 return server2->shutdown();
263 }).then([] {
264 logger().info("test_echo() done!\n");
265 }).handle_exception([server1, server2, client1, client2] (auto eptr) {
266 logger().error("test_echo() failed: got exception {}", eptr);
267 throw;
268 });
269 }
270
271 static seastar::future<> test_concurrent_dispatch()
272 {
273 struct test_state {
274 struct Server final
275 : public crimson::net::Dispatcher {
276 crimson::net::MessengerRef msgr;
277 int count = 0;
278 seastar::promise<> on_second; // satisfied on second dispatch
279 seastar::promise<> on_done; // satisfied when first dispatch unblocks
280 crimson::auth::DummyAuthClientServer dummy_auth;
281
282 std::optional<seastar::future<>> ms_dispatch(
283 crimson::net::ConnectionRef, MessageRef m) override {
284 switch (++count) {
285 case 1:
286 // block on the first request until we reenter with the second
287 std::ignore = on_second.get_future().then([this] { on_done.set_value(); });
288 break;
289 case 2:
290 on_second.set_value();
291 break;
292 default:
293 throw std::runtime_error("unexpected count");
294 }
295 return {seastar::now()};
296 }
297
298 seastar::future<> wait() { return on_done.get_future(); }
299
300 seastar::future<> init(const entity_name_t& name,
301 const std::string& lname,
302 const uint64_t nonce,
303 const entity_addr_t& addr) {
304 msgr = crimson::net::Messenger::create(name, lname, nonce);
305 msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
306 msgr->set_auth_client(&dummy_auth);
307 msgr->set_auth_server(&dummy_auth);
308 return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
309 return msgr->start({this});
310 }, crimson::net::Messenger::bind_ertr::all_same_way(
311 [addr] (const std::error_code& e) {
312 logger().error("test_concurrent_dispatch(): "
313 "there is another instance running at {}", addr);
314 ceph_abort();
315 }));
316 }
317 };
318
319 struct Client final
320 : public crimson::net::Dispatcher {
321 crimson::net::MessengerRef msgr;
322 crimson::auth::DummyAuthClientServer dummy_auth;
323
324 std::optional<seastar::future<>> ms_dispatch(
325 crimson::net::ConnectionRef, MessageRef m) override {
326 return {seastar::now()};
327 }
328
329 seastar::future<> init(const entity_name_t& name,
330 const std::string& lname,
331 const uint64_t nonce) {
332 msgr = crimson::net::Messenger::create(name, lname, nonce);
333 msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
334 msgr->set_auth_client(&dummy_auth);
335 msgr->set_auth_server(&dummy_auth);
336 return msgr->start({this});
337 }
338 };
339 };
340
341 logger().info("test_concurrent_dispatch():");
342 auto server = seastar::make_shared<test_state::Server>();
343 auto client = seastar::make_shared<test_state::Client>();
344 auto addr = get_server_addr();
345 addr.set_type(entity_addr_t::TYPE_MSGR2);
346 addr.set_family(AF_INET);
347 return seastar::when_all_succeed(
348 server->init(entity_name_t::OSD(4), "server3", 5, addr),
349 client->init(entity_name_t::OSD(5), "client3", 6)
350 ).then_unpack([server, client] {
351 auto conn = client->msgr->connect(server->msgr->get_myaddr(),
352 entity_name_t::TYPE_OSD);
353 // send two messages
354 return conn->send(crimson::make_message<MPing>()).then([conn] {
355 return conn->send(crimson::make_message<MPing>());
356 });
357 }).then([server] {
358 return server->wait();
359 }).then([client] {
360 logger().info("client shutdown...");
361 client->msgr->stop();
362 return client->msgr->shutdown();
363 }).then([server] {
364 logger().info("server shutdown...");
365 server->msgr->stop();
366 return server->msgr->shutdown();
367 }).then([] {
368 logger().info("test_concurrent_dispatch() done!\n");
369 }).handle_exception([server, client] (auto eptr) {
370 logger().error("test_concurrent_dispatch() failed: got exception {}", eptr);
371 throw;
372 });
373 }
374
375 seastar::future<> test_preemptive_shutdown() {
376 struct test_state {
377 class Server final
378 : public crimson::net::Dispatcher {
379 crimson::net::MessengerRef msgr;
380 crimson::auth::DummyAuthClientServer dummy_auth;
381
382 std::optional<seastar::future<>> ms_dispatch(
383 crimson::net::ConnectionRef c, MessageRef m) override {
384 std::ignore = c->send(crimson::make_message<MPing>());
385 return {seastar::now()};
386 }
387
388 public:
389 seastar::future<> init(const entity_name_t& name,
390 const std::string& lname,
391 const uint64_t nonce,
392 const entity_addr_t& addr) {
393 msgr = crimson::net::Messenger::create(name, lname, nonce);
394 msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
395 msgr->set_auth_client(&dummy_auth);
396 msgr->set_auth_server(&dummy_auth);
397 return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
398 return msgr->start({this});
399 }, crimson::net::Messenger::bind_ertr::all_same_way(
400 [addr] (const std::error_code& e) {
401 logger().error("test_preemptive_shutdown(): "
402 "there is another instance running at {}", addr);
403 ceph_abort();
404 }));
405 }
406 entity_addr_t get_addr() const {
407 return msgr->get_myaddr();
408 }
409 seastar::future<> shutdown() {
410 msgr->stop();
411 return msgr->shutdown();
412 }
413 };
414
415 class Client final
416 : public crimson::net::Dispatcher {
417 crimson::net::MessengerRef msgr;
418 crimson::auth::DummyAuthClientServer dummy_auth;
419
420 bool stop_send = false;
421 seastar::promise<> stopped_send_promise;
422
423 std::optional<seastar::future<>> ms_dispatch(
424 crimson::net::ConnectionRef, MessageRef m) override {
425 return {seastar::now()};
426 }
427
428 public:
429 seastar::future<> init(const entity_name_t& name,
430 const std::string& lname,
431 const uint64_t nonce) {
432 msgr = crimson::net::Messenger::create(name, lname, nonce);
433 msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
434 msgr->set_auth_client(&dummy_auth);
435 msgr->set_auth_server(&dummy_auth);
436 return msgr->start({this});
437 }
438 void send_pings(const entity_addr_t& addr) {
439 auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD);
440 // forwarded to stopped_send_promise
441 (void) seastar::do_until(
442 [this] { return stop_send; },
443 [conn] {
444 return conn->send(crimson::make_message<MPing>()).then([] {
445 return seastar::sleep(0ms);
446 });
447 }
448 ).then_wrapped([this, conn] (auto fut) {
449 fut.forward_to(std::move(stopped_send_promise));
450 });
451 }
452 seastar::future<> shutdown() {
453 msgr->stop();
454 return msgr->shutdown().then([this] {
455 stop_send = true;
456 return stopped_send_promise.get_future();
457 });
458 }
459 };
460 };
461
462 logger().info("test_preemptive_shutdown():");
463 auto server = seastar::make_shared<test_state::Server>();
464 auto client = seastar::make_shared<test_state::Client>();
465 auto addr = get_server_addr();
466 addr.set_type(entity_addr_t::TYPE_MSGR2);
467 addr.set_family(AF_INET);
468 return seastar::when_all_succeed(
469 server->init(entity_name_t::OSD(6), "server4", 7, addr),
470 client->init(entity_name_t::OSD(7), "client4", 8)
471 ).then_unpack([server, client] {
472 client->send_pings(server->get_addr());
473 return seastar::sleep(100ms);
474 }).then([client] {
475 logger().info("client shutdown...");
476 return client->shutdown();
477 }).then([server] {
478 logger().info("server shutdown...");
479 return server->shutdown();
480 }).then([] {
481 logger().info("test_preemptive_shutdown() done!\n");
482 }).handle_exception([server, client] (auto eptr) {
483 logger().error("test_preemptive_shutdown() failed: got exception {}", eptr);
484 throw;
485 });
486 }
487
488 using ceph::msgr::v2::Tag;
489 using crimson::net::bp_action_t;
490 using crimson::net::bp_type_t;
491 using crimson::net::Breakpoint;
492 using crimson::net::Connection;
493 using crimson::net::ConnectionRef;
494 using crimson::net::custom_bp_t;
495 using crimson::net::Dispatcher;
496 using crimson::net::Interceptor;
497 using crimson::net::Messenger;
498 using crimson::net::MessengerRef;
499 using crimson::net::SocketConnection;
500 using crimson::net::SocketPolicy;
501 using crimson::net::tag_bp_t;
502 using namespace ceph::net::test;
503
504 struct counter_t { unsigned counter = 0; };
505
506 enum class conn_state_t {
507 unknown = 0,
508 established,
509 closed,
510 replaced,
511 };
512
513 std::ostream& operator<<(std::ostream& out, const conn_state_t& state) {
514 switch(state) {
515 case conn_state_t::unknown:
516 return out << "unknown";
517 case conn_state_t::established:
518 return out << "established";
519 case conn_state_t::closed:
520 return out << "closed";
521 case conn_state_t::replaced:
522 return out << "replaced";
523 default:
524 ceph_abort();
525 }
526 }
527
528 } // anonymous namespace
529
530 #if FMT_VERSION >= 90000
531 template<>
532 struct fmt::formatter<conn_state_t> : fmt::ostream_formatter {};
533 #endif
534
535 namespace {
536
537 struct ConnResult {
538 ConnectionRef conn;
539 unsigned index;
540 conn_state_t state = conn_state_t::unknown;
541
542 unsigned connect_attempts = 0;
543 unsigned client_connect_attempts = 0;
544 unsigned client_reconnect_attempts = 0;
545 unsigned cnt_connect_dispatched = 0;
546
547 unsigned accept_attempts = 0;
548 unsigned server_connect_attempts = 0;
549 unsigned server_reconnect_attempts = 0;
550 unsigned cnt_accept_dispatched = 0;
551
552 unsigned cnt_reset_dispatched = 0;
553 unsigned cnt_remote_reset_dispatched = 0;
554
555 ConnResult(ConnectionRef conn, unsigned index)
556 : conn(conn), index(index) {}
557
558 template <typename T>
559 void _assert_eq(const char* expr_actual, T actual,
560 const char* expr_expected, T expected) const {
561 if (actual != expected) {
562 throw std::runtime_error(fmt::format(
563 "[{}] {} '{}' is actually {}, not the expected '{}' {}",
564 index, *conn, expr_actual, actual, expr_expected, expected));
565 }
566 }
567
568 #define ASSERT_EQUAL(actual, expected) \
569 _assert_eq(#actual, actual, #expected, expected)
570
571 void assert_state_at(conn_state_t expected) const {
572 ASSERT_EQUAL(state, expected);
573 }
574
575 void assert_connect(unsigned attempts,
576 unsigned connects,
577 unsigned reconnects,
578 unsigned dispatched) const {
579 ASSERT_EQUAL(connect_attempts, attempts);
580 ASSERT_EQUAL(client_connect_attempts, connects);
581 ASSERT_EQUAL(client_reconnect_attempts, reconnects);
582 ASSERT_EQUAL(cnt_connect_dispatched, dispatched);
583 }
584
585 void assert_connect(unsigned attempts,
586 unsigned dispatched) const {
587 ASSERT_EQUAL(connect_attempts, attempts);
588 ASSERT_EQUAL(cnt_connect_dispatched, dispatched);
589 }
590
591 void assert_accept(unsigned attempts,
592 unsigned accepts,
593 unsigned reaccepts,
594 unsigned dispatched) const {
595 ASSERT_EQUAL(accept_attempts, attempts);
596 ASSERT_EQUAL(server_connect_attempts, accepts);
597 ASSERT_EQUAL(server_reconnect_attempts, reaccepts);
598 ASSERT_EQUAL(cnt_accept_dispatched, dispatched);
599 }
600
601 void assert_accept(unsigned attempts,
602 unsigned dispatched) const {
603 ASSERT_EQUAL(accept_attempts, attempts);
604 ASSERT_EQUAL(cnt_accept_dispatched, dispatched);
605 }
606
607 void assert_reset(unsigned local, unsigned remote) const {
608 ASSERT_EQUAL(cnt_reset_dispatched, local);
609 ASSERT_EQUAL(cnt_remote_reset_dispatched, remote);
610 }
611
612 void dump() const {
613 logger().info("\nResult({}):\n"
614 " conn: [{}] {}:\n"
615 " state: {}\n"
616 " connect_attempts: {}\n"
617 " client_connect_attempts: {}\n"
618 " client_reconnect_attempts: {}\n"
619 " cnt_connect_dispatched: {}\n"
620 " accept_attempts: {}\n"
621 " server_connect_attempts: {}\n"
622 " server_reconnect_attempts: {}\n"
623 " cnt_accept_dispatched: {}\n"
624 " cnt_reset_dispatched: {}\n"
625 " cnt_remote_reset_dispatched: {}\n",
626 static_cast<const void*>(this),
627 index, *conn,
628 state,
629 connect_attempts,
630 client_connect_attempts,
631 client_reconnect_attempts,
632 cnt_connect_dispatched,
633 accept_attempts,
634 server_connect_attempts,
635 server_reconnect_attempts,
636 cnt_accept_dispatched,
637 cnt_reset_dispatched,
638 cnt_remote_reset_dispatched);
639 }
640 };
641 using ConnResults = std::vector<ConnResult>;
642
643 struct TestInterceptor : public Interceptor {
644 std::map<Breakpoint, std::map<unsigned, bp_action_t>> breakpoints;
645 std::map<Breakpoint, counter_t> breakpoints_counter;
646 std::map<ConnectionRef, unsigned> conns;
647 ConnResults results;
648 std::optional<seastar::abort_source> signal;
649
650 TestInterceptor() = default;
651 // only used for copy breakpoint configurations
652 TestInterceptor(const TestInterceptor& other) {
653 assert(other.breakpoints_counter.empty());
654 assert(other.conns.empty());
655 assert(other.results.empty());
656 breakpoints = other.breakpoints;
657 assert(!other.signal);
658 }
659
660 void make_fault(Breakpoint bp, unsigned round = 1) {
661 assert(round >= 1);
662 breakpoints[bp][round] = bp_action_t::FAULT;
663 }
664
665 void make_block(Breakpoint bp, unsigned round = 1) {
666 assert(round >= 1);
667 breakpoints[bp][round] = bp_action_t::BLOCK;
668 }
669
670 void make_stall(Breakpoint bp, unsigned round = 1) {
671 assert(round >= 1);
672 breakpoints[bp][round] = bp_action_t::STALL;
673 }
674
675 ConnResult* find_result(ConnectionRef conn) {
676 auto it = conns.find(conn);
677 if (it == conns.end()) {
678 return nullptr;
679 } else {
680 return &results[it->second];
681 }
682 }
683
684 seastar::future<> wait() {
685 assert(!signal);
686 signal = seastar::abort_source();
687 return seastar::sleep_abortable(10s, *signal).then([] {
688 throw std::runtime_error("Timeout (10s) in TestInterceptor::wait()");
689 }).handle_exception_type([] (const seastar::sleep_aborted& e) {
690 // wait done!
691 });
692 }
693
694 void notify() {
695 if (signal) {
696 signal->request_abort();
697 signal = std::nullopt;
698 }
699 }
700
701 private:
702 void register_conn(SocketConnection& _conn) override {
703 auto conn = _conn.get_local_shared_foreign_from_this();
704 auto result = find_result(conn);
705 if (result != nullptr) {
706 logger().error("The connection [{}] {} already exists when register {}",
707 result->index, *result->conn, _conn);
708 ceph_abort();
709 }
710 unsigned index = results.size();
711 results.emplace_back(conn, index);
712 conns[conn] = index;
713 notify();
714 logger().info("[{}] {} new connection registered", index, _conn);
715 }
716
717 void register_conn_closed(SocketConnection& conn) override {
718 auto result = find_result(conn.get_local_shared_foreign_from_this());
719 if (result == nullptr) {
720 logger().error("Untracked closed connection: {}", conn);
721 ceph_abort();
722 }
723
724 if (result->state != conn_state_t::replaced) {
725 result->state = conn_state_t::closed;
726 }
727 notify();
728 logger().info("[{}] {} closed({})", result->index, conn, result->state);
729 }
730
731 void register_conn_ready(SocketConnection& conn) override {
732 auto result = find_result(conn.get_local_shared_foreign_from_this());
733 if (result == nullptr) {
734 logger().error("Untracked ready connection: {}", conn);
735 ceph_abort();
736 }
737
738 ceph_assert(conn.is_connected());
739 notify();
740 logger().info("[{}] {} ready", result->index, conn);
741 }
742
743 void register_conn_replaced(SocketConnection& conn) override {
744 auto result = find_result(conn.get_local_shared_foreign_from_this());
745 if (result == nullptr) {
746 logger().error("Untracked replaced connection: {}", conn);
747 ceph_abort();
748 }
749
750 result->state = conn_state_t::replaced;
751 logger().info("[{}] {} {}", result->index, conn, result->state);
752 }
753
754 bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
755 ++breakpoints_counter[bp].counter;
756
757 auto result = find_result(conn.get_local_shared_foreign_from_this());
758 if (result == nullptr) {
759 logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
760 conn, bp, breakpoints_counter[bp].counter);
761 ceph_abort();
762 }
763
764 if (bp == custom_bp_t::SOCKET_CONNECTING) {
765 ++result->connect_attempts;
766 logger().info("[Test] connect_attempts={}", result->connect_attempts);
767 } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::WRITE}) {
768 ++result->client_connect_attempts;
769 logger().info("[Test] client_connect_attempts={}", result->client_connect_attempts);
770 } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::WRITE}) {
771 ++result->client_reconnect_attempts;
772 logger().info("[Test] client_reconnect_attempts={}", result->client_reconnect_attempts);
773 } else if (bp == custom_bp_t::SOCKET_ACCEPTED) {
774 ++result->accept_attempts;
775 logger().info("[Test] accept_attempts={}", result->accept_attempts);
776 } else if (bp == tag_bp_t{Tag::CLIENT_IDENT, bp_type_t::READ}) {
777 ++result->server_connect_attempts;
778 logger().info("[Test] server_connect_attemps={}", result->server_connect_attempts);
779 } else if (bp == tag_bp_t{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
780 ++result->server_reconnect_attempts;
781 logger().info("[Test] server_reconnect_attempts={}", result->server_reconnect_attempts);
782 }
783
784 auto it_bp = breakpoints.find(bp);
785 if (it_bp != breakpoints.end()) {
786 auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
787 if (it_cnt != it_bp->second.end()) {
788 logger().info("[{}] {} intercepted {}({}) => {}",
789 result->index, conn, bp,
790 breakpoints_counter[bp].counter, it_cnt->second);
791 return it_cnt->second;
792 }
793 }
794 logger().info("[{}] {} intercepted {}({})",
795 result->index, conn, bp, breakpoints_counter[bp].counter);
796 return bp_action_t::CONTINUE;
797 }
798 };
799
800 SocketPolicy to_socket_policy(policy_t policy) {
801 switch (policy) {
802 case policy_t::stateful_server:
803 return SocketPolicy::stateful_server(0);
804 case policy_t::stateless_server:
805 return SocketPolicy::stateless_server(0);
806 case policy_t::lossless_peer:
807 return SocketPolicy::lossless_peer(0);
808 case policy_t::lossless_peer_reuse:
809 return SocketPolicy::lossless_peer_reuse(0);
810 case policy_t::lossy_client:
811 return SocketPolicy::lossy_client(0);
812 case policy_t::lossless_client:
813 return SocketPolicy::lossless_client(0);
814 default:
815 logger().error("unexpected policy type");
816 ceph_abort();
817 }
818 }
819
820 class FailoverSuite : public Dispatcher {
821 crimson::auth::DummyAuthClientServer dummy_auth;
822 MessengerRef test_msgr;
823 const entity_addr_t test_peer_addr;
824 TestInterceptor interceptor;
825
826 unsigned tracked_index = 0;
827 ConnectionRef tracked_conn;
828 unsigned pending_send = 0;
829 unsigned pending_peer_receive = 0;
830 unsigned pending_receive = 0;
831
832 std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
833 auto result = interceptor.find_result(c);
834 if (result == nullptr) {
835 logger().error("Untracked ms dispatched connection: {}", *c);
836 ceph_abort();
837 }
838
839 if (tracked_conn != c) {
840 logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
841 result->index, *c, tracked_index, *tracked_conn);
842 ceph_abort();
843 }
844 ceph_assert(result->index == tracked_index);
845
846 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
847 ceph_assert(pending_receive > 0);
848 --pending_receive;
849 if (pending_receive == 0) {
850 interceptor.notify();
851 }
852 logger().info("[Test] got op, left {} ops -- [{}] {}",
853 pending_receive, result->index, *c);
854 return {seastar::now()};
855 }
856
857 void ms_handle_accept(ConnectionRef conn) override {
858 auto result = interceptor.find_result(conn);
859 if (result == nullptr) {
860 logger().error("Untracked accepted connection: {}", *conn);
861 ceph_abort();
862 }
863
864 if (tracked_conn &&
865 !tracked_conn->is_closed() &&
866 tracked_conn != conn) {
867 logger().error("[{}] {} got accepted, but there's already traced_conn [{}] {}",
868 result->index, *conn, tracked_index, *tracked_conn);
869 ceph_abort();
870 }
871
872 tracked_index = result->index;
873 tracked_conn = conn;
874 ++result->cnt_accept_dispatched;
875 logger().info("[Test] got accept (cnt_accept_dispatched={}), track [{}] {}",
876 result->cnt_accept_dispatched, result->index, *conn);
877 std::ignore = flush_pending_send();
878 }
879
880 void ms_handle_connect(ConnectionRef conn) override {
881 auto result = interceptor.find_result(conn);
882 if (result == nullptr) {
883 logger().error("Untracked connected connection: {}", *conn);
884 ceph_abort();
885 }
886
887 if (tracked_conn != conn) {
888 logger().error("[{}] {} got connected, but doesn't match tracked_conn [{}] {}",
889 result->index, *conn, tracked_index, *tracked_conn);
890 ceph_abort();
891 }
892 ceph_assert(result->index == tracked_index);
893
894 ++result->cnt_connect_dispatched;
895 logger().info("[Test] got connected (cnt_connect_dispatched={}) -- [{}] {}",
896 result->cnt_connect_dispatched, result->index, *conn);
897 }
898
899 void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
900 auto result = interceptor.find_result(conn);
901 if (result == nullptr) {
902 logger().error("Untracked reset connection: {}", *conn);
903 ceph_abort();
904 }
905
906 if (tracked_conn != conn) {
907 logger().error("[{}] {} got reset, but doesn't match tracked_conn [{}] {}",
908 result->index, *conn, tracked_index, *tracked_conn);
909 ceph_abort();
910 }
911 ceph_assert(result->index == tracked_index);
912
913 tracked_index = 0;
914 tracked_conn = nullptr;
915 ++result->cnt_reset_dispatched;
916 logger().info("[Test] got reset (cnt_reset_dispatched={}), untrack [{}] {}",
917 result->cnt_reset_dispatched, result->index, *conn);
918 }
919
920 void ms_handle_remote_reset(ConnectionRef conn) override {
921 auto result = interceptor.find_result(conn);
922 if (result == nullptr) {
923 logger().error("Untracked remotely reset connection: {}", *conn);
924 ceph_abort();
925 }
926
927 if (tracked_conn != conn) {
928 logger().error("[{}] {} got remotely reset, but doesn't match tracked_conn [{}] {}",
929 result->index, *conn, tracked_index, *tracked_conn);
930 ceph_abort();
931 }
932 ceph_assert(result->index == tracked_index);
933
934 ++result->cnt_remote_reset_dispatched;
935 logger().info("[Test] got remote reset (cnt_remote_reset_dispatched={}) -- [{}] {}",
936 result->cnt_remote_reset_dispatched, result->index, *conn);
937 }
938
939 private:
940 seastar::future<> init(entity_addr_t test_addr, SocketPolicy policy) {
941 test_msgr->set_default_policy(policy);
942 test_msgr->set_auth_client(&dummy_auth);
943 test_msgr->set_auth_server(&dummy_auth);
944 test_msgr->set_interceptor(&interceptor);
945 return test_msgr->bind(entity_addrvec_t{test_addr}).safe_then([this] {
946 return test_msgr->start({this});
947 }, Messenger::bind_ertr::all_same_way([test_addr] (const std::error_code& e) {
948 logger().error("FailoverSuite: "
949 "there is another instance running at {}", test_addr);
950 ceph_abort();
951 }));
952 }
953
954 seastar::future<> send_op(bool expect_reply=true) {
955 ceph_assert(tracked_conn);
956 if (expect_reply) {
957 ++pending_peer_receive;
958 }
959 pg_t pgid;
960 object_locator_t oloc;
961 hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
962 pgid.pool(), oloc.nspace);
963 spg_t spgid(pgid);
964 return tracked_conn->send(crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0));
965 }
966
967 seastar::future<> flush_pending_send() {
968 if (pending_send != 0) {
969 logger().info("[Test] flush sending {} ops", pending_send);
970 }
971 ceph_assert(tracked_conn);
972 return seastar::do_until(
973 [this] { return pending_send == 0; },
974 [this] {
975 --pending_send;
976 return send_op();
977 });
978 }
979
980 seastar::future<> wait_ready(unsigned num_ready_conns,
981 unsigned num_replaced,
982 bool wait_received) {
983 unsigned pending_conns = 0;
984 unsigned pending_establish = 0;
985 unsigned replaced_conns = 0;
986 for (auto& result : interceptor.results) {
987 if (result.conn->is_closed_clean()) {
988 if (result.state == conn_state_t::replaced) {
989 ++replaced_conns;
990 }
991 } else if (result.conn->is_connected()) {
992 if (tracked_conn != result.conn || tracked_index != result.index) {
993 throw std::runtime_error(fmt::format(
994 "The connected connection [{}] {} doesn't"
995 " match the tracked connection [{}] {}",
996 result.index, *result.conn, tracked_index, *tracked_conn));
997 }
998 if (pending_send == 0 && pending_peer_receive == 0 && pending_receive == 0) {
999 result.state = conn_state_t::established;
1000 } else {
1001 ++pending_establish;
1002 }
1003 } else {
1004 ++pending_conns;
1005 }
1006 }
1007
1008 bool do_wait = false;
1009 if (num_ready_conns > 0) {
1010 if (interceptor.results.size() > num_ready_conns) {
1011 throw std::runtime_error(fmt::format(
1012 "{} connections, more than expected: {}",
1013 interceptor.results.size(), num_ready_conns));
1014 } else if (interceptor.results.size() < num_ready_conns || pending_conns > 0) {
1015 logger().info("[Test] wait_ready(): wait for connections,"
1016 " currently {} out of {}, pending {} ready ...",
1017 interceptor.results.size(), num_ready_conns, pending_conns);
1018 do_wait = true;
1019 }
1020 }
1021 if (wait_received &&
1022 (pending_send || pending_peer_receive || pending_receive)) {
1023 if (pending_conns || pending_establish) {
1024 logger().info("[Test] wait_ready(): wait for pending_send={},"
1025 " pending_peer_receive={}, pending_receive={},"
1026 " pending {}/{} ready/establish connections ...",
1027 pending_send, pending_peer_receive, pending_receive,
1028 pending_conns, pending_establish);
1029 do_wait = true;
1030 }
1031 }
1032 if (num_replaced > 0) {
1033 if (replaced_conns > num_replaced) {
1034 throw std::runtime_error(fmt::format(
1035 "{} replaced connections, more than expected: {}",
1036 replaced_conns, num_replaced));
1037 }
1038 if (replaced_conns < num_replaced) {
1039 logger().info("[Test] wait_ready(): wait for {} replaced connections,"
1040 " currently {} ...",
1041 num_replaced, replaced_conns);
1042 do_wait = true;
1043 }
1044 }
1045
1046 if (do_wait) {
1047 return interceptor.wait(
1048 ).then([this, num_ready_conns, num_replaced, wait_received] {
1049 return wait_ready(num_ready_conns, num_replaced, wait_received);
1050 });
1051 } else {
1052 logger().info("[Test] wait_ready(): wait done!");
1053 return seastar::now();
1054 }
1055 }
1056
1057 // called by FailoverTest
1058 public:
1059 FailoverSuite(MessengerRef test_msgr,
1060 entity_addr_t test_peer_addr,
1061 const TestInterceptor& interceptor)
1062 : test_msgr(test_msgr),
1063 test_peer_addr(test_peer_addr),
1064 interceptor(interceptor) { }
1065
1066 entity_addr_t get_addr() const {
1067 return test_msgr->get_myaddr();
1068 }
1069
1070 seastar::future<> shutdown() {
1071 test_msgr->stop();
1072 return test_msgr->shutdown();
1073 }
1074
1075 void needs_receive() {
1076 ++pending_receive;
1077 }
1078
1079 void notify_peer_reply() {
1080 ceph_assert(pending_peer_receive > 0);
1081 --pending_peer_receive;
1082 logger().info("[Test] TestPeer said got op, left {} ops",
1083 pending_peer_receive);
1084 if (pending_peer_receive == 0) {
1085 interceptor.notify();
1086 }
1087 }
1088
1089 void post_check() const {
1090 // make sure all breakpoints were hit
1091 for (auto& kv : interceptor.breakpoints) {
1092 auto it = interceptor.breakpoints_counter.find(kv.first);
1093 if (it == interceptor.breakpoints_counter.end()) {
1094 throw std::runtime_error(fmt::format("{} was missed", kv.first));
1095 }
1096 auto expected = kv.second.rbegin()->first;
1097 if (expected > it->second.counter) {
1098 throw std::runtime_error(fmt::format(
1099 "{} only triggered {} times, not the expected {}",
1100 kv.first, it->second.counter, expected));
1101 }
1102 }
1103 }
1104
1105 void dump_results() const {
1106 for (auto& result : interceptor.results) {
1107 result.dump();
1108 }
1109 }
1110
1111 static seastar::future<std::unique_ptr<FailoverSuite>>
1112 create(entity_addr_t test_addr,
1113 SocketPolicy test_policy,
1114 entity_addr_t test_peer_addr,
1115 const TestInterceptor& interceptor) {
1116 auto suite = std::make_unique<FailoverSuite>(
1117 Messenger::create(entity_name_t::OSD(TEST_OSD), "Test", TEST_NONCE),
1118 test_peer_addr, interceptor);
1119 return suite->init(test_addr, test_policy
1120 ).then([suite = std::move(suite)] () mutable {
1121 return std::move(suite);
1122 });
1123 }
1124
1125 // called by tests
1126 public:
1127 seastar::future<> connect_peer() {
1128 logger().info("[Test] connect_peer({})", test_peer_addr);
1129 auto conn = test_msgr->connect(test_peer_addr, entity_name_t::TYPE_OSD);
1130 auto result = interceptor.find_result(conn);
1131 ceph_assert(result != nullptr);
1132
1133 if (tracked_conn) {
1134 if (tracked_conn->is_closed()) {
1135 ceph_assert(tracked_conn != conn);
1136 logger().info("[Test] this is a new session replacing an closed one");
1137 } else {
1138 ceph_assert(tracked_index == result->index);
1139 ceph_assert(tracked_conn == conn);
1140 logger().info("[Test] this is not a new session");
1141 }
1142 } else {
1143 logger().info("[Test] this is a new session");
1144 }
1145 tracked_index = result->index;
1146 tracked_conn = conn;
1147
1148 return flush_pending_send();
1149 }
1150
1151 seastar::future<> send_peer() {
1152 if (tracked_conn) {
1153 logger().info("[Test] send_peer()");
1154 ceph_assert(!pending_send);
1155 return send_op();
1156 } else {
1157 ++pending_send;
1158 logger().info("[Test] send_peer() (pending {})", pending_send);
1159 return seastar::now();
1160 }
1161 }
1162
1163 seastar::future<> keepalive_peer() {
1164 logger().info("[Test] keepalive_peer()");
1165 ceph_assert(tracked_conn);
1166 return tracked_conn->send_keepalive();
1167 }
1168
1169 seastar::future<> try_send_peer() {
1170 logger().info("[Test] try_send_peer()");
1171 ceph_assert(tracked_conn);
1172 return send_op(false);
1173 }
1174
1175 seastar::future<> markdown() {
1176 logger().info("[Test] markdown() in 100ms ...");
1177 ceph_assert(tracked_conn);
1178 // sleep to propagate potential remaining acks
1179 return seastar::sleep(100ms
1180 ).then([this] {
1181 tracked_conn->mark_down();
1182 });
1183 }
1184
1185 seastar::future<> wait_blocked() {
1186 logger().info("[Test] wait_blocked() ...");
1187 return interceptor.blocker.wait_blocked();
1188 }
1189
1190 void unblock() {
1191 logger().info("[Test] unblock()");
1192 return interceptor.blocker.unblock();
1193 }
1194
1195 seastar::future<> wait_replaced(unsigned count) {
1196 logger().info("[Test] wait_replaced({}) ...", count);
1197 return wait_ready(0, count, false);
1198 }
1199
1200 seastar::future<> wait_established() {
1201 logger().info("[Test] wait_established() ...");
1202 return wait_ready(0, 0, true);
1203 }
1204
1205 seastar::future<std::reference_wrapper<ConnResults>>
1206 wait_results(unsigned count) {
1207 logger().info("[Test] wait_result({}) ...", count);
1208 return wait_ready(count, 0, true).then([this] {
1209 return std::reference_wrapper<ConnResults>(interceptor.results);
1210 });
1211 }
1212
1213 bool is_standby() {
1214 ceph_assert(tracked_conn);
1215 return !(tracked_conn->is_connected() || tracked_conn->is_closed());
1216 }
1217 };
1218
1219 class FailoverTest : public Dispatcher {
1220 crimson::auth::DummyAuthClientServer dummy_auth;
1221 MessengerRef cmd_msgr;
1222 ConnectionRef cmd_conn;
1223 const entity_addr_t test_addr;
1224 const entity_addr_t test_peer_addr;
1225
1226 std::optional<seastar::promise<>> recv_pong;
1227 std::optional<seastar::promise<>> recv_cmdreply;
1228
1229 std::unique_ptr<FailoverSuite> test_suite;
1230
1231 std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
1232 switch (m->get_type()) {
1233 case CEPH_MSG_PING:
1234 ceph_assert(recv_pong);
1235 recv_pong->set_value();
1236 recv_pong = std::nullopt;
1237 break;
1238 case MSG_COMMAND_REPLY:
1239 ceph_assert(recv_cmdreply);
1240 recv_cmdreply->set_value();
1241 recv_cmdreply = std::nullopt;
1242 break;
1243 case MSG_COMMAND: {
1244 auto m_cmd = boost::static_pointer_cast<MCommand>(m);
1245 ceph_assert(static_cast<cmd_t>(m_cmd->cmd[0][0]) == cmd_t::suite_recv_op);
1246 ceph_assert(test_suite);
1247 test_suite->notify_peer_reply();
1248 break;
1249 }
1250 default:
1251 logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
1252 ceph_abort();
1253 }
1254 return {seastar::now()};
1255 }
1256
1257 private:
1258 seastar::future<> prepare_cmd(
1259 cmd_t cmd,
1260 std::function<void(MCommand&)>
1261 f_prepare = [] (auto& m) { return; }) {
1262 assert(!recv_cmdreply);
1263 recv_cmdreply = seastar::promise<>();
1264 auto fut = recv_cmdreply->get_future();
1265 auto m = crimson::make_message<MCommand>();
1266 m->cmd.emplace_back(1, static_cast<char>(cmd));
1267 f_prepare(*m);
1268 return cmd_conn->send(std::move(m)).then([fut = std::move(fut)] () mutable {
1269 return std::move(fut);
1270 });
1271 }
1272
1273 seastar::future<> start_peer(policy_t peer_policy) {
1274 return prepare_cmd(cmd_t::suite_start,
1275 [peer_policy] (auto& m) {
1276 m.cmd.emplace_back(1, static_cast<char>(peer_policy));
1277 });
1278 }
1279
1280 seastar::future<> stop_peer() {
1281 return prepare_cmd(cmd_t::suite_stop);
1282 }
1283
1284 seastar::future<> pingpong() {
1285 assert(!recv_pong);
1286 recv_pong = seastar::promise<>();
1287 auto fut = recv_pong->get_future();
1288 return cmd_conn->send(crimson::make_message<MPing>()
1289 ).then([fut = std::move(fut)] () mutable {
1290 return std::move(fut);
1291 });
1292 }
1293
1294 seastar::future<> init(entity_addr_t cmd_peer_addr) {
1295 cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0));
1296 cmd_msgr->set_auth_client(&dummy_auth);
1297 cmd_msgr->set_auth_server(&dummy_auth);
1298 return cmd_msgr->start({this}).then([this, cmd_peer_addr] {
1299 logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
1300 cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
1301 return pingpong();
1302 });
1303 }
1304
1305 public:
1306 FailoverTest(MessengerRef cmd_msgr,
1307 entity_addr_t test_addr,
1308 entity_addr_t test_peer_addr)
1309 : cmd_msgr(cmd_msgr),
1310 test_addr(test_addr),
1311 test_peer_addr(test_peer_addr) { }
1312
1313 seastar::future<> shutdown() {
1314 logger().info("CmdCli shutdown...");
1315 assert(!recv_cmdreply);
1316 auto m = crimson::make_message<MCommand>();
1317 m->cmd.emplace_back(1, static_cast<char>(cmd_t::shutdown));
1318 return cmd_conn->send(std::move(m)).then([] {
1319 return seastar::sleep(200ms);
1320 }).then([this] {
1321 cmd_msgr->stop();
1322 return cmd_msgr->shutdown();
1323 });
1324 }
1325
1326 static seastar::future<seastar::lw_shared_ptr<FailoverTest>>
1327 create(entity_addr_t test_addr,
1328 entity_addr_t cmd_peer_addr,
1329 entity_addr_t test_peer_addr) {
1330 auto test = seastar::make_lw_shared<FailoverTest>(
1331 Messenger::create(entity_name_t::OSD(CMD_CLI_OSD), "CmdCli", CMD_CLI_NONCE),
1332 test_addr, test_peer_addr);
1333 return test->init(cmd_peer_addr).then([test] {
1334 logger().info("CmdCli ready");
1335 return test;
1336 });
1337 }
1338
1339 // called by tests
1340 public:
1341 seastar::future<> run_suite(
1342 std::string name,
1343 const TestInterceptor& interceptor,
1344 policy_t test_policy,
1345 policy_t peer_policy,
1346 std::function<seastar::future<>(FailoverSuite&)>&& f) {
1347 logger().info("\n\n[{}]", name);
1348 ceph_assert(!test_suite);
1349 SocketPolicy test_policy_ = to_socket_policy(test_policy);
1350 return FailoverSuite::create(
1351 test_addr, test_policy_, test_peer_addr, interceptor
1352 ).then([this, peer_policy, f = std::move(f)] (auto suite) mutable {
1353 ceph_assert(suite->get_addr() == test_addr);
1354 test_suite.swap(suite);
1355 return start_peer(peer_policy).then([this, f = std::move(f)] {
1356 return f(*test_suite);
1357 }).then([this] {
1358 test_suite->post_check();
1359 logger().info("\n[SUCCESS]");
1360 }).handle_exception([this] (auto eptr) {
1361 logger().info("\n[FAIL: {}]", eptr);
1362 test_suite->dump_results();
1363 throw;
1364 }).then([this] {
1365 return stop_peer();
1366 }).then([this] {
1367 return test_suite->shutdown().then([this] {
1368 test_suite.reset();
1369 });
1370 });
1371 });
1372 }
1373
1374 seastar::future<> peer_connect_me() {
1375 logger().info("[Test] peer_connect_me({})", test_addr);
1376 return prepare_cmd(cmd_t::suite_connect_me,
1377 [this] (auto& m) {
1378 m.cmd.emplace_back(fmt::format("{}", test_addr));
1379 });
1380 }
1381
1382 seastar::future<> peer_send_me() {
1383 logger().info("[Test] peer_send_me()");
1384 ceph_assert(test_suite);
1385 test_suite->needs_receive();
1386 return prepare_cmd(cmd_t::suite_send_me);
1387 }
1388
1389 seastar::future<> try_peer_send_me() {
1390 logger().info("[Test] try_peer_send_me()");
1391 ceph_assert(test_suite);
1392 return prepare_cmd(cmd_t::suite_send_me);
1393 }
1394
1395 seastar::future<> send_bidirectional() {
1396 ceph_assert(test_suite);
1397 return test_suite->send_peer().then([this] {
1398 return peer_send_me();
1399 });
1400 }
1401
1402 seastar::future<> peer_keepalive_me() {
1403 logger().info("[Test] peer_keepalive_me()");
1404 ceph_assert(test_suite);
1405 return prepare_cmd(cmd_t::suite_keepalive_me);
1406 }
1407
1408 seastar::future<> markdown_peer() {
1409 logger().info("[Test] markdown_peer() in 150ms ...");
1410 // sleep to propagate potential remaining acks
1411 return seastar::sleep(50ms
1412 ).then([this] {
1413 return prepare_cmd(cmd_t::suite_markdown);
1414 }).then([] {
1415 // sleep awhile for peer markdown propagated
1416 return seastar::sleep(100ms);
1417 });
1418 }
1419 };
1420
1421 class FailoverSuitePeer : public Dispatcher {
1422 using cb_t = std::function<seastar::future<>()>;
1423 crimson::auth::DummyAuthClientServer dummy_auth;
1424 MessengerRef peer_msgr;
1425 cb_t op_callback;
1426
1427 ConnectionRef tracked_conn;
1428 unsigned pending_send = 0;
1429
1430 std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
1431 logger().info("[TestPeer] got op from Test");
1432 ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
1433 ceph_assert(tracked_conn == c);
1434 std::ignore = op_callback();
1435 return {seastar::now()};
1436 }
1437
1438 void ms_handle_accept(ConnectionRef conn) override {
1439 logger().info("[TestPeer] got accept from Test");
1440 ceph_assert(!tracked_conn ||
1441 tracked_conn->is_closed() ||
1442 tracked_conn == conn);
1443 tracked_conn = conn;
1444 std::ignore = flush_pending_send();
1445 }
1446
1447 void ms_handle_reset(ConnectionRef conn, bool is_replace) override {
1448 logger().info("[TestPeer] got reset from Test");
1449 ceph_assert(tracked_conn == conn);
1450 tracked_conn = nullptr;
1451 }
1452
1453 private:
1454 seastar::future<> init(entity_addr_t test_peer_addr, SocketPolicy policy) {
1455 peer_msgr->set_default_policy(policy);
1456 peer_msgr->set_auth_client(&dummy_auth);
1457 peer_msgr->set_auth_server(&dummy_auth);
1458 return peer_msgr->bind(entity_addrvec_t{test_peer_addr}).safe_then([this] {
1459 return peer_msgr->start({this});
1460 }, Messenger::bind_ertr::all_same_way([test_peer_addr] (const std::error_code& e) {
1461 logger().error("FailoverSuitePeer: "
1462 "there is another instance running at {}", test_peer_addr);
1463 ceph_abort();
1464 }));
1465 }
1466
1467 seastar::future<> send_op() {
1468 ceph_assert(tracked_conn);
1469 pg_t pgid;
1470 object_locator_t oloc;
1471 hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
1472 pgid.pool(), oloc.nspace);
1473 spg_t spgid(pgid);
1474 return tracked_conn->send(crimson::make_message<MOSDOp>(0, 0, hobj, spgid, 0, 0, 0));
1475 }
1476
1477 seastar::future<> flush_pending_send() {
1478 if (pending_send != 0) {
1479 logger().info("[TestPeer] flush sending {} ops", pending_send);
1480 }
1481 ceph_assert(tracked_conn);
1482 return seastar::do_until(
1483 [this] { return pending_send == 0; },
1484 [this] {
1485 --pending_send;
1486 return send_op();
1487 });
1488 }
1489
1490 public:
1491 FailoverSuitePeer(MessengerRef peer_msgr, cb_t op_callback)
1492 : peer_msgr(peer_msgr), op_callback(op_callback) { }
1493
1494 seastar::future<> shutdown() {
1495 peer_msgr->stop();
1496 return peer_msgr->shutdown();
1497 }
1498
1499 seastar::future<> connect_peer(entity_addr_t test_addr_decoded) {
1500 logger().info("[TestPeer] connect_peer({})", test_addr_decoded);
1501 auto new_tracked_conn = peer_msgr->connect(test_addr_decoded, entity_name_t::TYPE_OSD);
1502 if (tracked_conn) {
1503 if (tracked_conn->is_closed()) {
1504 ceph_assert(tracked_conn != new_tracked_conn);
1505 logger().info("[TestPeer] this is a new session"
1506 " replacing an closed one");
1507 } else {
1508 ceph_assert(tracked_conn == new_tracked_conn);
1509 logger().info("[TestPeer] this is not a new session");
1510 }
1511 } else {
1512 logger().info("[TestPeer] this is a new session");
1513 }
1514 tracked_conn = new_tracked_conn;
1515 return flush_pending_send();
1516 }
1517
1518 seastar::future<> send_peer() {
1519 if (tracked_conn) {
1520 logger().info("[TestPeer] send_peer()");
1521 return send_op();
1522 } else {
1523 ++pending_send;
1524 logger().info("[TestPeer] send_peer() (pending {})", pending_send);
1525 return seastar::now();
1526 }
1527 }
1528
1529 seastar::future<> keepalive_peer() {
1530 logger().info("[TestPeer] keepalive_peer()");
1531 ceph_assert(tracked_conn);
1532 return tracked_conn->send_keepalive();
1533 }
1534
1535 seastar::future<> markdown() {
1536 logger().info("[TestPeer] markdown()");
1537 ceph_assert(tracked_conn);
1538 tracked_conn->mark_down();
1539 return seastar::now();
1540 }
1541
1542 static seastar::future<std::unique_ptr<FailoverSuitePeer>>
1543 create(entity_addr_t test_peer_addr, const SocketPolicy& policy, cb_t op_callback) {
1544 auto suite = std::make_unique<FailoverSuitePeer>(
1545 Messenger::create(
1546 entity_name_t::OSD(TEST_PEER_OSD),
1547 "TestPeer",
1548 TEST_PEER_NONCE),
1549 op_callback
1550 );
1551 return suite->init(test_peer_addr, policy
1552 ).then([suite = std::move(suite)] () mutable {
1553 return std::move(suite);
1554 });
1555 }
1556 };
1557
1558 class FailoverTestPeer : public Dispatcher {
1559 crimson::auth::DummyAuthClientServer dummy_auth;
1560 MessengerRef cmd_msgr;
1561 ConnectionRef cmd_conn;
1562 const entity_addr_t test_peer_addr;
1563 std::unique_ptr<FailoverSuitePeer> test_suite;
1564
1565 std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
1566 ceph_assert(cmd_conn == c);
1567 switch (m->get_type()) {
1568 case CEPH_MSG_PING:
1569 std::ignore = c->send(crimson::make_message<MPing>());
1570 break;
1571 case MSG_COMMAND: {
1572 auto m_cmd = boost::static_pointer_cast<MCommand>(m);
1573 auto cmd = static_cast<cmd_t>(m_cmd->cmd[0][0]);
1574 if (cmd == cmd_t::shutdown) {
1575 logger().info("CmdSrv shutdown...");
1576 // forwarded to FailoverTestPeer::wait()
1577 cmd_msgr->stop();
1578 std::ignore = cmd_msgr->shutdown();
1579 } else {
1580 std::ignore = handle_cmd(cmd, m_cmd).then([c] {
1581 return c->send(crimson::make_message<MCommandReply>());
1582 });
1583 }
1584 break;
1585 }
1586 default:
1587 logger().error("{} got unexpected msg from cmd client: {}", *c, *m);
1588 ceph_abort();
1589 }
1590 return {seastar::now()};
1591 }
1592
1593 void ms_handle_accept(ConnectionRef conn) override {
1594 cmd_conn = conn;
1595 }
1596
1597 private:
1598 seastar::future<> notify_recv_op() {
1599 ceph_assert(cmd_conn);
1600 auto m = crimson::make_message<MCommand>();
1601 m->cmd.emplace_back(1, static_cast<char>(cmd_t::suite_recv_op));
1602 return cmd_conn->send(std::move(m));
1603 }
1604
1605 seastar::future<> handle_cmd(cmd_t cmd, MRef<MCommand> m_cmd) {
1606 switch (cmd) {
1607 case cmd_t::suite_start: {
1608 ceph_assert(!test_suite);
1609 auto policy = to_socket_policy(static_cast<policy_t>(m_cmd->cmd[1][0]));
1610 return FailoverSuitePeer::create(
1611 test_peer_addr, policy, [this] { return notify_recv_op(); }
1612 ).then([this] (auto suite) {
1613 test_suite.swap(suite);
1614 });
1615 }
1616 case cmd_t::suite_stop:
1617 ceph_assert(test_suite);
1618 return test_suite->shutdown().then([this] {
1619 test_suite.reset();
1620 });
1621 case cmd_t::suite_connect_me: {
1622 ceph_assert(test_suite);
1623 entity_addr_t test_addr_decoded = entity_addr_t();
1624 test_addr_decoded.parse(m_cmd->cmd[1].c_str(), nullptr);
1625 return test_suite->connect_peer(test_addr_decoded);
1626 }
1627 case cmd_t::suite_send_me:
1628 ceph_assert(test_suite);
1629 return test_suite->send_peer();
1630 case cmd_t::suite_keepalive_me:
1631 ceph_assert(test_suite);
1632 return test_suite->keepalive_peer();
1633 case cmd_t::suite_markdown:
1634 ceph_assert(test_suite);
1635 return test_suite->markdown();
1636 default:
1637 logger().error("TestPeer got unexpected command {} from Test",
1638 fmt::ptr(m_cmd.get()));
1639 ceph_abort();
1640 return seastar::now();
1641 }
1642 }
1643
1644 seastar::future<> init(entity_addr_t cmd_peer_addr) {
1645 cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
1646 cmd_msgr->set_auth_client(&dummy_auth);
1647 cmd_msgr->set_auth_server(&dummy_auth);
1648 return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).safe_then([this] {
1649 return cmd_msgr->start({this});
1650 }, Messenger::bind_ertr::all_same_way([cmd_peer_addr] (const std::error_code& e) {
1651 logger().error("FailoverTestPeer: "
1652 "there is another instance running at {}", cmd_peer_addr);
1653 ceph_abort();
1654 }));
1655 }
1656
1657 public:
1658 FailoverTestPeer(MessengerRef cmd_msgr,
1659 entity_addr_t test_peer_addr)
1660 : cmd_msgr(cmd_msgr),
1661 test_peer_addr(test_peer_addr) { }
1662
1663 seastar::future<> wait() {
1664 return cmd_msgr->wait();
1665 }
1666
1667 static seastar::future<std::unique_ptr<FailoverTestPeer>>
1668 create(entity_addr_t cmd_peer_addr, entity_addr_t test_peer_addr) {
1669 auto test_peer = std::make_unique<FailoverTestPeer>(
1670 Messenger::create(entity_name_t::OSD(CMD_SRV_OSD), "CmdSrv", CMD_SRV_NONCE),
1671 test_peer_addr);
1672 return test_peer->init(cmd_peer_addr
1673 ).then([test_peer = std::move(test_peer)] () mutable {
1674 logger().info("CmdSrv ready");
1675 return std::move(test_peer);
1676 });
1677 }
1678 };
1679
1680 seastar::future<>
1681 test_v2_lossy_early_connect_fault(FailoverTest& test) {
1682 return seastar::do_with(std::vector<Breakpoint>{
1683 {custom_bp_t::SOCKET_CONNECTING},
1684 {custom_bp_t::BANNER_WRITE},
1685 {custom_bp_t::BANNER_READ},
1686 {custom_bp_t::BANNER_PAYLOAD_READ},
1687 {Tag::HELLO, bp_type_t::WRITE},
1688 {Tag::HELLO, bp_type_t::READ},
1689 {Tag::AUTH_REQUEST, bp_type_t::WRITE},
1690 {Tag::AUTH_DONE, bp_type_t::READ},
1691 {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
1692 {Tag::AUTH_SIGNATURE, bp_type_t::READ},
1693 }, [&test] (auto& failure_cases) {
1694 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1695 TestInterceptor interceptor;
1696 interceptor.make_fault(bp);
1697 return test.run_suite(
1698 fmt::format("test_v2_lossy_early_connect_fault -- {}", bp),
1699 interceptor,
1700 policy_t::lossy_client,
1701 policy_t::stateless_server,
1702 [] (FailoverSuite& suite) {
1703 return seastar::futurize_invoke([&suite] {
1704 return suite.send_peer();
1705 }).then([&suite] {
1706 return suite.connect_peer();
1707 }).then([&suite] {
1708 return suite.wait_results(1);
1709 }).then([] (ConnResults& results) {
1710 results[0].assert_state_at(conn_state_t::established);
1711 results[0].assert_connect(2, 1, 0, 1);
1712 results[0].assert_accept(0, 0, 0, 0);
1713 results[0].assert_reset(0, 0);
1714 });
1715 });
1716 });
1717 });
1718 }
1719
1720 seastar::future<>
1721 test_v2_lossy_connect_fault(FailoverTest& test) {
1722 return seastar::do_with(std::vector<Breakpoint>{
1723 {Tag::CLIENT_IDENT, bp_type_t::WRITE},
1724 {Tag::SERVER_IDENT, bp_type_t::READ},
1725 }, [&test] (auto& failure_cases) {
1726 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1727 TestInterceptor interceptor;
1728 interceptor.make_fault(bp);
1729 return test.run_suite(
1730 fmt::format("test_v2_lossy_connect_fault -- {}", bp),
1731 interceptor,
1732 policy_t::lossy_client,
1733 policy_t::stateless_server,
1734 [] (FailoverSuite& suite) {
1735 return seastar::futurize_invoke([&suite] {
1736 return suite.send_peer();
1737 }).then([&suite] {
1738 return suite.connect_peer();
1739 }).then([&suite] {
1740 return suite.wait_results(1);
1741 }).then([] (ConnResults& results) {
1742 results[0].assert_state_at(conn_state_t::established);
1743 results[0].assert_connect(2, 2, 0, 1);
1744 results[0].assert_accept(0, 0, 0, 0);
1745 results[0].assert_reset(0, 0);
1746 });
1747 });
1748 });
1749 });
1750 }
1751
1752 seastar::future<>
1753 test_v2_lossy_connected_fault(FailoverTest& test) {
1754 return seastar::do_with(std::vector<Breakpoint>{
1755 {Tag::MESSAGE, bp_type_t::WRITE},
1756 {Tag::MESSAGE, bp_type_t::READ},
1757 }, [&test] (auto& failure_cases) {
1758 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1759 TestInterceptor interceptor;
1760 interceptor.make_fault(bp);
1761 return test.run_suite(
1762 fmt::format("test_v2_lossy_connected_fault -- {}", bp),
1763 interceptor,
1764 policy_t::lossy_client,
1765 policy_t::stateless_server,
1766 [&test] (FailoverSuite& suite) {
1767 return seastar::futurize_invoke([&test] {
1768 return test.send_bidirectional();
1769 }).then([&suite] {
1770 return suite.connect_peer();
1771 }).then([&suite] {
1772 return suite.wait_results(1);
1773 }).then([] (ConnResults& results) {
1774 results[0].assert_state_at(conn_state_t::closed);
1775 results[0].assert_connect(1, 1, 0, 1);
1776 results[0].assert_accept(0, 0, 0, 0);
1777 results[0].assert_reset(1, 0);
1778 });
1779 });
1780 });
1781 });
1782 }
1783
1784 seastar::future<>
1785 test_v2_lossy_early_accept_fault(FailoverTest& test) {
1786 return seastar::do_with(std::vector<Breakpoint>{
1787 {custom_bp_t::BANNER_WRITE},
1788 {custom_bp_t::BANNER_READ},
1789 {custom_bp_t::BANNER_PAYLOAD_READ},
1790 {Tag::HELLO, bp_type_t::WRITE},
1791 {Tag::HELLO, bp_type_t::READ},
1792 {Tag::AUTH_REQUEST, bp_type_t::READ},
1793 {Tag::AUTH_DONE, bp_type_t::WRITE},
1794 {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
1795 {Tag::AUTH_SIGNATURE, bp_type_t::READ},
1796 }, [&test] (auto& failure_cases) {
1797 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1798 TestInterceptor interceptor;
1799 interceptor.make_fault(bp);
1800 return test.run_suite(
1801 fmt::format("test_v2_lossy_early_accept_fault -- {}", bp),
1802 interceptor,
1803 policy_t::stateless_server,
1804 policy_t::lossy_client,
1805 [&test] (FailoverSuite& suite) {
1806 return seastar::futurize_invoke([&test] {
1807 return test.peer_send_me();
1808 }).then([&test] {
1809 return test.peer_connect_me();
1810 }).then([&suite] {
1811 return suite.wait_results(2);
1812 }).then([] (ConnResults& results) {
1813 results[0].assert_state_at(conn_state_t::closed);
1814 results[0].assert_connect(0, 0, 0, 0);
1815 results[0].assert_accept(1, 0, 0, 0);
1816 results[0].assert_reset(0, 0);
1817 results[1].assert_state_at(conn_state_t::established);
1818 results[1].assert_connect(0, 0, 0, 0);
1819 results[1].assert_accept(1, 1, 0, 1);
1820 results[1].assert_reset(0, 0);
1821 });
1822 });
1823 });
1824 });
1825 }
1826
1827 seastar::future<>
1828 test_v2_lossy_accept_fault(FailoverTest& test) {
1829 auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
1830 TestInterceptor interceptor;
1831 interceptor.make_fault(bp);
1832 return test.run_suite(
1833 fmt::format("test_v2_lossy_accept_fault -- {}", bp),
1834 interceptor,
1835 policy_t::stateless_server,
1836 policy_t::lossy_client,
1837 [&test] (FailoverSuite& suite) {
1838 return seastar::futurize_invoke([&test] {
1839 return test.peer_send_me();
1840 }).then([&test] {
1841 return test.peer_connect_me();
1842 }).then([&suite] {
1843 return suite.wait_results(2);
1844 }).then([] (ConnResults& results) {
1845 results[0].assert_state_at(conn_state_t::closed);
1846 results[0].assert_connect(0, 0, 0, 0);
1847 results[0].assert_accept(1, 1, 0, 0);
1848 results[0].assert_reset(0, 0);
1849 results[1].assert_state_at(conn_state_t::established);
1850 results[1].assert_connect(0, 0, 0, 0);
1851 results[1].assert_accept(1, 1, 0, 1);
1852 results[1].assert_reset(0, 0);
1853 });
1854 });
1855 }
1856
1857 seastar::future<>
1858 test_v2_lossy_establishing_fault(FailoverTest& test) {
1859 auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
1860 TestInterceptor interceptor;
1861 interceptor.make_fault(bp);
1862 return test.run_suite(
1863 fmt::format("test_v2_lossy_establishing_fault -- {}", bp),
1864 interceptor,
1865 policy_t::stateless_server,
1866 policy_t::lossy_client,
1867 [&test] (FailoverSuite& suite) {
1868 return seastar::futurize_invoke([&test] {
1869 return test.peer_send_me();
1870 }).then([&test] {
1871 return test.peer_connect_me();
1872 }).then([&suite] {
1873 return suite.wait_results(2);
1874 }).then([] (ConnResults& results) {
1875 results[0].assert_state_at(conn_state_t::closed);
1876 results[0].assert_connect(0, 0, 0, 0);
1877 results[0].assert_accept(1, 1, 0, 1);
1878 results[0].assert_reset(1, 0);
1879 results[1].assert_state_at(conn_state_t::established);
1880 results[1].assert_connect(0, 0, 0, 0);
1881 results[1].assert_accept(1, 1, 0, 1);
1882 results[1].assert_reset(0, 0);
1883 });
1884 });
1885 }
1886
1887 seastar::future<>
1888 test_v2_lossy_accepted_fault(FailoverTest& test) {
1889 return seastar::do_with(std::vector<Breakpoint>{
1890 {Tag::MESSAGE, bp_type_t::WRITE},
1891 {Tag::MESSAGE, bp_type_t::READ},
1892 }, [&test] (auto& failure_cases) {
1893 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1894 TestInterceptor interceptor;
1895 interceptor.make_fault(bp);
1896 return test.run_suite(
1897 fmt::format("test_v2_lossy_accepted_fault -- {}", bp),
1898 interceptor,
1899 policy_t::stateless_server,
1900 policy_t::lossy_client,
1901 [&test] (FailoverSuite& suite) {
1902 return seastar::futurize_invoke([&test] {
1903 return test.send_bidirectional();
1904 }).then([&test] {
1905 return test.peer_connect_me();
1906 }).then([&suite] {
1907 return suite.wait_results(1);
1908 }).then([] (ConnResults& results) {
1909 results[0].assert_state_at(conn_state_t::closed);
1910 results[0].assert_connect(0, 0, 0, 0);
1911 results[0].assert_accept(1, 1, 0, 1);
1912 results[0].assert_reset(1, 0);
1913 });
1914 });
1915 });
1916 });
1917 }
1918
1919 seastar::future<>
1920 test_v2_lossless_connect_fault(FailoverTest& test) {
1921 return seastar::do_with(std::vector<Breakpoint>{
1922 {Tag::CLIENT_IDENT, bp_type_t::WRITE},
1923 {Tag::SERVER_IDENT, bp_type_t::READ},
1924 }, [&test] (auto& failure_cases) {
1925 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1926 TestInterceptor interceptor;
1927 interceptor.make_fault(bp);
1928 return test.run_suite(
1929 fmt::format("test_v2_lossless_connect_fault -- {}", bp),
1930 interceptor,
1931 policy_t::lossless_client,
1932 policy_t::stateful_server,
1933 [&test] (FailoverSuite& suite) {
1934 return seastar::futurize_invoke([&test] {
1935 return test.send_bidirectional();
1936 }).then([&suite] {
1937 return suite.connect_peer();
1938 }).then([&suite] {
1939 return suite.wait_results(1);
1940 }).then([] (ConnResults& results) {
1941 results[0].assert_state_at(conn_state_t::established);
1942 results[0].assert_connect(2, 2, 0, 1);
1943 results[0].assert_accept(0, 0, 0, 0);
1944 results[0].assert_reset(0, 0);
1945 });
1946 });
1947 });
1948 });
1949 }
1950
1951 seastar::future<>
1952 test_v2_lossless_connected_fault(FailoverTest& test) {
1953 return seastar::do_with(std::vector<Breakpoint>{
1954 {Tag::MESSAGE, bp_type_t::WRITE},
1955 {Tag::MESSAGE, bp_type_t::READ},
1956 }, [&test] (auto& failure_cases) {
1957 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1958 TestInterceptor interceptor;
1959 interceptor.make_fault(bp);
1960 return test.run_suite(
1961 fmt::format("test_v2_lossless_connected_fault -- {}", bp),
1962 interceptor,
1963 policy_t::lossless_client,
1964 policy_t::stateful_server,
1965 [&test] (FailoverSuite& suite) {
1966 return seastar::futurize_invoke([&test] {
1967 return test.send_bidirectional();
1968 }).then([&suite] {
1969 return suite.connect_peer();
1970 }).then([&suite] {
1971 return suite.wait_results(1);
1972 }).then([] (ConnResults& results) {
1973 results[0].assert_state_at(conn_state_t::established);
1974 results[0].assert_connect(2, 1, 1, 2);
1975 results[0].assert_accept(0, 0, 0, 0);
1976 results[0].assert_reset(0, 0);
1977 });
1978 });
1979 });
1980 });
1981 }
1982
1983 seastar::future<>
1984 test_v2_lossless_connected_fault2(FailoverTest& test) {
1985 return seastar::do_with(std::vector<Breakpoint>{
1986 {Tag::ACK, bp_type_t::READ},
1987 {Tag::ACK, bp_type_t::WRITE},
1988 {Tag::KEEPALIVE2, bp_type_t::READ},
1989 {Tag::KEEPALIVE2, bp_type_t::WRITE},
1990 {Tag::KEEPALIVE2_ACK, bp_type_t::READ},
1991 {Tag::KEEPALIVE2_ACK, bp_type_t::WRITE},
1992 }, [&test] (auto& failure_cases) {
1993 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
1994 TestInterceptor interceptor;
1995 interceptor.make_fault(bp);
1996 return test.run_suite(
1997 fmt::format("test_v2_lossless_connected_fault2 -- {}", bp),
1998 interceptor,
1999 policy_t::lossless_client,
2000 policy_t::stateful_server,
2001 [&test] (FailoverSuite& suite) {
2002 return seastar::futurize_invoke([&suite] {
2003 return suite.connect_peer();
2004 }).then([&suite] {
2005 return suite.wait_established();
2006 }).then([&suite] {
2007 return suite.send_peer();
2008 }).then([&suite] {
2009 return suite.keepalive_peer();
2010 }).then([&suite] {
2011 return suite.wait_established();
2012 }).then([&test] {
2013 return test.peer_send_me();
2014 }).then([&test] {
2015 return test.peer_keepalive_me();
2016 }).then([&suite] {
2017 return suite.wait_established();
2018 }).then([&suite] {
2019 return suite.send_peer();
2020 }).then([&suite] {
2021 return suite.wait_established();
2022 }).then([&test] {
2023 return test.peer_send_me();
2024 }).then([&suite] {
2025 return suite.wait_established();
2026 }).then([&suite] {
2027 return suite.wait_results(1);
2028 }).then([] (ConnResults& results) {
2029 results[0].assert_state_at(conn_state_t::established);
2030 results[0].assert_connect(2, 1, 1, 2);
2031 results[0].assert_accept(0, 0, 0, 0);
2032 results[0].assert_reset(0, 0);
2033 });
2034 });
2035 });
2036 });
2037 }
2038
2039 seastar::future<>
2040 test_v2_lossless_reconnect_fault(FailoverTest& test) {
2041 return seastar::do_with(std::vector<std::pair<Breakpoint, Breakpoint>>{
2042 {{Tag::MESSAGE, bp_type_t::WRITE},
2043 {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
2044 {{Tag::MESSAGE, bp_type_t::WRITE},
2045 {Tag::SESSION_RECONNECT_OK, bp_type_t::READ}},
2046 }, [&test] (auto& failure_cases) {
2047 return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) {
2048 TestInterceptor interceptor;
2049 interceptor.make_fault(bp_pair.first);
2050 interceptor.make_fault(bp_pair.second);
2051 return test.run_suite(
2052 fmt::format("test_v2_lossless_reconnect_fault -- {}, {}",
2053 bp_pair.first, bp_pair.second),
2054 interceptor,
2055 policy_t::lossless_client,
2056 policy_t::stateful_server,
2057 [&test] (FailoverSuite& suite) {
2058 return seastar::futurize_invoke([&test] {
2059 return test.send_bidirectional();
2060 }).then([&suite] {
2061 return suite.connect_peer();
2062 }).then([&suite] {
2063 return suite.wait_results(1);
2064 }).then([] (ConnResults& results) {
2065 results[0].assert_state_at(conn_state_t::established);
2066 results[0].assert_connect(3, 1, 2, 2);
2067 results[0].assert_accept(0, 0, 0, 0);
2068 results[0].assert_reset(0, 0);
2069 });
2070 });
2071 });
2072 });
2073 }
2074
2075 seastar::future<>
2076 test_v2_lossless_accept_fault(FailoverTest& test) {
2077 auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
2078 TestInterceptor interceptor;
2079 interceptor.make_fault(bp);
2080 return test.run_suite(
2081 fmt::format("test_v2_lossless_accept_fault -- {}", bp),
2082 interceptor,
2083 policy_t::stateful_server,
2084 policy_t::lossless_client,
2085 [&test] (FailoverSuite& suite) {
2086 return seastar::futurize_invoke([&test] {
2087 return test.send_bidirectional();
2088 }).then([&test] {
2089 return test.peer_connect_me();
2090 }).then([&suite] {
2091 return suite.wait_results(2);
2092 }).then([] (ConnResults& results) {
2093 results[0].assert_state_at(conn_state_t::closed);
2094 results[0].assert_connect(0, 0, 0, 0);
2095 results[0].assert_accept(1, 1, 0, 0);
2096 results[0].assert_reset(0, 0);
2097 results[1].assert_state_at(conn_state_t::established);
2098 results[1].assert_connect(0, 0, 0, 0);
2099 results[1].assert_accept(1, 1, 0, 1);
2100 results[1].assert_reset(0, 0);
2101 });
2102 });
2103 }
2104
2105 seastar::future<>
2106 test_v2_lossless_establishing_fault(FailoverTest& test) {
2107 auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
2108 TestInterceptor interceptor;
2109 interceptor.make_fault(bp);
2110 return test.run_suite(
2111 fmt::format("test_v2_lossless_establishing_fault -- {}", bp),
2112 interceptor,
2113 policy_t::stateful_server,
2114 policy_t::lossless_client,
2115 [&test] (FailoverSuite& suite) {
2116 return seastar::futurize_invoke([&test] {
2117 return test.send_bidirectional();
2118 }).then([&test] {
2119 return test.peer_connect_me();
2120 }).then([&suite] {
2121 return suite.wait_results(2);
2122 }).then([] (ConnResults& results) {
2123 results[0].assert_state_at(conn_state_t::established);
2124 results[0].assert_connect(0, 0, 0, 0);
2125 results[0].assert_accept(1, 1, 0, 2);
2126 results[0].assert_reset(0, 0);
2127 results[1].assert_state_at(conn_state_t::replaced);
2128 results[1].assert_connect(0, 0, 0, 0);
2129 results[1].assert_accept(1, 1, 0, 0);
2130 results[1].assert_reset(0, 0);
2131 });
2132 });
2133 }
2134
2135 seastar::future<>
2136 test_v2_lossless_accepted_fault(FailoverTest& test) {
2137 return seastar::do_with(std::vector<Breakpoint>{
2138 {Tag::MESSAGE, bp_type_t::WRITE},
2139 {Tag::MESSAGE, bp_type_t::READ},
2140 }, [&test] (auto& failure_cases) {
2141 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
2142 TestInterceptor interceptor;
2143 interceptor.make_fault(bp);
2144 return test.run_suite(
2145 fmt::format("test_v2_lossless_accepted_fault -- {}", bp),
2146 interceptor,
2147 policy_t::stateful_server,
2148 policy_t::lossless_client,
2149 [&test] (FailoverSuite& suite) {
2150 return seastar::futurize_invoke([&test] {
2151 return test.send_bidirectional();
2152 }).then([&test] {
2153 return test.peer_connect_me();
2154 }).then([&suite] {
2155 return suite.wait_results(2);
2156 }).then([] (ConnResults& results) {
2157 results[0].assert_state_at(conn_state_t::established);
2158 results[0].assert_connect(0, 0, 0, 0);
2159 results[0].assert_accept(1, 1, 0, 2);
2160 results[0].assert_reset(0, 0);
2161 results[1].assert_state_at(conn_state_t::replaced);
2162 results[1].assert_connect(0, 0, 0, 0);
2163 results[1].assert_accept(1, 0);
2164 results[1].assert_reset(0, 0);
2165 });
2166 });
2167 });
2168 });
2169 }
2170
2171 seastar::future<>
2172 test_v2_lossless_reaccept_fault(FailoverTest& test) {
2173 return seastar::do_with(std::vector<std::pair<Breakpoint, Breakpoint>>{
2174 {{Tag::MESSAGE, bp_type_t::READ},
2175 {Tag::SESSION_RECONNECT, bp_type_t::READ}},
2176 {{Tag::MESSAGE, bp_type_t::READ},
2177 {Tag::SESSION_RECONNECT_OK, bp_type_t::WRITE}},
2178 }, [&test] (auto& failure_cases) {
2179 return seastar::do_for_each(failure_cases, [&test] (auto bp_pair) {
2180 TestInterceptor interceptor;
2181 interceptor.make_fault(bp_pair.first);
2182 interceptor.make_fault(bp_pair.second);
2183 return test.run_suite(
2184 fmt::format("test_v2_lossless_reaccept_fault -- {}, {}",
2185 bp_pair.first, bp_pair.second),
2186 interceptor,
2187 policy_t::stateful_server,
2188 policy_t::lossless_client,
2189 [&test, bp = bp_pair.second] (FailoverSuite& suite) {
2190 return seastar::futurize_invoke([&test] {
2191 return test.send_bidirectional();
2192 }).then([&test] {
2193 return test.peer_connect_me();
2194 }).then([&suite] {
2195 return suite.wait_results(3);
2196 }).then([bp] (ConnResults& results) {
2197 results[0].assert_state_at(conn_state_t::established);
2198 results[0].assert_connect(0, 0, 0, 0);
2199 if (bp == Breakpoint{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
2200 results[0].assert_accept(1, 1, 0, 2);
2201 } else {
2202 results[0].assert_accept(1, 1, 0, 3);
2203 }
2204 results[0].assert_reset(0, 0);
2205 if (bp == Breakpoint{Tag::SESSION_RECONNECT, bp_type_t::READ}) {
2206 results[1].assert_state_at(conn_state_t::closed);
2207 } else {
2208 results[1].assert_state_at(conn_state_t::replaced);
2209 }
2210 results[1].assert_connect(0, 0, 0, 0);
2211 results[1].assert_accept(1, 0, 1, 0);
2212 results[1].assert_reset(0, 0);
2213 results[2].assert_state_at(conn_state_t::replaced);
2214 results[2].assert_connect(0, 0, 0, 0);
2215 results[2].assert_accept(1, 0, 1, 0);
2216 results[2].assert_reset(0, 0);
2217 });
2218 });
2219 });
2220 });
2221 }
2222
2223 seastar::future<>
2224 test_v2_peer_connect_fault(FailoverTest& test) {
2225 return seastar::do_with(std::vector<Breakpoint>{
2226 {Tag::CLIENT_IDENT, bp_type_t::WRITE},
2227 {Tag::SERVER_IDENT, bp_type_t::READ},
2228 }, [&test] (auto& failure_cases) {
2229 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
2230 TestInterceptor interceptor;
2231 interceptor.make_fault(bp);
2232 return test.run_suite(
2233 fmt::format("test_v2_peer_connect_fault -- {}", bp),
2234 interceptor,
2235 policy_t::lossless_peer,
2236 policy_t::lossless_peer,
2237 [] (FailoverSuite& suite) {
2238 return seastar::futurize_invoke([&suite] {
2239 return suite.send_peer();
2240 }).then([&suite] {
2241 return suite.connect_peer();
2242 }).then([&suite] {
2243 return suite.wait_results(1);
2244 }).then([] (ConnResults& results) {
2245 results[0].assert_state_at(conn_state_t::established);
2246 results[0].assert_connect(2, 2, 0, 1);
2247 results[0].assert_accept(0, 0, 0, 0);
2248 results[0].assert_reset(0, 0);
2249 });
2250 });
2251 });
2252 });
2253 }
2254
2255 seastar::future<>
2256 test_v2_peer_accept_fault(FailoverTest& test) {
2257 auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
2258 TestInterceptor interceptor;
2259 interceptor.make_fault(bp);
2260 return test.run_suite(
2261 fmt::format("test_v2_peer_accept_fault -- {}", bp),
2262 interceptor,
2263 policy_t::lossless_peer,
2264 policy_t::lossless_peer,
2265 [&test] (FailoverSuite& suite) {
2266 return seastar::futurize_invoke([&test] {
2267 return test.peer_send_me();
2268 }).then([&test] {
2269 return test.peer_connect_me();
2270 }).then([&suite] {
2271 return suite.wait_results(2);
2272 }).then([] (ConnResults& results) {
2273 results[0].assert_state_at(conn_state_t::closed);
2274 results[0].assert_connect(0, 0, 0, 0);
2275 results[0].assert_accept(1, 1, 0, 0);
2276 results[0].assert_reset(0, 0);
2277 results[1].assert_state_at(conn_state_t::established);
2278 results[1].assert_connect(0, 0, 0, 0);
2279 results[1].assert_accept(1, 1, 0, 1);
2280 results[1].assert_reset(0, 0);
2281 });
2282 });
2283 }
2284
2285 seastar::future<>
2286 test_v2_peer_establishing_fault(FailoverTest& test) {
2287 auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
2288 TestInterceptor interceptor;
2289 interceptor.make_fault(bp);
2290 return test.run_suite(
2291 fmt::format("test_v2_peer_establishing_fault -- {}", bp),
2292 interceptor,
2293 policy_t::lossless_peer,
2294 policy_t::lossless_peer,
2295 [&test] (FailoverSuite& suite) {
2296 return seastar::futurize_invoke([&test] {
2297 return test.peer_send_me();
2298 }).then([&test] {
2299 return test.peer_connect_me();
2300 }).then([&suite] {
2301 return suite.wait_results(2);
2302 }).then([] (ConnResults& results) {
2303 results[0].assert_state_at(conn_state_t::established);
2304 results[0].assert_connect(0, 0, 0, 0);
2305 results[0].assert_accept(1, 1, 0, 2);
2306 results[0].assert_reset(0, 0);
2307 results[1].assert_state_at(conn_state_t::replaced);
2308 results[1].assert_connect(0, 0, 0, 0);
2309 results[1].assert_accept(1, 1, 0, 0);
2310 results[1].assert_reset(0, 0);
2311 });
2312 });
2313 }
2314
2315 seastar::future<>
2316 test_v2_peer_connected_fault_reconnect(FailoverTest& test) {
2317 auto bp = Breakpoint{Tag::MESSAGE, bp_type_t::WRITE};
2318 TestInterceptor interceptor;
2319 interceptor.make_fault(bp);
2320 return test.run_suite(
2321 fmt::format("test_v2_peer_connected_fault_reconnect -- {}", bp),
2322 interceptor,
2323 policy_t::lossless_peer,
2324 policy_t::lossless_peer,
2325 [] (FailoverSuite& suite) {
2326 return seastar::futurize_invoke([&suite] {
2327 return suite.send_peer();
2328 }).then([&suite] {
2329 return suite.connect_peer();
2330 }).then([&suite] {
2331 return suite.wait_results(1);
2332 }).then([] (ConnResults& results) {
2333 results[0].assert_state_at(conn_state_t::established);
2334 results[0].assert_connect(2, 1, 1, 2);
2335 results[0].assert_accept(0, 0, 0, 0);
2336 results[0].assert_reset(0, 0);
2337 });
2338 });
2339 }
2340
2341 seastar::future<>
2342 test_v2_peer_connected_fault_reaccept(FailoverTest& test) {
2343 auto bp = Breakpoint{Tag::MESSAGE, bp_type_t::READ};
2344 TestInterceptor interceptor;
2345 interceptor.make_fault(bp);
2346 return test.run_suite(
2347 fmt::format("test_v2_peer_connected_fault_reaccept -- {}", bp),
2348 interceptor,
2349 policy_t::lossless_peer,
2350 policy_t::lossless_peer,
2351 [&test] (FailoverSuite& suite) {
2352 return seastar::futurize_invoke([&test] {
2353 return test.peer_send_me();
2354 }).then([&suite] {
2355 return suite.connect_peer();
2356 }).then([&suite] {
2357 return suite.wait_results(2);
2358 }).then([] (ConnResults& results) {
2359 results[0].assert_state_at(conn_state_t::established);
2360 results[0].assert_connect(1, 1, 0, 1);
2361 results[0].assert_accept(0, 0, 0, 1);
2362 results[0].assert_reset(0, 0);
2363 results[1].assert_state_at(conn_state_t::replaced);
2364 results[1].assert_connect(0, 0, 0, 0);
2365 results[1].assert_accept(1, 0, 1, 0);
2366 results[1].assert_reset(0, 0);
2367 });
2368 });
2369 }
2370
2371 seastar::future<bool>
2372 check_peer_wins(FailoverTest& test) {
2373 return seastar::do_with(bool(), [&test] (auto& ret) {
2374 return test.run_suite("check_peer_wins",
2375 TestInterceptor(),
2376 policy_t::lossy_client,
2377 policy_t::stateless_server,
2378 [&ret] (FailoverSuite& suite) {
2379 return suite.connect_peer().then([&suite] {
2380 return suite.wait_results(1);
2381 }).then([&ret] (ConnResults& results) {
2382 results[0].assert_state_at(conn_state_t::established);
2383 ret = results[0].conn->peer_wins();
2384 logger().info("check_peer_wins: {}", ret);
2385 });
2386 }).then([&ret] {
2387 return ret;
2388 });
2389 });
2390 }
2391
2392 seastar::future<>
2393 test_v2_racing_reconnect_acceptor_lose(FailoverTest& test) {
2394 return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
2395 {1, {Tag::SESSION_RECONNECT, bp_type_t::READ}},
2396 {2, {custom_bp_t::BANNER_WRITE}},
2397 {2, {custom_bp_t::BANNER_READ}},
2398 {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
2399 {2, {Tag::HELLO, bp_type_t::WRITE}},
2400 {2, {Tag::HELLO, bp_type_t::READ}},
2401 {2, {Tag::AUTH_REQUEST, bp_type_t::READ}},
2402 {2, {Tag::AUTH_DONE, bp_type_t::WRITE}},
2403 {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
2404 {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
2405 }, [&test] (auto& failure_cases) {
2406 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
2407 TestInterceptor interceptor;
2408 // fault acceptor
2409 interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
2410 // block acceptor
2411 interceptor.make_block(bp.second, bp.first);
2412 return test.run_suite(
2413 fmt::format("test_v2_racing_reconnect_acceptor_lose -- {}({})",
2414 bp.second, bp.first),
2415 interceptor,
2416 policy_t::lossless_peer,
2417 policy_t::lossless_peer,
2418 [&test] (FailoverSuite& suite) {
2419 return seastar::futurize_invoke([&test] {
2420 return test.peer_send_me();
2421 }).then([&test] {
2422 return test.peer_connect_me();
2423 }).then([&suite] {
2424 return suite.wait_blocked();
2425 }).then([&suite] {
2426 return suite.send_peer();
2427 }).then([&suite] {
2428 return suite.wait_established();
2429 }).then([&suite] {
2430 suite.unblock();
2431 return suite.wait_results(2);
2432 }).then([] (ConnResults& results) {
2433 results[0].assert_state_at(conn_state_t::established);
2434 results[0].assert_connect(1, 0, 1, 1);
2435 results[0].assert_accept(1, 1, 0, 1);
2436 results[0].assert_reset(0, 0);
2437 results[1].assert_state_at(conn_state_t::closed);
2438 results[1].assert_connect(0, 0, 0, 0);
2439 results[1].assert_accept(1, 0);
2440 results[1].assert_reset(0, 0);
2441 });
2442 });
2443 });
2444 });
2445 }
2446
2447 seastar::future<>
2448 test_v2_racing_reconnect_acceptor_win(FailoverTest& test) {
2449 return seastar::do_with(std::vector<std::pair<unsigned, Breakpoint>>{
2450 {1, {Tag::SESSION_RECONNECT, bp_type_t::WRITE}},
2451 {2, {custom_bp_t::SOCKET_CONNECTING}},
2452 {2, {custom_bp_t::BANNER_WRITE}},
2453 {2, {custom_bp_t::BANNER_READ}},
2454 {2, {custom_bp_t::BANNER_PAYLOAD_READ}},
2455 {2, {Tag::HELLO, bp_type_t::WRITE}},
2456 {2, {Tag::HELLO, bp_type_t::READ}},
2457 {2, {Tag::AUTH_REQUEST, bp_type_t::WRITE}},
2458 {2, {Tag::AUTH_DONE, bp_type_t::READ}},
2459 {2, {Tag::AUTH_SIGNATURE, bp_type_t::WRITE}},
2460 {2, {Tag::AUTH_SIGNATURE, bp_type_t::READ}},
2461 }, [&test] (auto& failure_cases) {
2462 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
2463 TestInterceptor interceptor;
2464 // fault connector
2465 interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
2466 // block connector
2467 interceptor.make_block(bp.second, bp.first);
2468 return test.run_suite(
2469 fmt::format("test_v2_racing_reconnect_acceptor_win -- {}({})",
2470 bp.second, bp.first),
2471 interceptor,
2472 policy_t::lossless_peer,
2473 policy_t::lossless_peer,
2474 [&test] (FailoverSuite& suite) {
2475 return seastar::futurize_invoke([&suite] {
2476 return suite.send_peer();
2477 }).then([&suite] {
2478 return suite.connect_peer();
2479 }).then([&suite] {
2480 return suite.wait_blocked();
2481 }).then([&test] {
2482 return test.peer_send_me();
2483 }).then([&suite] {
2484 return suite.wait_replaced(1);
2485 }).then([&suite] {
2486 suite.unblock();
2487 return suite.wait_results(2);
2488 }).then([] (ConnResults& results) {
2489 results[0].assert_state_at(conn_state_t::established);
2490 results[0].assert_connect(2, 1);
2491 results[0].assert_accept(0, 0, 0, 1);
2492 results[0].assert_reset(0, 0);
2493 results[1].assert_state_at(conn_state_t::replaced);
2494 results[1].assert_connect(0, 0, 0, 0);
2495 results[1].assert_accept(1, 0, 1, 0);
2496 results[1].assert_reset(0, 0);
2497 });
2498 });
2499 });
2500 });
2501 }
2502
2503 seastar::future<>
2504 test_v2_racing_connect_acceptor_lose(FailoverTest& test) {
2505 return seastar::do_with(std::vector<Breakpoint>{
2506 {custom_bp_t::BANNER_WRITE},
2507 {custom_bp_t::BANNER_READ},
2508 {custom_bp_t::BANNER_PAYLOAD_READ},
2509 {Tag::HELLO, bp_type_t::WRITE},
2510 {Tag::HELLO, bp_type_t::READ},
2511 {Tag::AUTH_REQUEST, bp_type_t::READ},
2512 {Tag::AUTH_DONE, bp_type_t::WRITE},
2513 {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
2514 {Tag::AUTH_SIGNATURE, bp_type_t::READ},
2515 {Tag::CLIENT_IDENT, bp_type_t::READ},
2516 }, [&test] (auto& failure_cases) {
2517 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
2518 TestInterceptor interceptor;
2519 // block acceptor
2520 interceptor.make_block(bp);
2521 return test.run_suite(
2522 fmt::format("test_v2_racing_connect_acceptor_lose -- {}", bp),
2523 interceptor,
2524 policy_t::lossless_peer,
2525 policy_t::lossless_peer,
2526 [&test] (FailoverSuite& suite) {
2527 return seastar::futurize_invoke([&test] {
2528 return test.peer_send_me();
2529 }).then([&test] {
2530 return test.peer_connect_me();
2531 }).then([&suite] {
2532 return suite.wait_blocked();
2533 }).then([&suite] {
2534 return suite.send_peer();
2535 }).then([&suite] {
2536 return suite.connect_peer();
2537 }).then([&suite] {
2538 return suite.wait_established();
2539 }).then([&suite] {
2540 suite.unblock();
2541 return suite.wait_results(2);
2542 }).then([] (ConnResults& results) {
2543 results[0].assert_state_at(conn_state_t::closed);
2544 results[0].assert_connect(0, 0, 0, 0);
2545 results[0].assert_accept(1, 0);
2546 results[0].assert_reset(0, 0);
2547 results[1].assert_state_at(conn_state_t::established);
2548 results[1].assert_connect(1, 1, 0, 1);
2549 results[1].assert_accept(0, 0, 0, 0);
2550 results[1].assert_reset(0, 0);
2551 });
2552 });
2553 });
2554 });
2555 }
2556
2557 seastar::future<>
2558 test_v2_racing_connect_acceptor_win(FailoverTest& test) {
2559 return seastar::do_with(std::vector<Breakpoint>{
2560 {custom_bp_t::SOCKET_CONNECTING},
2561 {custom_bp_t::BANNER_WRITE},
2562 {custom_bp_t::BANNER_READ},
2563 {custom_bp_t::BANNER_PAYLOAD_READ},
2564 {Tag::HELLO, bp_type_t::WRITE},
2565 {Tag::HELLO, bp_type_t::READ},
2566 {Tag::AUTH_REQUEST, bp_type_t::WRITE},
2567 {Tag::AUTH_DONE, bp_type_t::READ},
2568 {Tag::AUTH_SIGNATURE, bp_type_t::WRITE},
2569 {Tag::AUTH_SIGNATURE, bp_type_t::READ},
2570 {Tag::CLIENT_IDENT, bp_type_t::WRITE},
2571 }, [&test] (auto& failure_cases) {
2572 return seastar::do_for_each(failure_cases, [&test] (auto bp) {
2573 TestInterceptor interceptor;
2574 // block connector
2575 interceptor.make_block(bp);
2576 return test.run_suite(
2577 fmt::format("test_v2_racing_connect_acceptor_win -- {}", bp),
2578 interceptor,
2579 policy_t::lossless_peer,
2580 policy_t::lossless_peer,
2581 [&test] (FailoverSuite& suite) {
2582 return seastar::futurize_invoke([&suite] {
2583 return suite.send_peer();
2584 }).then([&suite] {
2585 return suite.connect_peer();
2586 }).then([&suite] {
2587 return suite.wait_blocked();
2588 }).then([&test] {
2589 return test.peer_send_me();
2590 }).then([&test] {
2591 return test.peer_connect_me();
2592 }).then([&suite] {
2593 return suite.wait_replaced(1);
2594 }).then([&suite] {
2595 suite.unblock();
2596 return suite.wait_results(2);
2597 }).then([] (ConnResults& results) {
2598 results[0].assert_state_at(conn_state_t::established);
2599 results[0].assert_connect(1, 0);
2600 results[0].assert_accept(0, 0, 0, 1);
2601 results[0].assert_reset(0, 0);
2602 results[1].assert_state_at(conn_state_t::replaced);
2603 results[1].assert_connect(0, 0, 0, 0);
2604 results[1].assert_accept(1, 1, 0, 0);
2605 results[1].assert_reset(0, 0);
2606 });
2607 });
2608 });
2609 });
2610 }
2611
2612 seastar::future<>
2613 test_v2_racing_connect_reconnect_lose(FailoverTest& test) {
2614 TestInterceptor interceptor;
2615 interceptor.make_fault({Tag::SERVER_IDENT, bp_type_t::READ});
2616 interceptor.make_block({Tag::CLIENT_IDENT, bp_type_t::WRITE}, 2);
2617 return test.run_suite("test_v2_racing_connect_reconnect_lose",
2618 interceptor,
2619 policy_t::lossless_peer,
2620 policy_t::lossless_peer,
2621 [&test] (FailoverSuite& suite) {
2622 return seastar::futurize_invoke([&suite] {
2623 return suite.send_peer();
2624 }).then([&suite] {
2625 return suite.connect_peer();
2626 }).then([&suite] {
2627 return suite.wait_blocked();
2628 }).then([&test] {
2629 return test.peer_send_me();
2630 }).then([&suite] {
2631 return suite.wait_replaced(1);
2632 }).then([&suite] {
2633 suite.unblock();
2634 return suite.wait_results(2);
2635 }).then([] (ConnResults& results) {
2636 results[0].assert_state_at(conn_state_t::established);
2637 results[0].assert_connect(2, 2, 0, 0);
2638 results[0].assert_accept(0, 0, 0, 1);
2639 results[0].assert_reset(0, 0);
2640 results[1].assert_state_at(conn_state_t::replaced);
2641 results[1].assert_connect(0, 0, 0, 0);
2642 results[1].assert_accept(1, 1, 1, 0);
2643 results[1].assert_reset(0, 0);
2644 });
2645 });
2646 }
2647
2648 seastar::future<>
2649 test_v2_racing_connect_reconnect_win(FailoverTest& test) {
2650 TestInterceptor interceptor;
2651 interceptor.make_fault({Tag::SERVER_IDENT, bp_type_t::READ});
2652 interceptor.make_block({Tag::SESSION_RECONNECT, bp_type_t::READ});
2653 return test.run_suite("test_v2_racing_connect_reconnect_win",
2654 interceptor,
2655 policy_t::lossless_peer,
2656 policy_t::lossless_peer,
2657 [&test] (FailoverSuite& suite) {
2658 return seastar::futurize_invoke([&test] {
2659 return test.peer_send_me();
2660 }).then([&suite] {
2661 return suite.connect_peer();
2662 }).then([&suite] {
2663 return suite.wait_blocked();
2664 }).then([&suite] {
2665 return suite.send_peer();
2666 }).then([&suite] {
2667 return suite.wait_established();
2668 }).then([&suite] {
2669 suite.unblock();
2670 return suite.wait_results(2);
2671 }).then([] (ConnResults& results) {
2672 results[0].assert_state_at(conn_state_t::established);
2673 results[0].assert_connect(2, 2, 0, 1);
2674 results[0].assert_accept(0, 0, 0, 0);
2675 results[0].assert_reset(0, 0);
2676 results[1].assert_state_at(conn_state_t::closed);
2677 results[1].assert_connect(0, 0, 0, 0);
2678 results[1].assert_accept(1, 0, 1, 0);
2679 results[1].assert_reset(0, 0);
2680 });
2681 });
2682 }
2683
2684 seastar::future<>
2685 test_v2_stale_connect(FailoverTest& test) {
2686 auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::READ};
2687 TestInterceptor interceptor;
2688 interceptor.make_stall(bp);
2689 return test.run_suite(
2690 fmt::format("test_v2_stale_connect -- {}", bp),
2691 interceptor,
2692 policy_t::lossless_peer,
2693 policy_t::lossless_peer,
2694 [&test] (FailoverSuite& suite) {
2695 return seastar::futurize_invoke([&suite] {
2696 return suite.connect_peer();
2697 }).then([&suite] {
2698 return suite.wait_blocked();
2699 }).then([&test] {
2700 return test.peer_send_me();
2701 }).then([&suite] {
2702 return suite.wait_replaced(1);
2703 }).then([&suite] {
2704 suite.unblock();
2705 return suite.wait_results(2);
2706 }).then([] (ConnResults& results) {
2707 results[0].assert_state_at(conn_state_t::established);
2708 results[0].assert_connect(1, 1, 0, 0);
2709 results[0].assert_accept(0, 0, 0, 1);
2710 results[0].assert_reset(0, 0);
2711 results[1].assert_state_at(conn_state_t::replaced);
2712 results[1].assert_connect(0, 0, 0, 0);
2713 results[1].assert_accept(1, 1, 1, 0);
2714 results[1].assert_reset(0, 0);
2715 });
2716 });
2717 }
2718
2719 seastar::future<>
2720 test_v2_stale_reconnect(FailoverTest& test) {
2721 auto bp = Breakpoint{Tag::SESSION_RECONNECT_OK, bp_type_t::READ};
2722 TestInterceptor interceptor;
2723 interceptor.make_fault({Tag::MESSAGE, bp_type_t::WRITE});
2724 interceptor.make_stall(bp);
2725 return test.run_suite(
2726 fmt::format("test_v2_stale_reconnect -- {}", bp),
2727 interceptor,
2728 policy_t::lossless_peer,
2729 policy_t::lossless_peer,
2730 [&test] (FailoverSuite& suite) {
2731 return seastar::futurize_invoke([&suite] {
2732 return suite.send_peer();
2733 }).then([&suite] {
2734 return suite.connect_peer();
2735 }).then([&suite] {
2736 return suite.wait_blocked();
2737 }).then([&test] {
2738 return test.peer_send_me();
2739 }).then([&suite] {
2740 return suite.wait_replaced(1);
2741 }).then([&suite] {
2742 suite.unblock();
2743 return suite.wait_results(2);
2744 }).then([] (ConnResults& results) {
2745 results[0].assert_state_at(conn_state_t::established);
2746 results[0].assert_connect(2, 1, 1, 1);
2747 results[0].assert_accept(0, 0, 0, 1);
2748 results[0].assert_reset(0, 0);
2749 results[1].assert_state_at(conn_state_t::replaced);
2750 results[1].assert_connect(0, 0, 0, 0);
2751 results[1].assert_accept(1, 0, 1, 0);
2752 results[1].assert_reset(0, 0);
2753 });
2754 });
2755 }
2756
2757 seastar::future<>
2758 test_v2_stale_accept(FailoverTest& test) {
2759 auto bp = Breakpoint{Tag::CLIENT_IDENT, bp_type_t::READ};
2760 TestInterceptor interceptor;
2761 interceptor.make_stall(bp);
2762 return test.run_suite(
2763 fmt::format("test_v2_stale_accept -- {}", bp),
2764 interceptor,
2765 policy_t::lossless_peer,
2766 policy_t::lossless_peer,
2767 [&test] (FailoverSuite& suite) {
2768 return seastar::futurize_invoke([&test] {
2769 return test.peer_connect_me();
2770 }).then([&suite] {
2771 return suite.wait_blocked();
2772 }).then([&test] {
2773 return test.peer_send_me();
2774 }).then([&suite] {
2775 return suite.wait_established();
2776 }).then([&suite] {
2777 suite.unblock();
2778 return suite.wait_results(2);
2779 }).then([] (ConnResults& results) {
2780 results[0].assert_state_at(conn_state_t::closed);
2781 results[0].assert_connect(0, 0, 0, 0);
2782 results[0].assert_accept(1, 1, 0, 0);
2783 results[0].assert_reset(0, 0);
2784 results[1].assert_state_at(conn_state_t::established);
2785 results[1].assert_connect(0, 0, 0, 0);
2786 results[1].assert_accept(1, 1, 0, 1);
2787 results[1].assert_reset(0, 0);
2788 });
2789 });
2790 }
2791
2792 seastar::future<>
2793 test_v2_stale_establishing(FailoverTest& test) {
2794 auto bp = Breakpoint{Tag::SERVER_IDENT, bp_type_t::WRITE};
2795 TestInterceptor interceptor;
2796 interceptor.make_stall(bp);
2797 return test.run_suite(
2798 fmt::format("test_v2_stale_establishing -- {}", bp),
2799 interceptor,
2800 policy_t::lossless_peer,
2801 policy_t::lossless_peer,
2802 [&test] (FailoverSuite& suite) {
2803 return seastar::futurize_invoke([&test] {
2804 return test.peer_connect_me();
2805 }).then([&suite] {
2806 return suite.wait_blocked();
2807 }).then([&test] {
2808 return test.peer_send_me();
2809 }).then([&suite] {
2810 return suite.wait_replaced(1);
2811 }).then([&suite] {
2812 suite.unblock();
2813 return suite.wait_results(2);
2814 }).then([] (ConnResults& results) {
2815 results[0].assert_state_at(conn_state_t::established);
2816 results[0].assert_connect(0, 0, 0, 0);
2817 results[0].assert_accept(1, 1, 0, 2);
2818 results[0].assert_reset(0, 0);
2819 results[1].assert_state_at(conn_state_t::replaced);
2820 results[1].assert_connect(0, 0, 0, 0);
2821 results[1].assert_accept(1, 0);
2822 results[1].assert_reset(0, 0);
2823 });
2824 });
2825 }
2826
2827 seastar::future<>
2828 test_v2_stale_reaccept(FailoverTest& test) {
2829 auto bp = Breakpoint{Tag::SESSION_RECONNECT_OK, bp_type_t::WRITE};
2830 TestInterceptor interceptor;
2831 interceptor.make_fault({Tag::MESSAGE, bp_type_t::READ});
2832 interceptor.make_stall(bp);
2833 return test.run_suite(
2834 fmt::format("test_v2_stale_reaccept -- {}", bp),
2835 interceptor,
2836 policy_t::lossless_peer,
2837 policy_t::lossless_peer,
2838 [&test] (FailoverSuite& suite) {
2839 return seastar::futurize_invoke([&test] {
2840 return test.peer_send_me();
2841 }).then([&test] {
2842 return test.peer_connect_me();
2843 }).then([&suite] {
2844 return suite.wait_blocked();
2845 }).then([] {
2846 logger().info("[Test] block the broken REPLACING for 210ms...");
2847 return seastar::sleep(210ms);
2848 }).then([&suite] {
2849 suite.unblock();
2850 return suite.wait_results(3);
2851 }).then([] (ConnResults& results) {
2852 results[0].assert_state_at(conn_state_t::established);
2853 results[0].assert_connect(0, 0, 0, 0);
2854 results[0].assert_accept(1, 1, 0, 3);
2855 results[0].assert_reset(0, 0);
2856 results[1].assert_state_at(conn_state_t::replaced);
2857 results[1].assert_connect(0, 0, 0, 0);
2858 results[1].assert_accept(1, 0, 1, 0);
2859 results[1].assert_reset(0, 0);
2860 results[2].assert_state_at(conn_state_t::replaced);
2861 results[2].assert_connect(0, 0, 0, 0);
2862 results[2].assert_accept(1, 0);
2863 results[2].assert_reset(0, 0);
2864 ceph_assert(results[2].server_reconnect_attempts >= 1);
2865 });
2866 });
2867 }
2868
2869 seastar::future<>
2870 test_v2_lossy_client(FailoverTest& test) {
2871 return test.run_suite(
2872 "test_v2_lossy_client",
2873 TestInterceptor(),
2874 policy_t::lossy_client,
2875 policy_t::stateless_server,
2876 [&test] (FailoverSuite& suite) {
2877 return seastar::futurize_invoke([&suite] {
2878 logger().info("-- 0 --");
2879 logger().info("[Test] setup connection...");
2880 return suite.connect_peer();
2881 }).then([&test] {
2882 return test.send_bidirectional();
2883 }).then([&suite] {
2884 return suite.wait_results(1);
2885 }).then([] (ConnResults& results) {
2886 results[0].assert_state_at(conn_state_t::established);
2887 results[0].assert_connect(1, 1, 0, 1);
2888 results[0].assert_accept(0, 0, 0, 0);
2889 results[0].assert_reset(0, 0);
2890 }).then([&suite] {
2891 logger().info("-- 1 --");
2892 logger().info("[Test] client markdown...");
2893 return suite.markdown();
2894 }).then([&suite] {
2895 return suite.connect_peer();
2896 }).then([&suite] {
2897 return suite.send_peer();
2898 }).then([&suite] {
2899 return suite.wait_results(2);
2900 }).then([] (ConnResults& results) {
2901 results[0].assert_state_at(conn_state_t::closed);
2902 results[0].assert_connect(1, 1, 0, 1);
2903 results[0].assert_accept(0, 0, 0, 0);
2904 results[0].assert_reset(0, 0);
2905 results[1].assert_state_at(conn_state_t::established);
2906 results[1].assert_connect(1, 1, 0, 1);
2907 results[1].assert_accept(0, 0, 0, 0);
2908 results[1].assert_reset(0, 0);
2909 }).then([&test] {
2910 logger().info("-- 2 --");
2911 logger().info("[Test] server markdown...");
2912 return test.markdown_peer();
2913 }).then([&suite] {
2914 return suite.wait_results(2);
2915 }).then([] (ConnResults& results) {
2916 results[0].assert_state_at(conn_state_t::closed);
2917 results[0].assert_connect(1, 1, 0, 1);
2918 results[0].assert_accept(0, 0, 0, 0);
2919 results[0].assert_reset(0, 0);
2920 results[1].assert_state_at(conn_state_t::closed);
2921 results[1].assert_connect(1, 1, 0, 1);
2922 results[1].assert_accept(0, 0, 0, 0);
2923 results[1].assert_reset(1, 0);
2924 }).then([&suite] {
2925 logger().info("-- 3 --");
2926 logger().info("[Test] client reconnect...");
2927 return suite.connect_peer();
2928 }).then([&suite] {
2929 return suite.send_peer();
2930 }).then([&suite] {
2931 return suite.wait_results(3);
2932 }).then([] (ConnResults& results) {
2933 results[0].assert_state_at(conn_state_t::closed);
2934 results[0].assert_connect(1, 1, 0, 1);
2935 results[0].assert_accept(0, 0, 0, 0);
2936 results[0].assert_reset(0, 0);
2937 results[1].assert_state_at(conn_state_t::closed);
2938 results[1].assert_connect(1, 1, 0, 1);
2939 results[1].assert_accept(0, 0, 0, 0);
2940 results[1].assert_reset(1, 0);
2941 results[2].assert_state_at(conn_state_t::established);
2942 results[2].assert_connect(1, 1, 0, 1);
2943 results[2].assert_accept(0, 0, 0, 0);
2944 results[2].assert_reset(0, 0);
2945 });
2946 });
2947 }
2948
2949 seastar::future<>
2950 test_v2_stateless_server(FailoverTest& test) {
2951 return test.run_suite(
2952 "test_v2_stateless_server",
2953 TestInterceptor(),
2954 policy_t::stateless_server,
2955 policy_t::lossy_client,
2956 [&test] (FailoverSuite& suite) {
2957 return seastar::futurize_invoke([&test] {
2958 logger().info("-- 0 --");
2959 logger().info("[Test] setup connection...");
2960 return test.peer_connect_me();
2961 }).then([&test] {
2962 return test.send_bidirectional();
2963 }).then([&suite] {
2964 return suite.wait_results(1);
2965 }).then([] (ConnResults& results) {
2966 results[0].assert_state_at(conn_state_t::established);
2967 results[0].assert_connect(0, 0, 0, 0);
2968 results[0].assert_accept(1, 1, 0, 1);
2969 results[0].assert_reset(0, 0);
2970 }).then([&test] {
2971 logger().info("-- 1 --");
2972 logger().info("[Test] client markdown...");
2973 return test.markdown_peer();
2974 }).then([&test] {
2975 return test.peer_connect_me();
2976 }).then([&test] {
2977 return test.peer_send_me();
2978 }).then([&suite] {
2979 return suite.wait_results(2);
2980 }).then([] (ConnResults& results) {
2981 results[0].assert_state_at(conn_state_t::closed);
2982 results[0].assert_connect(0, 0, 0, 0);
2983 results[0].assert_accept(1, 1, 0, 1);
2984 results[0].assert_reset(1, 0);
2985 results[1].assert_state_at(conn_state_t::established);
2986 results[1].assert_connect(0, 0, 0, 0);
2987 results[1].assert_accept(1, 1, 0, 1);
2988 results[1].assert_reset(0, 0);
2989 }).then([&suite] {
2990 logger().info("-- 2 --");
2991 logger().info("[Test] server markdown...");
2992 return suite.markdown();
2993 }).then([&suite] {
2994 return suite.wait_results(2);
2995 }).then([] (ConnResults& results) {
2996 results[0].assert_state_at(conn_state_t::closed);
2997 results[0].assert_connect(0, 0, 0, 0);
2998 results[0].assert_accept(1, 1, 0, 1);
2999 results[0].assert_reset(1, 0);
3000 results[1].assert_state_at(conn_state_t::closed);
3001 results[1].assert_connect(0, 0, 0, 0);
3002 results[1].assert_accept(1, 1, 0, 1);
3003 results[1].assert_reset(0, 0);
3004 }).then([&test] {
3005 logger().info("-- 3 --");
3006 logger().info("[Test] client reconnect...");
3007 return test.peer_connect_me();
3008 }).then([&test] {
3009 return test.peer_send_me();
3010 }).then([&suite] {
3011 return suite.wait_results(3);
3012 }).then([] (ConnResults& results) {
3013 results[0].assert_state_at(conn_state_t::closed);
3014 results[0].assert_connect(0, 0, 0, 0);
3015 results[0].assert_accept(1, 1, 0, 1);
3016 results[0].assert_reset(1, 0);
3017 results[1].assert_state_at(conn_state_t::closed);
3018 results[1].assert_connect(0, 0, 0, 0);
3019 results[1].assert_accept(1, 1, 0, 1);
3020 results[1].assert_reset(0, 0);
3021 results[2].assert_state_at(conn_state_t::established);
3022 results[2].assert_connect(0, 0, 0, 0);
3023 results[2].assert_accept(1, 1, 0, 1);
3024 results[2].assert_reset(0, 0);
3025 });
3026 });
3027 }
3028
3029 seastar::future<>
3030 test_v2_lossless_client(FailoverTest& test) {
3031 return test.run_suite(
3032 "test_v2_lossless_client",
3033 TestInterceptor(),
3034 policy_t::lossless_client,
3035 policy_t::stateful_server,
3036 [&test] (FailoverSuite& suite) {
3037 return seastar::futurize_invoke([&suite] {
3038 logger().info("-- 0 --");
3039 logger().info("[Test] setup connection...");
3040 return suite.connect_peer();
3041 }).then([&test] {
3042 return test.send_bidirectional();
3043 }).then([&suite] {
3044 return suite.wait_results(1);
3045 }).then([] (ConnResults& results) {
3046 results[0].assert_state_at(conn_state_t::established);
3047 results[0].assert_connect(1, 1, 0, 1);
3048 results[0].assert_accept(0, 0, 0, 0);
3049 results[0].assert_reset(0, 0);
3050 }).then([&suite] {
3051 logger().info("-- 1 --");
3052 logger().info("[Test] client markdown...");
3053 return suite.markdown();
3054 }).then([&suite] {
3055 return suite.connect_peer();
3056 }).then([&suite] {
3057 return suite.send_peer();
3058 }).then([&suite] {
3059 return suite.wait_results(2);
3060 }).then([] (ConnResults& results) {
3061 results[0].assert_state_at(conn_state_t::closed);
3062 results[0].assert_connect(1, 1, 0, 1);
3063 results[0].assert_accept(0, 0, 0, 0);
3064 results[0].assert_reset(0, 0);
3065 results[1].assert_state_at(conn_state_t::established);
3066 results[1].assert_connect(1, 1, 0, 1);
3067 results[1].assert_accept(0, 0, 0, 0);
3068 results[1].assert_reset(0, 0);
3069 }).then([&test] {
3070 logger().info("-- 2 --");
3071 logger().info("[Test] server markdown...");
3072 return test.markdown_peer();
3073 }).then([&suite] {
3074 return suite.wait_results(2);
3075 }).then([] (ConnResults& results) {
3076 results[0].assert_state_at(conn_state_t::closed);
3077 results[0].assert_connect(1, 1, 0, 1);
3078 results[0].assert_accept(0, 0, 0, 0);
3079 results[0].assert_reset(0, 0);
3080 results[1].assert_state_at(conn_state_t::established);
3081 results[1].assert_connect(2, 2, 1, 2);
3082 results[1].assert_accept(0, 0, 0, 0);
3083 results[1].assert_reset(0, 1);
3084 }).then([&suite] {
3085 logger().info("-- 3 --");
3086 logger().info("[Test] client reconnect...");
3087 return suite.connect_peer();
3088 }).then([&suite] {
3089 return suite.send_peer();
3090 }).then([&suite] {
3091 return suite.wait_results(2);
3092 }).then([] (ConnResults& results) {
3093 results[0].assert_state_at(conn_state_t::closed);
3094 results[0].assert_connect(1, 1, 0, 1);
3095 results[0].assert_accept(0, 0, 0, 0);
3096 results[0].assert_reset(0, 0);
3097 results[1].assert_state_at(conn_state_t::established);
3098 results[1].assert_connect(2, 2, 1, 2);
3099 results[1].assert_accept(0, 0, 0, 0);
3100 results[1].assert_reset(0, 1);
3101 });
3102 });
3103 }
3104
3105 seastar::future<>
3106 test_v2_stateful_server(FailoverTest& test) {
3107 return test.run_suite(
3108 "test_v2_stateful_server",
3109 TestInterceptor(),
3110 policy_t::stateful_server,
3111 policy_t::lossless_client,
3112 [&test] (FailoverSuite& suite) {
3113 return seastar::futurize_invoke([&test] {
3114 logger().info("-- 0 --");
3115 logger().info("[Test] setup connection...");
3116 return test.peer_connect_me();
3117 }).then([&test] {
3118 return test.send_bidirectional();
3119 }).then([&suite] {
3120 return suite.wait_results(1);
3121 }).then([] (ConnResults& results) {
3122 results[0].assert_state_at(conn_state_t::established);
3123 results[0].assert_connect(0, 0, 0, 0);
3124 results[0].assert_accept(1, 1, 0, 1);
3125 results[0].assert_reset(0, 0);
3126 }).then([&test] {
3127 logger().info("-- 1 --");
3128 logger().info("[Test] client markdown...");
3129 return test.markdown_peer();
3130 }).then([&test] {
3131 return test.peer_connect_me();
3132 }).then([&test] {
3133 return test.peer_send_me();
3134 }).then([&suite] {
3135 return suite.wait_results(2);
3136 }).then([] (ConnResults& results) {
3137 results[0].assert_state_at(conn_state_t::established);
3138 results[0].assert_connect(0, 0, 0, 0);
3139 results[0].assert_accept(1, 1, 0, 2);
3140 results[0].assert_reset(0, 1);
3141 results[1].assert_state_at(conn_state_t::replaced);
3142 results[1].assert_connect(0, 0, 0, 0);
3143 results[1].assert_accept(1, 1, 0, 0);
3144 results[1].assert_reset(0, 0);
3145 }).then([&suite] {
3146 logger().info("-- 2 --");
3147 logger().info("[Test] server markdown...");
3148 return suite.markdown();
3149 }).then([&suite] {
3150 return suite.wait_results(3);
3151 }).then([] (ConnResults& results) {
3152 results[0].assert_state_at(conn_state_t::closed);
3153 results[0].assert_connect(0, 0, 0, 0);
3154 results[0].assert_accept(1, 1, 0, 2);
3155 results[0].assert_reset(0, 1);
3156 results[1].assert_state_at(conn_state_t::replaced);
3157 results[1].assert_connect(0, 0, 0, 0);
3158 results[1].assert_accept(1, 1, 0, 0);
3159 results[1].assert_reset(0, 0);
3160 results[2].assert_state_at(conn_state_t::established);
3161 results[2].assert_connect(0, 0, 0, 0);
3162 results[2].assert_accept(1, 1, 1, 1);
3163 results[2].assert_reset(0, 0);
3164 }).then([&test] {
3165 logger().info("-- 3 --");
3166 logger().info("[Test] client reconnect...");
3167 return test.peer_connect_me();
3168 }).then([&test] {
3169 return test.peer_send_me();
3170 }).then([&suite] {
3171 return suite.wait_results(3);
3172 }).then([] (ConnResults& results) {
3173 results[0].assert_state_at(conn_state_t::closed);
3174 results[0].assert_connect(0, 0, 0, 0);
3175 results[0].assert_accept(1, 1, 0, 2);
3176 results[0].assert_reset(0, 1);
3177 results[1].assert_state_at(conn_state_t::replaced);
3178 results[1].assert_connect(0, 0, 0, 0);
3179 results[1].assert_accept(1, 1, 0, 0);
3180 results[1].assert_reset(0, 0);
3181 results[2].assert_state_at(conn_state_t::established);
3182 results[2].assert_connect(0, 0, 0, 0);
3183 results[2].assert_accept(1, 1, 1, 1);
3184 results[2].assert_reset(0, 0);
3185 });
3186 });
3187 }
3188
3189 seastar::future<>
3190 test_v2_peer_reuse_connector(FailoverTest& test) {
3191 return test.run_suite(
3192 "test_v2_peer_reuse_connector",
3193 TestInterceptor(),
3194 policy_t::lossless_peer_reuse,
3195 policy_t::lossless_peer_reuse,
3196 [&test] (FailoverSuite& suite) {
3197 return seastar::futurize_invoke([&suite] {
3198 logger().info("-- 0 --");
3199 logger().info("[Test] setup connection...");
3200 return suite.connect_peer();
3201 }).then([&test] {
3202 return test.send_bidirectional();
3203 }).then([&suite] {
3204 return suite.wait_results(1);
3205 }).then([] (ConnResults& results) {
3206 results[0].assert_state_at(conn_state_t::established);
3207 results[0].assert_connect(1, 1, 0, 1);
3208 results[0].assert_accept(0, 0, 0, 0);
3209 results[0].assert_reset(0, 0);
3210 }).then([&suite] {
3211 logger().info("-- 1 --");
3212 logger().info("[Test] connector markdown...");
3213 return suite.markdown();
3214 }).then([&suite] {
3215 return suite.connect_peer();
3216 }).then([&suite] {
3217 return suite.send_peer();
3218 }).then([&suite] {
3219 return suite.wait_results(2);
3220 }).then([] (ConnResults& results) {
3221 results[0].assert_state_at(conn_state_t::closed);
3222 results[0].assert_connect(1, 1, 0, 1);
3223 results[0].assert_accept(0, 0, 0, 0);
3224 results[0].assert_reset(0, 0);
3225 results[1].assert_state_at(conn_state_t::established);
3226 results[1].assert_connect(1, 1, 0, 1);
3227 results[1].assert_accept(0, 0, 0, 0);
3228 results[1].assert_reset(0, 0);
3229 }).then([&test] {
3230 logger().info("-- 2 --");
3231 logger().info("[Test] acceptor markdown...");
3232 return test.markdown_peer();
3233 }).then([&suite] {
3234 ceph_assert(suite.is_standby());
3235 logger().info("-- 3 --");
3236 logger().info("[Test] connector reconnect...");
3237 return suite.connect_peer();
3238 }).then([&suite] {
3239 return suite.try_send_peer();
3240 }).then([&suite] {
3241 return suite.wait_results(2);
3242 }).then([] (ConnResults& results) {
3243 results[0].assert_state_at(conn_state_t::closed);
3244 results[0].assert_connect(1, 1, 0, 1);
3245 results[0].assert_accept(0, 0, 0, 0);
3246 results[0].assert_reset(0, 0);
3247 results[1].assert_state_at(conn_state_t::established);
3248 results[1].assert_connect(2, 2, 1, 2);
3249 results[1].assert_accept(0, 0, 0, 0);
3250 results[1].assert_reset(0, 1);
3251 });
3252 });
3253 }
3254
3255 seastar::future<>
3256 test_v2_peer_reuse_acceptor(FailoverTest& test) {
3257 return test.run_suite(
3258 "test_v2_peer_reuse_acceptor",
3259 TestInterceptor(),
3260 policy_t::lossless_peer_reuse,
3261 policy_t::lossless_peer_reuse,
3262 [&test] (FailoverSuite& suite) {
3263 return seastar::futurize_invoke([&test] {
3264 logger().info("-- 0 --");
3265 logger().info("[Test] setup connection...");
3266 return test.peer_connect_me();
3267 }).then([&test] {
3268 return test.send_bidirectional();
3269 }).then([&suite] {
3270 return suite.wait_results(1);
3271 }).then([] (ConnResults& results) {
3272 results[0].assert_state_at(conn_state_t::established);
3273 results[0].assert_connect(0, 0, 0, 0);
3274 results[0].assert_accept(1, 1, 0, 1);
3275 results[0].assert_reset(0, 0);
3276 }).then([&test] {
3277 logger().info("-- 1 --");
3278 logger().info("[Test] connector markdown...");
3279 return test.markdown_peer();
3280 }).then([&test] {
3281 return test.peer_connect_me();
3282 }).then([&test] {
3283 return test.peer_send_me();
3284 }).then([&suite] {
3285 return suite.wait_results(2);
3286 }).then([] (ConnResults& results) {
3287 results[0].assert_state_at(conn_state_t::established);
3288 results[0].assert_connect(0, 0, 0, 0);
3289 results[0].assert_accept(1, 1, 0, 2);
3290 results[0].assert_reset(0, 1);
3291 results[1].assert_state_at(conn_state_t::replaced);
3292 results[1].assert_connect(0, 0, 0, 0);
3293 results[1].assert_accept(1, 1, 0, 0);
3294 results[1].assert_reset(0, 0);
3295 }).then([&suite] {
3296 logger().info("-- 2 --");
3297 logger().info("[Test] acceptor markdown...");
3298 return suite.markdown();
3299 }).then([&suite] {
3300 return suite.wait_results(2);
3301 }).then([] (ConnResults& results) {
3302 results[0].assert_state_at(conn_state_t::closed);
3303 results[0].assert_connect(0, 0, 0, 0);
3304 results[0].assert_accept(1, 1, 0, 2);
3305 results[0].assert_reset(0, 1);
3306 results[1].assert_state_at(conn_state_t::replaced);
3307 results[1].assert_connect(0, 0, 0, 0);
3308 results[1].assert_accept(1, 1, 0, 0);
3309 results[1].assert_reset(0, 0);
3310 }).then([&test] {
3311 logger().info("-- 3 --");
3312 logger().info("[Test] connector reconnect...");
3313 return test.peer_connect_me();
3314 }).then([&test] {
3315 return test.try_peer_send_me();
3316 }).then([&suite] {
3317 return suite.wait_results(3);
3318 }).then([] (ConnResults& results) {
3319 results[0].assert_state_at(conn_state_t::closed);
3320 results[0].assert_connect(0, 0, 0, 0);
3321 results[0].assert_accept(1, 1, 0, 2);
3322 results[0].assert_reset(0, 1);
3323 results[1].assert_state_at(conn_state_t::replaced);
3324 results[1].assert_connect(0, 0, 0, 0);
3325 results[1].assert_accept(1, 1, 0, 0);
3326 results[1].assert_reset(0, 0);
3327 results[2].assert_state_at(conn_state_t::established);
3328 results[2].assert_connect(0, 0, 0, 0);
3329 results[2].assert_accept(1, 1, 1, 1);
3330 results[2].assert_reset(0, 0);
3331 });
3332 });
3333 }
3334
3335 seastar::future<>
3336 test_v2_lossless_peer_connector(FailoverTest& test) {
3337 return test.run_suite(
3338 "test_v2_lossless_peer_connector",
3339 TestInterceptor(),
3340 policy_t::lossless_peer,
3341 policy_t::lossless_peer,
3342 [&test] (FailoverSuite& suite) {
3343 return seastar::futurize_invoke([&suite] {
3344 logger().info("-- 0 --");
3345 logger().info("[Test] setup connection...");
3346 return suite.connect_peer();
3347 }).then([&test] {
3348 return test.send_bidirectional();
3349 }).then([&suite] {
3350 return suite.wait_results(1);
3351 }).then([] (ConnResults& results) {
3352 results[0].assert_state_at(conn_state_t::established);
3353 results[0].assert_connect(1, 1, 0, 1);
3354 results[0].assert_accept(0, 0, 0, 0);
3355 results[0].assert_reset(0, 0);
3356 }).then([&suite] {
3357 logger().info("-- 1 --");
3358 logger().info("[Test] connector markdown...");
3359 return suite.markdown();
3360 }).then([&suite] {
3361 return suite.connect_peer();
3362 }).then([&suite] {
3363 return suite.send_peer();
3364 }).then([&suite] {
3365 return suite.wait_results(2);
3366 }).then([] (ConnResults& results) {
3367 results[0].assert_state_at(conn_state_t::closed);
3368 results[0].assert_connect(1, 1, 0, 1);
3369 results[0].assert_accept(0, 0, 0, 0);
3370 results[0].assert_reset(0, 0);
3371 results[1].assert_state_at(conn_state_t::established);
3372 results[1].assert_connect(1, 1, 0, 1);
3373 results[1].assert_accept(0, 0, 0, 0);
3374 results[1].assert_reset(0, 0);
3375 }).then([&test] {
3376 logger().info("-- 2 --");
3377 logger().info("[Test] acceptor markdown...");
3378 return test.markdown_peer();
3379 }).then([&suite] {
3380 ceph_assert(suite.is_standby());
3381 logger().info("-- 3 --");
3382 logger().info("[Test] connector reconnect...");
3383 return suite.connect_peer();
3384 }).then([&suite] {
3385 return suite.try_send_peer();
3386 }).then([&suite] {
3387 return suite.wait_results(2);
3388 }).then([] (ConnResults& results) {
3389 results[0].assert_state_at(conn_state_t::closed);
3390 results[0].assert_connect(1, 1, 0, 1);
3391 results[0].assert_accept(0, 0, 0, 0);
3392 results[0].assert_reset(0, 0);
3393 results[1].assert_state_at(conn_state_t::established);
3394 results[1].assert_connect(2, 2, 1, 2);
3395 results[1].assert_accept(0, 0, 0, 0);
3396 results[1].assert_reset(0, 1);
3397 });
3398 });
3399 }
3400
3401 seastar::future<>
3402 test_v2_lossless_peer_acceptor(FailoverTest& test) {
3403 return test.run_suite(
3404 "test_v2_lossless_peer_acceptor",
3405 TestInterceptor(),
3406 policy_t::lossless_peer,
3407 policy_t::lossless_peer,
3408 [&test] (FailoverSuite& suite) {
3409 return seastar::futurize_invoke([&test] {
3410 logger().info("-- 0 --");
3411 logger().info("[Test] setup connection...");
3412 return test.peer_connect_me();
3413 }).then([&test] {
3414 return test.send_bidirectional();
3415 }).then([&suite] {
3416 return suite.wait_results(1);
3417 }).then([] (ConnResults& results) {
3418 results[0].assert_state_at(conn_state_t::established);
3419 results[0].assert_connect(0, 0, 0, 0);
3420 results[0].assert_accept(1, 1, 0, 1);
3421 results[0].assert_reset(0, 0);
3422 }).then([&test] {
3423 logger().info("-- 1 --");
3424 logger().info("[Test] connector markdown...");
3425 return test.markdown_peer();
3426 }).then([&test] {
3427 return test.peer_connect_me();
3428 }).then([&test] {
3429 return test.peer_send_me();
3430 }).then([&suite] {
3431 return suite.wait_results(2);
3432 }).then([] (ConnResults& results) {
3433 results[0].assert_state_at(conn_state_t::established);
3434 results[0].assert_connect(0, 0, 0, 0);
3435 results[0].assert_accept(1, 1, 0, 2);
3436 results[0].assert_reset(0, 0);
3437 results[1].assert_state_at(conn_state_t::replaced);
3438 results[1].assert_connect(0, 0, 0, 0);
3439 results[1].assert_accept(1, 1, 0, 0);
3440 results[1].assert_reset(0, 0);
3441 }).then([&suite] {
3442 logger().info("-- 2 --");
3443 logger().info("[Test] acceptor markdown...");
3444 return suite.markdown();
3445 }).then([&suite] {
3446 return suite.wait_results(2);
3447 }).then([] (ConnResults& results) {
3448 results[0].assert_state_at(conn_state_t::closed);
3449 results[0].assert_connect(0, 0, 0, 0);
3450 results[0].assert_accept(1, 1, 0, 2);
3451 results[0].assert_reset(0, 0);
3452 results[1].assert_state_at(conn_state_t::replaced);
3453 results[1].assert_connect(0, 0, 0, 0);
3454 results[1].assert_accept(1, 1, 0, 0);
3455 results[1].assert_reset(0, 0);
3456 }).then([&test] {
3457 logger().info("-- 3 --");
3458 logger().info("[Test] connector reconnect...");
3459 return test.peer_connect_me();
3460 }).then([&test] {
3461 return test.try_peer_send_me();
3462 }).then([&suite] {
3463 return suite.wait_results(3);
3464 }).then([] (ConnResults& results) {
3465 results[0].assert_state_at(conn_state_t::closed);
3466 results[0].assert_connect(0, 0, 0, 0);
3467 results[0].assert_accept(1, 1, 0, 2);
3468 results[0].assert_reset(0, 0);
3469 results[1].assert_state_at(conn_state_t::replaced);
3470 results[1].assert_connect(0, 0, 0, 0);
3471 results[1].assert_accept(1, 1, 0, 0);
3472 results[1].assert_reset(0, 0);
3473 results[2].assert_state_at(conn_state_t::established);
3474 results[2].assert_connect(0, 0, 0, 0);
3475 results[2].assert_accept(1, 1, 1, 1);
3476 results[2].assert_reset(0, 0);
3477 });
3478 });
3479 }
3480
3481 seastar::future<>
3482 test_v2_protocol(entity_addr_t test_addr,
3483 entity_addr_t cmd_peer_addr,
3484 entity_addr_t test_peer_addr,
3485 bool test_peer_islocal,
3486 bool peer_wins) {
3487 ceph_assert_always(test_addr.is_msgr2());
3488 ceph_assert_always(cmd_peer_addr.is_msgr2());
3489 ceph_assert_always(test_peer_addr.is_msgr2());
3490
3491 if (test_peer_islocal) {
3492 // initiate crimson test peer locally
3493 logger().info("test_v2_protocol: start local TestPeer at {}...", cmd_peer_addr);
3494 return FailoverTestPeer::create(cmd_peer_addr, test_peer_addr
3495 ).then([test_addr, cmd_peer_addr, test_peer_addr, peer_wins](auto peer) {
3496 return test_v2_protocol(
3497 test_addr,
3498 cmd_peer_addr,
3499 test_peer_addr,
3500 false,
3501 peer_wins
3502 ).then([peer = std::move(peer)] () mutable {
3503 return peer->wait().then([peer = std::move(peer)] {});
3504 });
3505 }).handle_exception([] (auto eptr) {
3506 logger().error("FailoverTestPeer failed: got exception {}", eptr);
3507 throw;
3508 });
3509 }
3510
3511 return FailoverTest::create(test_addr, cmd_peer_addr, test_peer_addr
3512 ).then([peer_wins](auto test) {
3513 return seastar::futurize_invoke([test] {
3514 return test_v2_lossy_early_connect_fault(*test);
3515 }).then([test] {
3516 return test_v2_lossy_connect_fault(*test);
3517 }).then([test] {
3518 return test_v2_lossy_connected_fault(*test);
3519 }).then([test] {
3520 return test_v2_lossy_early_accept_fault(*test);
3521 }).then([test] {
3522 return test_v2_lossy_accept_fault(*test);
3523 }).then([test] {
3524 return test_v2_lossy_establishing_fault(*test);
3525 }).then([test] {
3526 return test_v2_lossy_accepted_fault(*test);
3527 }).then([test] {
3528 return test_v2_lossless_connect_fault(*test);
3529 }).then([test] {
3530 return test_v2_lossless_connected_fault(*test);
3531 }).then([test] {
3532 return test_v2_lossless_connected_fault2(*test);
3533 }).then([test] {
3534 return test_v2_lossless_reconnect_fault(*test);
3535 }).then([test] {
3536 return test_v2_lossless_accept_fault(*test);
3537 }).then([test] {
3538 return test_v2_lossless_establishing_fault(*test);
3539 }).then([test] {
3540 return test_v2_lossless_accepted_fault(*test);
3541 }).then([test] {
3542 return test_v2_lossless_reaccept_fault(*test);
3543 }).then([test] {
3544 return test_v2_peer_connect_fault(*test);
3545 }).then([test] {
3546 return test_v2_peer_accept_fault(*test);
3547 }).then([test] {
3548 return test_v2_peer_establishing_fault(*test);
3549 }).then([test] {
3550 return test_v2_peer_connected_fault_reconnect(*test);
3551 }).then([test] {
3552 return test_v2_peer_connected_fault_reaccept(*test);
3553 }).then([test] {
3554 return check_peer_wins(*test);
3555 }).then([test, peer_wins](bool ret_peer_wins) {
3556 ceph_assert(peer_wins == ret_peer_wins);
3557 if (ret_peer_wins) {
3558 return seastar::futurize_invoke([test] {
3559 return test_v2_racing_connect_acceptor_win(*test);
3560 }).then([test] {
3561 return test_v2_racing_reconnect_acceptor_win(*test);
3562 });
3563 } else {
3564 return seastar::futurize_invoke([test] {
3565 return test_v2_racing_connect_acceptor_lose(*test);
3566 }).then([test] {
3567 return test_v2_racing_reconnect_acceptor_lose(*test);
3568 });
3569 }
3570 }).then([test] {
3571 return test_v2_racing_connect_reconnect_win(*test);
3572 }).then([test] {
3573 return test_v2_racing_connect_reconnect_lose(*test);
3574 }).then([test] {
3575 return test_v2_stale_connect(*test);
3576 }).then([test] {
3577 return test_v2_stale_reconnect(*test);
3578 }).then([test] {
3579 return test_v2_stale_accept(*test);
3580 }).then([test] {
3581 return test_v2_stale_establishing(*test);
3582 }).then([test] {
3583 return test_v2_stale_reaccept(*test);
3584 }).then([test] {
3585 return test_v2_lossy_client(*test);
3586 }).then([test] {
3587 return test_v2_stateless_server(*test);
3588 }).then([test] {
3589 return test_v2_lossless_client(*test);
3590 }).then([test] {
3591 return test_v2_stateful_server(*test);
3592 }).then([test] {
3593 return test_v2_peer_reuse_connector(*test);
3594 }).then([test] {
3595 return test_v2_peer_reuse_acceptor(*test);
3596 }).then([test] {
3597 return test_v2_lossless_peer_connector(*test);
3598 }).then([test] {
3599 return test_v2_lossless_peer_acceptor(*test);
3600 }).then([test] {
3601 return test->shutdown().then([test] {});
3602 });
3603 }).handle_exception([] (auto eptr) {
3604 logger().error("FailoverTest failed: got exception {}", eptr);
3605 throw;
3606 });
3607 }
3608
3609 }
3610
3611 seastar::future<int> do_test(seastar::app_template& app)
3612 {
3613 std::vector<const char*> args;
3614 std::string cluster;
3615 std::string conf_file_list;
3616 auto init_params = ceph_argparse_early_args(args,
3617 CEPH_ENTITY_TYPE_CLIENT,
3618 &cluster,
3619 &conf_file_list);
3620 return crimson::common::sharded_conf().start(init_params.name, cluster)
3621 .then([conf_file_list] {
3622 return local_conf().parse_config_files(conf_file_list);
3623 }).then([&app] {
3624 auto&& config = app.configuration();
3625 verbose = config["verbose"].as<bool>();
3626 auto rounds = config["rounds"].as<unsigned>();
3627 auto keepalive_ratio = config["keepalive-ratio"].as<double>();
3628 auto testpeer_islocal = config["testpeer-islocal"].as<bool>();
3629
3630 entity_addr_t test_addr;
3631 ceph_assert(test_addr.parse(
3632 config["test-addr"].as<std::string>().c_str(), nullptr));
3633 test_addr.set_nonce(TEST_NONCE);
3634
3635 entity_addr_t cmd_peer_addr;
3636 ceph_assert(cmd_peer_addr.parse(
3637 config["testpeer-addr"].as<std::string>().c_str(), nullptr));
3638 cmd_peer_addr.set_nonce(CMD_SRV_NONCE);
3639
3640 entity_addr_t test_peer_addr = get_test_peer_addr(cmd_peer_addr);
3641 bool peer_wins = (test_addr > test_peer_addr);
3642
3643 logger().info("test configuration: verbose={}, rounds={}, keepalive_ratio={}, "
3644 "test_addr={}, cmd_peer_addr={}, test_peer_addr={}, "
3645 "testpeer_islocal={}, peer_wins={}",
3646 verbose, rounds, keepalive_ratio,
3647 test_addr, cmd_peer_addr, test_peer_addr,
3648 testpeer_islocal, peer_wins);
3649 return test_echo(rounds, keepalive_ratio
3650 ).then([] {
3651 return test_concurrent_dispatch();
3652 }).then([] {
3653 return test_preemptive_shutdown();
3654 }).then([test_addr, cmd_peer_addr, test_peer_addr, testpeer_islocal, peer_wins] {
3655 return test_v2_protocol(
3656 test_addr,
3657 cmd_peer_addr,
3658 test_peer_addr,
3659 testpeer_islocal,
3660 peer_wins);
3661 }).then([] {
3662 logger().info("All tests succeeded");
3663 // Seastar has bugs to have events undispatched during shutdown,
3664 // which will result in memory leak and thus fail LeakSanitizer.
3665 return seastar::sleep(100ms);
3666 });
3667 }).then([] {
3668 return crimson::common::sharded_conf().stop();
3669 }).then([] {
3670 return 0;
3671 }).handle_exception([] (auto eptr) {
3672 logger().error("Test failed: got exception {}", eptr);
3673 return 1;
3674 });
3675 }
3676
3677 int main(int argc, char** argv)
3678 {
3679 seastar::app_template app;
3680 app.add_options()
3681 ("verbose,v", bpo::value<bool>()->default_value(false),
3682 "chatty if true")
3683 ("rounds", bpo::value<unsigned>()->default_value(512),
3684 "number of pingpong rounds")
3685 ("keepalive-ratio", bpo::value<double>()->default_value(0.1),
3686 "ratio of keepalive in ping messages")
3687 ("test-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9014"),
3688 "address of v2 failover tests")
3689 ("testpeer-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9012"),
3690 "addresses of v2 failover testpeer"
3691 " (This is CmdSrv address, and TestPeer address is at port+=1)")
3692 ("testpeer-islocal", bpo::value<bool>()->default_value(true),
3693 "create a local crimson testpeer, or connect to a remote testpeer");
3694 return app.run(argc, argv, [&app] {
3695 // This test normally succeeds within 60 seconds, so kill it after 300
3696 // seconds in case it is blocked forever due to unaddressed bugs.
3697 return seastar::with_timeout(seastar::lowres_clock::now() + 300s, do_test(app))
3698 .handle_exception_type([](seastar::timed_out_error&) {
3699 logger().error("test_messenger timeout after 300s, abort! "
3700 "Consider to extend the period if the test is still running.");
3701 // use the retcode of timeout(1)
3702 return 124;
3703 });
3704 });
3705 }