1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
26 #include <boost/random/binomial_distribution.hpp>
27 #include <boost/random/mersenne_twister.hpp>
28 #include <boost/random/uniform_int.hpp>
29 #include <gtest/gtest.h>
31 #define MSG_POLICY_UNIT_TESTING
33 #include "common/ceph_argparse.h"
34 #include "common/ceph_mutex.h"
35 #include "global/global_init.h"
36 #include "messages/MCommand.h"
37 #include "messages/MPing.h"
38 #include "msg/Connection.h"
39 #include "msg/Dispatcher.h"
40 #include "msg/Message.h"
41 #include "msg/Messenger.h"
42 #include "msg/msg_types.h"
44 typedef boost::mt11213b gen_type
;
46 #include "common/dout.h"
47 #include "include/ceph_assert.h"
49 #include "auth/DummyAuth.h"
51 #define dout_subsys ceph_subsys_ms
53 #define dout_prefix *_dout << " ceph_test_msgr "
56 #define CHECK_AND_WAIT_TRUE(expr) do { \
67 class MessengerTest
: public ::testing::TestWithParam
<const char*> {
69 DummyAuthClientServer dummy_auth
;
70 Messenger
*server_msgr
;
71 Messenger
*client_msgr
;
73 MessengerTest() : dummy_auth(g_ceph_context
),
74 server_msgr(NULL
), client_msgr(NULL
) {
75 dummy_auth
.auth_registry
.refresh_config();
77 void SetUp() override
{
78 lderr(g_ceph_context
) << __func__
<< " start set up " << GetParam() << dendl
;
79 server_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
80 client_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
81 server_msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
82 client_msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
83 server_msgr
->set_auth_client(&dummy_auth
);
84 server_msgr
->set_auth_server(&dummy_auth
);
85 client_msgr
->set_auth_client(&dummy_auth
);
86 client_msgr
->set_auth_server(&dummy_auth
);
87 server_msgr
->set_require_authorizer(false);
89 void TearDown() override
{
90 ASSERT_EQ(server_msgr
->get_dispatch_queue_len(), 0);
91 ASSERT_EQ(client_msgr
->get_dispatch_queue_len(), 0);
99 class FakeDispatcher
: public Dispatcher
{
101 struct Session
: public RefCountedObject
{
102 atomic
<uint64_t> count
;
105 explicit Session(ConnectionRef c
): RefCountedObject(g_ceph_context
), count(0), con(c
) {
107 uint64_t get_count() { return count
; }
110 ceph::mutex lock
= ceph::make_mutex("FakeDispatcher::lock");
111 ceph::condition_variable cond
;
114 bool got_remote_reset
;
117 entity_addrvec_t last_accept
;
118 ConnectionRef
*last_accept_con_ptr
= nullptr;
120 explicit FakeDispatcher(bool s
): Dispatcher(g_ceph_context
),
121 is_server(s
), got_new(false), got_remote_reset(false),
122 got_connect(false), loopback(false) {
124 bool ms_can_fast_dispatch_any() const override
{ return true; }
125 bool ms_can_fast_dispatch(const Message
*m
) const override
{
126 switch (m
->get_type()) {
134 void ms_handle_fast_connect(Connection
*con
) override
{
135 std::scoped_lock l
{lock
};
136 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
137 auto s
= con
->get_priv();
139 auto session
= new Session(con
);
140 con
->set_priv(RefCountedPtr
{session
, false});
141 lderr(g_ceph_context
) << __func__
<< " con: " << con
142 << " count: " << session
->count
<< dendl
;
147 void ms_handle_fast_accept(Connection
*con
) override
{
148 last_accept
= con
->get_peer_addrs();
149 if (last_accept_con_ptr
) {
150 *last_accept_con_ptr
= con
;
152 if (!con
->get_priv()) {
153 con
->set_priv(RefCountedPtr
{new Session(con
), false});
156 bool ms_dispatch(Message
*m
) override
{
157 auto priv
= m
->get_connection()->get_priv();
158 auto s
= static_cast<Session
*>(priv
.get());
160 s
= new Session(m
->get_connection());
161 priv
.reset(s
, false);
162 m
->get_connection()->set_priv(priv
);
165 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
169 std::lock_guard l
{lock
};
175 bool ms_handle_reset(Connection
*con
) override
{
176 std::lock_guard l
{lock
};
177 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
178 auto priv
= con
->get_priv();
179 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
180 s
->con
.reset(); // break con <-> session ref cycle
181 con
->set_priv(nullptr); // break ref <-> session cycle, if any
185 void ms_handle_remote_reset(Connection
*con
) override
{
186 std::lock_guard l
{lock
};
187 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
188 auto priv
= con
->get_priv();
189 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
190 s
->con
.reset(); // break con <-> session ref cycle
191 con
->set_priv(nullptr); // break ref <-> session cycle, if any
193 got_remote_reset
= true;
196 bool ms_handle_refused(Connection
*con
) override
{
199 void ms_fast_dispatch(Message
*m
) override
{
200 auto priv
= m
->get_connection()->get_priv();
201 auto s
= static_cast<Session
*>(priv
.get());
203 s
= new Session(m
->get_connection());
204 priv
.reset(s
, false);
205 m
->get_connection()->set_priv(priv
);
208 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
211 ceph_assert(m
->get_source().is_osd());
214 } else if (loopback
) {
215 ceph_assert(m
->get_source().is_client());
218 std::lock_guard l
{lock
};
223 int ms_handle_fast_authentication(Connection
*con
) override
{
227 void reply_message(Message
*m
) {
228 MPing
*rm
= new MPing();
229 m
->get_connection()->send_message(rm
);
233 typedef FakeDispatcher::Session Session
;
235 struct TestInterceptor
: public Interceptor
{
237 bool step_waiting
= false;
239 std::map
<Connection
*, uint32_t> current_step
;
240 std::map
<Connection
*, std::list
<uint32_t>> step_history
;
241 std::map
<uint32_t, std::optional
<ACTION
>> decisions
;
242 std::set
<uint32_t> breakpoints
;
244 uint32_t count_step(Connection
*conn
, uint32_t step
) {
246 for (auto s
: step_history
[conn
]) {
254 void breakpoint(uint32_t step
) {
255 breakpoints
.insert(step
);
258 void remove_bp(uint32_t step
) {
259 breakpoints
.erase(step
);
262 Connection
*wait(uint32_t step
, Connection
*conn
=nullptr) {
263 std::unique_lock
<std::mutex
> l(lock
);
266 auto it
= current_step
.find(conn
);
267 if (it
!= current_step
.end()) {
268 if (it
->second
== step
) {
273 for (auto it
: current_step
) {
274 if (it
.second
== step
) {
286 step_waiting
= false;
290 ACTION
wait_for_decision(uint32_t step
, std::unique_lock
<std::mutex
> &l
) {
291 if (decisions
[step
]) {
292 return *(decisions
[step
]);
295 cond_var
.wait(l
, [this] { return !waiting
; });
296 return *(decisions
[step
]);
299 void proceed(uint32_t step
, ACTION decision
) {
300 std::unique_lock
<std::mutex
> l(lock
);
301 decisions
[step
] = decision
;
304 cond_var
.notify_one();
308 ACTION
intercept(Connection
*conn
, uint32_t step
) override
{
309 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
310 << ") intercept called on step=" << step
<< dendl
;
313 std::unique_lock
<std::mutex
> l(lock
);
314 step_history
[conn
].push_back(step
);
315 current_step
[conn
] = step
;
317 cond_var
.notify_one();
321 std::unique_lock
<std::mutex
> l(lock
);
322 ACTION decision
= ACTION::CONTINUE
;
323 if (breakpoints
.find(step
) != breakpoints
.end()) {
324 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
325 << ") pausing on step=" << step
<< dendl
;
326 decision
= wait_for_decision(step
, l
);
328 if (decisions
[step
]) {
329 decision
= *(decisions
[step
]);
332 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
333 << ") resuming step=" << step
<< " with decision="
334 << decision
<< dendl
;
335 decisions
[step
].reset();
342 * Scenario: A connects to B, and B connects to A at the same time.
344 TEST_P(MessengerTest
, ConnectionRaceTest
) {
345 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
347 TestInterceptor
*cli_interceptor
= new TestInterceptor();
348 TestInterceptor
*srv_interceptor
= new TestInterceptor();
350 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer_reuse(0));
351 server_msgr
->interceptor
= srv_interceptor
;
353 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer_reuse(0));
354 client_msgr
->interceptor
= cli_interceptor
;
356 entity_addr_t bind_addr
;
357 bind_addr
.parse("v2:127.0.0.1:3300");
358 server_msgr
->bind(bind_addr
);
359 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
360 server_msgr
->start();
362 bind_addr
.parse("v2:127.0.0.1:3301");
363 client_msgr
->bind(bind_addr
);
364 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
365 client_msgr
->start();
367 // pause before sending client_ident message
368 cli_interceptor
->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY
);
369 // pause before sending client_ident message
370 srv_interceptor
->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY
);
372 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
373 server_msgr
->get_myaddrs());
374 MPing
*m1
= new MPing();
375 ASSERT_EQ(c2s
->send_message(m1
), 0);
377 ConnectionRef s2c
= server_msgr
->connect_to(client_msgr
->get_mytype(),
378 client_msgr
->get_myaddrs());
379 MPing
*m2
= new MPing();
380 ASSERT_EQ(s2c
->send_message(m2
), 0);
382 cli_interceptor
->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY
, c2s
.get());
383 srv_interceptor
->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY
, s2c
.get());
385 // at this point both connections (A->B, B->A) are paused just before sending
386 // the client_ident message.
388 cli_interceptor
->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY
);
389 srv_interceptor
->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY
);
391 cli_interceptor
->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY
, Interceptor::ACTION::CONTINUE
);
392 srv_interceptor
->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY
, Interceptor::ACTION::CONTINUE
);
395 std::unique_lock l
{cli_dispatcher
.lock
};
396 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
397 cli_dispatcher
.got_new
= false;
401 std::unique_lock l
{srv_dispatcher
.lock
};
402 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
403 srv_dispatcher
.got_new
= false;
406 ASSERT_TRUE(s2c
->is_connected());
407 ASSERT_EQ(1u, static_cast<Session
*>(s2c
->get_priv().get())->get_count());
408 ASSERT_TRUE(s2c
->peer_is_client());
410 ASSERT_TRUE(c2s
->is_connected());
411 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
412 ASSERT_TRUE(c2s
->peer_is_osd());
414 client_msgr
->shutdown();
416 server_msgr
->shutdown();
419 delete cli_interceptor
;
420 delete srv_interceptor
;
424 * Scenario: A connects to B, and B connects to A at the same time.
425 * The first (A -> B) connection gets to message flow handshake, the
426 * second (B -> A) connection is stuck waiting for a banner from A.
427 * After A sends client_ident to B, the first connection wins and B
428 * calls reuse_connection() to replace the second connection's socket
429 * while the second connection is still in BANNER_CONNECTING.
431 TEST_P(MessengerTest
, ConnectionRaceReuseBannerTest
) {
432 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
434 auto cli_interceptor
= std::make_unique
<TestInterceptor
>();
435 auto srv_interceptor
= std::make_unique
<TestInterceptor
>();
437 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
,
438 Messenger::Policy::lossless_peer_reuse(0));
439 server_msgr
->interceptor
= srv_interceptor
.get();
441 client_msgr
->set_policy(entity_name_t::TYPE_OSD
,
442 Messenger::Policy::lossless_peer_reuse(0));
443 client_msgr
->interceptor
= cli_interceptor
.get();
445 entity_addr_t bind_addr
;
446 bind_addr
.parse("v2:127.0.0.1:3300");
447 server_msgr
->bind(bind_addr
);
448 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
449 server_msgr
->start();
451 bind_addr
.parse("v2:127.0.0.1:3301");
452 client_msgr
->bind(bind_addr
);
453 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
454 client_msgr
->start();
456 // pause before sending client_ident message
457 srv_interceptor
->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY
);
459 ConnectionRef s2c
= server_msgr
->connect_to(client_msgr
->get_mytype(),
460 client_msgr
->get_myaddrs());
461 MPing
*m1
= new MPing();
462 ASSERT_EQ(s2c
->send_message(m1
), 0);
464 srv_interceptor
->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY
);
465 srv_interceptor
->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY
);
467 // pause before sending banner
468 cli_interceptor
->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING
);
470 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
471 server_msgr
->get_myaddrs());
472 MPing
*m2
= new MPing();
473 ASSERT_EQ(c2s
->send_message(m2
), 0);
475 cli_interceptor
->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING
);
476 cli_interceptor
->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING
);
478 // second connection is in BANNER_CONNECTING, ensure it stays so
479 // and send client_ident
480 srv_interceptor
->breakpoint(Interceptor::STEP::BANNER_EXCHANGE
);
481 srv_interceptor
->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY
, Interceptor::ACTION::CONTINUE
);
483 // handle client_ident -- triggers reuse_connection() with exproto
484 // in BANNER_CONNECTING
485 cli_interceptor
->breakpoint(Interceptor::STEP::READY
);
486 cli_interceptor
->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING
, Interceptor::ACTION::CONTINUE
);
488 cli_interceptor
->wait(Interceptor::STEP::READY
);
489 cli_interceptor
->remove_bp(Interceptor::STEP::READY
);
491 // first connection is in READY
492 Connection
*s2c_accepter
= srv_interceptor
->wait(Interceptor::STEP::BANNER_EXCHANGE
);
493 srv_interceptor
->remove_bp(Interceptor::STEP::BANNER_EXCHANGE
);
495 srv_interceptor
->proceed(Interceptor::STEP::BANNER_EXCHANGE
, Interceptor::ACTION::CONTINUE
);
496 cli_interceptor
->proceed(Interceptor::STEP::READY
, Interceptor::ACTION::CONTINUE
);
499 std::unique_lock l
{cli_dispatcher
.lock
};
500 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
501 cli_dispatcher
.got_new
= false;
505 std::unique_lock l
{srv_dispatcher
.lock
};
506 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
507 srv_dispatcher
.got_new
= false;
510 EXPECT_TRUE(s2c
->is_connected());
511 EXPECT_EQ(1u, static_cast<Session
*>(s2c
->get_priv().get())->get_count());
512 EXPECT_TRUE(s2c
->peer_is_client());
514 EXPECT_TRUE(c2s
->is_connected());
515 EXPECT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
516 EXPECT_TRUE(c2s
->peer_is_osd());
518 // closed in reuse_connection() -- EPIPE when writing banner/hello
519 EXPECT_FALSE(s2c_accepter
->is_connected());
521 // established exactly once, never faulted and reconnected
522 EXPECT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE
), 1u);
523 EXPECT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::SEND_RECONNECT
), 0u);
524 EXPECT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::READY
), 1u);
526 client_msgr
->shutdown();
528 server_msgr
->shutdown();
535 * - A sends client_ident to B
536 * - B fails before sending server_ident to A
539 TEST_P(MessengerTest
, MissingServerIdenTest
) {
540 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
542 TestInterceptor
*cli_interceptor
= new TestInterceptor();
543 TestInterceptor
*srv_interceptor
= new TestInterceptor();
545 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
546 server_msgr
->interceptor
= srv_interceptor
;
548 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossy_client(0));
549 client_msgr
->interceptor
= cli_interceptor
;
551 entity_addr_t bind_addr
;
552 bind_addr
.parse("v2:127.0.0.1:3300");
553 server_msgr
->bind(bind_addr
);
554 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
555 server_msgr
->start();
557 bind_addr
.parse("v2:127.0.0.1:3301");
558 client_msgr
->bind(bind_addr
);
559 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
560 client_msgr
->start();
562 // pause before sending server_ident message
563 srv_interceptor
->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY
);
565 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
566 server_msgr
->get_myaddrs());
567 MPing
*m1
= new MPing();
568 ASSERT_EQ(c2s
->send_message(m1
), 0);
570 Connection
*c2s_accepter
= srv_interceptor
->wait(Interceptor::STEP::SEND_SERVER_IDENTITY
);
571 srv_interceptor
->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY
);
573 // We inject a message from this side of the connection to force it to be
574 // in standby when we inject the failure below
575 MPing
*m2
= new MPing();
576 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
578 srv_interceptor
->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY
, Interceptor::ACTION::FAIL
);
581 std::unique_lock l
{srv_dispatcher
.lock
};
582 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
583 srv_dispatcher
.got_new
= false;
587 std::unique_lock l
{cli_dispatcher
.lock
};
588 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
589 cli_dispatcher
.got_new
= false;
592 ASSERT_TRUE(c2s
->is_connected());
593 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
594 ASSERT_TRUE(c2s
->peer_is_osd());
596 ASSERT_TRUE(c2s_accepter
->is_connected());
597 ASSERT_EQ(1u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
598 ASSERT_TRUE(c2s_accepter
->peer_is_client());
600 client_msgr
->shutdown();
602 server_msgr
->shutdown();
605 delete cli_interceptor
;
606 delete srv_interceptor
;
612 * - A sends client_ident to B
613 * - B fails before sending server_ident to A
614 * - A goes to standby
615 * - B reconnects to A
617 TEST_P(MessengerTest
, MissingServerIdenTest2
) {
618 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
620 TestInterceptor
*cli_interceptor
= new TestInterceptor();
621 TestInterceptor
*srv_interceptor
= new TestInterceptor();
623 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
624 server_msgr
->interceptor
= srv_interceptor
;
626 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
627 client_msgr
->interceptor
= cli_interceptor
;
629 entity_addr_t bind_addr
;
630 bind_addr
.parse("v2:127.0.0.1:3300");
631 server_msgr
->bind(bind_addr
);
632 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
633 server_msgr
->start();
635 bind_addr
.parse("v2:127.0.0.1:3301");
636 client_msgr
->bind(bind_addr
);
637 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
638 client_msgr
->start();
640 // pause before sending server_ident message
641 srv_interceptor
->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY
);
643 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
644 server_msgr
->get_myaddrs());
646 Connection
*c2s_accepter
= srv_interceptor
->wait(Interceptor::STEP::SEND_SERVER_IDENTITY
);
647 srv_interceptor
->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY
);
649 // We inject a message from this side of the connection to force it to be
650 // in standby when we inject the failure below
651 MPing
*m2
= new MPing();
652 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
654 srv_interceptor
->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY
, Interceptor::ACTION::FAIL
);
657 std::unique_lock l
{cli_dispatcher
.lock
};
658 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
659 cli_dispatcher
.got_new
= false;
662 ASSERT_TRUE(c2s
->is_connected());
663 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
664 ASSERT_TRUE(c2s
->peer_is_osd());
666 ASSERT_TRUE(c2s_accepter
->is_connected());
667 ASSERT_EQ(0u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
668 ASSERT_TRUE(c2s_accepter
->peer_is_client());
670 client_msgr
->shutdown();
672 server_msgr
->shutdown();
675 delete cli_interceptor
;
676 delete srv_interceptor
;
682 * - A and B exchange messages
684 * - B goes into standby
687 TEST_P(MessengerTest
, ReconnectTest
) {
688 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
690 TestInterceptor
*cli_interceptor
= new TestInterceptor();
691 TestInterceptor
*srv_interceptor
= new TestInterceptor();
693 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
694 server_msgr
->interceptor
= srv_interceptor
;
696 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
697 client_msgr
->interceptor
= cli_interceptor
;
699 entity_addr_t bind_addr
;
700 bind_addr
.parse("v2:127.0.0.1:3300");
701 server_msgr
->bind(bind_addr
);
702 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
703 server_msgr
->start();
705 bind_addr
.parse("v2:127.0.0.1:3301");
706 client_msgr
->bind(bind_addr
);
707 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
708 client_msgr
->start();
710 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
711 server_msgr
->get_myaddrs());
713 MPing
*m1
= new MPing();
714 ASSERT_EQ(c2s
->send_message(m1
), 0);
717 std::unique_lock l
{cli_dispatcher
.lock
};
718 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
719 cli_dispatcher
.got_new
= false;
722 ASSERT_TRUE(c2s
->is_connected());
723 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
724 ASSERT_TRUE(c2s
->peer_is_osd());
726 cli_interceptor
->breakpoint(Interceptor::STEP::HANDLE_MESSAGE
);
728 MPing
*m2
= new MPing();
729 ASSERT_EQ(c2s
->send_message(m2
), 0);
731 cli_interceptor
->wait(Interceptor::STEP::HANDLE_MESSAGE
, c2s
.get());
732 cli_interceptor
->remove_bp(Interceptor::STEP::HANDLE_MESSAGE
);
734 // at this point client and server are connected together
736 srv_interceptor
->breakpoint(Interceptor::STEP::READY
);
739 cli_interceptor
->proceed(Interceptor::STEP::HANDLE_MESSAGE
, Interceptor::ACTION::FAIL
);
741 MPing
*m3
= new MPing();
742 ASSERT_EQ(c2s
->send_message(m3
), 0);
744 Connection
*c2s_accepter
= srv_interceptor
->wait(Interceptor::STEP::READY
);
745 // the srv end of theconnection is now paused at ready
746 // this means that the reconnect was successful
747 srv_interceptor
->remove_bp(Interceptor::STEP::READY
);
749 ASSERT_TRUE(c2s_accepter
->peer_is_client());
750 // c2s_accepter sent 0 reconnect messages
751 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, Interceptor::STEP::SEND_RECONNECT
), 0u);
752 // c2s_accepter sent 1 reconnect_ok messages
753 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, Interceptor::STEP::SEND_RECONNECT_OK
), 1u);
754 // c2s sent 1 reconnect messages
755 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::SEND_RECONNECT
), 1u);
756 // c2s sent 0 reconnect_ok messages
757 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::SEND_RECONNECT_OK
), 0u);
759 srv_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
762 std::unique_lock l
{cli_dispatcher
.lock
};
763 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
764 cli_dispatcher
.got_new
= false;
767 client_msgr
->shutdown();
769 server_msgr
->shutdown();
772 delete cli_interceptor
;
773 delete srv_interceptor
;
779 * - A and B exchange messages
781 * - A reconnects // B reconnects
783 TEST_P(MessengerTest
, ReconnectRaceTest
) {
784 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
786 TestInterceptor
*cli_interceptor
= new TestInterceptor();
787 TestInterceptor
*srv_interceptor
= new TestInterceptor();
789 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
790 server_msgr
->interceptor
= srv_interceptor
;
792 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
793 client_msgr
->interceptor
= cli_interceptor
;
795 entity_addr_t bind_addr
;
796 bind_addr
.parse("v2:127.0.0.1:3300");
797 server_msgr
->bind(bind_addr
);
798 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
799 server_msgr
->start();
801 bind_addr
.parse("v2:127.0.0.1:3301");
802 client_msgr
->bind(bind_addr
);
803 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
804 client_msgr
->start();
806 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
807 server_msgr
->get_myaddrs());
809 MPing
*m1
= new MPing();
810 ASSERT_EQ(c2s
->send_message(m1
), 0);
813 std::unique_lock l
{cli_dispatcher
.lock
};
814 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
815 cli_dispatcher
.got_new
= false;
818 ASSERT_TRUE(c2s
->is_connected());
819 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
820 ASSERT_TRUE(c2s
->peer_is_osd());
822 cli_interceptor
->breakpoint(Interceptor::STEP::HANDLE_MESSAGE
);
824 MPing
*m2
= new MPing();
825 ASSERT_EQ(c2s
->send_message(m2
), 0);
827 cli_interceptor
->wait(Interceptor::STEP::HANDLE_MESSAGE
, c2s
.get());
828 cli_interceptor
->remove_bp(Interceptor::STEP::HANDLE_MESSAGE
);
830 // at this point client and server are connected together
832 // force both client and server to race on reconnect
833 cli_interceptor
->breakpoint(Interceptor::STEP::SEND_RECONNECT
);
834 srv_interceptor
->breakpoint(Interceptor::STEP::SEND_RECONNECT
);
837 // this will cause both client and server to reconnect at the same time
838 cli_interceptor
->proceed(Interceptor::STEP::HANDLE_MESSAGE
, Interceptor::ACTION::FAIL
);
840 MPing
*m3
= new MPing();
841 ASSERT_EQ(c2s
->send_message(m3
), 0);
843 cli_interceptor
->wait(Interceptor::STEP::SEND_RECONNECT
, c2s
.get());
844 srv_interceptor
->wait(Interceptor::STEP::SEND_RECONNECT
);
846 cli_interceptor
->remove_bp(Interceptor::STEP::SEND_RECONNECT
);
847 srv_interceptor
->remove_bp(Interceptor::STEP::SEND_RECONNECT
);
850 srv_interceptor
->breakpoint(Interceptor::STEP::READY
);
852 cli_interceptor
->proceed(Interceptor::STEP::SEND_RECONNECT
, Interceptor::ACTION::CONTINUE
);
853 srv_interceptor
->proceed(Interceptor::STEP::SEND_RECONNECT
, Interceptor::ACTION::CONTINUE
);
855 Connection
*c2s_accepter
= srv_interceptor
->wait(Interceptor::STEP::READY
);
857 // the server has reconnected and is "ready"
858 srv_interceptor
->remove_bp(Interceptor::STEP::READY
);
860 ASSERT_TRUE(c2s_accepter
->peer_is_client());
861 ASSERT_TRUE(c2s
->peer_is_osd());
863 // the server should win the reconnect race
865 // c2s_accepter sent 1 or 2 reconnect messages
866 ASSERT_LT(srv_interceptor
->count_step(c2s_accepter
, Interceptor::STEP::SEND_RECONNECT
), 3u);
867 ASSERT_GT(srv_interceptor
->count_step(c2s_accepter
, Interceptor::STEP::SEND_RECONNECT
), 0u);
868 // c2s_accepter sent 0 reconnect_ok messages
869 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, Interceptor::STEP::SEND_RECONNECT_OK
), 0u);
870 // c2s sent 1 reconnect messages
871 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::SEND_RECONNECT
), 1u);
872 // c2s sent 1 reconnect_ok messages
873 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::SEND_RECONNECT_OK
), 1u);
875 if (srv_interceptor
->count_step(c2s_accepter
, Interceptor::STEP::SEND_RECONNECT
) == 2) {
876 // if the server send the reconnect message two times then
877 // the client must have sent a session retry message to the server
878 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::SESSION_RETRY
), 1u);
880 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), Interceptor::STEP::SESSION_RETRY
), 0u);
883 srv_interceptor
->proceed(Interceptor::STEP::READY
, Interceptor::ACTION::CONTINUE
);
886 std::unique_lock l
{cli_dispatcher
.lock
};
887 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
888 cli_dispatcher
.got_new
= false;
891 client_msgr
->shutdown();
893 server_msgr
->shutdown();
896 delete cli_interceptor
;
897 delete srv_interceptor
;
900 TEST_P(MessengerTest
, SimpleTest
) {
901 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
902 entity_addr_t bind_addr
;
903 bind_addr
.parse("v2:127.0.0.1");
904 server_msgr
->bind(bind_addr
);
905 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
906 server_msgr
->start();
908 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
909 client_msgr
->start();
911 // 1. simple round trip
912 MPing
*m
= new MPing();
913 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
914 server_msgr
->get_myaddrs());
916 ASSERT_EQ(conn
->send_message(m
), 0);
917 std::unique_lock l
{cli_dispatcher
.lock
};
918 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
919 cli_dispatcher
.got_new
= false;
921 ASSERT_TRUE(conn
->is_connected());
922 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
923 ASSERT_TRUE(conn
->peer_is_osd());
925 // 2. test rebind port
926 set
<int> avoid_ports
;
927 for (int i
= 0; i
< 10 ; i
++) {
928 for (auto a
: server_msgr
->get_myaddrs().v
) {
929 avoid_ports
.insert(a
.get_port() + i
);
932 server_msgr
->rebind(avoid_ports
);
933 for (auto a
: server_msgr
->get_myaddrs().v
) {
934 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
937 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
938 server_msgr
->get_myaddrs());
941 ASSERT_EQ(conn
->send_message(m
), 0);
942 std::unique_lock l
{cli_dispatcher
.lock
};
943 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
944 cli_dispatcher
.got_new
= false;
946 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
948 // 3. test markdown connection
950 ASSERT_FALSE(conn
->is_connected());
952 // 4. test failed connection
953 server_msgr
->shutdown();
957 conn
->send_message(m
);
958 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
959 ASSERT_FALSE(conn
->is_connected());
961 // 5. loopback connection
962 srv_dispatcher
.loopback
= true;
963 conn
= client_msgr
->get_loopback_connection();
966 ASSERT_EQ(conn
->send_message(m
), 0);
967 std::unique_lock l
{cli_dispatcher
.lock
};
968 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
969 cli_dispatcher
.got_new
= false;
971 srv_dispatcher
.loopback
= false;
972 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
973 client_msgr
->shutdown();
975 server_msgr
->shutdown();
979 TEST_P(MessengerTest
, SimpleMsgr2Test
) {
980 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
981 entity_addr_t legacy_addr
;
982 legacy_addr
.parse("v1:127.0.0.1");
983 entity_addr_t msgr2_addr
;
984 msgr2_addr
.parse("v2:127.0.0.1");
985 entity_addrvec_t bind_addrs
;
986 bind_addrs
.v
.push_back(legacy_addr
);
987 bind_addrs
.v
.push_back(msgr2_addr
);
988 server_msgr
->bindv(bind_addrs
);
989 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
990 server_msgr
->start();
992 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
993 client_msgr
->start();
995 // 1. simple round trip
996 MPing
*m
= new MPing();
997 ConnectionRef conn
= client_msgr
->connect_to(
998 server_msgr
->get_mytype(),
999 server_msgr
->get_myaddrs());
1001 ASSERT_EQ(conn
->send_message(m
), 0);
1002 std::unique_lock l
{cli_dispatcher
.lock
};
1003 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1004 cli_dispatcher
.got_new
= false;
1006 ASSERT_TRUE(conn
->is_connected());
1007 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1008 ASSERT_TRUE(conn
->peer_is_osd());
1010 // 2. test rebind port
1011 set
<int> avoid_ports
;
1012 for (int i
= 0; i
< 10 ; i
++) {
1013 for (auto a
: server_msgr
->get_myaddrs().v
) {
1014 avoid_ports
.insert(a
.get_port() + i
);
1017 server_msgr
->rebind(avoid_ports
);
1018 for (auto a
: server_msgr
->get_myaddrs().v
) {
1019 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
1022 conn
= client_msgr
->connect_to(
1023 server_msgr
->get_mytype(),
1024 server_msgr
->get_myaddrs());
1027 ASSERT_EQ(conn
->send_message(m
), 0);
1028 std::unique_lock l
{cli_dispatcher
.lock
};
1029 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1030 cli_dispatcher
.got_new
= false;
1032 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1034 // 3. test markdown connection
1036 ASSERT_FALSE(conn
->is_connected());
1038 // 4. test failed connection
1039 server_msgr
->shutdown();
1040 server_msgr
->wait();
1043 conn
->send_message(m
);
1044 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1045 ASSERT_FALSE(conn
->is_connected());
1047 // 5. loopback connection
1048 srv_dispatcher
.loopback
= true;
1049 conn
= client_msgr
->get_loopback_connection();
1052 ASSERT_EQ(conn
->send_message(m
), 0);
1053 std::unique_lock l
{cli_dispatcher
.lock
};
1054 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1055 cli_dispatcher
.got_new
= false;
1057 srv_dispatcher
.loopback
= false;
1058 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1059 client_msgr
->shutdown();
1060 client_msgr
->wait();
1061 server_msgr
->shutdown();
1062 server_msgr
->wait();
1065 TEST_P(MessengerTest
, FeatureTest
) {
1066 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1067 entity_addr_t bind_addr
;
1068 bind_addr
.parse("v2:127.0.0.1");
1069 uint64_t all_feature_supported
, feature_required
, feature_supported
= 0;
1070 for (int i
= 0; i
< 10; i
++)
1071 feature_supported
|= 1ULL << i
;
1072 feature_supported
|= CEPH_FEATUREMASK_MSG_ADDR2
;
1073 feature_supported
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
;
1074 feature_required
= feature_supported
| 1ULL << 13;
1075 all_feature_supported
= feature_required
| 1ULL << 14;
1077 Messenger::Policy p
= server_msgr
->get_policy(entity_name_t::TYPE_CLIENT
);
1078 p
.features_required
= feature_required
;
1079 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1080 server_msgr
->bind(bind_addr
);
1081 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1082 server_msgr
->start();
1084 // 1. Suppose if only support less than required
1085 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
1086 p
.features_supported
= feature_supported
;
1087 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1088 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1089 client_msgr
->start();
1091 MPing
*m
= new MPing();
1092 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1093 server_msgr
->get_myaddrs());
1094 conn
->send_message(m
);
1095 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1096 // should failed build a connection
1097 ASSERT_FALSE(conn
->is_connected());
1099 client_msgr
->shutdown();
1100 client_msgr
->wait();
1102 // 2. supported met required
1103 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
1104 p
.features_supported
= all_feature_supported
;
1105 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1106 client_msgr
->start();
1108 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1109 server_msgr
->get_myaddrs());
1112 ASSERT_EQ(conn
->send_message(m
), 0);
1113 std::unique_lock l
{cli_dispatcher
.lock
};
1114 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1115 cli_dispatcher
.got_new
= false;
1117 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1119 server_msgr
->shutdown();
1120 client_msgr
->shutdown();
1121 server_msgr
->wait();
1122 client_msgr
->wait();
1125 TEST_P(MessengerTest
, TimeoutTest
) {
1126 g_ceph_context
->_conf
.set_val("ms_connection_idle_timeout", "1");
1127 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1128 entity_addr_t bind_addr
;
1129 bind_addr
.parse("v2:127.0.0.1");
1130 server_msgr
->bind(bind_addr
);
1131 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1132 server_msgr
->start();
1134 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1135 client_msgr
->start();
1137 // 1. build the connection
1138 MPing
*m
= new MPing();
1139 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1140 server_msgr
->get_myaddrs());
1142 ASSERT_EQ(conn
->send_message(m
), 0);
1143 std::unique_lock l
{cli_dispatcher
.lock
};
1144 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1145 cli_dispatcher
.got_new
= false;
1147 ASSERT_TRUE(conn
->is_connected());
1148 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1149 ASSERT_TRUE(conn
->peer_is_osd());
1153 ASSERT_FALSE(conn
->is_connected());
1155 server_msgr
->shutdown();
1156 server_msgr
->wait();
1158 client_msgr
->shutdown();
1159 client_msgr
->wait();
1160 g_ceph_context
->_conf
.set_val("ms_connection_idle_timeout", "900");
1163 TEST_P(MessengerTest
, StatefulTest
) {
1165 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1166 entity_addr_t bind_addr
;
1167 bind_addr
.parse("v2:127.0.0.1");
1168 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1169 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1170 p
= Messenger::Policy::lossless_client(0);
1171 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1173 server_msgr
->bind(bind_addr
);
1174 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1175 server_msgr
->start();
1176 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1177 client_msgr
->start();
1179 // 1. test for server standby
1180 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1181 server_msgr
->get_myaddrs());
1184 ASSERT_EQ(conn
->send_message(m
), 0);
1185 std::unique_lock l
{cli_dispatcher
.lock
};
1186 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1187 cli_dispatcher
.got_new
= false;
1189 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1191 ASSERT_FALSE(conn
->is_connected());
1192 ConnectionRef server_conn
= server_msgr
->connect_to(
1193 client_msgr
->get_mytype(), srv_dispatcher
.last_accept
);
1195 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1197 srv_dispatcher
.got_new
= false;
1198 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1199 server_msgr
->get_myaddrs());
1202 ASSERT_EQ(conn
->send_message(m
), 0);
1203 std::unique_lock l
{cli_dispatcher
.lock
};
1204 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1205 cli_dispatcher
.got_new
= false;
1207 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1208 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1209 srv_dispatcher
.last_accept
);
1211 std::unique_lock l
{srv_dispatcher
.lock
};
1212 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_remote_reset
; });
1215 // 2. test for client reconnect
1216 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1217 cli_dispatcher
.got_connect
= false;
1218 cli_dispatcher
.got_new
= false;
1219 cli_dispatcher
.got_remote_reset
= false;
1220 server_conn
->mark_down();
1221 ASSERT_FALSE(server_conn
->is_connected());
1222 // ensure client detect server socket closed
1224 std::unique_lock l
{cli_dispatcher
.lock
};
1225 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_remote_reset
; });
1226 cli_dispatcher
.got_remote_reset
= false;
1229 std::unique_lock l
{cli_dispatcher
.lock
};
1230 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_connect
; });
1231 cli_dispatcher
.got_connect
= false;
1233 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1234 ASSERT_TRUE(conn
->is_connected());
1238 ASSERT_EQ(conn
->send_message(m
), 0);
1239 ASSERT_TRUE(conn
->is_connected());
1240 std::unique_lock l
{cli_dispatcher
.lock
};
1241 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1242 cli_dispatcher
.got_new
= false;
1244 // resetcheck happen
1245 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1246 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1247 srv_dispatcher
.last_accept
);
1248 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1249 cli_dispatcher
.got_remote_reset
= false;
1251 server_msgr
->shutdown();
1252 client_msgr
->shutdown();
1253 server_msgr
->wait();
1254 client_msgr
->wait();
1257 TEST_P(MessengerTest
, StatelessTest
) {
1259 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1260 entity_addr_t bind_addr
;
1261 bind_addr
.parse("v2:127.0.0.1");
1262 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
1263 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1264 p
= Messenger::Policy::lossy_client(0);
1265 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1267 server_msgr
->bind(bind_addr
);
1268 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1269 server_msgr
->start();
1270 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1271 client_msgr
->start();
1273 // 1. test for server lose state
1274 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1275 server_msgr
->get_myaddrs());
1278 ASSERT_EQ(conn
->send_message(m
), 0);
1279 std::unique_lock l
{cli_dispatcher
.lock
};
1280 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1281 cli_dispatcher
.got_new
= false;
1283 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1285 ASSERT_FALSE(conn
->is_connected());
1287 srv_dispatcher
.got_new
= false;
1288 ConnectionRef server_conn
;
1289 srv_dispatcher
.last_accept_con_ptr
= &server_conn
;
1290 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1291 server_msgr
->get_myaddrs());
1294 ASSERT_EQ(conn
->send_message(m
), 0);
1295 std::unique_lock l
{cli_dispatcher
.lock
};
1296 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1297 cli_dispatcher
.got_new
= false;
1299 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1300 ASSERT_TRUE(server_conn
);
1302 // server lose state
1304 std::unique_lock l
{srv_dispatcher
.lock
};
1305 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
1307 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1309 // 2. test for client lossy
1310 server_conn
->mark_down();
1311 ASSERT_FALSE(server_conn
->is_connected());
1312 conn
->send_keepalive();
1313 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1314 ASSERT_FALSE(conn
->is_connected());
1315 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1316 server_msgr
->get_myaddrs());
1319 ASSERT_EQ(conn
->send_message(m
), 0);
1320 std::unique_lock l
{cli_dispatcher
.lock
};
1321 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1322 cli_dispatcher
.got_new
= false;
1324 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1326 server_msgr
->shutdown();
1327 client_msgr
->shutdown();
1328 server_msgr
->wait();
1329 client_msgr
->wait();
1332 TEST_P(MessengerTest
, AnonTest
) {
1334 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1335 entity_addr_t bind_addr
;
1336 bind_addr
.parse("v2:127.0.0.1");
1337 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
1338 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1339 p
= Messenger::Policy::lossy_client(0);
1340 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1342 server_msgr
->bind(bind_addr
);
1343 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1344 server_msgr
->start();
1345 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1346 client_msgr
->start();
1348 ConnectionRef server_con_a
, server_con_b
;
1351 srv_dispatcher
.last_accept_con_ptr
= &server_con_a
;
1352 ConnectionRef con_a
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1353 server_msgr
->get_myaddrs(),
1357 ASSERT_EQ(con_a
->send_message(m
), 0);
1358 std::unique_lock l
{cli_dispatcher
.lock
};
1359 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1360 cli_dispatcher
.got_new
= false;
1362 ASSERT_EQ(1U, static_cast<Session
*>(con_a
->get_priv().get())->get_count());
1365 srv_dispatcher
.last_accept_con_ptr
= &server_con_b
;
1366 ConnectionRef con_b
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1367 server_msgr
->get_myaddrs(),
1371 ASSERT_EQ(con_b
->send_message(m
), 0);
1372 std::unique_lock l
{cli_dispatcher
.lock
};
1373 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1374 cli_dispatcher
.got_new
= false;
1376 ASSERT_EQ(1U, static_cast<Session
*>(con_b
->get_priv().get())->get_count());
1378 // these should be distinct
1379 ASSERT_NE(con_a
, con_b
);
1380 ASSERT_NE(server_con_a
, server_con_b
);
1382 // and both connected
1385 ASSERT_EQ(con_a
->send_message(m
), 0);
1386 std::unique_lock l
{cli_dispatcher
.lock
};
1387 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1388 cli_dispatcher
.got_new
= false;
1392 ASSERT_EQ(con_b
->send_message(m
), 0);
1393 std::unique_lock l
{cli_dispatcher
.lock
};
1394 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1395 cli_dispatcher
.got_new
= false;
1400 ASSERT_FALSE(con_a
->is_connected());
1402 ASSERT_FALSE(con_b
->is_connected());
1404 server_msgr
->shutdown();
1405 client_msgr
->shutdown();
1406 server_msgr
->wait();
1407 client_msgr
->wait();
1410 TEST_P(MessengerTest
, ClientStandbyTest
) {
1412 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1413 entity_addr_t bind_addr
;
1414 bind_addr
.parse("v2:127.0.0.1");
1415 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1416 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1417 p
= Messenger::Policy::lossless_peer(0);
1418 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1420 server_msgr
->bind(bind_addr
);
1421 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1422 server_msgr
->start();
1423 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1424 client_msgr
->start();
1426 // 1. test for client standby, resetcheck
1427 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1428 server_msgr
->get_myaddrs());
1431 ASSERT_EQ(conn
->send_message(m
), 0);
1432 std::unique_lock l
{cli_dispatcher
.lock
};
1433 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1434 cli_dispatcher
.got_new
= false;
1436 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1437 ConnectionRef server_conn
= server_msgr
->connect_to(
1438 client_msgr
->get_mytype(),
1439 srv_dispatcher
.last_accept
);
1440 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1441 cli_dispatcher
.got_connect
= false;
1442 server_conn
->mark_down();
1443 ASSERT_FALSE(server_conn
->is_connected());
1444 // client should be standby
1446 // client should be standby, so we use original connection
1448 // Try send message to verify got remote reset callback
1450 ASSERT_EQ(conn
->send_message(m
), 0);
1452 std::unique_lock l
{cli_dispatcher
.lock
};
1453 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_remote_reset
; });
1454 cli_dispatcher
.got_remote_reset
= false;
1455 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_connect
; });
1456 cli_dispatcher
.got_connect
= false;
1458 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1459 ASSERT_TRUE(conn
->is_connected());
1461 ASSERT_EQ(conn
->send_message(m
), 0);
1462 std::unique_lock l
{cli_dispatcher
.lock
};
1463 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1464 cli_dispatcher
.got_new
= false;
1466 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1467 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1468 srv_dispatcher
.last_accept
);
1469 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1471 server_msgr
->shutdown();
1472 client_msgr
->shutdown();
1473 server_msgr
->wait();
1474 client_msgr
->wait();
1477 TEST_P(MessengerTest
, AuthTest
) {
1478 g_ceph_context
->_conf
.set_val("auth_cluster_required", "cephx");
1479 g_ceph_context
->_conf
.set_val("auth_service_required", "cephx");
1480 g_ceph_context
->_conf
.set_val("auth_client_required", "cephx");
1481 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1482 entity_addr_t bind_addr
;
1483 bind_addr
.parse("v2:127.0.0.1");
1484 server_msgr
->bind(bind_addr
);
1485 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1486 server_msgr
->start();
1488 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1489 client_msgr
->start();
1491 // 1. simple auth round trip
1492 MPing
*m
= new MPing();
1493 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1494 server_msgr
->get_myaddrs());
1496 ASSERT_EQ(conn
->send_message(m
), 0);
1497 std::unique_lock l
{cli_dispatcher
.lock
};
1498 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1499 cli_dispatcher
.got_new
= false;
1501 ASSERT_TRUE(conn
->is_connected());
1502 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1505 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
1506 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
1507 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
1509 ASSERT_FALSE(conn
->is_connected());
1510 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1511 server_msgr
->get_myaddrs());
1513 MPing
*m
= new MPing();
1514 ASSERT_EQ(conn
->send_message(m
), 0);
1515 std::unique_lock l
{cli_dispatcher
.lock
};
1516 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1517 cli_dispatcher
.got_new
= false;
1519 ASSERT_TRUE(conn
->is_connected());
1520 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1521 server_msgr
->shutdown();
1522 client_msgr
->shutdown();
1523 server_msgr
->wait();
1524 client_msgr
->wait();
1527 TEST_P(MessengerTest
, MessageTest
) {
1528 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1529 entity_addr_t bind_addr
;
1530 bind_addr
.parse("v2:127.0.0.1");
1531 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1532 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1533 p
= Messenger::Policy::lossless_peer(0);
1534 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1536 server_msgr
->bind(bind_addr
);
1537 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1538 server_msgr
->start();
1539 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1540 client_msgr
->start();
1543 // 1. A very large "front"(as well as "payload")
1544 // Because a external message need to invade Messenger::decode_message,
1545 // here we only use existing message class(MCommand)
1546 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1547 server_msgr
->get_myaddrs());
1550 uuid
.generate_random();
1551 vector
<string
> cmds
;
1552 string
s("abcdefghijklmnopqrstuvwxyz");
1553 for (int i
= 0; i
< 1024*30; i
++)
1555 MCommand
*m
= new MCommand(uuid
);
1557 conn
->send_message(m
);
1558 std::unique_lock l
{cli_dispatcher
.lock
};
1559 cli_dispatcher
.cond
.wait_for(l
, 500s
, [&] { return cli_dispatcher
.got_new
; });
1560 ASSERT_TRUE(cli_dispatcher
.got_new
);
1561 cli_dispatcher
.got_new
= false;
1564 // 2. A very large "data"
1567 string
s("abcdefghijklmnopqrstuvwxyz");
1568 for (int i
= 0; i
< 1024*30; i
++)
1570 MPing
*m
= new MPing();
1572 conn
->send_message(m
);
1575 std::unique_lock l
{cli_dispatcher
.lock
};
1576 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1577 ASSERT_TRUE(cli_dispatcher
.got_new
);
1578 cli_dispatcher
.got_new
= false;
1580 server_msgr
->shutdown();
1581 client_msgr
->shutdown();
1582 server_msgr
->wait();
1583 client_msgr
->wait();
1587 class SyntheticWorkload
;
1590 enum Who
: uint8_t {
1598 Payload(Who who
, uint64_t seq
, const bufferlist
& data
)
1599 : who(who
), seq(seq
), data(data
)
1601 Payload() = default;
1602 DENC(Payload
, v
, p
) {
1603 DENC_START(1, 1, p
);
1610 WRITE_CLASS_DENC(Payload
)
1612 ostream
& operator<<(ostream
& out
, const Payload
&pl
)
1614 return out
<< "reply=" << pl
.who
<< " i = " << pl
.seq
;
1617 class SyntheticDispatcher
: public Dispatcher
{
1619 ceph::mutex lock
= ceph::make_mutex("SyntheticDispatcher::lock");
1620 ceph::condition_variable cond
;
1623 bool got_remote_reset
;
1625 map
<ConnectionRef
, list
<uint64_t> > conn_sent
;
1626 map
<uint64_t, bufferlist
> sent
;
1627 atomic
<uint64_t> index
;
1628 SyntheticWorkload
*workload
;
1630 SyntheticDispatcher(bool s
, SyntheticWorkload
*wl
):
1631 Dispatcher(g_ceph_context
), is_server(s
), got_new(false),
1632 got_remote_reset(false), got_connect(false), index(0), workload(wl
) {
1634 bool ms_can_fast_dispatch_any() const override
{ return true; }
1635 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1636 switch (m
->get_type()) {
1645 void ms_handle_fast_connect(Connection
*con
) override
{
1646 std::lock_guard l
{lock
};
1647 list
<uint64_t> c
= conn_sent
[con
];
1648 for (list
<uint64_t>::iterator it
= c
.begin();
1649 it
!= c
.end(); ++it
)
1651 conn_sent
.erase(con
);
1655 void ms_handle_fast_accept(Connection
*con
) override
{
1656 std::lock_guard l
{lock
};
1657 list
<uint64_t> c
= conn_sent
[con
];
1658 for (list
<uint64_t>::iterator it
= c
.begin();
1659 it
!= c
.end(); ++it
)
1661 conn_sent
.erase(con
);
1664 bool ms_dispatch(Message
*m
) override
{
1667 bool ms_handle_reset(Connection
*con
) override
;
1668 void ms_handle_remote_reset(Connection
*con
) override
{
1669 std::lock_guard l
{lock
};
1670 list
<uint64_t> c
= conn_sent
[con
];
1671 for (list
<uint64_t>::iterator it
= c
.begin();
1672 it
!= c
.end(); ++it
)
1674 conn_sent
.erase(con
);
1675 got_remote_reset
= true;
1677 bool ms_handle_refused(Connection
*con
) override
{
1680 void ms_fast_dispatch(Message
*m
) override
{
1681 // MSG_COMMAND is used to disorganize regular message flow
1682 if (m
->get_type() == MSG_COMMAND
) {
1688 auto p
= m
->get_data().cbegin();
1690 if (pl
.who
== Payload::PING
) {
1691 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1692 reply_message(m
, pl
);
1694 std::lock_guard l
{lock
};
1698 std::lock_guard l
{lock
};
1699 if (sent
.count(pl
.seq
)) {
1700 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1701 ASSERT_EQ(conn_sent
[m
->get_connection()].front(), pl
.seq
);
1702 ASSERT_TRUE(pl
.data
.contents_equal(sent
[pl
.seq
]));
1703 conn_sent
[m
->get_connection()].pop_front();
1712 int ms_handle_fast_authentication(Connection
*con
) override
{
1716 void reply_message(const Message
*m
, Payload
& pl
) {
1717 pl
.who
= Payload::PONG
;
1720 MPing
*rm
= new MPing();
1722 m
->get_connection()->send_message(rm
);
1723 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << " reply m=" << m
<< " i=" << pl
.seq
<< dendl
;
1726 void send_message_wrap(ConnectionRef con
, const bufferlist
& data
) {
1727 Message
*m
= new MPing();
1728 Payload pl
{Payload::PING
, index
++, data
};
1732 if (!con
->get_messenger()->get_default_policy().lossy
) {
1733 std::lock_guard l
{lock
};
1734 sent
[pl
.seq
] = pl
.data
;
1735 conn_sent
[con
].push_back(pl
.seq
);
1737 lderr(g_ceph_context
) << __func__
<< " conn=" << con
.get() << " send m=" << m
<< " i=" << pl
.seq
<< dendl
;
1738 ASSERT_EQ(0, con
->send_message(m
));
1741 uint64_t get_pending() {
1742 std::lock_guard l
{lock
};
1746 void clear_pending(ConnectionRef con
) {
1747 std::lock_guard l
{lock
};
1749 for (list
<uint64_t>::iterator it
= conn_sent
[con
].begin();
1750 it
!= conn_sent
[con
].end(); ++it
)
1752 conn_sent
.erase(con
);
1756 for (auto && p
: conn_sent
) {
1757 if (!p
.second
.empty()) {
1758 lderr(g_ceph_context
) << __func__
<< " " << p
.first
<< " wait " << p
.second
.size() << dendl
;
1765 class SyntheticWorkload
{
1766 ceph::mutex lock
= ceph::make_mutex("SyntheticWorkload::lock");
1767 ceph::condition_variable cond
;
1768 set
<Messenger
*> available_servers
;
1769 set
<Messenger
*> available_clients
;
1770 Messenger::Policy client_policy
;
1771 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> > available_connections
;
1772 SyntheticDispatcher dispatcher
;
1774 vector
<bufferlist
> rand_data
;
1775 DummyAuthClientServer dummy_auth
;
1778 const unsigned max_in_flight
= 0;
1779 const unsigned max_connections
= 0;
1780 static const unsigned max_message_len
= 1024 * 1024 * 4;
1782 SyntheticWorkload(int servers
, int clients
, string type
, int random_num
,
1783 Messenger::Policy srv_policy
, Messenger::Policy cli_policy
,
1784 int _max_in_flight
= 64, int _max_connections
= 128)
1785 : client_policy(cli_policy
),
1786 dispatcher(false, this),
1788 dummy_auth(g_ceph_context
),
1789 max_in_flight(_max_in_flight
),
1790 max_connections(_max_connections
) {
1792 dummy_auth
.auth_registry
.refresh_config();
1794 int base_port
= 16800;
1795 entity_addr_t bind_addr
;
1797 for (int i
= 0; i
< servers
; ++i
) {
1798 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0),
1799 "server", getpid()+i
);
1800 snprintf(addr
, sizeof(addr
), "v2:127.0.0.1:%d",
1802 bind_addr
.parse(addr
);
1803 msgr
->bind(bind_addr
);
1804 msgr
->add_dispatcher_head(&dispatcher
);
1805 msgr
->set_auth_client(&dummy_auth
);
1806 msgr
->set_auth_server(&dummy_auth
);
1809 msgr
->set_default_policy(srv_policy
);
1810 available_servers
.insert(msgr
);
1814 for (int i
= 0; i
< clients
; ++i
) {
1815 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(-1),
1816 "client", getpid()+i
+servers
);
1817 if (cli_policy
.standby
) {
1818 snprintf(addr
, sizeof(addr
), "v2:127.0.0.1:%d",
1819 base_port
+i
+servers
);
1820 bind_addr
.parse(addr
);
1821 msgr
->bind(bind_addr
);
1823 msgr
->add_dispatcher_head(&dispatcher
);
1824 msgr
->set_auth_client(&dummy_auth
);
1825 msgr
->set_auth_server(&dummy_auth
);
1828 msgr
->set_default_policy(cli_policy
);
1829 available_clients
.insert(msgr
);
1833 for (int i
= 0; i
< random_num
; i
++) {
1835 boost::uniform_int
<> u(32, max_message_len
);
1836 uint64_t value_len
= u(rng
);
1837 bufferptr
bp(value_len
);
1839 for (uint64_t j
= 0; j
< value_len
-sizeof(i
); ) {
1840 memcpy(bp
.c_str()+j
, &i
, sizeof(i
));
1845 rand_data
.push_back(bl
);
1849 ConnectionRef
_get_random_connection() {
1850 while (dispatcher
.get_pending() > max_in_flight
) {
1855 ceph_assert(ceph_mutex_is_locked(lock
));
1856 boost::uniform_int
<> choose(0, available_connections
.size() - 1);
1857 int index
= choose(rng
);
1858 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> >::iterator i
= available_connections
.begin();
1859 for (; index
> 0; --index
, ++i
) ;
1863 bool can_create_connection() {
1864 return available_connections
.size() < max_connections
;
1867 void generate_connection() {
1868 std::lock_guard l
{lock
};
1869 if (!can_create_connection())
1872 Messenger
*server
, *client
;
1874 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1875 int index
= choose(rng
);
1876 set
<Messenger
*>::iterator i
= available_servers
.begin();
1877 for (; index
> 0; --index
, ++i
) ;
1881 boost::uniform_int
<> choose(0, available_clients
.size() - 1);
1882 int index
= choose(rng
);
1883 set
<Messenger
*>::iterator i
= available_clients
.begin();
1884 for (; index
> 0; --index
, ++i
) ;
1888 pair
<Messenger
*, Messenger
*> p
;
1890 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1891 if (server
->get_default_policy().server
) {
1892 p
= make_pair(client
, server
);
1893 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1894 server
->get_myaddrs());
1895 available_connections
[conn
] = p
;
1897 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1898 server
->get_myaddrs());
1899 p
= make_pair(client
, server
);
1900 available_connections
[conn
] = p
;
1905 void send_message() {
1906 std::lock_guard l
{lock
};
1907 ConnectionRef conn
= _get_random_connection();
1908 boost::uniform_int
<> true_false(0, 99);
1909 int val
= true_false(rng
);
1912 uuid
.generate_random();
1913 MCommand
*m
= new MCommand(uuid
);
1914 vector
<string
> cmds
;
1915 cmds
.push_back("command");
1917 m
->set_priority(200);
1918 conn
->send_message(m
);
1920 boost::uniform_int
<> u(0, rand_data
.size()-1);
1921 dispatcher
.send_message_wrap(conn
, rand_data
[u(rng
)]);
1925 void send_large_message(bool inject_network_congestion
=false) {
1926 std::lock_guard l
{lock
};
1927 ConnectionRef conn
= _get_random_connection();
1929 uuid
.generate_random();
1930 MCommand
*m
= new MCommand(uuid
);
1931 vector
<string
> cmds
;
1932 cmds
.push_back("command");
1933 // set the random data to make the large message
1935 string
s("abcdefghijklmnopqrstuvwxyz");
1936 for (int i
= 0; i
< 1024*256; i
++)
1941 m
->set_priority(200);
1942 // setup after connection is ready
1943 if (inject_network_congestion
&& conn
->is_connected()) {
1944 g_ceph_context
->_conf
.set_val("ms_inject_network_congestion", "100");
1946 g_ceph_context
->_conf
.set_val("ms_inject_network_congestion", "0");
1948 conn
->send_message(m
);
1951 void drop_connection() {
1952 std::lock_guard l
{lock
};
1953 if (available_connections
.size() < 10)
1955 ConnectionRef conn
= _get_random_connection();
1956 dispatcher
.clear_pending(conn
);
1958 if (!client_policy
.server
&&
1959 !client_policy
.lossy
&&
1960 client_policy
.standby
) {
1961 // it's a lossless policy, so we need to mark down each side
1962 pair
<Messenger
*, Messenger
*> &p
= available_connections
[conn
];
1963 if (!p
.first
->get_default_policy().server
&& !p
.second
->get_default_policy().server
) {
1964 ASSERT_EQ(conn
->get_messenger(), p
.first
);
1965 ConnectionRef peer
= p
.second
->connect_to(p
.first
->get_mytype(),
1966 p
.first
->get_myaddrs());
1968 dispatcher
.clear_pending(peer
);
1969 available_connections
.erase(peer
);
1972 ASSERT_EQ(available_connections
.erase(conn
), 1U);
1975 void print_internal_state(bool detail
=false) {
1976 std::lock_guard l
{lock
};
1977 lderr(g_ceph_context
) << "available_connections: " << available_connections
.size()
1978 << " inflight messages: " << dispatcher
.get_pending() << dendl
;
1979 if (detail
&& !available_connections
.empty()) {
1984 void wait_for_done() {
1985 int64_t tick_us
= 1000 * 100; // 100ms
1986 int64_t timeout_us
= 5 * 60 * 1000 * 1000; // 5 mins
1988 while (dispatcher
.get_pending()) {
1990 timeout_us
-= tick_us
;
1992 print_internal_state(true);
1994 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
1996 for (set
<Messenger
*>::iterator it
= available_servers
.begin();
1997 it
!= available_servers
.end(); ++it
) {
2000 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
2003 available_servers
.clear();
2005 for (set
<Messenger
*>::iterator it
= available_clients
.begin();
2006 it
!= available_clients
.end(); ++it
) {
2009 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
2012 available_clients
.clear();
2015 void handle_reset(Connection
*con
) {
2016 std::lock_guard l
{lock
};
2017 available_connections
.erase(con
);
2018 dispatcher
.clear_pending(con
);
2022 bool SyntheticDispatcher::ms_handle_reset(Connection
*con
) {
2023 workload
->handle_reset(con
);
2027 TEST_P(MessengerTest
, SyntheticStressTest
) {
2028 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
2029 Messenger::Policy::stateful_server(0),
2030 Messenger::Policy::lossless_client(0));
2031 for (int i
= 0; i
< 100; ++i
) {
2032 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2033 test_msg
.generate_connection();
2035 gen_type
rng(time(NULL
));
2036 for (int i
= 0; i
< 5000; ++i
) {
2038 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2039 test_msg
.print_internal_state();
2041 boost::uniform_int
<> true_false(0, 99);
2042 int val
= true_false(rng
);
2044 test_msg
.generate_connection();
2045 } else if (val
> 80) {
2046 test_msg
.drop_connection();
2047 } else if (val
> 10) {
2048 test_msg
.send_message();
2050 usleep(rand() % 1000 + 500);
2053 test_msg
.wait_for_done();
2056 TEST_P(MessengerTest
, SyntheticStressTest1
) {
2057 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
2058 Messenger::Policy::lossless_peer_reuse(0),
2059 Messenger::Policy::lossless_peer_reuse(0));
2060 for (int i
= 0; i
< 10; ++i
) {
2061 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2062 test_msg
.generate_connection();
2064 gen_type
rng(time(NULL
));
2065 for (int i
= 0; i
< 10000; ++i
) {
2067 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2068 test_msg
.print_internal_state();
2070 boost::uniform_int
<> true_false(0, 99);
2071 int val
= true_false(rng
);
2073 test_msg
.generate_connection();
2074 } else if (val
> 60) {
2075 test_msg
.drop_connection();
2076 } else if (val
> 10) {
2077 test_msg
.send_message();
2079 usleep(rand() % 1000 + 500);
2082 test_msg
.wait_for_done();
2086 TEST_P(MessengerTest
, SyntheticInjectTest
) {
2087 uint64_t dispatch_throttle_bytes
= g_ceph_context
->_conf
->ms_dispatch_throttle_bytes
;
2088 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2089 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2090 g_ceph_context
->_conf
.set_val("ms_dispatch_throttle_bytes", "16777216");
2091 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
2092 Messenger::Policy::stateful_server(0),
2093 Messenger::Policy::lossless_client(0));
2094 for (int i
= 0; i
< 100; ++i
) {
2095 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2096 test_msg
.generate_connection();
2098 gen_type
rng(time(NULL
));
2099 for (int i
= 0; i
< 1000; ++i
) {
2101 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2102 test_msg
.print_internal_state();
2104 boost::uniform_int
<> true_false(0, 99);
2105 int val
= true_false(rng
);
2107 test_msg
.generate_connection();
2108 } else if (val
> 80) {
2109 test_msg
.drop_connection();
2110 } else if (val
> 10) {
2111 test_msg
.send_message();
2113 usleep(rand() % 500 + 100);
2116 test_msg
.wait_for_done();
2117 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2118 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2119 g_ceph_context
->_conf
.set_val(
2120 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes
));
2123 TEST_P(MessengerTest
, SyntheticInjectTest2
) {
2124 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2125 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2126 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
2127 Messenger::Policy::lossless_peer_reuse(0),
2128 Messenger::Policy::lossless_peer_reuse(0));
2129 for (int i
= 0; i
< 100; ++i
) {
2130 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2131 test_msg
.generate_connection();
2133 gen_type
rng(time(NULL
));
2134 for (int i
= 0; i
< 1000; ++i
) {
2136 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2137 test_msg
.print_internal_state();
2139 boost::uniform_int
<> true_false(0, 99);
2140 int val
= true_false(rng
);
2142 test_msg
.generate_connection();
2143 } else if (val
> 80) {
2144 test_msg
.drop_connection();
2145 } else if (val
> 10) {
2146 test_msg
.send_message();
2148 usleep(rand() % 500 + 100);
2151 test_msg
.wait_for_done();
2152 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2153 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2156 TEST_P(MessengerTest
, SyntheticInjectTest3
) {
2157 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "600");
2158 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2159 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
2160 Messenger::Policy::stateless_server(0),
2161 Messenger::Policy::lossy_client(0));
2162 for (int i
= 0; i
< 100; ++i
) {
2163 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2164 test_msg
.generate_connection();
2166 gen_type
rng(time(NULL
));
2167 for (int i
= 0; i
< 1000; ++i
) {
2169 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2170 test_msg
.print_internal_state();
2172 boost::uniform_int
<> true_false(0, 99);
2173 int val
= true_false(rng
);
2175 test_msg
.generate_connection();
2176 } else if (val
> 80) {
2177 test_msg
.drop_connection();
2178 } else if (val
> 10) {
2179 test_msg
.send_message();
2181 usleep(rand() % 500 + 100);
2184 test_msg
.wait_for_done();
2185 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2186 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2190 TEST_P(MessengerTest
, SyntheticInjectTest4
) {
2191 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2192 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2193 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "1");
2194 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "client osd");
2195 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "5");
2196 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
2197 Messenger::Policy::lossless_peer(0),
2198 Messenger::Policy::lossless_peer(0));
2199 for (int i
= 0; i
< 100; ++i
) {
2200 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2201 test_msg
.generate_connection();
2203 gen_type
rng(time(NULL
));
2204 for (int i
= 0; i
< 1000; ++i
) {
2206 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2207 test_msg
.print_internal_state();
2209 boost::uniform_int
<> true_false(0, 99);
2210 int val
= true_false(rng
);
2212 test_msg
.generate_connection();
2213 } else if (val
> 80) {
2214 // test_msg.drop_connection();
2215 } else if (val
> 10) {
2216 test_msg
.send_message();
2218 usleep(rand() % 500 + 100);
2221 test_msg
.wait_for_done();
2222 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2223 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2224 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "0");
2225 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "");
2226 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "0");
2229 // This is test for network block, means ::send return EAGAIN
2230 TEST_P(MessengerTest
, SyntheticInjectTest5
) {
2231 SyntheticWorkload
test_msg(1, 8, GetParam(), 100,
2232 Messenger::Policy::stateful_server(0),
2233 Messenger::Policy::lossless_client(0),
2235 bool simulate_network_congestion
= true;
2236 for (int i
= 0; i
< 2; ++i
)
2237 test_msg
.generate_connection();
2238 for (int i
= 0; i
< 5000; ++i
) {
2240 ldout(g_ceph_context
, 0) << "Op " << i
<< ": " << dendl
;
2241 test_msg
.print_internal_state();
2244 // means that we would stuck 1600 * 6M (9.6G) around with 2 connections
2245 test_msg
.send_large_message(simulate_network_congestion
);
2247 simulate_network_congestion
= false;
2248 test_msg
.send_large_message(simulate_network_congestion
);
2251 test_msg
.wait_for_done();
2255 class MarkdownDispatcher
: public Dispatcher
{
2256 ceph::mutex lock
= ceph::make_mutex("MarkdownDispatcher::lock");
2257 set
<ConnectionRef
> conns
;
2260 std::atomic
<uint64_t> count
= { 0 };
2261 explicit MarkdownDispatcher(bool s
): Dispatcher(g_ceph_context
),
2264 bool ms_can_fast_dispatch_any() const override
{ return false; }
2265 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2266 switch (m
->get_type()) {
2274 void ms_handle_fast_connect(Connection
*con
) override
{
2275 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2276 std::lock_guard l
{lock
};
2279 void ms_handle_fast_accept(Connection
*con
) override
{
2280 std::lock_guard l
{lock
};
2283 bool ms_dispatch(Message
*m
) override
{
2284 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << dendl
;
2285 std::lock_guard l
{lock
};
2287 conns
.insert(m
->get_connection());
2288 if (conns
.size() < 2 && !last_mark
) {
2294 usleep(rand() % 500);
2295 for (set
<ConnectionRef
>::iterator it
= conns
.begin(); it
!= conns
.end(); ++it
) {
2296 if ((*it
) != m
->get_connection().get()) {
2307 bool ms_handle_reset(Connection
*con
) override
{
2308 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2309 std::lock_guard l
{lock
};
2311 usleep(rand() % 500);
2314 void ms_handle_remote_reset(Connection
*con
) override
{
2315 std::lock_guard l
{lock
};
2317 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2319 bool ms_handle_refused(Connection
*con
) override
{
2322 void ms_fast_dispatch(Message
*m
) override
{
2325 int ms_handle_fast_authentication(Connection
*con
) override
{
2331 // Markdown with external lock
2332 TEST_P(MessengerTest
, MarkdownTest
) {
2333 Messenger
*server_msgr2
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
2334 MarkdownDispatcher
cli_dispatcher(false), srv_dispatcher(true);
2335 DummyAuthClientServer
dummy_auth(g_ceph_context
);
2336 dummy_auth
.auth_registry
.refresh_config();
2337 entity_addr_t bind_addr
;
2338 bind_addr
.parse("v2:127.0.0.1:16800");
2339 server_msgr
->bind(bind_addr
);
2340 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
2341 server_msgr
->set_auth_client(&dummy_auth
);
2342 server_msgr
->set_auth_server(&dummy_auth
);
2343 server_msgr
->start();
2344 bind_addr
.parse("v2:127.0.0.1:16801");
2345 server_msgr2
->bind(bind_addr
);
2346 server_msgr2
->add_dispatcher_head(&srv_dispatcher
);
2347 server_msgr2
->set_auth_client(&dummy_auth
);
2348 server_msgr2
->set_auth_server(&dummy_auth
);
2349 server_msgr2
->start();
2351 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
2352 client_msgr
->set_auth_client(&dummy_auth
);
2353 client_msgr
->set_auth_server(&dummy_auth
);
2354 client_msgr
->start();
2359 uint64_t equal_count
= 0;
2361 ConnectionRef conn1
= client_msgr
->connect_to(server_msgr
->get_mytype(),
2362 server_msgr
->get_myaddrs());
2363 ConnectionRef conn2
= client_msgr
->connect_to(server_msgr2
->get_mytype(),
2364 server_msgr2
->get_myaddrs());
2365 MPing
*m
= new MPing();
2366 ASSERT_EQ(conn1
->send_message(m
), 0);
2368 ASSERT_EQ(conn2
->send_message(m
), 0);
2369 CHECK_AND_WAIT_TRUE(srv_dispatcher
.count
> last
+ 1);
2370 if (srv_dispatcher
.count
== last
) {
2371 lderr(g_ceph_context
) << __func__
<< " last is " << last
<< dendl
;
2378 last
= srv_dispatcher
.count
;
2381 ASSERT_FALSE(equal
&& equal_count
> 3);
2383 server_msgr
->shutdown();
2384 client_msgr
->shutdown();
2385 server_msgr2
->shutdown();
2386 server_msgr
->wait();
2387 client_msgr
->wait();
2388 server_msgr2
->wait();
2389 delete server_msgr2
;
2392 INSTANTIATE_TEST_SUITE_P(
2400 int main(int argc
, char **argv
) {
2401 auto args
= argv_to_vec(argc
, argv
);
2403 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
2404 CODE_ENVIRONMENT_UTILITY
,
2405 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
2406 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
2407 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
2408 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
2409 g_ceph_context
->_conf
.set_val("keyring", "/dev/null");
2410 g_ceph_context
->_conf
.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2411 g_ceph_context
->_conf
.set_val("ms_die_on_bad_msg", "true");
2412 g_ceph_context
->_conf
.set_val("ms_die_on_old_message", "true");
2413 g_ceph_context
->_conf
.set_val("ms_max_backoff", "1");
2414 common_init_finish(g_ceph_context
);
2416 ::testing::InitGoogleTest(&argc
, argv
);
2417 return RUN_ALL_TESTS();
2422 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"