#include <atomic>
#include <iostream>
+#include <memory>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
delete srv_interceptor;
}
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ * The first (A -> B) connection gets to message flow handshake, the
+ * second (B -> A) connection is stuck waiting for a banner from A.
+ * After A sends client_ident to B, the first connection wins and B
+ * calls reuse_connection() to replace the second connection's socket
+ * while the second connection is still in BANNER_CONNECTING.
+ */
+TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) {
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+ auto cli_interceptor = std::make_unique<TestInterceptor>();
+ auto srv_interceptor = std::make_unique<TestInterceptor>();
+
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT,
+ Messenger::Policy::lossless_peer_reuse(0));
+ server_msgr->interceptor = srv_interceptor.get();
+
+ client_msgr->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::lossless_peer_reuse(0));
+ client_msgr->interceptor = cli_interceptor.get();
+
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1:3300");
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+
+ bind_addr.parse("v2:127.0.0.1:3301");
+ client_msgr->bind(bind_addr);
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ // pause before sending client_ident message
+ srv_interceptor->breakpoint(11);
+
+ ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
+ MPing *m1 = new MPing();
+ ASSERT_EQ(s2c->send_message(m1), 0);
+
+ srv_interceptor->wait(11);
+ srv_interceptor->remove_bp(11);
+
+ // pause before sending banner
+ cli_interceptor->breakpoint(3);
+
+ ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ MPing *m2 = new MPing();
+ ASSERT_EQ(c2s->send_message(m2), 0);
+
+ cli_interceptor->wait(3);
+ cli_interceptor->remove_bp(3);
+
+ // second connection is in BANNER_CONNECTING, ensure it stays so
+ // and send client_ident
+ srv_interceptor->breakpoint(4);
+ srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+
+ // handle client_ident -- triggers reuse_connection() with exproto
+ // in BANNER_CONNECTING
+ cli_interceptor->breakpoint(15);
+ cli_interceptor->proceed(3, Interceptor::ACTION::CONTINUE);
+
+ cli_interceptor->wait(15);
+ cli_interceptor->remove_bp(15);
+
+ // first connection is in READY
+ Connection *s2c_accepter = srv_interceptor->wait(4);
+ srv_interceptor->remove_bp(4);
+
+ srv_interceptor->proceed(4, Interceptor::ACTION::CONTINUE);
+ cli_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+ {
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ {
+ std::unique_lock l{srv_dispatcher.lock};
+ srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
+ srv_dispatcher.got_new = false;
+ }
+
+ EXPECT_TRUE(s2c->is_connected());
+ EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+ EXPECT_TRUE(s2c->peer_is_client());
+
+ EXPECT_TRUE(c2s->is_connected());
+ EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+ EXPECT_TRUE(c2s->peer_is_osd());
+
+ // closed in reuse_connection() -- EPIPE when writing banner/hello
+ EXPECT_FALSE(s2c_accepter->is_connected());
+
+ // established exactly once, never faulted and reconnected
+ EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 1), 1u);
+ EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 13), 0u);
+ EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 15), 1u);
+
+ client_msgr->shutdown();
+ client_msgr->wait();
+ server_msgr->shutdown();
+ server_msgr->wait();
+}
+
/**
* Scenario:
* - A connects to B