- client2->dispatch_pingpong(server1->msgr->get_myaddr()));
- // shutdown
- }).then_unpack([] {
- return seastar::now();
- }).then([client1] {
- logger().info("client1 shutdown...");
- return client1->shutdown();
- }).then([client2] {
- logger().info("client2 shutdown...");
- return client2->shutdown();
- }).then([server1] {
- logger().info("server1 shutdown...");
- return server1->shutdown();
- }).then([server2] {
- logger().info("server2 shutdown...");
- return server2->shutdown();
- }).then([] {
- logger().info("test_echo() done!\n");
- }).handle_exception([server1, server2, client1, client2] (auto eptr) {
- logger().error("test_echo() failed: got exception {}", eptr);
- throw;
- });
-}
-
-static seastar::future<> test_concurrent_dispatch()
-{
- struct test_state {
- struct Server final
- : public crimson::net::Dispatcher {
- crimson::net::MessengerRef msgr;
- int count = 0;
- seastar::promise<> on_second; // satisfied on second dispatch
- seastar::promise<> on_done; // satisfied when first dispatch unblocks
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- std::optional<seastar::future<>> ms_dispatch(
- crimson::net::ConnectionRef, MessageRef m) override {
- switch (++count) {
- case 1:
- // block on the first request until we reenter with the second
- std::ignore = on_second.get_future().then([this] { on_done.set_value(); });
- break;
- case 2:
- on_second.set_value();
- break;
- default:
- throw std::runtime_error("unexpected count");
- }
- return {seastar::now()};
- }
-
- seastar::future<> wait() { return on_done.get_future(); }
-
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce,
- const entity_addr_t& addr) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
- msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
- return msgr->start({this});
- }, crimson::net::Messenger::bind_ertr::all_same_way(
- [addr] (const std::error_code& e) {
- logger().error("test_concurrent_dispatch(): "
- "there is another instance running at {}", addr);
- ceph_abort();
- }));
- }
- };
-
- struct Client final
- : public crimson::net::Dispatcher {
- crimson::net::MessengerRef msgr;
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- std::optional<seastar::future<>> ms_dispatch(
- crimson::net::ConnectionRef, MessageRef m) override {
- return {seastar::now()};
- }
-
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce) {
- msgr = crimson::net::Messenger::create(name, lname, nonce);
- msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->start({this});
- }
- };
- };
-
- logger().info("test_concurrent_dispatch():");
- auto server = seastar::make_shared<test_state::Server>();
- auto client = seastar::make_shared<test_state::Client>();
- auto addr = get_server_addr();
- addr.set_type(entity_addr_t::TYPE_MSGR2);
- addr.set_family(AF_INET);
- return seastar::when_all_succeed(
- server->init(entity_name_t::OSD(4), "server3", 5, addr),
- client->init(entity_name_t::OSD(5), "client3", 6)
- ).then_unpack([server, client] {
- auto conn = client->msgr->connect(server->msgr->get_myaddr(),
- entity_name_t::TYPE_OSD);
- // send two messages
- return conn->send(crimson::make_message<MPing>()).then([conn] {
- return conn->send(crimson::make_message<MPing>());
+ client2->dispatch_pingpong(server1->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server2->msgr->get_myaddr()));
+ // shutdown
+ }).then_unpack([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).then([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).then([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).then([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
+ }).then([] {
+ logger().info("test_echo() done!\n");
+ }).handle_exception([](auto eptr) {
+ logger().error("test_echo() failed: got exception {}", eptr);
+ throw;
+ }).finally([gates, server1, server2] {
+ return gates->close();