1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
25 #include "common/ceph_mutex.h"
26 #include "common/ceph_argparse.h"
27 #include "global/global_init.h"
28 #include "msg/Dispatcher.h"
29 #include "msg/msg_types.h"
30 #include "msg/Message.h"
31 #include "msg/Messenger.h"
32 #include "msg/Connection.h"
33 #include "messages/MPing.h"
34 #include "messages/MCommand.h"
36 #include <boost/random/mersenne_twister.hpp>
37 #include <boost/random/uniform_int.hpp>
38 #include <boost/random/binomial_distribution.hpp>
39 #include <gtest/gtest.h>
41 typedef boost::mt11213b gen_type
;
43 #include "common/dout.h"
44 #include "include/ceph_assert.h"
46 #include "auth/DummyAuth.h"
48 #define dout_subsys ceph_subsys_ms
50 #define dout_prefix *_dout << " ceph_test_msgr "
53 #define CHECK_AND_WAIT_TRUE(expr) do { \
62 class MessengerTest
: public ::testing::TestWithParam
<const char*> {
64 DummyAuthClientServer dummy_auth
;
65 Messenger
*server_msgr
;
66 Messenger
*client_msgr
;
68 MessengerTest() : dummy_auth(g_ceph_context
),
69 server_msgr(NULL
), client_msgr(NULL
) {
70 dummy_auth
.auth_registry
.refresh_config();
72 void SetUp() override
{
73 lderr(g_ceph_context
) << __func__
<< " start set up " << GetParam() << dendl
;
74 server_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
75 client_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
76 server_msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
77 client_msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
78 server_msgr
->set_auth_client(&dummy_auth
);
79 server_msgr
->set_auth_server(&dummy_auth
);
80 client_msgr
->set_auth_client(&dummy_auth
);
81 client_msgr
->set_auth_server(&dummy_auth
);
82 server_msgr
->set_require_authorizer(false);
84 void TearDown() override
{
85 ASSERT_EQ(server_msgr
->get_dispatch_queue_len(), 0);
86 ASSERT_EQ(client_msgr
->get_dispatch_queue_len(), 0);
94 class FakeDispatcher
: public Dispatcher
{
96 struct Session
: public RefCountedObject
{
97 atomic
<uint64_t> count
;
100 explicit Session(ConnectionRef c
): RefCountedObject(g_ceph_context
), count(0), con(c
) {
102 uint64_t get_count() { return count
; }
105 ceph::mutex lock
= ceph::make_mutex("FakeDispatcher::lock");
106 ceph::condition_variable cond
;
109 bool got_remote_reset
;
112 entity_addrvec_t last_accept
;
113 ConnectionRef
*last_accept_con_ptr
= nullptr;
115 explicit FakeDispatcher(bool s
): Dispatcher(g_ceph_context
),
116 is_server(s
), got_new(false), got_remote_reset(false),
117 got_connect(false), loopback(false) {
119 bool ms_can_fast_dispatch_any() const override
{ return true; }
120 bool ms_can_fast_dispatch(const Message
*m
) const override
{
121 switch (m
->get_type()) {
129 void ms_handle_fast_connect(Connection
*con
) override
{
130 std::scoped_lock l
{lock
};
131 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
132 auto s
= con
->get_priv();
134 auto session
= new Session(con
);
135 con
->set_priv(RefCountedPtr
{session
, false});
136 lderr(g_ceph_context
) << __func__
<< " con: " << con
137 << " count: " << session
->count
<< dendl
;
142 void ms_handle_fast_accept(Connection
*con
) override
{
143 last_accept
= con
->get_peer_addrs();
144 if (last_accept_con_ptr
) {
145 *last_accept_con_ptr
= con
;
147 if (!con
->get_priv()) {
148 con
->set_priv(RefCountedPtr
{new Session(con
), false});
151 bool ms_dispatch(Message
*m
) override
{
152 auto priv
= m
->get_connection()->get_priv();
153 auto s
= static_cast<Session
*>(priv
.get());
155 s
= new Session(m
->get_connection());
156 priv
.reset(s
, false);
157 m
->get_connection()->set_priv(priv
);
160 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
164 std::lock_guard l
{lock
};
170 bool ms_handle_reset(Connection
*con
) override
{
171 std::lock_guard l
{lock
};
172 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
173 auto priv
= con
->get_priv();
174 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
175 s
->con
.reset(); // break con <-> session ref cycle
176 con
->set_priv(nullptr); // break ref <-> session cycle, if any
180 void ms_handle_remote_reset(Connection
*con
) override
{
181 std::lock_guard l
{lock
};
182 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
183 auto priv
= con
->get_priv();
184 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
185 s
->con
.reset(); // break con <-> session ref cycle
186 con
->set_priv(nullptr); // break ref <-> session cycle, if any
188 got_remote_reset
= true;
191 bool ms_handle_refused(Connection
*con
) override
{
194 void ms_fast_dispatch(Message
*m
) override
{
195 auto priv
= m
->get_connection()->get_priv();
196 auto s
= static_cast<Session
*>(priv
.get());
198 s
= new Session(m
->get_connection());
199 priv
.reset(s
, false);
200 m
->get_connection()->set_priv(priv
);
203 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
206 ceph_assert(m
->get_source().is_osd());
209 } else if (loopback
) {
210 ceph_assert(m
->get_source().is_client());
213 std::lock_guard l
{lock
};
218 int ms_handle_authentication(Connection
*con
) override
{
222 void reply_message(Message
*m
) {
223 MPing
*rm
= new MPing();
224 m
->get_connection()->send_message(rm
);
228 typedef FakeDispatcher::Session Session
;
230 struct TestInterceptor
: public Interceptor
{
232 bool step_waiting
= false;
234 std::map
<Connection
*, uint32_t> current_step
;
235 std::map
<Connection
*, std::list
<uint32_t>> step_history
;
236 std::map
<uint32_t, std::optional
<ACTION
>> decisions
;
237 std::set
<uint32_t> breakpoints
;
239 uint32_t count_step(Connection
*conn
, uint32_t step
) {
241 for (auto s
: step_history
[conn
]) {
249 void breakpoint(uint32_t step
) {
250 breakpoints
.insert(step
);
253 void remove_bp(uint32_t step
) {
254 breakpoints
.erase(step
);
257 Connection
*wait(uint32_t step
, Connection
*conn
=nullptr) {
258 std::unique_lock
<std::mutex
> l(lock
);
261 auto it
= current_step
.find(conn
);
262 if (it
!= current_step
.end()) {
263 if (it
->second
== step
) {
268 for (auto it
: current_step
) {
269 if (it
.second
== step
) {
281 step_waiting
= false;
285 ACTION
wait_for_decision(uint32_t step
, std::unique_lock
<std::mutex
> &l
) {
286 if (decisions
[step
]) {
287 return *(decisions
[step
]);
290 cond_var
.wait(l
, [this] { return !waiting
; });
291 return *(decisions
[step
]);
294 void proceed(uint32_t step
, ACTION decision
) {
295 std::unique_lock
<std::mutex
> l(lock
);
296 decisions
[step
] = decision
;
299 cond_var
.notify_one();
303 ACTION
intercept(Connection
*conn
, uint32_t step
) override
{
304 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
305 << ") intercept called on step=" << step
<< dendl
;
308 std::unique_lock
<std::mutex
> l(lock
);
309 step_history
[conn
].push_back(step
);
310 current_step
[conn
] = step
;
312 cond_var
.notify_one();
316 std::unique_lock
<std::mutex
> l(lock
);
317 ACTION decision
= ACTION::CONTINUE
;
318 if (breakpoints
.find(step
) != breakpoints
.end()) {
319 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
320 << ") pausing on step=" << step
<< dendl
;
321 decision
= wait_for_decision(step
, l
);
323 if (decisions
[step
]) {
324 decision
= *(decisions
[step
]);
327 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
328 << ") resuming step=" << step
<< " with decision="
329 << decision
<< dendl
;
330 decisions
[step
].reset();
337 * Scenario: A connects to B, and B connects to A at the same time.
339 TEST_P(MessengerTest
, ConnectionRaceTest
) {
340 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
342 TestInterceptor
*cli_interceptor
= new TestInterceptor();
343 TestInterceptor
*srv_interceptor
= new TestInterceptor();
345 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer_reuse(0));
346 server_msgr
->interceptor
= srv_interceptor
;
348 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer_reuse(0));
349 client_msgr
->interceptor
= cli_interceptor
;
351 entity_addr_t bind_addr
;
352 bind_addr
.parse("v2:127.0.0.1:3300");
353 server_msgr
->bind(bind_addr
);
354 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
355 server_msgr
->start();
357 bind_addr
.parse("v2:127.0.0.1:3301");
358 client_msgr
->bind(bind_addr
);
359 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
360 client_msgr
->start();
362 // pause before sending client_ident message
363 cli_interceptor
->breakpoint(11);
364 // pause before sending client_ident message
365 srv_interceptor
->breakpoint(11);
367 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
368 server_msgr
->get_myaddrs());
369 MPing
*m1
= new MPing();
370 ASSERT_EQ(c2s
->send_message(m1
), 0);
372 ConnectionRef s2c
= server_msgr
->connect_to(client_msgr
->get_mytype(),
373 client_msgr
->get_myaddrs());
374 MPing
*m2
= new MPing();
375 ASSERT_EQ(s2c
->send_message(m2
), 0);
377 cli_interceptor
->wait(11, c2s
.get());
378 srv_interceptor
->wait(11, s2c
.get());
380 // at this point both connections (A->B, B->A) are paused just before sending
381 // the client_ident message.
383 cli_interceptor
->remove_bp(11);
384 srv_interceptor
->remove_bp(11);
386 cli_interceptor
->proceed(11, Interceptor::ACTION::CONTINUE
);
387 srv_interceptor
->proceed(11, Interceptor::ACTION::CONTINUE
);
390 std::unique_lock l
{cli_dispatcher
.lock
};
391 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
392 cli_dispatcher
.got_new
= false;
396 std::unique_lock l
{srv_dispatcher
.lock
};
397 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
398 srv_dispatcher
.got_new
= false;
401 ASSERT_TRUE(s2c
->is_connected());
402 ASSERT_EQ(1u, static_cast<Session
*>(s2c
->get_priv().get())->get_count());
403 ASSERT_TRUE(s2c
->peer_is_client());
405 ASSERT_TRUE(c2s
->is_connected());
406 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
407 ASSERT_TRUE(c2s
->peer_is_osd());
409 client_msgr
->shutdown();
411 server_msgr
->shutdown();
414 delete cli_interceptor
;
415 delete srv_interceptor
;
419 * Scenario: A connects to B, and B connects to A at the same time.
420 * The first (A -> B) connection gets to message flow handshake, the
421 * second (B -> A) connection is stuck waiting for a banner from A.
422 * After A sends client_ident to B, the first connection wins and B
423 * calls reuse_connection() to replace the second connection's socket
424 * while the second connection is still in BANNER_CONNECTING.
426 TEST_P(MessengerTest
, ConnectionRaceReuseBannerTest
) {
427 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
429 auto cli_interceptor
= std::make_unique
<TestInterceptor
>();
430 auto srv_interceptor
= std::make_unique
<TestInterceptor
>();
432 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
,
433 Messenger::Policy::lossless_peer_reuse(0));
434 server_msgr
->interceptor
= srv_interceptor
.get();
436 client_msgr
->set_policy(entity_name_t::TYPE_OSD
,
437 Messenger::Policy::lossless_peer_reuse(0));
438 client_msgr
->interceptor
= cli_interceptor
.get();
440 entity_addr_t bind_addr
;
441 bind_addr
.parse("v2:127.0.0.1:3300");
442 server_msgr
->bind(bind_addr
);
443 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
444 server_msgr
->start();
446 bind_addr
.parse("v2:127.0.0.1:3301");
447 client_msgr
->bind(bind_addr
);
448 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
449 client_msgr
->start();
451 // pause before sending client_ident message
452 srv_interceptor
->breakpoint(11);
454 ConnectionRef s2c
= server_msgr
->connect_to(client_msgr
->get_mytype(),
455 client_msgr
->get_myaddrs());
456 MPing
*m1
= new MPing();
457 ASSERT_EQ(s2c
->send_message(m1
), 0);
459 srv_interceptor
->wait(11);
460 srv_interceptor
->remove_bp(11);
462 // pause before sending banner
463 cli_interceptor
->breakpoint(3);
465 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
466 server_msgr
->get_myaddrs());
467 MPing
*m2
= new MPing();
468 ASSERT_EQ(c2s
->send_message(m2
), 0);
470 cli_interceptor
->wait(3);
471 cli_interceptor
->remove_bp(3);
473 // second connection is in BANNER_CONNECTING, ensure it stays so
474 // and send client_ident
475 srv_interceptor
->breakpoint(4);
476 srv_interceptor
->proceed(11, Interceptor::ACTION::CONTINUE
);
478 // handle client_ident -- triggers reuse_connection() with exproto
479 // in BANNER_CONNECTING
480 cli_interceptor
->breakpoint(15);
481 cli_interceptor
->proceed(3, Interceptor::ACTION::CONTINUE
);
483 cli_interceptor
->wait(15);
484 cli_interceptor
->remove_bp(15);
486 // first connection is in READY
487 Connection
*s2c_accepter
= srv_interceptor
->wait(4);
488 srv_interceptor
->remove_bp(4);
490 srv_interceptor
->proceed(4, Interceptor::ACTION::CONTINUE
);
491 cli_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
494 std::unique_lock l
{cli_dispatcher
.lock
};
495 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
496 cli_dispatcher
.got_new
= false;
500 std::unique_lock l
{srv_dispatcher
.lock
};
501 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
502 srv_dispatcher
.got_new
= false;
505 EXPECT_TRUE(s2c
->is_connected());
506 EXPECT_EQ(1u, static_cast<Session
*>(s2c
->get_priv().get())->get_count());
507 EXPECT_TRUE(s2c
->peer_is_client());
509 EXPECT_TRUE(c2s
->is_connected());
510 EXPECT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
511 EXPECT_TRUE(c2s
->peer_is_osd());
513 // closed in reuse_connection() -- EPIPE when writing banner/hello
514 EXPECT_FALSE(s2c_accepter
->is_connected());
516 // established exactly once, never faulted and reconnected
517 EXPECT_EQ(cli_interceptor
->count_step(c2s
.get(), 1), 1u);
518 EXPECT_EQ(cli_interceptor
->count_step(c2s
.get(), 13), 0u);
519 EXPECT_EQ(cli_interceptor
->count_step(c2s
.get(), 15), 1u);
521 client_msgr
->shutdown();
523 server_msgr
->shutdown();
530 * - A sends client_ident to B
531 * - B fails before sending server_ident to A
534 TEST_P(MessengerTest
, MissingServerIdenTest
) {
535 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
537 TestInterceptor
*cli_interceptor
= new TestInterceptor();
538 TestInterceptor
*srv_interceptor
= new TestInterceptor();
540 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
541 server_msgr
->interceptor
= srv_interceptor
;
543 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossy_client(0));
544 client_msgr
->interceptor
= cli_interceptor
;
546 entity_addr_t bind_addr
;
547 bind_addr
.parse("v2:127.0.0.1:3300");
548 server_msgr
->bind(bind_addr
);
549 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
550 server_msgr
->start();
552 bind_addr
.parse("v2:127.0.0.1:3301");
553 client_msgr
->bind(bind_addr
);
554 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
555 client_msgr
->start();
557 // pause before sending client_ident message
558 srv_interceptor
->breakpoint(12);
560 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
561 server_msgr
->get_myaddrs());
562 MPing
*m1
= new MPing();
563 ASSERT_EQ(c2s
->send_message(m1
), 0);
565 Connection
*c2s_accepter
= srv_interceptor
->wait(12);
566 srv_interceptor
->remove_bp(12);
568 // We inject a message from this side of the connection to force it to be
569 // in standby when we inject the failure below
570 MPing
*m2
= new MPing();
571 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
573 srv_interceptor
->proceed(12, Interceptor::ACTION::FAIL
);
576 std::unique_lock l
{srv_dispatcher
.lock
};
577 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
578 srv_dispatcher
.got_new
= false;
582 std::unique_lock l
{cli_dispatcher
.lock
};
583 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
584 cli_dispatcher
.got_new
= false;
587 ASSERT_TRUE(c2s
->is_connected());
588 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
589 ASSERT_TRUE(c2s
->peer_is_osd());
591 ASSERT_TRUE(c2s_accepter
->is_connected());
592 ASSERT_EQ(1u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
593 ASSERT_TRUE(c2s_accepter
->peer_is_client());
595 client_msgr
->shutdown();
597 server_msgr
->shutdown();
600 delete cli_interceptor
;
601 delete srv_interceptor
;
607 * - A sends client_ident to B
608 * - B fails before sending server_ident to A
609 * - A goes to standby
610 * - B reconnects to A
612 TEST_P(MessengerTest
, MissingServerIdenTest2
) {
613 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
615 TestInterceptor
*cli_interceptor
= new TestInterceptor();
616 TestInterceptor
*srv_interceptor
= new TestInterceptor();
618 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
619 server_msgr
->interceptor
= srv_interceptor
;
621 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
622 client_msgr
->interceptor
= cli_interceptor
;
624 entity_addr_t bind_addr
;
625 bind_addr
.parse("v2:127.0.0.1:3300");
626 server_msgr
->bind(bind_addr
);
627 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
628 server_msgr
->start();
630 bind_addr
.parse("v2:127.0.0.1:3301");
631 client_msgr
->bind(bind_addr
);
632 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
633 client_msgr
->start();
635 // pause before sending client_ident message
636 srv_interceptor
->breakpoint(12);
638 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
639 server_msgr
->get_myaddrs());
641 Connection
*c2s_accepter
= srv_interceptor
->wait(12);
642 srv_interceptor
->remove_bp(12);
644 // We inject a message from this side of the connection to force it to be
645 // in standby when we inject the failure below
646 MPing
*m2
= new MPing();
647 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
649 srv_interceptor
->proceed(12, Interceptor::ACTION::FAIL
);
652 std::unique_lock l
{cli_dispatcher
.lock
};
653 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
654 cli_dispatcher
.got_new
= false;
657 ASSERT_TRUE(c2s
->is_connected());
658 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
659 ASSERT_TRUE(c2s
->peer_is_osd());
661 ASSERT_TRUE(c2s_accepter
->is_connected());
662 ASSERT_EQ(0u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
663 ASSERT_TRUE(c2s_accepter
->peer_is_client());
665 client_msgr
->shutdown();
667 server_msgr
->shutdown();
670 delete cli_interceptor
;
671 delete srv_interceptor
;
677 * - A and B exchange messages
679 * - B goes into standby
682 TEST_P(MessengerTest
, ReconnectTest
) {
683 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
685 TestInterceptor
*cli_interceptor
= new TestInterceptor();
686 TestInterceptor
*srv_interceptor
= new TestInterceptor();
688 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
689 server_msgr
->interceptor
= srv_interceptor
;
691 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
692 client_msgr
->interceptor
= cli_interceptor
;
694 entity_addr_t bind_addr
;
695 bind_addr
.parse("v2:127.0.0.1:3300");
696 server_msgr
->bind(bind_addr
);
697 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
698 server_msgr
->start();
700 bind_addr
.parse("v2:127.0.0.1:3301");
701 client_msgr
->bind(bind_addr
);
702 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
703 client_msgr
->start();
705 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
706 server_msgr
->get_myaddrs());
708 MPing
*m1
= new MPing();
709 ASSERT_EQ(c2s
->send_message(m1
), 0);
712 std::unique_lock l
{cli_dispatcher
.lock
};
713 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
714 cli_dispatcher
.got_new
= false;
717 ASSERT_TRUE(c2s
->is_connected());
718 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
719 ASSERT_TRUE(c2s
->peer_is_osd());
721 cli_interceptor
->breakpoint(16);
723 MPing
*m2
= new MPing();
724 ASSERT_EQ(c2s
->send_message(m2
), 0);
726 cli_interceptor
->wait(16, c2s
.get());
727 cli_interceptor
->remove_bp(16);
729 // at this point client and server are connected together
731 srv_interceptor
->breakpoint(15);
734 cli_interceptor
->proceed(16, Interceptor::ACTION::FAIL
);
736 MPing
*m3
= new MPing();
737 ASSERT_EQ(c2s
->send_message(m3
), 0);
739 Connection
*c2s_accepter
= srv_interceptor
->wait(15);
740 // the srv end of theconnection is now paused at ready
741 // this means that the reconnect was successful
742 srv_interceptor
->remove_bp(15);
744 ASSERT_TRUE(c2s_accepter
->peer_is_client());
745 // c2s_accepter sent 0 reconnect messages
746 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 13), 0u);
747 // c2s_accepter sent 1 reconnect_ok messages
748 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 14), 1u);
749 // c2s sent 1 reconnect messages
750 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 13), 1u);
751 // c2s sent 0 reconnect_ok messages
752 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 14), 0u);
754 srv_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
757 std::unique_lock l
{cli_dispatcher
.lock
};
758 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
759 cli_dispatcher
.got_new
= false;
762 client_msgr
->shutdown();
764 server_msgr
->shutdown();
767 delete cli_interceptor
;
768 delete srv_interceptor
;
774 * - A and B exchange messages
776 * - A reconnects // B reconnects
778 TEST_P(MessengerTest
, ReconnectRaceTest
) {
779 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
781 TestInterceptor
*cli_interceptor
= new TestInterceptor();
782 TestInterceptor
*srv_interceptor
= new TestInterceptor();
784 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
785 server_msgr
->interceptor
= srv_interceptor
;
787 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
788 client_msgr
->interceptor
= cli_interceptor
;
790 entity_addr_t bind_addr
;
791 bind_addr
.parse("v2:127.0.0.1:3300");
792 server_msgr
->bind(bind_addr
);
793 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
794 server_msgr
->start();
796 bind_addr
.parse("v2:127.0.0.1:3301");
797 client_msgr
->bind(bind_addr
);
798 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
799 client_msgr
->start();
801 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
802 server_msgr
->get_myaddrs());
804 MPing
*m1
= new MPing();
805 ASSERT_EQ(c2s
->send_message(m1
), 0);
808 std::unique_lock l
{cli_dispatcher
.lock
};
809 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
810 cli_dispatcher
.got_new
= false;
813 ASSERT_TRUE(c2s
->is_connected());
814 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
815 ASSERT_TRUE(c2s
->peer_is_osd());
817 cli_interceptor
->breakpoint(16);
819 MPing
*m2
= new MPing();
820 ASSERT_EQ(c2s
->send_message(m2
), 0);
822 cli_interceptor
->wait(16, c2s
.get());
823 cli_interceptor
->remove_bp(16);
825 // at this point client and server are connected together
827 // force both client and server to race on reconnect
828 cli_interceptor
->breakpoint(13);
829 srv_interceptor
->breakpoint(13);
832 // this will cause both client and server to reconnect at the same time
833 cli_interceptor
->proceed(16, Interceptor::ACTION::FAIL
);
835 MPing
*m3
= new MPing();
836 ASSERT_EQ(c2s
->send_message(m3
), 0);
838 cli_interceptor
->wait(13, c2s
.get());
839 srv_interceptor
->wait(13);
841 cli_interceptor
->remove_bp(13);
842 srv_interceptor
->remove_bp(13);
845 srv_interceptor
->breakpoint(15);
847 cli_interceptor
->proceed(13, Interceptor::ACTION::CONTINUE
);
848 srv_interceptor
->proceed(13, Interceptor::ACTION::CONTINUE
);
850 Connection
*c2s_accepter
= srv_interceptor
->wait(15);
852 // the server has reconnected and is "ready"
853 srv_interceptor
->remove_bp(15);
855 ASSERT_TRUE(c2s_accepter
->peer_is_client());
856 ASSERT_TRUE(c2s
->peer_is_osd());
858 // the server should win the reconnect race
860 // c2s_accepter sent 1 or 2 reconnect messages
861 ASSERT_LT(srv_interceptor
->count_step(c2s_accepter
, 13), 3u);
862 ASSERT_GT(srv_interceptor
->count_step(c2s_accepter
, 13), 0u);
863 // c2s_accepter sent 0 reconnect_ok messages
864 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 14), 0u);
865 // c2s sent 1 reconnect messages
866 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 13), 1u);
867 // c2s sent 1 reconnect_ok messages
868 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 14), 1u);
870 if (srv_interceptor
->count_step(c2s_accepter
, 13) == 2) {
871 // if the server send the reconnect message two times then
872 // the client must have sent a session retry message to the server
873 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 18), 1u);
875 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 18), 0u);
878 srv_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
881 std::unique_lock l
{cli_dispatcher
.lock
};
882 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
883 cli_dispatcher
.got_new
= false;
886 client_msgr
->shutdown();
888 server_msgr
->shutdown();
891 delete cli_interceptor
;
892 delete srv_interceptor
;
895 TEST_P(MessengerTest
, SimpleTest
) {
896 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
897 entity_addr_t bind_addr
;
898 bind_addr
.parse("v2:127.0.0.1");
899 server_msgr
->bind(bind_addr
);
900 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
901 server_msgr
->start();
903 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
904 client_msgr
->start();
906 // 1. simple round trip
907 MPing
*m
= new MPing();
908 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
909 server_msgr
->get_myaddrs());
911 ASSERT_EQ(conn
->send_message(m
), 0);
912 std::unique_lock l
{cli_dispatcher
.lock
};
913 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
914 cli_dispatcher
.got_new
= false;
916 ASSERT_TRUE(conn
->is_connected());
917 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
918 ASSERT_TRUE(conn
->peer_is_osd());
920 // 2. test rebind port
921 set
<int> avoid_ports
;
922 for (int i
= 0; i
< 10 ; i
++) {
923 for (auto a
: server_msgr
->get_myaddrs().v
) {
924 avoid_ports
.insert(a
.get_port() + i
);
927 server_msgr
->rebind(avoid_ports
);
928 for (auto a
: server_msgr
->get_myaddrs().v
) {
929 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
932 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
933 server_msgr
->get_myaddrs());
936 ASSERT_EQ(conn
->send_message(m
), 0);
937 std::unique_lock l
{cli_dispatcher
.lock
};
938 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
939 cli_dispatcher
.got_new
= false;
941 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
943 // 3. test markdown connection
945 ASSERT_FALSE(conn
->is_connected());
947 // 4. test failed connection
948 server_msgr
->shutdown();
952 conn
->send_message(m
);
953 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
954 ASSERT_FALSE(conn
->is_connected());
956 // 5. loopback connection
957 srv_dispatcher
.loopback
= true;
958 conn
= client_msgr
->get_loopback_connection();
961 ASSERT_EQ(conn
->send_message(m
), 0);
962 std::unique_lock l
{cli_dispatcher
.lock
};
963 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
964 cli_dispatcher
.got_new
= false;
966 srv_dispatcher
.loopback
= false;
967 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
968 client_msgr
->shutdown();
970 server_msgr
->shutdown();
974 TEST_P(MessengerTest
, SimpleMsgr2Test
) {
975 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
976 entity_addr_t legacy_addr
;
977 legacy_addr
.parse("v1:127.0.0.1");
978 entity_addr_t msgr2_addr
;
979 msgr2_addr
.parse("v2:127.0.0.1");
980 entity_addrvec_t bind_addrs
;
981 bind_addrs
.v
.push_back(legacy_addr
);
982 bind_addrs
.v
.push_back(msgr2_addr
);
983 server_msgr
->bindv(bind_addrs
);
984 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
985 server_msgr
->start();
987 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
988 client_msgr
->start();
990 // 1. simple round trip
991 MPing
*m
= new MPing();
992 ConnectionRef conn
= client_msgr
->connect_to(
993 server_msgr
->get_mytype(),
994 server_msgr
->get_myaddrs());
996 ASSERT_EQ(conn
->send_message(m
), 0);
997 std::unique_lock l
{cli_dispatcher
.lock
};
998 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
999 cli_dispatcher
.got_new
= false;
1001 ASSERT_TRUE(conn
->is_connected());
1002 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1003 ASSERT_TRUE(conn
->peer_is_osd());
1005 // 2. test rebind port
1006 set
<int> avoid_ports
;
1007 for (int i
= 0; i
< 10 ; i
++) {
1008 for (auto a
: server_msgr
->get_myaddrs().v
) {
1009 avoid_ports
.insert(a
.get_port() + i
);
1012 server_msgr
->rebind(avoid_ports
);
1013 for (auto a
: server_msgr
->get_myaddrs().v
) {
1014 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
1017 conn
= client_msgr
->connect_to(
1018 server_msgr
->get_mytype(),
1019 server_msgr
->get_myaddrs());
1022 ASSERT_EQ(conn
->send_message(m
), 0);
1023 std::unique_lock l
{cli_dispatcher
.lock
};
1024 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1025 cli_dispatcher
.got_new
= false;
1027 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1029 // 3. test markdown connection
1031 ASSERT_FALSE(conn
->is_connected());
1033 // 4. test failed connection
1034 server_msgr
->shutdown();
1035 server_msgr
->wait();
1038 conn
->send_message(m
);
1039 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1040 ASSERT_FALSE(conn
->is_connected());
1042 // 5. loopback connection
1043 srv_dispatcher
.loopback
= true;
1044 conn
= client_msgr
->get_loopback_connection();
1047 ASSERT_EQ(conn
->send_message(m
), 0);
1048 std::unique_lock l
{cli_dispatcher
.lock
};
1049 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1050 cli_dispatcher
.got_new
= false;
1052 srv_dispatcher
.loopback
= false;
1053 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1054 client_msgr
->shutdown();
1055 client_msgr
->wait();
1056 server_msgr
->shutdown();
1057 server_msgr
->wait();
1060 TEST_P(MessengerTest
, FeatureTest
) {
1061 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1062 entity_addr_t bind_addr
;
1063 bind_addr
.parse("v2:127.0.0.1");
1064 uint64_t all_feature_supported
, feature_required
, feature_supported
= 0;
1065 for (int i
= 0; i
< 10; i
++)
1066 feature_supported
|= 1ULL << i
;
1067 feature_supported
|= CEPH_FEATUREMASK_MSG_ADDR2
;
1068 feature_supported
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
;
1069 feature_required
= feature_supported
| 1ULL << 13;
1070 all_feature_supported
= feature_required
| 1ULL << 14;
1072 Messenger::Policy p
= server_msgr
->get_policy(entity_name_t::TYPE_CLIENT
);
1073 p
.features_required
= feature_required
;
1074 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1075 server_msgr
->bind(bind_addr
);
1076 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1077 server_msgr
->start();
1079 // 1. Suppose if only support less than required
1080 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
1081 p
.features_supported
= feature_supported
;
1082 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1083 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1084 client_msgr
->start();
1086 MPing
*m
= new MPing();
1087 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1088 server_msgr
->get_myaddrs());
1089 conn
->send_message(m
);
1090 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1091 // should failed build a connection
1092 ASSERT_FALSE(conn
->is_connected());
1094 client_msgr
->shutdown();
1095 client_msgr
->wait();
1097 // 2. supported met required
1098 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
1099 p
.features_supported
= all_feature_supported
;
1100 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1101 client_msgr
->start();
1103 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1104 server_msgr
->get_myaddrs());
1107 ASSERT_EQ(conn
->send_message(m
), 0);
1108 std::unique_lock l
{cli_dispatcher
.lock
};
1109 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1110 cli_dispatcher
.got_new
= false;
1112 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1114 server_msgr
->shutdown();
1115 client_msgr
->shutdown();
1116 server_msgr
->wait();
1117 client_msgr
->wait();
1120 TEST_P(MessengerTest
, TimeoutTest
) {
1121 g_ceph_context
->_conf
.set_val("ms_connection_idle_timeout", "1");
1122 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1123 entity_addr_t bind_addr
;
1124 bind_addr
.parse("v2:127.0.0.1");
1125 server_msgr
->bind(bind_addr
);
1126 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1127 server_msgr
->start();
1129 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1130 client_msgr
->start();
1132 // 1. build the connection
1133 MPing
*m
= new MPing();
1134 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1135 server_msgr
->get_myaddrs());
1137 ASSERT_EQ(conn
->send_message(m
), 0);
1138 std::unique_lock l
{cli_dispatcher
.lock
};
1139 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1140 cli_dispatcher
.got_new
= false;
1142 ASSERT_TRUE(conn
->is_connected());
1143 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1144 ASSERT_TRUE(conn
->peer_is_osd());
1148 ASSERT_FALSE(conn
->is_connected());
1150 server_msgr
->shutdown();
1151 server_msgr
->wait();
1153 client_msgr
->shutdown();
1154 client_msgr
->wait();
1155 g_ceph_context
->_conf
.set_val("ms_connection_idle_timeout", "900");
1158 TEST_P(MessengerTest
, StatefulTest
) {
1160 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1161 entity_addr_t bind_addr
;
1162 bind_addr
.parse("v2:127.0.0.1");
1163 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1164 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1165 p
= Messenger::Policy::lossless_client(0);
1166 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1168 server_msgr
->bind(bind_addr
);
1169 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1170 server_msgr
->start();
1171 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1172 client_msgr
->start();
1174 // 1. test for server standby
1175 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1176 server_msgr
->get_myaddrs());
1179 ASSERT_EQ(conn
->send_message(m
), 0);
1180 std::unique_lock l
{cli_dispatcher
.lock
};
1181 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1182 cli_dispatcher
.got_new
= false;
1184 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1186 ASSERT_FALSE(conn
->is_connected());
1187 ConnectionRef server_conn
= server_msgr
->connect_to(
1188 client_msgr
->get_mytype(), srv_dispatcher
.last_accept
);
1190 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1192 srv_dispatcher
.got_new
= false;
1193 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1194 server_msgr
->get_myaddrs());
1197 ASSERT_EQ(conn
->send_message(m
), 0);
1198 std::unique_lock l
{cli_dispatcher
.lock
};
1199 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1200 cli_dispatcher
.got_new
= false;
1202 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1203 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1204 srv_dispatcher
.last_accept
);
1206 std::unique_lock l
{srv_dispatcher
.lock
};
1207 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_remote_reset
; });
1210 // 2. test for client reconnect
1211 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1212 cli_dispatcher
.got_connect
= false;
1213 cli_dispatcher
.got_new
= false;
1214 cli_dispatcher
.got_remote_reset
= false;
1215 server_conn
->mark_down();
1216 ASSERT_FALSE(server_conn
->is_connected());
1217 // ensure client detect server socket closed
1219 std::unique_lock l
{cli_dispatcher
.lock
};
1220 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_remote_reset
; });
1221 cli_dispatcher
.got_remote_reset
= false;
1224 std::unique_lock l
{cli_dispatcher
.lock
};
1225 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_connect
; });
1226 cli_dispatcher
.got_connect
= false;
1228 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1229 ASSERT_TRUE(conn
->is_connected());
1233 ASSERT_EQ(conn
->send_message(m
), 0);
1234 ASSERT_TRUE(conn
->is_connected());
1235 std::unique_lock l
{cli_dispatcher
.lock
};
1236 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1237 cli_dispatcher
.got_new
= false;
1239 // resetcheck happen
1240 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1241 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1242 srv_dispatcher
.last_accept
);
1243 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1244 cli_dispatcher
.got_remote_reset
= false;
1246 server_msgr
->shutdown();
1247 client_msgr
->shutdown();
1248 server_msgr
->wait();
1249 client_msgr
->wait();
1252 TEST_P(MessengerTest
, StatelessTest
) {
1254 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1255 entity_addr_t bind_addr
;
1256 bind_addr
.parse("v2:127.0.0.1");
1257 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
1258 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1259 p
= Messenger::Policy::lossy_client(0);
1260 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1262 server_msgr
->bind(bind_addr
);
1263 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1264 server_msgr
->start();
1265 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1266 client_msgr
->start();
1268 // 1. test for server lose state
1269 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1270 server_msgr
->get_myaddrs());
1273 ASSERT_EQ(conn
->send_message(m
), 0);
1274 std::unique_lock l
{cli_dispatcher
.lock
};
1275 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1276 cli_dispatcher
.got_new
= false;
1278 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1280 ASSERT_FALSE(conn
->is_connected());
1282 srv_dispatcher
.got_new
= false;
1283 ConnectionRef server_conn
;
1284 srv_dispatcher
.last_accept_con_ptr
= &server_conn
;
1285 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1286 server_msgr
->get_myaddrs());
1289 ASSERT_EQ(conn
->send_message(m
), 0);
1290 std::unique_lock l
{cli_dispatcher
.lock
};
1291 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1292 cli_dispatcher
.got_new
= false;
1294 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1295 ASSERT_TRUE(server_conn
);
1297 // server lose state
1299 std::unique_lock l
{srv_dispatcher
.lock
};
1300 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
1302 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1304 // 2. test for client lossy
1305 server_conn
->mark_down();
1306 ASSERT_FALSE(server_conn
->is_connected());
1307 conn
->send_keepalive();
1308 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1309 ASSERT_FALSE(conn
->is_connected());
1310 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1311 server_msgr
->get_myaddrs());
1314 ASSERT_EQ(conn
->send_message(m
), 0);
1315 std::unique_lock l
{cli_dispatcher
.lock
};
1316 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1317 cli_dispatcher
.got_new
= false;
1319 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1321 server_msgr
->shutdown();
1322 client_msgr
->shutdown();
1323 server_msgr
->wait();
1324 client_msgr
->wait();
1327 TEST_P(MessengerTest
, AnonTest
) {
1329 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1330 entity_addr_t bind_addr
;
1331 bind_addr
.parse("v2:127.0.0.1");
1332 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
1333 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1334 p
= Messenger::Policy::lossy_client(0);
1335 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1337 server_msgr
->bind(bind_addr
);
1338 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1339 server_msgr
->start();
1340 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1341 client_msgr
->start();
1343 ConnectionRef server_con_a
, server_con_b
;
1346 srv_dispatcher
.last_accept_con_ptr
= &server_con_a
;
1347 ConnectionRef con_a
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1348 server_msgr
->get_myaddrs(),
1352 ASSERT_EQ(con_a
->send_message(m
), 0);
1353 std::unique_lock l
{cli_dispatcher
.lock
};
1354 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1355 cli_dispatcher
.got_new
= false;
1357 ASSERT_EQ(1U, static_cast<Session
*>(con_a
->get_priv().get())->get_count());
1360 srv_dispatcher
.last_accept_con_ptr
= &server_con_b
;
1361 ConnectionRef con_b
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1362 server_msgr
->get_myaddrs(),
1366 ASSERT_EQ(con_b
->send_message(m
), 0);
1367 std::unique_lock l
{cli_dispatcher
.lock
};
1368 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1369 cli_dispatcher
.got_new
= false;
1371 ASSERT_EQ(1U, static_cast<Session
*>(con_b
->get_priv().get())->get_count());
1373 // these should be distinct
1374 ASSERT_NE(con_a
, con_b
);
1375 ASSERT_NE(server_con_a
, server_con_b
);
1377 // and both connected
1380 ASSERT_EQ(con_a
->send_message(m
), 0);
1381 std::unique_lock l
{cli_dispatcher
.lock
};
1382 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1383 cli_dispatcher
.got_new
= false;
1387 ASSERT_EQ(con_b
->send_message(m
), 0);
1388 std::unique_lock l
{cli_dispatcher
.lock
};
1389 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1390 cli_dispatcher
.got_new
= false;
1395 ASSERT_FALSE(con_a
->is_connected());
1397 ASSERT_FALSE(con_b
->is_connected());
1399 server_msgr
->shutdown();
1400 client_msgr
->shutdown();
1401 server_msgr
->wait();
1402 client_msgr
->wait();
1405 TEST_P(MessengerTest
, ClientStandbyTest
) {
1407 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1408 entity_addr_t bind_addr
;
1409 bind_addr
.parse("v2:127.0.0.1");
1410 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1411 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1412 p
= Messenger::Policy::lossless_peer(0);
1413 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1415 server_msgr
->bind(bind_addr
);
1416 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1417 server_msgr
->start();
1418 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1419 client_msgr
->start();
1421 // 1. test for client standby, resetcheck
1422 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1423 server_msgr
->get_myaddrs());
1426 ASSERT_EQ(conn
->send_message(m
), 0);
1427 std::unique_lock l
{cli_dispatcher
.lock
};
1428 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1429 cli_dispatcher
.got_new
= false;
1431 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1432 ConnectionRef server_conn
= server_msgr
->connect_to(
1433 client_msgr
->get_mytype(),
1434 srv_dispatcher
.last_accept
);
1435 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1436 cli_dispatcher
.got_connect
= false;
1437 server_conn
->mark_down();
1438 ASSERT_FALSE(server_conn
->is_connected());
1439 // client should be standby
1441 // client should be standby, so we use original connection
1443 // Try send message to verify got remote reset callback
1445 ASSERT_EQ(conn
->send_message(m
), 0);
1447 std::unique_lock l
{cli_dispatcher
.lock
};
1448 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_remote_reset
; });
1449 cli_dispatcher
.got_remote_reset
= false;
1450 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_connect
; });
1451 cli_dispatcher
.got_connect
= false;
1453 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1454 ASSERT_TRUE(conn
->is_connected());
1456 ASSERT_EQ(conn
->send_message(m
), 0);
1457 std::unique_lock l
{cli_dispatcher
.lock
};
1458 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1459 cli_dispatcher
.got_new
= false;
1461 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1462 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1463 srv_dispatcher
.last_accept
);
1464 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1466 server_msgr
->shutdown();
1467 client_msgr
->shutdown();
1468 server_msgr
->wait();
1469 client_msgr
->wait();
1472 TEST_P(MessengerTest
, AuthTest
) {
1473 g_ceph_context
->_conf
.set_val("auth_cluster_required", "cephx");
1474 g_ceph_context
->_conf
.set_val("auth_service_required", "cephx");
1475 g_ceph_context
->_conf
.set_val("auth_client_required", "cephx");
1476 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1477 entity_addr_t bind_addr
;
1478 bind_addr
.parse("v2:127.0.0.1");
1479 server_msgr
->bind(bind_addr
);
1480 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1481 server_msgr
->start();
1483 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1484 client_msgr
->start();
1486 // 1. simple auth round trip
1487 MPing
*m
= new MPing();
1488 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1489 server_msgr
->get_myaddrs());
1491 ASSERT_EQ(conn
->send_message(m
), 0);
1492 std::unique_lock l
{cli_dispatcher
.lock
};
1493 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1494 cli_dispatcher
.got_new
= false;
1496 ASSERT_TRUE(conn
->is_connected());
1497 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1500 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
1501 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
1502 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
1504 ASSERT_FALSE(conn
->is_connected());
1505 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1506 server_msgr
->get_myaddrs());
1508 MPing
*m
= new MPing();
1509 ASSERT_EQ(conn
->send_message(m
), 0);
1510 std::unique_lock l
{cli_dispatcher
.lock
};
1511 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1512 cli_dispatcher
.got_new
= false;
1514 ASSERT_TRUE(conn
->is_connected());
1515 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1516 server_msgr
->shutdown();
1517 client_msgr
->shutdown();
1518 server_msgr
->wait();
1519 client_msgr
->wait();
1522 TEST_P(MessengerTest
, MessageTest
) {
1523 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1524 entity_addr_t bind_addr
;
1525 bind_addr
.parse("v2:127.0.0.1");
1526 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1527 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1528 p
= Messenger::Policy::lossless_peer(0);
1529 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1531 server_msgr
->bind(bind_addr
);
1532 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1533 server_msgr
->start();
1534 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1535 client_msgr
->start();
1538 // 1. A very large "front"(as well as "payload")
1539 // Because a external message need to invade Messenger::decode_message,
1540 // here we only use existing message class(MCommand)
1541 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1542 server_msgr
->get_myaddrs());
1545 uuid
.generate_random();
1546 vector
<string
> cmds
;
1547 string
s("abcdefghijklmnopqrstuvwxyz");
1548 for (int i
= 0; i
< 1024*30; i
++)
1550 MCommand
*m
= new MCommand(uuid
);
1552 conn
->send_message(m
);
1553 std::unique_lock l
{cli_dispatcher
.lock
};
1554 cli_dispatcher
.cond
.wait_for(l
, 500s
, [&] { return cli_dispatcher
.got_new
; });
1555 ASSERT_TRUE(cli_dispatcher
.got_new
);
1556 cli_dispatcher
.got_new
= false;
1559 // 2. A very large "data"
1562 string
s("abcdefghijklmnopqrstuvwxyz");
1563 for (int i
= 0; i
< 1024*30; i
++)
1565 MPing
*m
= new MPing();
1567 conn
->send_message(m
);
1570 std::unique_lock l
{cli_dispatcher
.lock
};
1571 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1572 ASSERT_TRUE(cli_dispatcher
.got_new
);
1573 cli_dispatcher
.got_new
= false;
1575 server_msgr
->shutdown();
1576 client_msgr
->shutdown();
1577 server_msgr
->wait();
1578 client_msgr
->wait();
1582 class SyntheticWorkload
;
1585 enum Who
: uint8_t {
1593 Payload(Who who
, uint64_t seq
, const bufferlist
& data
)
1594 : who(who
), seq(seq
), data(data
)
1596 Payload() = default;
1597 DENC(Payload
, v
, p
) {
1598 DENC_START(1, 1, p
);
1605 WRITE_CLASS_DENC(Payload
)
1607 ostream
& operator<<(ostream
& out
, const Payload
&pl
)
1609 return out
<< "reply=" << pl
.who
<< " i = " << pl
.seq
;
1612 class SyntheticDispatcher
: public Dispatcher
{
1614 ceph::mutex lock
= ceph::make_mutex("SyntheticDispatcher::lock");
1615 ceph::condition_variable cond
;
1618 bool got_remote_reset
;
1620 map
<ConnectionRef
, list
<uint64_t> > conn_sent
;
1621 map
<uint64_t, bufferlist
> sent
;
1622 atomic
<uint64_t> index
;
1623 SyntheticWorkload
*workload
;
1625 SyntheticDispatcher(bool s
, SyntheticWorkload
*wl
):
1626 Dispatcher(g_ceph_context
), is_server(s
), got_new(false),
1627 got_remote_reset(false), got_connect(false), index(0), workload(wl
) {
1629 bool ms_can_fast_dispatch_any() const override
{ return true; }
1630 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1631 switch (m
->get_type()) {
1640 void ms_handle_fast_connect(Connection
*con
) override
{
1641 std::lock_guard l
{lock
};
1642 list
<uint64_t> c
= conn_sent
[con
];
1643 for (list
<uint64_t>::iterator it
= c
.begin();
1644 it
!= c
.end(); ++it
)
1646 conn_sent
.erase(con
);
1650 void ms_handle_fast_accept(Connection
*con
) override
{
1651 std::lock_guard l
{lock
};
1652 list
<uint64_t> c
= conn_sent
[con
];
1653 for (list
<uint64_t>::iterator it
= c
.begin();
1654 it
!= c
.end(); ++it
)
1656 conn_sent
.erase(con
);
1659 bool ms_dispatch(Message
*m
) override
{
1662 bool ms_handle_reset(Connection
*con
) override
;
1663 void ms_handle_remote_reset(Connection
*con
) override
{
1664 std::lock_guard l
{lock
};
1665 list
<uint64_t> c
= conn_sent
[con
];
1666 for (list
<uint64_t>::iterator it
= c
.begin();
1667 it
!= c
.end(); ++it
)
1669 conn_sent
.erase(con
);
1670 got_remote_reset
= true;
1672 bool ms_handle_refused(Connection
*con
) override
{
1675 void ms_fast_dispatch(Message
*m
) override
{
1676 // MSG_COMMAND is used to disorganize regular message flow
1677 if (m
->get_type() == MSG_COMMAND
) {
1683 auto p
= m
->get_data().cbegin();
1685 if (pl
.who
== Payload::PING
) {
1686 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1687 reply_message(m
, pl
);
1689 std::lock_guard l
{lock
};
1693 std::lock_guard l
{lock
};
1694 if (sent
.count(pl
.seq
)) {
1695 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1696 ASSERT_EQ(conn_sent
[m
->get_connection()].front(), pl
.seq
);
1697 ASSERT_TRUE(pl
.data
.contents_equal(sent
[pl
.seq
]));
1698 conn_sent
[m
->get_connection()].pop_front();
1707 int ms_handle_authentication(Connection
*con
) override
{
1711 void reply_message(const Message
*m
, Payload
& pl
) {
1712 pl
.who
= Payload::PONG
;
1715 MPing
*rm
= new MPing();
1717 m
->get_connection()->send_message(rm
);
1718 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << " reply m=" << m
<< " i=" << pl
.seq
<< dendl
;
1721 void send_message_wrap(ConnectionRef con
, const bufferlist
& data
) {
1722 Message
*m
= new MPing();
1723 Payload pl
{Payload::PING
, index
++, data
};
1727 if (!con
->get_messenger()->get_default_policy().lossy
) {
1728 std::lock_guard l
{lock
};
1729 sent
[pl
.seq
] = pl
.data
;
1730 conn_sent
[con
].push_back(pl
.seq
);
1732 lderr(g_ceph_context
) << __func__
<< " conn=" << con
.get() << " send m=" << m
<< " i=" << pl
.seq
<< dendl
;
1733 ASSERT_EQ(0, con
->send_message(m
));
1736 uint64_t get_pending() {
1737 std::lock_guard l
{lock
};
1741 void clear_pending(ConnectionRef con
) {
1742 std::lock_guard l
{lock
};
1744 for (list
<uint64_t>::iterator it
= conn_sent
[con
].begin();
1745 it
!= conn_sent
[con
].end(); ++it
)
1747 conn_sent
.erase(con
);
1751 for (auto && p
: conn_sent
) {
1752 if (!p
.second
.empty()) {
1753 lderr(g_ceph_context
) << __func__
<< " " << p
.first
<< " wait " << p
.second
.size() << dendl
;
1760 class SyntheticWorkload
{
1761 ceph::mutex lock
= ceph::make_mutex("SyntheticWorkload::lock");
1762 ceph::condition_variable cond
;
1763 set
<Messenger
*> available_servers
;
1764 set
<Messenger
*> available_clients
;
1765 Messenger::Policy client_policy
;
1766 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> > available_connections
;
1767 SyntheticDispatcher dispatcher
;
1769 vector
<bufferlist
> rand_data
;
1770 DummyAuthClientServer dummy_auth
;
1773 static const unsigned max_in_flight
= 64;
1774 static const unsigned max_connections
= 128;
1775 static const unsigned max_message_len
= 1024 * 1024 * 4;
1777 SyntheticWorkload(int servers
, int clients
, string type
, int random_num
,
1778 Messenger::Policy srv_policy
, Messenger::Policy cli_policy
)
1779 : client_policy(cli_policy
),
1780 dispatcher(false, this),
1782 dummy_auth(g_ceph_context
) {
1783 dummy_auth
.auth_registry
.refresh_config();
1785 int base_port
= 16800;
1786 entity_addr_t bind_addr
;
1788 for (int i
= 0; i
< servers
; ++i
) {
1789 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0),
1790 "server", getpid()+i
, 0);
1791 snprintf(addr
, sizeof(addr
), "v2:127.0.0.1:%d",
1793 bind_addr
.parse(addr
);
1794 msgr
->bind(bind_addr
);
1795 msgr
->add_dispatcher_head(&dispatcher
);
1796 msgr
->set_auth_client(&dummy_auth
);
1797 msgr
->set_auth_server(&dummy_auth
);
1800 msgr
->set_default_policy(srv_policy
);
1801 available_servers
.insert(msgr
);
1805 for (int i
= 0; i
< clients
; ++i
) {
1806 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(-1),
1807 "client", getpid()+i
+servers
, 0);
1808 if (cli_policy
.standby
) {
1809 snprintf(addr
, sizeof(addr
), "v2:127.0.0.1:%d",
1810 base_port
+i
+servers
);
1811 bind_addr
.parse(addr
);
1812 msgr
->bind(bind_addr
);
1814 msgr
->add_dispatcher_head(&dispatcher
);
1815 msgr
->set_auth_client(&dummy_auth
);
1816 msgr
->set_auth_server(&dummy_auth
);
1819 msgr
->set_default_policy(cli_policy
);
1820 available_clients
.insert(msgr
);
1824 for (int i
= 0; i
< random_num
; i
++) {
1826 boost::uniform_int
<> u(32, max_message_len
);
1827 uint64_t value_len
= u(rng
);
1828 bufferptr
bp(value_len
);
1830 for (uint64_t j
= 0; j
< value_len
-sizeof(i
); ) {
1831 memcpy(bp
.c_str()+j
, &i
, sizeof(i
));
1836 rand_data
.push_back(bl
);
1840 ConnectionRef
_get_random_connection() {
1841 while (dispatcher
.get_pending() > max_in_flight
) {
1846 ceph_assert(ceph_mutex_is_locked(lock
));
1847 boost::uniform_int
<> choose(0, available_connections
.size() - 1);
1848 int index
= choose(rng
);
1849 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> >::iterator i
= available_connections
.begin();
1850 for (; index
> 0; --index
, ++i
) ;
1854 bool can_create_connection() {
1855 return available_connections
.size() < max_connections
;
1858 void generate_connection() {
1859 std::lock_guard l
{lock
};
1860 if (!can_create_connection())
1863 Messenger
*server
, *client
;
1865 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1866 int index
= choose(rng
);
1867 set
<Messenger
*>::iterator i
= available_servers
.begin();
1868 for (; index
> 0; --index
, ++i
) ;
1872 boost::uniform_int
<> choose(0, available_clients
.size() - 1);
1873 int index
= choose(rng
);
1874 set
<Messenger
*>::iterator i
= available_clients
.begin();
1875 for (; index
> 0; --index
, ++i
) ;
1879 pair
<Messenger
*, Messenger
*> p
;
1881 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1882 if (server
->get_default_policy().server
) {
1883 p
= make_pair(client
, server
);
1884 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1885 server
->get_myaddrs());
1886 available_connections
[conn
] = p
;
1888 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1889 server
->get_myaddrs());
1890 p
= make_pair(client
, server
);
1891 available_connections
[conn
] = p
;
1896 void send_message() {
1897 std::lock_guard l
{lock
};
1898 ConnectionRef conn
= _get_random_connection();
1899 boost::uniform_int
<> true_false(0, 99);
1900 int val
= true_false(rng
);
1903 uuid
.generate_random();
1904 MCommand
*m
= new MCommand(uuid
);
1905 vector
<string
> cmds
;
1906 cmds
.push_back("command");
1908 m
->set_priority(200);
1909 conn
->send_message(m
);
1911 boost::uniform_int
<> u(0, rand_data
.size()-1);
1912 dispatcher
.send_message_wrap(conn
, rand_data
[u(rng
)]);
1916 void drop_connection() {
1917 std::lock_guard l
{lock
};
1918 if (available_connections
.size() < 10)
1920 ConnectionRef conn
= _get_random_connection();
1921 dispatcher
.clear_pending(conn
);
1923 if (!client_policy
.server
&&
1924 !client_policy
.lossy
&&
1925 client_policy
.standby
) {
1926 // it's a lossless policy, so we need to mark down each side
1927 pair
<Messenger
*, Messenger
*> &p
= available_connections
[conn
];
1928 if (!p
.first
->get_default_policy().server
&& !p
.second
->get_default_policy().server
) {
1929 ASSERT_EQ(conn
->get_messenger(), p
.first
);
1930 ConnectionRef peer
= p
.second
->connect_to(p
.first
->get_mytype(),
1931 p
.first
->get_myaddrs());
1933 dispatcher
.clear_pending(peer
);
1934 available_connections
.erase(peer
);
1937 ASSERT_EQ(available_connections
.erase(conn
), 1U);
1940 void print_internal_state(bool detail
=false) {
1941 std::lock_guard l
{lock
};
1942 lderr(g_ceph_context
) << "available_connections: " << available_connections
.size()
1943 << " inflight messages: " << dispatcher
.get_pending() << dendl
;
1944 if (detail
&& !available_connections
.empty()) {
1949 void wait_for_done() {
1950 int64_t tick_us
= 1000 * 100; // 100ms
1951 int64_t timeout_us
= 5 * 60 * 1000 * 1000; // 5 mins
1953 while (dispatcher
.get_pending()) {
1955 timeout_us
-= tick_us
;
1957 print_internal_state(true);
1959 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
1961 for (set
<Messenger
*>::iterator it
= available_servers
.begin();
1962 it
!= available_servers
.end(); ++it
) {
1965 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1968 available_servers
.clear();
1970 for (set
<Messenger
*>::iterator it
= available_clients
.begin();
1971 it
!= available_clients
.end(); ++it
) {
1974 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1977 available_clients
.clear();
1980 void handle_reset(Connection
*con
) {
1981 std::lock_guard l
{lock
};
1982 available_connections
.erase(con
);
1983 dispatcher
.clear_pending(con
);
1987 bool SyntheticDispatcher::ms_handle_reset(Connection
*con
) {
1988 workload
->handle_reset(con
);
1992 TEST_P(MessengerTest
, SyntheticStressTest
) {
1993 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1994 Messenger::Policy::stateful_server(0),
1995 Messenger::Policy::lossless_client(0));
1996 for (int i
= 0; i
< 100; ++i
) {
1997 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1998 test_msg
.generate_connection();
2000 gen_type
rng(time(NULL
));
2001 for (int i
= 0; i
< 5000; ++i
) {
2003 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2004 test_msg
.print_internal_state();
2006 boost::uniform_int
<> true_false(0, 99);
2007 int val
= true_false(rng
);
2009 test_msg
.generate_connection();
2010 } else if (val
> 80) {
2011 test_msg
.drop_connection();
2012 } else if (val
> 10) {
2013 test_msg
.send_message();
2015 usleep(rand() % 1000 + 500);
2018 test_msg
.wait_for_done();
2021 TEST_P(MessengerTest
, SyntheticStressTest1
) {
2022 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
2023 Messenger::Policy::lossless_peer_reuse(0),
2024 Messenger::Policy::lossless_peer_reuse(0));
2025 for (int i
= 0; i
< 10; ++i
) {
2026 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2027 test_msg
.generate_connection();
2029 gen_type
rng(time(NULL
));
2030 for (int i
= 0; i
< 10000; ++i
) {
2032 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2033 test_msg
.print_internal_state();
2035 boost::uniform_int
<> true_false(0, 99);
2036 int val
= true_false(rng
);
2038 test_msg
.generate_connection();
2039 } else if (val
> 60) {
2040 test_msg
.drop_connection();
2041 } else if (val
> 10) {
2042 test_msg
.send_message();
2044 usleep(rand() % 1000 + 500);
2047 test_msg
.wait_for_done();
2051 TEST_P(MessengerTest
, SyntheticInjectTest
) {
2052 uint64_t dispatch_throttle_bytes
= g_ceph_context
->_conf
->ms_dispatch_throttle_bytes
;
2053 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2054 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2055 g_ceph_context
->_conf
.set_val("ms_dispatch_throttle_bytes", "16777216");
2056 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
2057 Messenger::Policy::stateful_server(0),
2058 Messenger::Policy::lossless_client(0));
2059 for (int i
= 0; i
< 100; ++i
) {
2060 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2061 test_msg
.generate_connection();
2063 gen_type
rng(time(NULL
));
2064 for (int i
= 0; i
< 1000; ++i
) {
2066 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2067 test_msg
.print_internal_state();
2069 boost::uniform_int
<> true_false(0, 99);
2070 int val
= true_false(rng
);
2072 test_msg
.generate_connection();
2073 } else if (val
> 80) {
2074 test_msg
.drop_connection();
2075 } else if (val
> 10) {
2076 test_msg
.send_message();
2078 usleep(rand() % 500 + 100);
2081 test_msg
.wait_for_done();
2082 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2083 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2084 g_ceph_context
->_conf
.set_val(
2085 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes
));
2088 TEST_P(MessengerTest
, SyntheticInjectTest2
) {
2089 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2090 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2091 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
2092 Messenger::Policy::lossless_peer_reuse(0),
2093 Messenger::Policy::lossless_peer_reuse(0));
2094 for (int i
= 0; i
< 100; ++i
) {
2095 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2096 test_msg
.generate_connection();
2098 gen_type
rng(time(NULL
));
2099 for (int i
= 0; i
< 1000; ++i
) {
2101 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2102 test_msg
.print_internal_state();
2104 boost::uniform_int
<> true_false(0, 99);
2105 int val
= true_false(rng
);
2107 test_msg
.generate_connection();
2108 } else if (val
> 80) {
2109 test_msg
.drop_connection();
2110 } else if (val
> 10) {
2111 test_msg
.send_message();
2113 usleep(rand() % 500 + 100);
2116 test_msg
.wait_for_done();
2117 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2118 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2121 TEST_P(MessengerTest
, SyntheticInjectTest3
) {
2122 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "600");
2123 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2124 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
2125 Messenger::Policy::stateless_server(0),
2126 Messenger::Policy::lossy_client(0));
2127 for (int i
= 0; i
< 100; ++i
) {
2128 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2129 test_msg
.generate_connection();
2131 gen_type
rng(time(NULL
));
2132 for (int i
= 0; i
< 1000; ++i
) {
2134 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2135 test_msg
.print_internal_state();
2137 boost::uniform_int
<> true_false(0, 99);
2138 int val
= true_false(rng
);
2140 test_msg
.generate_connection();
2141 } else if (val
> 80) {
2142 test_msg
.drop_connection();
2143 } else if (val
> 10) {
2144 test_msg
.send_message();
2146 usleep(rand() % 500 + 100);
2149 test_msg
.wait_for_done();
2150 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2151 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2155 TEST_P(MessengerTest
, SyntheticInjectTest4
) {
2156 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2157 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2158 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "1");
2159 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "client osd");
2160 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "5");
2161 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
2162 Messenger::Policy::lossless_peer(0),
2163 Messenger::Policy::lossless_peer(0));
2164 for (int i
= 0; i
< 100; ++i
) {
2165 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2166 test_msg
.generate_connection();
2168 gen_type
rng(time(NULL
));
2169 for (int i
= 0; i
< 1000; ++i
) {
2171 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2172 test_msg
.print_internal_state();
2174 boost::uniform_int
<> true_false(0, 99);
2175 int val
= true_false(rng
);
2177 test_msg
.generate_connection();
2178 } else if (val
> 80) {
2179 // test_msg.drop_connection();
2180 } else if (val
> 10) {
2181 test_msg
.send_message();
2183 usleep(rand() % 500 + 100);
2186 test_msg
.wait_for_done();
2187 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2188 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2189 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "0");
2190 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "");
2191 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "0");
2195 class MarkdownDispatcher
: public Dispatcher
{
2196 ceph::mutex lock
= ceph::make_mutex("MarkdownDispatcher::lock");
2197 set
<ConnectionRef
> conns
;
2200 std::atomic
<uint64_t> count
= { 0 };
2201 explicit MarkdownDispatcher(bool s
): Dispatcher(g_ceph_context
),
2204 bool ms_can_fast_dispatch_any() const override
{ return false; }
2205 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2206 switch (m
->get_type()) {
2214 void ms_handle_fast_connect(Connection
*con
) override
{
2215 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2216 std::lock_guard l
{lock
};
2219 void ms_handle_fast_accept(Connection
*con
) override
{
2220 std::lock_guard l
{lock
};
2223 bool ms_dispatch(Message
*m
) override
{
2224 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << dendl
;
2225 std::lock_guard l
{lock
};
2227 conns
.insert(m
->get_connection());
2228 if (conns
.size() < 2 && !last_mark
) {
2234 usleep(rand() % 500);
2235 for (set
<ConnectionRef
>::iterator it
= conns
.begin(); it
!= conns
.end(); ++it
) {
2236 if ((*it
) != m
->get_connection().get()) {
2247 bool ms_handle_reset(Connection
*con
) override
{
2248 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2249 std::lock_guard l
{lock
};
2251 usleep(rand() % 500);
2254 void ms_handle_remote_reset(Connection
*con
) override
{
2255 std::lock_guard l
{lock
};
2257 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2259 bool ms_handle_refused(Connection
*con
) override
{
2262 void ms_fast_dispatch(Message
*m
) override
{
2265 int ms_handle_authentication(Connection
*con
) override
{
2271 // Markdown with external lock
2272 TEST_P(MessengerTest
, MarkdownTest
) {
2273 Messenger
*server_msgr2
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
2274 MarkdownDispatcher
cli_dispatcher(false), srv_dispatcher(true);
2275 DummyAuthClientServer
dummy_auth(g_ceph_context
);
2276 dummy_auth
.auth_registry
.refresh_config();
2277 entity_addr_t bind_addr
;
2278 bind_addr
.parse("v2:127.0.0.1:16800");
2279 server_msgr
->bind(bind_addr
);
2280 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
2281 server_msgr
->set_auth_client(&dummy_auth
);
2282 server_msgr
->set_auth_server(&dummy_auth
);
2283 server_msgr
->start();
2284 bind_addr
.parse("v2:127.0.0.1:16801");
2285 server_msgr2
->bind(bind_addr
);
2286 server_msgr2
->add_dispatcher_head(&srv_dispatcher
);
2287 server_msgr2
->set_auth_client(&dummy_auth
);
2288 server_msgr2
->set_auth_server(&dummy_auth
);
2289 server_msgr2
->start();
2291 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
2292 client_msgr
->set_auth_client(&dummy_auth
);
2293 client_msgr
->set_auth_server(&dummy_auth
);
2294 client_msgr
->start();
2299 uint64_t equal_count
= 0;
2301 ConnectionRef conn1
= client_msgr
->connect_to(server_msgr
->get_mytype(),
2302 server_msgr
->get_myaddrs());
2303 ConnectionRef conn2
= client_msgr
->connect_to(server_msgr2
->get_mytype(),
2304 server_msgr2
->get_myaddrs());
2305 MPing
*m
= new MPing();
2306 ASSERT_EQ(conn1
->send_message(m
), 0);
2308 ASSERT_EQ(conn2
->send_message(m
), 0);
2309 CHECK_AND_WAIT_TRUE(srv_dispatcher
.count
> last
+ 1);
2310 if (srv_dispatcher
.count
== last
) {
2311 lderr(g_ceph_context
) << __func__
<< " last is " << last
<< dendl
;
2318 last
= srv_dispatcher
.count
;
2321 ASSERT_FALSE(equal
&& equal_count
> 3);
2323 server_msgr
->shutdown();
2324 client_msgr
->shutdown();
2325 server_msgr2
->shutdown();
2326 server_msgr
->wait();
2327 client_msgr
->wait();
2328 server_msgr2
->wait();
2329 delete server_msgr2
;
2332 INSTANTIATE_TEST_SUITE_P(
2340 int main(int argc
, char **argv
) {
2341 vector
<const char*> args
;
2342 argv_to_vec(argc
, (const char **)argv
, args
);
2344 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
2345 CODE_ENVIRONMENT_UTILITY
,
2346 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
2347 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
2348 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
2349 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
2350 g_ceph_context
->_conf
.set_val("keyring", "/dev/null");
2351 g_ceph_context
->_conf
.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2352 g_ceph_context
->_conf
.set_val("ms_die_on_bad_msg", "true");
2353 g_ceph_context
->_conf
.set_val("ms_die_on_old_message", "true");
2354 g_ceph_context
->_conf
.set_val("ms_max_backoff", "1");
2355 common_init_finish(g_ceph_context
);
2357 ::testing::InitGoogleTest(&argc
, argv
);
2358 return RUN_ALL_TESTS();
2363 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"