1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
24 #include "common/Mutex.h"
25 #include "common/Cond.h"
26 #include "common/ceph_argparse.h"
27 #include "global/global_init.h"
28 #include "msg/Dispatcher.h"
29 #include "msg/msg_types.h"
30 #include "msg/Message.h"
31 #include "msg/Messenger.h"
32 #include "msg/Connection.h"
33 #include "messages/MPing.h"
34 #include "messages/MCommand.h"
36 #include <boost/random/mersenne_twister.hpp>
37 #include <boost/random/uniform_int.hpp>
38 #include <boost/random/binomial_distribution.hpp>
39 #include <gtest/gtest.h>
41 typedef boost::mt11213b gen_type
;
43 #include "common/dout.h"
44 #include "include/ceph_assert.h"
46 #include "auth/DummyAuth.h"
48 #define dout_subsys ceph_subsys_ms
50 #define dout_prefix *_dout << " ceph_test_msgr "
53 #if GTEST_HAS_PARAM_TEST
55 #define CHECK_AND_WAIT_TRUE(expr) do { \
64 class MessengerTest
: public ::testing::TestWithParam
<const char*> {
66 DummyAuthClientServer dummy_auth
;
67 Messenger
*server_msgr
;
68 Messenger
*client_msgr
;
70 MessengerTest() : dummy_auth(g_ceph_context
),
71 server_msgr(NULL
), client_msgr(NULL
) {
72 dummy_auth
.auth_registry
.refresh_config();
74 void SetUp() override
{
75 lderr(g_ceph_context
) << __func__
<< " start set up " << GetParam() << dendl
;
76 server_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
77 client_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
78 server_msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
79 client_msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
80 server_msgr
->set_auth_client(&dummy_auth
);
81 server_msgr
->set_auth_server(&dummy_auth
);
82 client_msgr
->set_auth_client(&dummy_auth
);
83 client_msgr
->set_auth_server(&dummy_auth
);
85 void TearDown() override
{
86 ASSERT_EQ(server_msgr
->get_dispatch_queue_len(), 0);
87 ASSERT_EQ(client_msgr
->get_dispatch_queue_len(), 0);
95 class FakeDispatcher
: public Dispatcher
{
97 struct Session
: public RefCountedObject
{
98 atomic
<uint64_t> count
;
101 explicit Session(ConnectionRef c
): RefCountedObject(g_ceph_context
), count(0), con(c
) {
103 uint64_t get_count() { return count
; }
110 bool got_remote_reset
;
113 entity_addrvec_t last_accept
;
115 explicit FakeDispatcher(bool s
): Dispatcher(g_ceph_context
), lock("FakeDispatcher::lock"),
116 is_server(s
), got_new(false), got_remote_reset(false),
117 got_connect(false), loopback(false) {
118 // don't need authorizers
119 ms_set_require_authorizer(false);
121 bool ms_can_fast_dispatch_any() const override
{ return true; }
122 bool ms_can_fast_dispatch(const Message
*m
) const override
{
123 switch (m
->get_type()) {
131 void ms_handle_fast_connect(Connection
*con
) override
{
133 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
134 auto s
= con
->get_priv();
136 auto session
= new Session(con
);
137 con
->set_priv(RefCountedPtr
{session
, false});
138 lderr(g_ceph_context
) << __func__
<< " con: " << con
139 << " count: " << session
->count
<< dendl
;
145 void ms_handle_fast_accept(Connection
*con
) override
{
146 last_accept
= con
->get_peer_addrs();
147 if (!con
->get_priv()) {
148 con
->set_priv(RefCountedPtr
{new Session(con
), false});
151 bool ms_dispatch(Message
*m
) override
{
152 auto priv
= m
->get_connection()->get_priv();
153 auto s
= static_cast<Session
*>(priv
.get());
155 s
= new Session(m
->get_connection());
156 priv
.reset(s
, false);
157 m
->get_connection()->set_priv(priv
);
160 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
164 Mutex::Locker
l(lock
);
170 bool ms_handle_reset(Connection
*con
) override
{
171 Mutex::Locker
l(lock
);
172 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
173 auto priv
= con
->get_priv();
174 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
175 s
->con
.reset(); // break con <-> session ref cycle
176 con
->set_priv(nullptr); // break ref <-> session cycle, if any
180 void ms_handle_remote_reset(Connection
*con
) override
{
181 Mutex::Locker
l(lock
);
182 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
183 auto priv
= con
->get_priv();
184 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
185 s
->con
.reset(); // break con <-> session ref cycle
186 con
->set_priv(nullptr); // break ref <-> session cycle, if any
188 got_remote_reset
= true;
191 bool ms_handle_refused(Connection
*con
) override
{
194 void ms_fast_dispatch(Message
*m
) override
{
195 auto priv
= m
->get_connection()->get_priv();
196 auto s
= static_cast<Session
*>(priv
.get());
198 s
= new Session(m
->get_connection());
199 priv
.reset(s
, false);
200 m
->get_connection()->set_priv(priv
);
203 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
206 ceph_assert(m
->get_source().is_osd());
209 } else if (loopback
) {
210 ceph_assert(m
->get_source().is_client());
213 Mutex::Locker
l(lock
);
218 int ms_handle_authentication(Connection
*con
) override
{
222 void reply_message(Message
*m
) {
223 MPing
*rm
= new MPing();
224 m
->get_connection()->send_message(rm
);
228 typedef FakeDispatcher::Session Session
;
230 struct TestInterceptor
: public Interceptor
{
232 bool step_waiting
= false;
234 std::map
<Connection
*, uint32_t> current_step
;
235 std::map
<Connection
*, std::list
<uint32_t>> step_history
;
236 std::map
<uint32_t, std::optional
<ACTION
>> decisions
;
237 std::set
<uint32_t> breakpoints
;
239 uint32_t count_step(Connection
*conn
, uint32_t step
) {
241 for (auto s
: step_history
[conn
]) {
249 void breakpoint(uint32_t step
) {
250 breakpoints
.insert(step
);
253 void remove_bp(uint32_t step
) {
254 breakpoints
.erase(step
);
257 Connection
*wait(uint32_t step
, Connection
*conn
=nullptr) {
258 std::unique_lock
<std::mutex
> l(lock
);
261 auto it
= current_step
.find(conn
);
262 if (it
!= current_step
.end()) {
263 if (it
->second
== step
) {
268 for (auto it
: current_step
) {
269 if (it
.second
== step
) {
281 step_waiting
= false;
285 ACTION
wait_for_decision(uint32_t step
, std::unique_lock
<std::mutex
> &l
) {
286 if (decisions
[step
]) {
287 return *(decisions
[step
]);
293 return *(decisions
[step
]);
296 void proceed(uint32_t step
, ACTION decision
) {
297 std::unique_lock
<std::mutex
> l(lock
);
298 decisions
[step
] = decision
;
301 cond_var
.notify_one();
305 ACTION
intercept(Connection
*conn
, uint32_t step
) override
{
306 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
307 << ") intercept called on step=" << step
<< dendl
;
310 std::unique_lock
<std::mutex
> l(lock
);
311 step_history
[conn
].push_back(step
);
312 current_step
[conn
] = step
;
314 cond_var
.notify_one();
318 std::unique_lock
<std::mutex
> l(lock
);
319 ACTION decision
= ACTION::CONTINUE
;
320 if (breakpoints
.find(step
) != breakpoints
.end()) {
321 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
322 << ") pausing on step=" << step
<< dendl
;
323 decision
= wait_for_decision(step
, l
);
325 if (decisions
[step
]) {
326 decision
= *(decisions
[step
]);
329 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
330 << ") resuming step=" << step
<< " with decision="
331 << decision
<< dendl
;
332 decisions
[step
].reset();
339 * Scenario: A connects to B, and B connects to A at the same time.
341 TEST_P(MessengerTest
, ConnectionRaceTest
) {
342 if (string(GetParam()) == "simple") {
346 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
348 TestInterceptor
*cli_interceptor
= new TestInterceptor();
349 TestInterceptor
*srv_interceptor
= new TestInterceptor();
351 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer_reuse(0));
352 server_msgr
->interceptor
= srv_interceptor
;
354 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer_reuse(0));
355 client_msgr
->interceptor
= cli_interceptor
;
357 entity_addr_t bind_addr
;
358 bind_addr
.parse("v2:127.0.0.1:3300");
359 server_msgr
->bind(bind_addr
);
360 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
361 server_msgr
->start();
363 bind_addr
.parse("v2:127.0.0.1:3301");
364 client_msgr
->bind(bind_addr
);
365 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
366 client_msgr
->start();
368 // pause before sending client_ident message
369 cli_interceptor
->breakpoint(11);
370 // pause before sending client_ident message
371 srv_interceptor
->breakpoint(11);
373 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
374 server_msgr
->get_myaddrs());
375 MPing
*m1
= new MPing();
376 ASSERT_EQ(c2s
->send_message(m1
), 0);
378 ConnectionRef s2c
= server_msgr
->connect_to(client_msgr
->get_mytype(),
379 client_msgr
->get_myaddrs());
380 MPing
*m2
= new MPing();
381 ASSERT_EQ(s2c
->send_message(m2
), 0);
383 cli_interceptor
->wait(11, c2s
.get());
384 srv_interceptor
->wait(11, s2c
.get());
386 // at this point both connections (A->B, B->A) are paused just before sending
387 // the client_ident message.
389 cli_interceptor
->remove_bp(11);
390 srv_interceptor
->remove_bp(11);
392 cli_interceptor
->proceed(11, Interceptor::ACTION::CONTINUE
);
393 srv_interceptor
->proceed(11, Interceptor::ACTION::CONTINUE
);
396 Mutex::Locker
l(cli_dispatcher
.lock
);
397 while (!cli_dispatcher
.got_new
)
398 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
399 cli_dispatcher
.got_new
= false;
403 Mutex::Locker
l(srv_dispatcher
.lock
);
404 while (!srv_dispatcher
.got_new
)
405 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
406 srv_dispatcher
.got_new
= false;
409 ASSERT_TRUE(s2c
->is_connected());
410 ASSERT_EQ(1u, static_cast<Session
*>(s2c
->get_priv().get())->get_count());
411 ASSERT_TRUE(s2c
->peer_is_client());
413 ASSERT_TRUE(c2s
->is_connected());
414 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
415 ASSERT_TRUE(c2s
->peer_is_osd());
417 client_msgr
->shutdown();
419 server_msgr
->shutdown();
422 delete cli_interceptor
;
423 delete srv_interceptor
;
429 * - A sends client_ident to B
430 * - B fails before sending server_ident to A
433 TEST_P(MessengerTest
, MissingServerIdenTest
) {
434 if (string(GetParam()) == "simple") {
438 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
440 TestInterceptor
*cli_interceptor
= new TestInterceptor();
441 TestInterceptor
*srv_interceptor
= new TestInterceptor();
443 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
444 server_msgr
->interceptor
= srv_interceptor
;
446 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossy_client(0));
447 client_msgr
->interceptor
= cli_interceptor
;
449 entity_addr_t bind_addr
;
450 bind_addr
.parse("v2:127.0.0.1:3300");
451 server_msgr
->bind(bind_addr
);
452 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
453 server_msgr
->start();
455 bind_addr
.parse("v2:127.0.0.1:3301");
456 client_msgr
->bind(bind_addr
);
457 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
458 client_msgr
->start();
460 // pause before sending client_ident message
461 srv_interceptor
->breakpoint(12);
463 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
464 server_msgr
->get_myaddrs());
465 MPing
*m1
= new MPing();
466 ASSERT_EQ(c2s
->send_message(m1
), 0);
468 Connection
*c2s_accepter
= srv_interceptor
->wait(12);
469 srv_interceptor
->remove_bp(12);
471 // We inject a message from this side of the connection to force it to be
472 // in standby when we inject the failure below
473 MPing
*m2
= new MPing();
474 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
476 srv_interceptor
->proceed(12, Interceptor::ACTION::FAIL
);
479 Mutex::Locker
l(srv_dispatcher
.lock
);
480 while (!srv_dispatcher
.got_new
)
481 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
482 srv_dispatcher
.got_new
= false;
486 Mutex::Locker
l(cli_dispatcher
.lock
);
487 while (!cli_dispatcher
.got_new
)
488 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
489 cli_dispatcher
.got_new
= false;
492 ASSERT_TRUE(c2s
->is_connected());
493 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
494 ASSERT_TRUE(c2s
->peer_is_osd());
496 ASSERT_TRUE(c2s_accepter
->is_connected());
497 ASSERT_EQ(1u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
498 ASSERT_TRUE(c2s_accepter
->peer_is_client());
500 client_msgr
->shutdown();
502 server_msgr
->shutdown();
505 delete cli_interceptor
;
506 delete srv_interceptor
;
512 * - A sends client_ident to B
513 * - B fails before sending server_ident to A
514 * - A goes to standby
515 * - B reconnects to A
517 TEST_P(MessengerTest
, MissingServerIdenTest2
) {
518 if (string(GetParam()) == "simple") {
522 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
524 TestInterceptor
*cli_interceptor
= new TestInterceptor();
525 TestInterceptor
*srv_interceptor
= new TestInterceptor();
527 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
528 server_msgr
->interceptor
= srv_interceptor
;
530 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
531 client_msgr
->interceptor
= cli_interceptor
;
533 entity_addr_t bind_addr
;
534 bind_addr
.parse("v2:127.0.0.1:3300");
535 server_msgr
->bind(bind_addr
);
536 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
537 server_msgr
->start();
539 bind_addr
.parse("v2:127.0.0.1:3301");
540 client_msgr
->bind(bind_addr
);
541 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
542 client_msgr
->start();
544 // pause before sending client_ident message
545 srv_interceptor
->breakpoint(12);
547 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
548 server_msgr
->get_myaddrs());
550 Connection
*c2s_accepter
= srv_interceptor
->wait(12);
551 srv_interceptor
->remove_bp(12);
553 // We inject a message from this side of the connection to force it to be
554 // in standby when we inject the failure below
555 MPing
*m2
= new MPing();
556 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
558 srv_interceptor
->proceed(12, Interceptor::ACTION::FAIL
);
561 Mutex::Locker
l(cli_dispatcher
.lock
);
562 while (!cli_dispatcher
.got_new
)
563 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
564 cli_dispatcher
.got_new
= false;
567 ASSERT_TRUE(c2s
->is_connected());
568 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
569 ASSERT_TRUE(c2s
->peer_is_osd());
571 ASSERT_TRUE(c2s_accepter
->is_connected());
572 ASSERT_EQ(0u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
573 ASSERT_TRUE(c2s_accepter
->peer_is_client());
575 client_msgr
->shutdown();
577 server_msgr
->shutdown();
580 delete cli_interceptor
;
581 delete srv_interceptor
;
587 * - A and B exchange messages
589 * - B goes into standby
592 TEST_P(MessengerTest
, ReconnectTest
) {
593 if (string(GetParam()) == "simple") {
597 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
599 TestInterceptor
*cli_interceptor
= new TestInterceptor();
600 TestInterceptor
*srv_interceptor
= new TestInterceptor();
602 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
603 server_msgr
->interceptor
= srv_interceptor
;
605 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
606 client_msgr
->interceptor
= cli_interceptor
;
608 entity_addr_t bind_addr
;
609 bind_addr
.parse("v2:127.0.0.1:3300");
610 server_msgr
->bind(bind_addr
);
611 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
612 server_msgr
->start();
614 bind_addr
.parse("v2:127.0.0.1:3301");
615 client_msgr
->bind(bind_addr
);
616 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
617 client_msgr
->start();
619 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
620 server_msgr
->get_myaddrs());
622 MPing
*m1
= new MPing();
623 ASSERT_EQ(c2s
->send_message(m1
), 0);
626 Mutex::Locker
l(cli_dispatcher
.lock
);
627 while (!cli_dispatcher
.got_new
)
628 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
629 cli_dispatcher
.got_new
= false;
632 ASSERT_TRUE(c2s
->is_connected());
633 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
634 ASSERT_TRUE(c2s
->peer_is_osd());
636 cli_interceptor
->breakpoint(16);
638 MPing
*m2
= new MPing();
639 ASSERT_EQ(c2s
->send_message(m2
), 0);
641 cli_interceptor
->wait(16, c2s
.get());
642 cli_interceptor
->remove_bp(16);
644 // at this point client and server are connected together
646 srv_interceptor
->breakpoint(15);
649 cli_interceptor
->proceed(16, Interceptor::ACTION::FAIL
);
651 MPing
*m3
= new MPing();
652 ASSERT_EQ(c2s
->send_message(m3
), 0);
654 Connection
*c2s_accepter
= srv_interceptor
->wait(15);
655 // the srv end of theconnection is now paused at ready
656 // this means that the reconnect was successful
657 srv_interceptor
->remove_bp(15);
659 ASSERT_TRUE(c2s_accepter
->peer_is_client());
660 // c2s_accepter sent 0 reconnect messages
661 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 13), 0u);
662 // c2s_accepter sent 1 reconnect_ok messages
663 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 14), 1u);
664 // c2s sent 1 reconnect messages
665 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 13), 1u);
666 // c2s sent 0 reconnect_ok messages
667 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 14), 0u);
669 srv_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
672 Mutex::Locker
l(cli_dispatcher
.lock
);
673 while (!cli_dispatcher
.got_new
)
674 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
675 cli_dispatcher
.got_new
= false;
678 client_msgr
->shutdown();
680 server_msgr
->shutdown();
683 delete cli_interceptor
;
684 delete srv_interceptor
;
690 * - A and B exchange messages
692 * - A reconnects // B reconnects
694 TEST_P(MessengerTest
, ReconnectRaceTest
) {
695 if (string(GetParam()) == "simple") {
699 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
701 TestInterceptor
*cli_interceptor
= new TestInterceptor();
702 TestInterceptor
*srv_interceptor
= new TestInterceptor();
704 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
705 server_msgr
->interceptor
= srv_interceptor
;
707 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
708 client_msgr
->interceptor
= cli_interceptor
;
710 entity_addr_t bind_addr
;
711 bind_addr
.parse("v2:127.0.0.1:3300");
712 server_msgr
->bind(bind_addr
);
713 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
714 server_msgr
->start();
716 bind_addr
.parse("v2:127.0.0.1:3301");
717 client_msgr
->bind(bind_addr
);
718 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
719 client_msgr
->start();
721 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
722 server_msgr
->get_myaddrs());
724 MPing
*m1
= new MPing();
725 ASSERT_EQ(c2s
->send_message(m1
), 0);
728 Mutex::Locker
l(cli_dispatcher
.lock
);
729 while (!cli_dispatcher
.got_new
)
730 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
731 cli_dispatcher
.got_new
= false;
734 ASSERT_TRUE(c2s
->is_connected());
735 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
736 ASSERT_TRUE(c2s
->peer_is_osd());
738 cli_interceptor
->breakpoint(16);
740 MPing
*m2
= new MPing();
741 ASSERT_EQ(c2s
->send_message(m2
), 0);
743 cli_interceptor
->wait(16, c2s
.get());
744 cli_interceptor
->remove_bp(16);
746 // at this point client and server are connected together
748 // force both client and server to race on reconnect
749 cli_interceptor
->breakpoint(13);
750 srv_interceptor
->breakpoint(13);
753 // this will cause both client and server to reconnect at the same time
754 cli_interceptor
->proceed(16, Interceptor::ACTION::FAIL
);
756 MPing
*m3
= new MPing();
757 ASSERT_EQ(c2s
->send_message(m3
), 0);
759 cli_interceptor
->wait(13, c2s
.get());
760 srv_interceptor
->wait(13);
762 cli_interceptor
->remove_bp(13);
763 srv_interceptor
->remove_bp(13);
766 srv_interceptor
->breakpoint(15);
768 cli_interceptor
->proceed(13, Interceptor::ACTION::CONTINUE
);
769 srv_interceptor
->proceed(13, Interceptor::ACTION::CONTINUE
);
771 Connection
*c2s_accepter
= srv_interceptor
->wait(15);
773 // the server has reconnected and is "ready"
774 srv_interceptor
->remove_bp(15);
776 ASSERT_TRUE(c2s_accepter
->peer_is_client());
777 ASSERT_TRUE(c2s
->peer_is_osd());
779 // the server should win the reconnect race
781 // c2s_accepter sent 1 or 2 reconnect messages
782 ASSERT_LT(srv_interceptor
->count_step(c2s_accepter
, 13), 3u);
783 ASSERT_GT(srv_interceptor
->count_step(c2s_accepter
, 13), 0u);
784 // c2s_accepter sent 0 reconnect_ok messages
785 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 14), 0u);
786 // c2s sent 1 reconnect messages
787 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 13), 1u);
788 // c2s sent 1 reconnect_ok messages
789 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 14), 1u);
791 if (srv_interceptor
->count_step(c2s_accepter
, 13) == 2) {
792 // if the server send the reconnect message two times then
793 // the client must have sent a session retry message to the server
794 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 18), 1u);
796 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 18), 0u);
799 srv_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
802 Mutex::Locker
l(cli_dispatcher
.lock
);
803 while (!cli_dispatcher
.got_new
)
804 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
805 cli_dispatcher
.got_new
= false;
808 client_msgr
->shutdown();
810 server_msgr
->shutdown();
813 delete cli_interceptor
;
814 delete srv_interceptor
;
817 TEST_P(MessengerTest
, SimpleTest
) {
818 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
819 entity_addr_t bind_addr
;
820 if (string(GetParam()) == "simple")
821 bind_addr
.parse("v1:127.0.0.1");
823 bind_addr
.parse("v2:127.0.0.1");
824 server_msgr
->bind(bind_addr
);
825 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
826 server_msgr
->start();
828 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
829 client_msgr
->start();
831 // 1. simple round trip
832 MPing
*m
= new MPing();
833 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
834 server_msgr
->get_myaddrs());
836 ASSERT_EQ(conn
->send_message(m
), 0);
837 Mutex::Locker
l(cli_dispatcher
.lock
);
838 while (!cli_dispatcher
.got_new
)
839 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
840 cli_dispatcher
.got_new
= false;
842 ASSERT_TRUE(conn
->is_connected());
843 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
844 ASSERT_TRUE(conn
->peer_is_osd());
846 // 2. test rebind port
847 set
<int> avoid_ports
;
848 for (int i
= 0; i
< 10 ; i
++) {
849 for (auto a
: server_msgr
->get_myaddrs().v
) {
850 avoid_ports
.insert(a
.get_port() + i
);
853 server_msgr
->rebind(avoid_ports
);
854 for (auto a
: server_msgr
->get_myaddrs().v
) {
855 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
858 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
859 server_msgr
->get_myaddrs());
862 ASSERT_EQ(conn
->send_message(m
), 0);
863 Mutex::Locker
l(cli_dispatcher
.lock
);
864 while (!cli_dispatcher
.got_new
)
865 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
866 cli_dispatcher
.got_new
= false;
868 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
870 // 3. test markdown connection
872 ASSERT_FALSE(conn
->is_connected());
874 // 4. test failed connection
875 server_msgr
->shutdown();
879 conn
->send_message(m
);
880 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
881 ASSERT_FALSE(conn
->is_connected());
883 // 5. loopback connection
884 srv_dispatcher
.loopback
= true;
885 conn
= client_msgr
->get_loopback_connection();
888 ASSERT_EQ(conn
->send_message(m
), 0);
889 Mutex::Locker
l(cli_dispatcher
.lock
);
890 while (!cli_dispatcher
.got_new
)
891 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
892 cli_dispatcher
.got_new
= false;
894 srv_dispatcher
.loopback
= false;
895 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
896 client_msgr
->shutdown();
898 server_msgr
->shutdown();
902 TEST_P(MessengerTest
, SimpleMsgr2Test
) {
903 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
904 entity_addr_t legacy_addr
;
905 legacy_addr
.parse("v1:127.0.0.1");
906 entity_addr_t msgr2_addr
;
907 msgr2_addr
.parse("v2:127.0.0.1");
908 entity_addrvec_t bind_addrs
;
909 bind_addrs
.v
.push_back(legacy_addr
);
910 bind_addrs
.v
.push_back(msgr2_addr
);
911 server_msgr
->bindv(bind_addrs
);
912 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
913 server_msgr
->start();
915 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
916 client_msgr
->start();
918 // 1. simple round trip
919 MPing
*m
= new MPing();
920 ConnectionRef conn
= client_msgr
->connect_to(
921 server_msgr
->get_mytype(),
922 server_msgr
->get_myaddrs());
924 ASSERT_EQ(conn
->send_message(m
), 0);
925 Mutex::Locker
l(cli_dispatcher
.lock
);
926 while (!cli_dispatcher
.got_new
)
927 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
928 cli_dispatcher
.got_new
= false;
930 ASSERT_TRUE(conn
->is_connected());
931 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
932 ASSERT_TRUE(conn
->peer_is_osd());
934 // 2. test rebind port
935 set
<int> avoid_ports
;
936 for (int i
= 0; i
< 10 ; i
++) {
937 for (auto a
: server_msgr
->get_myaddrs().v
) {
938 avoid_ports
.insert(a
.get_port() + i
);
941 server_msgr
->rebind(avoid_ports
);
942 for (auto a
: server_msgr
->get_myaddrs().v
) {
943 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
946 conn
= client_msgr
->connect_to(
947 server_msgr
->get_mytype(),
948 server_msgr
->get_myaddrs());
951 ASSERT_EQ(conn
->send_message(m
), 0);
952 Mutex::Locker
l(cli_dispatcher
.lock
);
953 while (!cli_dispatcher
.got_new
)
954 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
955 cli_dispatcher
.got_new
= false;
957 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
959 // 3. test markdown connection
961 ASSERT_FALSE(conn
->is_connected());
963 // 4. test failed connection
964 server_msgr
->shutdown();
968 conn
->send_message(m
);
969 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
970 ASSERT_FALSE(conn
->is_connected());
972 // 5. loopback connection
973 srv_dispatcher
.loopback
= true;
974 conn
= client_msgr
->get_loopback_connection();
977 ASSERT_EQ(conn
->send_message(m
), 0);
978 Mutex::Locker
l(cli_dispatcher
.lock
);
979 while (!cli_dispatcher
.got_new
)
980 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
981 cli_dispatcher
.got_new
= false;
983 srv_dispatcher
.loopback
= false;
984 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
985 client_msgr
->shutdown();
987 server_msgr
->shutdown();
991 TEST_P(MessengerTest
, NameAddrTest
) {
992 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
993 entity_addr_t bind_addr
;
994 if (string(GetParam()) == "simple")
995 bind_addr
.parse("v1:127.0.0.1");
997 bind_addr
.parse("v2:127.0.0.1");
998 server_msgr
->bind(bind_addr
);
999 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1000 server_msgr
->start();
1002 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1003 client_msgr
->start();
1005 MPing
*m
= new MPing();
1006 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1007 server_msgr
->get_myaddrs());
1009 ASSERT_EQ(conn
->send_message(m
), 0);
1010 Mutex::Locker
l(cli_dispatcher
.lock
);
1011 while (!cli_dispatcher
.got_new
)
1012 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1013 cli_dispatcher
.got_new
= false;
1015 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1016 ASSERT_TRUE(conn
->get_peer_addrs() == server_msgr
->get_myaddrs());
1017 ConnectionRef server_conn
= server_msgr
->connect_to(
1018 client_msgr
->get_mytype(), srv_dispatcher
.last_accept
);
1019 // Verify that server_conn is the one we already accepted from client,
1020 // so it means the session counter in server_conn is also incremented.
1021 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1022 server_msgr
->shutdown();
1023 client_msgr
->shutdown();
1024 server_msgr
->wait();
1025 client_msgr
->wait();
1028 TEST_P(MessengerTest
, FeatureTest
) {
1029 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1030 entity_addr_t bind_addr
;
1031 if (string(GetParam()) == "simple")
1032 bind_addr
.parse("v1:127.0.0.1");
1034 bind_addr
.parse("v2:127.0.0.1");
1035 uint64_t all_feature_supported
, feature_required
, feature_supported
= 0;
1036 for (int i
= 0; i
< 10; i
++)
1037 feature_supported
|= 1ULL << i
;
1038 feature_supported
|= CEPH_FEATUREMASK_MSG_ADDR2
;
1039 feature_supported
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
;
1040 feature_required
= feature_supported
| 1ULL << 13;
1041 all_feature_supported
= feature_required
| 1ULL << 14;
1043 Messenger::Policy p
= server_msgr
->get_policy(entity_name_t::TYPE_CLIENT
);
1044 p
.features_required
= feature_required
;
1045 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1046 server_msgr
->bind(bind_addr
);
1047 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1048 server_msgr
->start();
1050 // 1. Suppose if only support less than required
1051 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
1052 p
.features_supported
= feature_supported
;
1053 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1054 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1055 client_msgr
->start();
1057 MPing
*m
= new MPing();
1058 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1059 server_msgr
->get_myaddrs());
1060 conn
->send_message(m
);
1061 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1062 // should failed build a connection
1063 ASSERT_FALSE(conn
->is_connected());
1065 client_msgr
->shutdown();
1066 client_msgr
->wait();
1068 // 2. supported met required
1069 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
1070 p
.features_supported
= all_feature_supported
;
1071 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1072 client_msgr
->start();
1074 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1075 server_msgr
->get_myaddrs());
1078 ASSERT_EQ(conn
->send_message(m
), 0);
1079 Mutex::Locker
l(cli_dispatcher
.lock
);
1080 while (!cli_dispatcher
.got_new
)
1081 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1082 cli_dispatcher
.got_new
= false;
1084 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1086 server_msgr
->shutdown();
1087 client_msgr
->shutdown();
1088 server_msgr
->wait();
1089 client_msgr
->wait();
1092 TEST_P(MessengerTest
, TimeoutTest
) {
1093 g_ceph_context
->_conf
.set_val("ms_tcp_read_timeout", "1");
1094 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1095 entity_addr_t bind_addr
;
1096 if (string(GetParam()) == "simple")
1097 bind_addr
.parse("v1:127.0.0.1");
1099 bind_addr
.parse("v2:127.0.0.1");
1100 server_msgr
->bind(bind_addr
);
1101 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1102 server_msgr
->start();
1104 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1105 client_msgr
->start();
1107 // 1. build the connection
1108 MPing
*m
= new MPing();
1109 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1110 server_msgr
->get_myaddrs());
1112 ASSERT_EQ(conn
->send_message(m
), 0);
1113 Mutex::Locker
l(cli_dispatcher
.lock
);
1114 while (!cli_dispatcher
.got_new
)
1115 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1116 cli_dispatcher
.got_new
= false;
1118 ASSERT_TRUE(conn
->is_connected());
1119 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1120 ASSERT_TRUE(conn
->peer_is_osd());
1124 ASSERT_FALSE(conn
->is_connected());
1126 server_msgr
->shutdown();
1127 server_msgr
->wait();
1129 client_msgr
->shutdown();
1130 client_msgr
->wait();
1131 g_ceph_context
->_conf
.set_val("ms_tcp_read_timeout", "900");
1134 TEST_P(MessengerTest
, StatefulTest
) {
1136 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1137 entity_addr_t bind_addr
;
1138 if (string(GetParam()) == "simple")
1139 bind_addr
.parse("v1:127.0.0.1");
1141 bind_addr
.parse("v2:127.0.0.1");
1142 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1143 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1144 p
= Messenger::Policy::lossless_client(0);
1145 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1147 server_msgr
->bind(bind_addr
);
1148 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1149 server_msgr
->start();
1150 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1151 client_msgr
->start();
1153 // 1. test for server standby
1154 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1155 server_msgr
->get_myaddrs());
1158 ASSERT_EQ(conn
->send_message(m
), 0);
1159 Mutex::Locker
l(cli_dispatcher
.lock
);
1160 while (!cli_dispatcher
.got_new
)
1161 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1162 cli_dispatcher
.got_new
= false;
1164 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1166 ASSERT_FALSE(conn
->is_connected());
1167 ConnectionRef server_conn
= server_msgr
->connect_to(
1168 client_msgr
->get_mytype(), srv_dispatcher
.last_accept
);
1170 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1172 srv_dispatcher
.got_new
= false;
1173 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1174 server_msgr
->get_myaddrs());
1177 ASSERT_EQ(conn
->send_message(m
), 0);
1178 Mutex::Locker
l(cli_dispatcher
.lock
);
1179 while (!cli_dispatcher
.got_new
)
1180 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1181 cli_dispatcher
.got_new
= false;
1183 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1184 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1185 srv_dispatcher
.last_accept
);
1187 Mutex::Locker
l(srv_dispatcher
.lock
);
1188 while (!srv_dispatcher
.got_remote_reset
)
1189 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
1192 // 2. test for client reconnect
1193 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1194 cli_dispatcher
.got_connect
= false;
1195 cli_dispatcher
.got_new
= false;
1196 cli_dispatcher
.got_remote_reset
= false;
1197 server_conn
->mark_down();
1198 ASSERT_FALSE(server_conn
->is_connected());
1199 // ensure client detect server socket closed
1201 Mutex::Locker
l(cli_dispatcher
.lock
);
1202 while (!cli_dispatcher
.got_remote_reset
)
1203 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1204 cli_dispatcher
.got_remote_reset
= false;
1207 Mutex::Locker
l(cli_dispatcher
.lock
);
1208 while (!cli_dispatcher
.got_connect
)
1209 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1210 cli_dispatcher
.got_connect
= false;
1212 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1213 ASSERT_TRUE(conn
->is_connected());
1217 ASSERT_EQ(conn
->send_message(m
), 0);
1218 ASSERT_TRUE(conn
->is_connected());
1219 Mutex::Locker
l(cli_dispatcher
.lock
);
1220 while (!cli_dispatcher
.got_new
)
1221 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1222 cli_dispatcher
.got_new
= false;
1224 // resetcheck happen
1225 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1226 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1227 srv_dispatcher
.last_accept
);
1228 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1229 cli_dispatcher
.got_remote_reset
= false;
1231 server_msgr
->shutdown();
1232 client_msgr
->shutdown();
1233 server_msgr
->wait();
1234 client_msgr
->wait();
1237 TEST_P(MessengerTest
, StatelessTest
) {
1239 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1240 entity_addr_t bind_addr
;
1241 if (string(GetParam()) == "simple")
1242 bind_addr
.parse("v1:127.0.0.1");
1244 bind_addr
.parse("v2:127.0.0.1");
1245 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
1246 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1247 p
= Messenger::Policy::lossy_client(0);
1248 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1250 server_msgr
->bind(bind_addr
);
1251 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1252 server_msgr
->start();
1253 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1254 client_msgr
->start();
1256 // 1. test for server lose state
1257 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1258 server_msgr
->get_myaddrs());
1261 ASSERT_EQ(conn
->send_message(m
), 0);
1262 Mutex::Locker
l(cli_dispatcher
.lock
);
1263 while (!cli_dispatcher
.got_new
)
1264 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1265 cli_dispatcher
.got_new
= false;
1267 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1269 ASSERT_FALSE(conn
->is_connected());
1271 srv_dispatcher
.got_new
= false;
1272 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1273 server_msgr
->get_myaddrs());
1276 ASSERT_EQ(conn
->send_message(m
), 0);
1277 Mutex::Locker
l(cli_dispatcher
.lock
);
1278 while (!cli_dispatcher
.got_new
)
1279 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1280 cli_dispatcher
.got_new
= false;
1282 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1283 ConnectionRef server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1284 srv_dispatcher
.last_accept
);
1285 // server lose state
1287 Mutex::Locker
l(srv_dispatcher
.lock
);
1288 while (!srv_dispatcher
.got_new
)
1289 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
1291 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1293 // 2. test for client lossy
1294 server_conn
->mark_down();
1295 ASSERT_FALSE(server_conn
->is_connected());
1296 conn
->send_keepalive();
1297 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1298 ASSERT_FALSE(conn
->is_connected());
1299 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1300 server_msgr
->get_myaddrs());
1303 ASSERT_EQ(conn
->send_message(m
), 0);
1304 Mutex::Locker
l(cli_dispatcher
.lock
);
1305 while (!cli_dispatcher
.got_new
)
1306 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1307 cli_dispatcher
.got_new
= false;
1309 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1311 server_msgr
->shutdown();
1312 client_msgr
->shutdown();
1313 server_msgr
->wait();
1314 client_msgr
->wait();
1317 TEST_P(MessengerTest
, ClientStandbyTest
) {
1319 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1320 entity_addr_t bind_addr
;
1321 if (string(GetParam()) == "simple")
1322 bind_addr
.parse("v1:127.0.0.1");
1324 bind_addr
.parse("v2:127.0.0.1");
1325 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1326 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1327 p
= Messenger::Policy::lossless_peer(0);
1328 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1330 server_msgr
->bind(bind_addr
);
1331 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1332 server_msgr
->start();
1333 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1334 client_msgr
->start();
1336 // 1. test for client standby, resetcheck
1337 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1338 server_msgr
->get_myaddrs());
1341 ASSERT_EQ(conn
->send_message(m
), 0);
1342 Mutex::Locker
l(cli_dispatcher
.lock
);
1343 while (!cli_dispatcher
.got_new
)
1344 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1345 cli_dispatcher
.got_new
= false;
1347 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1348 ConnectionRef server_conn
= server_msgr
->connect_to(
1349 client_msgr
->get_mytype(),
1350 srv_dispatcher
.last_accept
);
1351 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1352 cli_dispatcher
.got_connect
= false;
1353 server_conn
->mark_down();
1354 ASSERT_FALSE(server_conn
->is_connected());
1355 // client should be standby
1357 // client should be standby, so we use original connection
1359 // Try send message to verify got remote reset callback
1361 ASSERT_EQ(conn
->send_message(m
), 0);
1363 Mutex::Locker
l(cli_dispatcher
.lock
);
1364 while (!cli_dispatcher
.got_remote_reset
)
1365 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1366 cli_dispatcher
.got_remote_reset
= false;
1367 while (!cli_dispatcher
.got_connect
)
1368 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1369 cli_dispatcher
.got_connect
= false;
1371 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1372 ASSERT_TRUE(conn
->is_connected());
1374 ASSERT_EQ(conn
->send_message(m
), 0);
1375 Mutex::Locker
l(cli_dispatcher
.lock
);
1376 while (!cli_dispatcher
.got_new
)
1377 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1378 cli_dispatcher
.got_new
= false;
1380 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1381 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1382 srv_dispatcher
.last_accept
);
1383 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1385 server_msgr
->shutdown();
1386 client_msgr
->shutdown();
1387 server_msgr
->wait();
1388 client_msgr
->wait();
1391 TEST_P(MessengerTest
, AuthTest
) {
1392 g_ceph_context
->_conf
.set_val("auth_cluster_required", "cephx");
1393 g_ceph_context
->_conf
.set_val("auth_service_required", "cephx");
1394 g_ceph_context
->_conf
.set_val("auth_client_required", "cephx");
1395 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1396 entity_addr_t bind_addr
;
1397 if (string(GetParam()) == "simple")
1398 bind_addr
.parse("v1:127.0.0.1");
1400 bind_addr
.parse("v2:127.0.0.1");
1401 server_msgr
->bind(bind_addr
);
1402 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1403 server_msgr
->start();
1405 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1406 client_msgr
->start();
1408 // 1. simple auth round trip
1409 MPing
*m
= new MPing();
1410 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1411 server_msgr
->get_myaddrs());
1413 ASSERT_EQ(conn
->send_message(m
), 0);
1414 Mutex::Locker
l(cli_dispatcher
.lock
);
1415 while (!cli_dispatcher
.got_new
)
1416 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1417 cli_dispatcher
.got_new
= false;
1419 ASSERT_TRUE(conn
->is_connected());
1420 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1423 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
1424 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
1425 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
1427 ASSERT_FALSE(conn
->is_connected());
1428 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1429 server_msgr
->get_myaddrs());
1431 MPing
*m
= new MPing();
1432 ASSERT_EQ(conn
->send_message(m
), 0);
1433 Mutex::Locker
l(cli_dispatcher
.lock
);
1434 while (!cli_dispatcher
.got_new
)
1435 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
1436 cli_dispatcher
.got_new
= false;
1438 ASSERT_TRUE(conn
->is_connected());
1439 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1440 server_msgr
->shutdown();
1441 client_msgr
->shutdown();
1442 server_msgr
->wait();
1443 client_msgr
->wait();
1446 TEST_P(MessengerTest
, MessageTest
) {
1447 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1448 entity_addr_t bind_addr
;
1449 if (string(GetParam()) == "simple")
1450 bind_addr
.parse("v1:127.0.0.1");
1452 bind_addr
.parse("v2:127.0.0.1");
1453 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1454 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1455 p
= Messenger::Policy::lossless_peer(0);
1456 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1458 server_msgr
->bind(bind_addr
);
1459 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1460 server_msgr
->start();
1461 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1462 client_msgr
->start();
1465 // 1. A very large "front"(as well as "payload")
1466 // Because a external message need to invade Messenger::decode_message,
1467 // here we only use existing message class(MCommand)
1468 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1469 server_msgr
->get_myaddrs());
1472 uuid
.generate_random();
1473 vector
<string
> cmds
;
1474 string
s("abcdefghijklmnopqrstuvwxyz");
1475 for (int i
= 0; i
< 1024*30; i
++)
1477 MCommand
*m
= new MCommand(uuid
);
1479 conn
->send_message(m
);
1482 Mutex::Locker
l(cli_dispatcher
.lock
);
1483 while (!cli_dispatcher
.got_new
)
1484 cli_dispatcher
.cond
.WaitInterval(cli_dispatcher
.lock
, t
);
1485 ASSERT_TRUE(cli_dispatcher
.got_new
);
1486 cli_dispatcher
.got_new
= false;
1489 // 2. A very large "data"
1492 string
s("abcdefghijklmnopqrstuvwxyz");
1493 for (int i
= 0; i
< 1024*30; i
++)
1495 MPing
*m
= new MPing();
1497 conn
->send_message(m
);
1500 Mutex::Locker
l(cli_dispatcher
.lock
);
1501 while (!cli_dispatcher
.got_new
)
1502 cli_dispatcher
.cond
.WaitInterval(cli_dispatcher
.lock
, t
);
1503 ASSERT_TRUE(cli_dispatcher
.got_new
);
1504 cli_dispatcher
.got_new
= false;
1506 server_msgr
->shutdown();
1507 client_msgr
->shutdown();
1508 server_msgr
->wait();
1509 client_msgr
->wait();
1513 class SyntheticWorkload
;
1516 enum Who
: uint8_t {
1524 Payload(Who who
, uint64_t seq
, const bufferlist
& data
)
1525 : who(who
), seq(seq
), data(data
)
1527 Payload() = default;
1528 DENC(Payload
, v
, p
) {
1529 DENC_START(1, 1, p
);
1536 WRITE_CLASS_DENC(Payload
)
1538 ostream
& operator<<(ostream
& out
, const Payload
&pl
)
1540 return out
<< "reply=" << pl
.who
<< " i = " << pl
.seq
;
1543 class SyntheticDispatcher
: public Dispatcher
{
1549 bool got_remote_reset
;
1551 map
<ConnectionRef
, list
<uint64_t> > conn_sent
;
1552 map
<uint64_t, bufferlist
> sent
;
1553 atomic
<uint64_t> index
;
1554 SyntheticWorkload
*workload
;
1556 SyntheticDispatcher(bool s
, SyntheticWorkload
*wl
):
1557 Dispatcher(g_ceph_context
), lock("SyntheticDispatcher::lock"), is_server(s
), got_new(false),
1558 got_remote_reset(false), got_connect(false), index(0), workload(wl
) {
1559 // don't need authorizers
1560 ms_set_require_authorizer(false);
1562 bool ms_can_fast_dispatch_any() const override
{ return true; }
1563 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1564 switch (m
->get_type()) {
1573 void ms_handle_fast_connect(Connection
*con
) override
{
1574 Mutex::Locker
l(lock
);
1575 list
<uint64_t> c
= conn_sent
[con
];
1576 for (list
<uint64_t>::iterator it
= c
.begin();
1577 it
!= c
.end(); ++it
)
1579 conn_sent
.erase(con
);
1583 void ms_handle_fast_accept(Connection
*con
) override
{
1584 Mutex::Locker
l(lock
);
1585 list
<uint64_t> c
= conn_sent
[con
];
1586 for (list
<uint64_t>::iterator it
= c
.begin();
1587 it
!= c
.end(); ++it
)
1589 conn_sent
.erase(con
);
1592 bool ms_dispatch(Message
*m
) override
{
1595 bool ms_handle_reset(Connection
*con
) override
;
1596 void ms_handle_remote_reset(Connection
*con
) override
{
1597 Mutex::Locker
l(lock
);
1598 list
<uint64_t> c
= conn_sent
[con
];
1599 for (list
<uint64_t>::iterator it
= c
.begin();
1600 it
!= c
.end(); ++it
)
1602 conn_sent
.erase(con
);
1603 got_remote_reset
= true;
1605 bool ms_handle_refused(Connection
*con
) override
{
1608 void ms_fast_dispatch(Message
*m
) override
{
1609 // MSG_COMMAND is used to disorganize regular message flow
1610 if (m
->get_type() == MSG_COMMAND
) {
1616 auto p
= m
->get_data().cbegin();
1618 if (pl
.who
== Payload::PING
) {
1619 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1620 reply_message(m
, pl
);
1622 Mutex::Locker
l(lock
);
1626 Mutex::Locker
l(lock
);
1627 if (sent
.count(pl
.seq
)) {
1628 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1629 ASSERT_EQ(conn_sent
[m
->get_connection()].front(), pl
.seq
);
1630 ASSERT_TRUE(pl
.data
.contents_equal(sent
[pl
.seq
]));
1631 conn_sent
[m
->get_connection()].pop_front();
1640 int ms_handle_authentication(Connection
*con
) override
{
1644 void reply_message(const Message
*m
, Payload
& pl
) {
1645 pl
.who
= Payload::PONG
;
1648 MPing
*rm
= new MPing();
1650 m
->get_connection()->send_message(rm
);
1651 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << " reply m=" << m
<< " i=" << pl
.seq
<< dendl
;
1654 void send_message_wrap(ConnectionRef con
, const bufferlist
& data
) {
1655 Message
*m
= new MPing();
1656 Payload pl
{Payload::PING
, index
++, data
};
1660 if (!con
->get_messenger()->get_default_policy().lossy
) {
1661 Mutex::Locker
l(lock
);
1662 sent
[pl
.seq
] = pl
.data
;
1663 conn_sent
[con
].push_back(pl
.seq
);
1665 lderr(g_ceph_context
) << __func__
<< " conn=" << con
.get() << " send m=" << m
<< " i=" << pl
.seq
<< dendl
;
1666 ASSERT_EQ(0, con
->send_message(m
));
1669 uint64_t get_pending() {
1670 Mutex::Locker
l(lock
);
1674 void clear_pending(ConnectionRef con
) {
1675 Mutex::Locker
l(lock
);
1677 for (list
<uint64_t>::iterator it
= conn_sent
[con
].begin();
1678 it
!= conn_sent
[con
].end(); ++it
)
1680 conn_sent
.erase(con
);
1684 for (auto && p
: conn_sent
) {
1685 if (!p
.second
.empty()) {
1686 lderr(g_ceph_context
) << __func__
<< " " << p
.first
<< " wait " << p
.second
.size() << dendl
;
1693 class SyntheticWorkload
{
1696 set
<Messenger
*> available_servers
;
1697 set
<Messenger
*> available_clients
;
1698 Messenger::Policy client_policy
;
1699 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> > available_connections
;
1700 SyntheticDispatcher dispatcher
;
1702 vector
<bufferlist
> rand_data
;
1703 DummyAuthClientServer dummy_auth
;
1706 static const unsigned max_in_flight
= 64;
1707 static const unsigned max_connections
= 128;
1708 static const unsigned max_message_len
= 1024 * 1024 * 4;
1710 SyntheticWorkload(int servers
, int clients
, string type
, int random_num
,
1711 Messenger::Policy srv_policy
, Messenger::Policy cli_policy
)
1712 : lock("SyntheticWorkload::lock"),
1713 client_policy(cli_policy
),
1714 dispatcher(false, this),
1716 dummy_auth(g_ceph_context
) {
1717 dummy_auth
.auth_registry
.refresh_config();
1719 int base_port
= 16800;
1720 entity_addr_t bind_addr
;
1722 for (int i
= 0; i
< servers
; ++i
) {
1723 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0),
1724 "server", getpid()+i
, 0);
1725 snprintf(addr
, sizeof(addr
), "%s127.0.0.1:%d",
1726 (type
== "simple") ? "v1:":"v2:",
1728 bind_addr
.parse(addr
);
1729 msgr
->bind(bind_addr
);
1730 msgr
->add_dispatcher_head(&dispatcher
);
1731 msgr
->set_auth_client(&dummy_auth
);
1732 msgr
->set_auth_server(&dummy_auth
);
1735 msgr
->set_default_policy(srv_policy
);
1736 available_servers
.insert(msgr
);
1740 for (int i
= 0; i
< clients
; ++i
) {
1741 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(-1),
1742 "client", getpid()+i
+servers
, 0);
1743 if (cli_policy
.standby
) {
1744 snprintf(addr
, sizeof(addr
), "%s127.0.0.1:%d",
1745 (type
== "simple") ? "v1:":"v2:",
1746 base_port
+i
+servers
);
1747 bind_addr
.parse(addr
);
1748 msgr
->bind(bind_addr
);
1750 msgr
->add_dispatcher_head(&dispatcher
);
1751 msgr
->set_auth_client(&dummy_auth
);
1752 msgr
->set_auth_server(&dummy_auth
);
1755 msgr
->set_default_policy(cli_policy
);
1756 available_clients
.insert(msgr
);
1760 for (int i
= 0; i
< random_num
; i
++) {
1762 boost::uniform_int
<> u(32, max_message_len
);
1763 uint64_t value_len
= u(rng
);
1764 bufferptr
bp(value_len
);
1766 for (uint64_t j
= 0; j
< value_len
-sizeof(i
); ) {
1767 memcpy(bp
.c_str()+j
, &i
, sizeof(i
));
1772 rand_data
.push_back(bl
);
1776 ConnectionRef
_get_random_connection() {
1777 while (dispatcher
.get_pending() > max_in_flight
) {
1782 ceph_assert(lock
.is_locked());
1783 boost::uniform_int
<> choose(0, available_connections
.size() - 1);
1784 int index
= choose(rng
);
1785 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> >::iterator i
= available_connections
.begin();
1786 for (; index
> 0; --index
, ++i
) ;
1790 bool can_create_connection() {
1791 return available_connections
.size() < max_connections
;
1794 void generate_connection() {
1795 Mutex::Locker
l(lock
);
1796 if (!can_create_connection())
1799 Messenger
*server
, *client
;
1801 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1802 int index
= choose(rng
);
1803 set
<Messenger
*>::iterator i
= available_servers
.begin();
1804 for (; index
> 0; --index
, ++i
) ;
1808 boost::uniform_int
<> choose(0, available_clients
.size() - 1);
1809 int index
= choose(rng
);
1810 set
<Messenger
*>::iterator i
= available_clients
.begin();
1811 for (; index
> 0; --index
, ++i
) ;
1815 pair
<Messenger
*, Messenger
*> p
;
1817 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1818 if (server
->get_default_policy().server
) {
1819 p
= make_pair(client
, server
);
1820 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1821 server
->get_myaddrs());
1822 available_connections
[conn
] = p
;
1824 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1825 server
->get_myaddrs());
1826 p
= make_pair(client
, server
);
1827 available_connections
[conn
] = p
;
1832 void send_message() {
1833 Mutex::Locker
l(lock
);
1834 ConnectionRef conn
= _get_random_connection();
1835 boost::uniform_int
<> true_false(0, 99);
1836 int val
= true_false(rng
);
1839 uuid
.generate_random();
1840 MCommand
*m
= new MCommand(uuid
);
1841 vector
<string
> cmds
;
1842 cmds
.push_back("command");
1844 m
->set_priority(200);
1845 conn
->send_message(m
);
1847 boost::uniform_int
<> u(0, rand_data
.size()-1);
1848 dispatcher
.send_message_wrap(conn
, rand_data
[u(rng
)]);
1852 void drop_connection() {
1853 Mutex::Locker
l(lock
);
1854 if (available_connections
.size() < 10)
1856 ConnectionRef conn
= _get_random_connection();
1857 dispatcher
.clear_pending(conn
);
1859 if (!client_policy
.server
&&
1860 !client_policy
.lossy
&&
1861 client_policy
.standby
) {
1862 // it's a lossless policy, so we need to mark down each side
1863 pair
<Messenger
*, Messenger
*> &p
= available_connections
[conn
];
1864 if (!p
.first
->get_default_policy().server
&& !p
.second
->get_default_policy().server
) {
1865 ASSERT_EQ(conn
->get_messenger(), p
.first
);
1866 ConnectionRef peer
= p
.second
->connect_to(p
.first
->get_mytype(),
1867 p
.first
->get_myaddrs());
1869 dispatcher
.clear_pending(peer
);
1870 available_connections
.erase(peer
);
1873 ASSERT_EQ(available_connections
.erase(conn
), 1U);
1876 void print_internal_state(bool detail
=false) {
1877 Mutex::Locker
l(lock
);
1878 lderr(g_ceph_context
) << "available_connections: " << available_connections
.size()
1879 << " inflight messages: " << dispatcher
.get_pending() << dendl
;
1880 if (detail
&& !available_connections
.empty()) {
1885 void wait_for_done() {
1886 int64_t tick_us
= 1000 * 100; // 100ms
1887 int64_t timeout_us
= 5 * 60 * 1000 * 1000; // 5 mins
1889 while (dispatcher
.get_pending()) {
1891 timeout_us
-= tick_us
;
1893 print_internal_state(true);
1895 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
1897 for (set
<Messenger
*>::iterator it
= available_servers
.begin();
1898 it
!= available_servers
.end(); ++it
) {
1901 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1904 available_servers
.clear();
1906 for (set
<Messenger
*>::iterator it
= available_clients
.begin();
1907 it
!= available_clients
.end(); ++it
) {
1910 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1913 available_clients
.clear();
1916 void handle_reset(Connection
*con
) {
1917 Mutex::Locker
l(lock
);
1918 available_connections
.erase(con
);
1919 dispatcher
.clear_pending(con
);
1923 bool SyntheticDispatcher::ms_handle_reset(Connection
*con
) {
1924 workload
->handle_reset(con
);
1928 TEST_P(MessengerTest
, SyntheticStressTest
) {
1929 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1930 Messenger::Policy::stateful_server(0),
1931 Messenger::Policy::lossless_client(0));
1932 for (int i
= 0; i
< 100; ++i
) {
1933 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1934 test_msg
.generate_connection();
1936 gen_type
rng(time(NULL
));
1937 for (int i
= 0; i
< 5000; ++i
) {
1939 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1940 test_msg
.print_internal_state();
1942 boost::uniform_int
<> true_false(0, 99);
1943 int val
= true_false(rng
);
1945 test_msg
.generate_connection();
1946 } else if (val
> 80) {
1947 test_msg
.drop_connection();
1948 } else if (val
> 10) {
1949 test_msg
.send_message();
1951 usleep(rand() % 1000 + 500);
1954 test_msg
.wait_for_done();
1957 TEST_P(MessengerTest
, SyntheticStressTest1
) {
1958 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
1959 Messenger::Policy::lossless_peer_reuse(0),
1960 Messenger::Policy::lossless_peer_reuse(0));
1961 for (int i
= 0; i
< 10; ++i
) {
1962 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1963 test_msg
.generate_connection();
1965 gen_type
rng(time(NULL
));
1966 for (int i
= 0; i
< 10000; ++i
) {
1968 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1969 test_msg
.print_internal_state();
1971 boost::uniform_int
<> true_false(0, 99);
1972 int val
= true_false(rng
);
1974 test_msg
.generate_connection();
1975 } else if (val
> 60) {
1976 test_msg
.drop_connection();
1977 } else if (val
> 10) {
1978 test_msg
.send_message();
1980 usleep(rand() % 1000 + 500);
1983 test_msg
.wait_for_done();
1987 TEST_P(MessengerTest
, SyntheticInjectTest
) {
1988 uint64_t dispatch_throttle_bytes
= g_ceph_context
->_conf
->ms_dispatch_throttle_bytes
;
1989 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
1990 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
1991 g_ceph_context
->_conf
.set_val("ms_dispatch_throttle_bytes", "16777216");
1992 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1993 Messenger::Policy::stateful_server(0),
1994 Messenger::Policy::lossless_client(0));
1995 for (int i
= 0; i
< 100; ++i
) {
1996 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1997 test_msg
.generate_connection();
1999 gen_type
rng(time(NULL
));
2000 for (int i
= 0; i
< 1000; ++i
) {
2002 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2003 test_msg
.print_internal_state();
2005 boost::uniform_int
<> true_false(0, 99);
2006 int val
= true_false(rng
);
2008 test_msg
.generate_connection();
2009 } else if (val
> 80) {
2010 test_msg
.drop_connection();
2011 } else if (val
> 10) {
2012 test_msg
.send_message();
2014 usleep(rand() % 500 + 100);
2017 test_msg
.wait_for_done();
2018 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2019 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2020 g_ceph_context
->_conf
.set_val(
2021 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes
));
2024 TEST_P(MessengerTest
, SyntheticInjectTest2
) {
2025 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2026 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2027 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
2028 Messenger::Policy::lossless_peer_reuse(0),
2029 Messenger::Policy::lossless_peer_reuse(0));
2030 for (int i
= 0; i
< 100; ++i
) {
2031 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2032 test_msg
.generate_connection();
2034 gen_type
rng(time(NULL
));
2035 for (int i
= 0; i
< 1000; ++i
) {
2037 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2038 test_msg
.print_internal_state();
2040 boost::uniform_int
<> true_false(0, 99);
2041 int val
= true_false(rng
);
2043 test_msg
.generate_connection();
2044 } else if (val
> 80) {
2045 test_msg
.drop_connection();
2046 } else if (val
> 10) {
2047 test_msg
.send_message();
2049 usleep(rand() % 500 + 100);
2052 test_msg
.wait_for_done();
2053 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2054 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2057 TEST_P(MessengerTest
, SyntheticInjectTest3
) {
2058 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "600");
2059 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2060 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
2061 Messenger::Policy::stateless_server(0),
2062 Messenger::Policy::lossy_client(0));
2063 for (int i
= 0; i
< 100; ++i
) {
2064 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2065 test_msg
.generate_connection();
2067 gen_type
rng(time(NULL
));
2068 for (int i
= 0; i
< 1000; ++i
) {
2070 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2071 test_msg
.print_internal_state();
2073 boost::uniform_int
<> true_false(0, 99);
2074 int val
= true_false(rng
);
2076 test_msg
.generate_connection();
2077 } else if (val
> 80) {
2078 test_msg
.drop_connection();
2079 } else if (val
> 10) {
2080 test_msg
.send_message();
2082 usleep(rand() % 500 + 100);
2085 test_msg
.wait_for_done();
2086 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2087 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2091 TEST_P(MessengerTest
, SyntheticInjectTest4
) {
2092 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2093 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2094 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "1");
2095 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "client osd");
2096 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "5");
2097 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
2098 Messenger::Policy::lossless_peer(0),
2099 Messenger::Policy::lossless_peer(0));
2100 for (int i
= 0; i
< 100; ++i
) {
2101 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2102 test_msg
.generate_connection();
2104 gen_type
rng(time(NULL
));
2105 for (int i
= 0; i
< 1000; ++i
) {
2107 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2108 test_msg
.print_internal_state();
2110 boost::uniform_int
<> true_false(0, 99);
2111 int val
= true_false(rng
);
2113 test_msg
.generate_connection();
2114 } else if (val
> 80) {
2115 // test_msg.drop_connection();
2116 } else if (val
> 10) {
2117 test_msg
.send_message();
2119 usleep(rand() % 500 + 100);
2122 test_msg
.wait_for_done();
2123 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2124 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2125 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "0");
2126 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "");
2127 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "0");
2131 class MarkdownDispatcher
: public Dispatcher
{
2133 set
<ConnectionRef
> conns
;
2136 std::atomic
<uint64_t> count
= { 0 };
2137 explicit MarkdownDispatcher(bool s
): Dispatcher(g_ceph_context
), lock("MarkdownDispatcher::lock"),
2139 // don't need authorizers
2140 ms_set_require_authorizer(false);
2142 bool ms_can_fast_dispatch_any() const override
{ return false; }
2143 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2144 switch (m
->get_type()) {
2152 void ms_handle_fast_connect(Connection
*con
) override
{
2153 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2154 Mutex::Locker
l(lock
);
2157 void ms_handle_fast_accept(Connection
*con
) override
{
2158 Mutex::Locker
l(lock
);
2161 bool ms_dispatch(Message
*m
) override
{
2162 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << dendl
;
2163 Mutex::Locker
l(lock
);
2165 conns
.insert(m
->get_connection());
2166 if (conns
.size() < 2 && !last_mark
) {
2172 usleep(rand() % 500);
2173 for (set
<ConnectionRef
>::iterator it
= conns
.begin(); it
!= conns
.end(); ++it
) {
2174 if ((*it
) != m
->get_connection().get()) {
2185 bool ms_handle_reset(Connection
*con
) override
{
2186 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2187 Mutex::Locker
l(lock
);
2189 usleep(rand() % 500);
2192 void ms_handle_remote_reset(Connection
*con
) override
{
2193 Mutex::Locker
l(lock
);
2195 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2197 bool ms_handle_refused(Connection
*con
) override
{
2200 void ms_fast_dispatch(Message
*m
) override
{
2203 int ms_handle_authentication(Connection
*con
) override
{
2209 // Markdown with external lock
2210 TEST_P(MessengerTest
, MarkdownTest
) {
2211 Messenger
*server_msgr2
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
2212 MarkdownDispatcher
cli_dispatcher(false), srv_dispatcher(true);
2213 DummyAuthClientServer
dummy_auth(g_ceph_context
);
2214 dummy_auth
.auth_registry
.refresh_config();
2215 entity_addr_t bind_addr
;
2216 if (string(GetParam()) == "simple")
2217 bind_addr
.parse("v1:127.0.0.1:16800");
2219 bind_addr
.parse("v2:127.0.0.1:16800");
2220 server_msgr
->bind(bind_addr
);
2221 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
2222 server_msgr
->set_auth_client(&dummy_auth
);
2223 server_msgr
->set_auth_server(&dummy_auth
);
2224 server_msgr
->start();
2225 if (string(GetParam()) == "simple")
2226 bind_addr
.parse("v1:127.0.0.1:16801");
2228 bind_addr
.parse("v2:127.0.0.1:16801");
2229 server_msgr2
->bind(bind_addr
);
2230 server_msgr2
->add_dispatcher_head(&srv_dispatcher
);
2231 server_msgr2
->set_auth_client(&dummy_auth
);
2232 server_msgr2
->set_auth_server(&dummy_auth
);
2233 server_msgr2
->start();
2235 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
2236 client_msgr
->set_auth_client(&dummy_auth
);
2237 client_msgr
->set_auth_server(&dummy_auth
);
2238 client_msgr
->start();
2243 uint64_t equal_count
= 0;
2245 ConnectionRef conn1
= client_msgr
->connect_to(server_msgr
->get_mytype(),
2246 server_msgr
->get_myaddrs());
2247 ConnectionRef conn2
= client_msgr
->connect_to(server_msgr2
->get_mytype(),
2248 server_msgr2
->get_myaddrs());
2249 MPing
*m
= new MPing();
2250 ASSERT_EQ(conn1
->send_message(m
), 0);
2252 ASSERT_EQ(conn2
->send_message(m
), 0);
2253 CHECK_AND_WAIT_TRUE(srv_dispatcher
.count
> last
+ 1);
2254 if (srv_dispatcher
.count
== last
) {
2255 lderr(g_ceph_context
) << __func__
<< " last is " << last
<< dendl
;
2262 last
= srv_dispatcher
.count
;
2265 ASSERT_FALSE(equal
&& equal_count
> 3);
2267 server_msgr
->shutdown();
2268 client_msgr
->shutdown();
2269 server_msgr2
->shutdown();
2270 server_msgr
->wait();
2271 client_msgr
->wait();
2272 server_msgr2
->wait();
2273 delete server_msgr2
;
2276 INSTANTIATE_TEST_CASE_P(
2287 // Google Test may not support value-parameterized tests with some
2288 // compilers. If we use conditional compilation to compile out all
2289 // code referring to the gtest_main library, MSVC linker will not link
2290 // that library at all and consequently complain about missing entry
2291 // point defined in that library (fatal error LNK1561: entry point
2292 // must be defined). This dummy test keeps gtest_main linked in.
2293 TEST(DummyTest
, ValueParameterizedTestsAreNotSupportedOnThisPlatform
) {}
2298 int main(int argc
, char **argv
) {
2299 vector
<const char*> args
;
2300 argv_to_vec(argc
, (const char **)argv
, args
);
2302 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
2303 CODE_ENVIRONMENT_UTILITY
,
2304 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
2305 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
2306 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
2307 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
2308 g_ceph_context
->_conf
.set_val("keyring", "/dev/null");
2309 g_ceph_context
->_conf
.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2310 g_ceph_context
->_conf
.set_val("ms_die_on_bad_msg", "true");
2311 g_ceph_context
->_conf
.set_val("ms_die_on_old_message", "true");
2312 g_ceph_context
->_conf
.set_val("ms_max_backoff", "1");
2313 common_init_finish(g_ceph_context
);
2315 ::testing::InitGoogleTest(&argc
, argv
);
2316 return RUN_ALL_TESTS();
2321 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"