1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
24 #include "common/ceph_mutex.h"
25 #include "common/ceph_argparse.h"
26 #include "global/global_init.h"
27 #include "msg/Dispatcher.h"
28 #include "msg/msg_types.h"
29 #include "msg/Message.h"
30 #include "msg/Messenger.h"
31 #include "msg/Connection.h"
32 #include "messages/MPing.h"
33 #include "messages/MCommand.h"
35 #include <boost/random/mersenne_twister.hpp>
36 #include <boost/random/uniform_int.hpp>
37 #include <boost/random/binomial_distribution.hpp>
38 #include <gtest/gtest.h>
40 typedef boost::mt11213b gen_type
;
42 #include "common/dout.h"
43 #include "include/ceph_assert.h"
45 #include "auth/DummyAuth.h"
47 #define dout_subsys ceph_subsys_ms
49 #define dout_prefix *_dout << " ceph_test_msgr "
52 #define CHECK_AND_WAIT_TRUE(expr) do { \
61 class MessengerTest
: public ::testing::TestWithParam
<const char*> {
63 DummyAuthClientServer dummy_auth
;
64 Messenger
*server_msgr
;
65 Messenger
*client_msgr
;
67 MessengerTest() : dummy_auth(g_ceph_context
),
68 server_msgr(NULL
), client_msgr(NULL
) {
69 dummy_auth
.auth_registry
.refresh_config();
71 void SetUp() override
{
72 lderr(g_ceph_context
) << __func__
<< " start set up " << GetParam() << dendl
;
73 server_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
74 client_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
75 server_msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
76 client_msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
77 server_msgr
->set_auth_client(&dummy_auth
);
78 server_msgr
->set_auth_server(&dummy_auth
);
79 client_msgr
->set_auth_client(&dummy_auth
);
80 client_msgr
->set_auth_server(&dummy_auth
);
81 server_msgr
->set_require_authorizer(false);
83 void TearDown() override
{
84 ASSERT_EQ(server_msgr
->get_dispatch_queue_len(), 0);
85 ASSERT_EQ(client_msgr
->get_dispatch_queue_len(), 0);
93 class FakeDispatcher
: public Dispatcher
{
95 struct Session
: public RefCountedObject
{
96 atomic
<uint64_t> count
;
99 explicit Session(ConnectionRef c
): RefCountedObject(g_ceph_context
), count(0), con(c
) {
101 uint64_t get_count() { return count
; }
104 ceph::mutex lock
= ceph::make_mutex("FakeDispatcher::lock");
105 ceph::condition_variable cond
;
108 bool got_remote_reset
;
111 entity_addrvec_t last_accept
;
112 ConnectionRef
*last_accept_con_ptr
= nullptr;
114 explicit FakeDispatcher(bool s
): Dispatcher(g_ceph_context
),
115 is_server(s
), got_new(false), got_remote_reset(false),
116 got_connect(false), loopback(false) {
118 bool ms_can_fast_dispatch_any() const override
{ return true; }
119 bool ms_can_fast_dispatch(const Message
*m
) const override
{
120 switch (m
->get_type()) {
128 void ms_handle_fast_connect(Connection
*con
) override
{
129 std::scoped_lock l
{lock
};
130 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
131 auto s
= con
->get_priv();
133 auto session
= new Session(con
);
134 con
->set_priv(RefCountedPtr
{session
, false});
135 lderr(g_ceph_context
) << __func__
<< " con: " << con
136 << " count: " << session
->count
<< dendl
;
141 void ms_handle_fast_accept(Connection
*con
) override
{
142 last_accept
= con
->get_peer_addrs();
143 if (last_accept_con_ptr
) {
144 *last_accept_con_ptr
= con
;
146 if (!con
->get_priv()) {
147 con
->set_priv(RefCountedPtr
{new Session(con
), false});
150 bool ms_dispatch(Message
*m
) override
{
151 auto priv
= m
->get_connection()->get_priv();
152 auto s
= static_cast<Session
*>(priv
.get());
154 s
= new Session(m
->get_connection());
155 priv
.reset(s
, false);
156 m
->get_connection()->set_priv(priv
);
159 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
163 std::lock_guard l
{lock
};
169 bool ms_handle_reset(Connection
*con
) override
{
170 std::lock_guard l
{lock
};
171 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
172 auto priv
= con
->get_priv();
173 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
174 s
->con
.reset(); // break con <-> session ref cycle
175 con
->set_priv(nullptr); // break ref <-> session cycle, if any
179 void ms_handle_remote_reset(Connection
*con
) override
{
180 std::lock_guard l
{lock
};
181 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
182 auto priv
= con
->get_priv();
183 if (auto s
= static_cast<Session
*>(priv
.get()); s
) {
184 s
->con
.reset(); // break con <-> session ref cycle
185 con
->set_priv(nullptr); // break ref <-> session cycle, if any
187 got_remote_reset
= true;
190 bool ms_handle_refused(Connection
*con
) override
{
193 void ms_fast_dispatch(Message
*m
) override
{
194 auto priv
= m
->get_connection()->get_priv();
195 auto s
= static_cast<Session
*>(priv
.get());
197 s
= new Session(m
->get_connection());
198 priv
.reset(s
, false);
199 m
->get_connection()->set_priv(priv
);
202 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
205 ceph_assert(m
->get_source().is_osd());
208 } else if (loopback
) {
209 ceph_assert(m
->get_source().is_client());
212 std::lock_guard l
{lock
};
217 int ms_handle_authentication(Connection
*con
) override
{
221 void reply_message(Message
*m
) {
222 MPing
*rm
= new MPing();
223 m
->get_connection()->send_message(rm
);
227 typedef FakeDispatcher::Session Session
;
229 struct TestInterceptor
: public Interceptor
{
231 bool step_waiting
= false;
233 std::map
<Connection
*, uint32_t> current_step
;
234 std::map
<Connection
*, std::list
<uint32_t>> step_history
;
235 std::map
<uint32_t, std::optional
<ACTION
>> decisions
;
236 std::set
<uint32_t> breakpoints
;
238 uint32_t count_step(Connection
*conn
, uint32_t step
) {
240 for (auto s
: step_history
[conn
]) {
248 void breakpoint(uint32_t step
) {
249 breakpoints
.insert(step
);
252 void remove_bp(uint32_t step
) {
253 breakpoints
.erase(step
);
256 Connection
*wait(uint32_t step
, Connection
*conn
=nullptr) {
257 std::unique_lock
<std::mutex
> l(lock
);
260 auto it
= current_step
.find(conn
);
261 if (it
!= current_step
.end()) {
262 if (it
->second
== step
) {
267 for (auto it
: current_step
) {
268 if (it
.second
== step
) {
280 step_waiting
= false;
284 ACTION
wait_for_decision(uint32_t step
, std::unique_lock
<std::mutex
> &l
) {
285 if (decisions
[step
]) {
286 return *(decisions
[step
]);
289 cond_var
.wait(l
, [this] { return !waiting
; });
290 return *(decisions
[step
]);
293 void proceed(uint32_t step
, ACTION decision
) {
294 std::unique_lock
<std::mutex
> l(lock
);
295 decisions
[step
] = decision
;
298 cond_var
.notify_one();
302 ACTION
intercept(Connection
*conn
, uint32_t step
) override
{
303 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
304 << ") intercept called on step=" << step
<< dendl
;
307 std::unique_lock
<std::mutex
> l(lock
);
308 step_history
[conn
].push_back(step
);
309 current_step
[conn
] = step
;
311 cond_var
.notify_one();
315 std::unique_lock
<std::mutex
> l(lock
);
316 ACTION decision
= ACTION::CONTINUE
;
317 if (breakpoints
.find(step
) != breakpoints
.end()) {
318 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
319 << ") pausing on step=" << step
<< dendl
;
320 decision
= wait_for_decision(step
, l
);
322 if (decisions
[step
]) {
323 decision
= *(decisions
[step
]);
326 lderr(g_ceph_context
) << __func__
<< " conn(" << conn
327 << ") resuming step=" << step
<< " with decision="
328 << decision
<< dendl
;
329 decisions
[step
].reset();
336 * Scenario: A connects to B, and B connects to A at the same time.
338 TEST_P(MessengerTest
, ConnectionRaceTest
) {
339 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
341 TestInterceptor
*cli_interceptor
= new TestInterceptor();
342 TestInterceptor
*srv_interceptor
= new TestInterceptor();
344 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer_reuse(0));
345 server_msgr
->interceptor
= srv_interceptor
;
347 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer_reuse(0));
348 client_msgr
->interceptor
= cli_interceptor
;
350 entity_addr_t bind_addr
;
351 bind_addr
.parse("v2:127.0.0.1:3300");
352 server_msgr
->bind(bind_addr
);
353 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
354 server_msgr
->start();
356 bind_addr
.parse("v2:127.0.0.1:3301");
357 client_msgr
->bind(bind_addr
);
358 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
359 client_msgr
->start();
361 // pause before sending client_ident message
362 cli_interceptor
->breakpoint(11);
363 // pause before sending client_ident message
364 srv_interceptor
->breakpoint(11);
366 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
367 server_msgr
->get_myaddrs());
368 MPing
*m1
= new MPing();
369 ASSERT_EQ(c2s
->send_message(m1
), 0);
371 ConnectionRef s2c
= server_msgr
->connect_to(client_msgr
->get_mytype(),
372 client_msgr
->get_myaddrs());
373 MPing
*m2
= new MPing();
374 ASSERT_EQ(s2c
->send_message(m2
), 0);
376 cli_interceptor
->wait(11, c2s
.get());
377 srv_interceptor
->wait(11, s2c
.get());
379 // at this point both connections (A->B, B->A) are paused just before sending
380 // the client_ident message.
382 cli_interceptor
->remove_bp(11);
383 srv_interceptor
->remove_bp(11);
385 cli_interceptor
->proceed(11, Interceptor::ACTION::CONTINUE
);
386 srv_interceptor
->proceed(11, Interceptor::ACTION::CONTINUE
);
389 std::unique_lock l
{cli_dispatcher
.lock
};
390 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
391 cli_dispatcher
.got_new
= false;
395 std::unique_lock l
{srv_dispatcher
.lock
};
396 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
397 srv_dispatcher
.got_new
= false;
400 ASSERT_TRUE(s2c
->is_connected());
401 ASSERT_EQ(1u, static_cast<Session
*>(s2c
->get_priv().get())->get_count());
402 ASSERT_TRUE(s2c
->peer_is_client());
404 ASSERT_TRUE(c2s
->is_connected());
405 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
406 ASSERT_TRUE(c2s
->peer_is_osd());
408 client_msgr
->shutdown();
410 server_msgr
->shutdown();
413 delete cli_interceptor
;
414 delete srv_interceptor
;
420 * - A sends client_ident to B
421 * - B fails before sending server_ident to A
424 TEST_P(MessengerTest
, MissingServerIdenTest
) {
425 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
427 TestInterceptor
*cli_interceptor
= new TestInterceptor();
428 TestInterceptor
*srv_interceptor
= new TestInterceptor();
430 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
431 server_msgr
->interceptor
= srv_interceptor
;
433 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossy_client(0));
434 client_msgr
->interceptor
= cli_interceptor
;
436 entity_addr_t bind_addr
;
437 bind_addr
.parse("v2:127.0.0.1:3300");
438 server_msgr
->bind(bind_addr
);
439 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
440 server_msgr
->start();
442 bind_addr
.parse("v2:127.0.0.1:3301");
443 client_msgr
->bind(bind_addr
);
444 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
445 client_msgr
->start();
447 // pause before sending client_ident message
448 srv_interceptor
->breakpoint(12);
450 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
451 server_msgr
->get_myaddrs());
452 MPing
*m1
= new MPing();
453 ASSERT_EQ(c2s
->send_message(m1
), 0);
455 Connection
*c2s_accepter
= srv_interceptor
->wait(12);
456 srv_interceptor
->remove_bp(12);
458 // We inject a message from this side of the connection to force it to be
459 // in standby when we inject the failure below
460 MPing
*m2
= new MPing();
461 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
463 srv_interceptor
->proceed(12, Interceptor::ACTION::FAIL
);
466 std::unique_lock l
{srv_dispatcher
.lock
};
467 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
468 srv_dispatcher
.got_new
= false;
472 std::unique_lock l
{cli_dispatcher
.lock
};
473 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
474 cli_dispatcher
.got_new
= false;
477 ASSERT_TRUE(c2s
->is_connected());
478 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
479 ASSERT_TRUE(c2s
->peer_is_osd());
481 ASSERT_TRUE(c2s_accepter
->is_connected());
482 ASSERT_EQ(1u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
483 ASSERT_TRUE(c2s_accepter
->peer_is_client());
485 client_msgr
->shutdown();
487 server_msgr
->shutdown();
490 delete cli_interceptor
;
491 delete srv_interceptor
;
497 * - A sends client_ident to B
498 * - B fails before sending server_ident to A
499 * - A goes to standby
500 * - B reconnects to A
502 TEST_P(MessengerTest
, MissingServerIdenTest2
) {
503 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(false);
505 TestInterceptor
*cli_interceptor
= new TestInterceptor();
506 TestInterceptor
*srv_interceptor
= new TestInterceptor();
508 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
509 server_msgr
->interceptor
= srv_interceptor
;
511 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
512 client_msgr
->interceptor
= cli_interceptor
;
514 entity_addr_t bind_addr
;
515 bind_addr
.parse("v2:127.0.0.1:3300");
516 server_msgr
->bind(bind_addr
);
517 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
518 server_msgr
->start();
520 bind_addr
.parse("v2:127.0.0.1:3301");
521 client_msgr
->bind(bind_addr
);
522 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
523 client_msgr
->start();
525 // pause before sending client_ident message
526 srv_interceptor
->breakpoint(12);
528 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
529 server_msgr
->get_myaddrs());
531 Connection
*c2s_accepter
= srv_interceptor
->wait(12);
532 srv_interceptor
->remove_bp(12);
534 // We inject a message from this side of the connection to force it to be
535 // in standby when we inject the failure below
536 MPing
*m2
= new MPing();
537 ASSERT_EQ(c2s_accepter
->send_message(m2
), 0);
539 srv_interceptor
->proceed(12, Interceptor::ACTION::FAIL
);
542 std::unique_lock l
{cli_dispatcher
.lock
};
543 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
544 cli_dispatcher
.got_new
= false;
547 ASSERT_TRUE(c2s
->is_connected());
548 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
549 ASSERT_TRUE(c2s
->peer_is_osd());
551 ASSERT_TRUE(c2s_accepter
->is_connected());
552 ASSERT_EQ(0u, static_cast<Session
*>(c2s_accepter
->get_priv().get())->get_count());
553 ASSERT_TRUE(c2s_accepter
->peer_is_client());
555 client_msgr
->shutdown();
557 server_msgr
->shutdown();
560 delete cli_interceptor
;
561 delete srv_interceptor
;
567 * - A and B exchange messages
569 * - B goes into standby
572 TEST_P(MessengerTest
, ReconnectTest
) {
573 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
575 TestInterceptor
*cli_interceptor
= new TestInterceptor();
576 TestInterceptor
*srv_interceptor
= new TestInterceptor();
578 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::stateful_server(0));
579 server_msgr
->interceptor
= srv_interceptor
;
581 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
582 client_msgr
->interceptor
= cli_interceptor
;
584 entity_addr_t bind_addr
;
585 bind_addr
.parse("v2:127.0.0.1:3300");
586 server_msgr
->bind(bind_addr
);
587 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
588 server_msgr
->start();
590 bind_addr
.parse("v2:127.0.0.1:3301");
591 client_msgr
->bind(bind_addr
);
592 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
593 client_msgr
->start();
595 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
596 server_msgr
->get_myaddrs());
598 MPing
*m1
= new MPing();
599 ASSERT_EQ(c2s
->send_message(m1
), 0);
602 std::unique_lock l
{cli_dispatcher
.lock
};
603 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
604 cli_dispatcher
.got_new
= false;
607 ASSERT_TRUE(c2s
->is_connected());
608 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
609 ASSERT_TRUE(c2s
->peer_is_osd());
611 cli_interceptor
->breakpoint(16);
613 MPing
*m2
= new MPing();
614 ASSERT_EQ(c2s
->send_message(m2
), 0);
616 cli_interceptor
->wait(16, c2s
.get());
617 cli_interceptor
->remove_bp(16);
619 // at this point client and server are connected together
621 srv_interceptor
->breakpoint(15);
624 cli_interceptor
->proceed(16, Interceptor::ACTION::FAIL
);
626 MPing
*m3
= new MPing();
627 ASSERT_EQ(c2s
->send_message(m3
), 0);
629 Connection
*c2s_accepter
= srv_interceptor
->wait(15);
630 // the srv end of theconnection is now paused at ready
631 // this means that the reconnect was successful
632 srv_interceptor
->remove_bp(15);
634 ASSERT_TRUE(c2s_accepter
->peer_is_client());
635 // c2s_accepter sent 0 reconnect messages
636 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 13), 0u);
637 // c2s_accepter sent 1 reconnect_ok messages
638 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 14), 1u);
639 // c2s sent 1 reconnect messages
640 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 13), 1u);
641 // c2s sent 0 reconnect_ok messages
642 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 14), 0u);
644 srv_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
647 std::unique_lock l
{cli_dispatcher
.lock
};
648 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
649 cli_dispatcher
.got_new
= false;
652 client_msgr
->shutdown();
654 server_msgr
->shutdown();
657 delete cli_interceptor
;
658 delete srv_interceptor
;
664 * - A and B exchange messages
666 * - A reconnects // B reconnects
668 TEST_P(MessengerTest
, ReconnectRaceTest
) {
669 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
671 TestInterceptor
*cli_interceptor
= new TestInterceptor();
672 TestInterceptor
*srv_interceptor
= new TestInterceptor();
674 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, Messenger::Policy::lossless_peer(0));
675 server_msgr
->interceptor
= srv_interceptor
;
677 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, Messenger::Policy::lossless_peer(0));
678 client_msgr
->interceptor
= cli_interceptor
;
680 entity_addr_t bind_addr
;
681 bind_addr
.parse("v2:127.0.0.1:3300");
682 server_msgr
->bind(bind_addr
);
683 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
684 server_msgr
->start();
686 bind_addr
.parse("v2:127.0.0.1:3301");
687 client_msgr
->bind(bind_addr
);
688 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
689 client_msgr
->start();
691 ConnectionRef c2s
= client_msgr
->connect_to(server_msgr
->get_mytype(),
692 server_msgr
->get_myaddrs());
694 MPing
*m1
= new MPing();
695 ASSERT_EQ(c2s
->send_message(m1
), 0);
698 std::unique_lock l
{cli_dispatcher
.lock
};
699 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
700 cli_dispatcher
.got_new
= false;
703 ASSERT_TRUE(c2s
->is_connected());
704 ASSERT_EQ(1u, static_cast<Session
*>(c2s
->get_priv().get())->get_count());
705 ASSERT_TRUE(c2s
->peer_is_osd());
707 cli_interceptor
->breakpoint(16);
709 MPing
*m2
= new MPing();
710 ASSERT_EQ(c2s
->send_message(m2
), 0);
712 cli_interceptor
->wait(16, c2s
.get());
713 cli_interceptor
->remove_bp(16);
715 // at this point client and server are connected together
717 // force both client and server to race on reconnect
718 cli_interceptor
->breakpoint(13);
719 srv_interceptor
->breakpoint(13);
722 // this will cause both client and server to reconnect at the same time
723 cli_interceptor
->proceed(16, Interceptor::ACTION::FAIL
);
725 MPing
*m3
= new MPing();
726 ASSERT_EQ(c2s
->send_message(m3
), 0);
728 cli_interceptor
->wait(13, c2s
.get());
729 srv_interceptor
->wait(13);
731 cli_interceptor
->remove_bp(13);
732 srv_interceptor
->remove_bp(13);
735 srv_interceptor
->breakpoint(15);
737 cli_interceptor
->proceed(13, Interceptor::ACTION::CONTINUE
);
738 srv_interceptor
->proceed(13, Interceptor::ACTION::CONTINUE
);
740 Connection
*c2s_accepter
= srv_interceptor
->wait(15);
742 // the server has reconnected and is "ready"
743 srv_interceptor
->remove_bp(15);
745 ASSERT_TRUE(c2s_accepter
->peer_is_client());
746 ASSERT_TRUE(c2s
->peer_is_osd());
748 // the server should win the reconnect race
750 // c2s_accepter sent 1 or 2 reconnect messages
751 ASSERT_LT(srv_interceptor
->count_step(c2s_accepter
, 13), 3u);
752 ASSERT_GT(srv_interceptor
->count_step(c2s_accepter
, 13), 0u);
753 // c2s_accepter sent 0 reconnect_ok messages
754 ASSERT_EQ(srv_interceptor
->count_step(c2s_accepter
, 14), 0u);
755 // c2s sent 1 reconnect messages
756 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 13), 1u);
757 // c2s sent 1 reconnect_ok messages
758 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 14), 1u);
760 if (srv_interceptor
->count_step(c2s_accepter
, 13) == 2) {
761 // if the server send the reconnect message two times then
762 // the client must have sent a session retry message to the server
763 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 18), 1u);
765 ASSERT_EQ(cli_interceptor
->count_step(c2s
.get(), 18), 0u);
768 srv_interceptor
->proceed(15, Interceptor::ACTION::CONTINUE
);
771 std::unique_lock l
{cli_dispatcher
.lock
};
772 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
773 cli_dispatcher
.got_new
= false;
776 client_msgr
->shutdown();
778 server_msgr
->shutdown();
781 delete cli_interceptor
;
782 delete srv_interceptor
;
785 TEST_P(MessengerTest
, SimpleTest
) {
786 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
787 entity_addr_t bind_addr
;
788 bind_addr
.parse("v2:127.0.0.1");
789 server_msgr
->bind(bind_addr
);
790 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
791 server_msgr
->start();
793 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
794 client_msgr
->start();
796 // 1. simple round trip
797 MPing
*m
= new MPing();
798 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
799 server_msgr
->get_myaddrs());
801 ASSERT_EQ(conn
->send_message(m
), 0);
802 std::unique_lock l
{cli_dispatcher
.lock
};
803 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
804 cli_dispatcher
.got_new
= false;
806 ASSERT_TRUE(conn
->is_connected());
807 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
808 ASSERT_TRUE(conn
->peer_is_osd());
810 // 2. test rebind port
811 set
<int> avoid_ports
;
812 for (int i
= 0; i
< 10 ; i
++) {
813 for (auto a
: server_msgr
->get_myaddrs().v
) {
814 avoid_ports
.insert(a
.get_port() + i
);
817 server_msgr
->rebind(avoid_ports
);
818 for (auto a
: server_msgr
->get_myaddrs().v
) {
819 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
822 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
823 server_msgr
->get_myaddrs());
826 ASSERT_EQ(conn
->send_message(m
), 0);
827 std::unique_lock l
{cli_dispatcher
.lock
};
828 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
829 cli_dispatcher
.got_new
= false;
831 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
833 // 3. test markdown connection
835 ASSERT_FALSE(conn
->is_connected());
837 // 4. test failed connection
838 server_msgr
->shutdown();
842 conn
->send_message(m
);
843 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
844 ASSERT_FALSE(conn
->is_connected());
846 // 5. loopback connection
847 srv_dispatcher
.loopback
= true;
848 conn
= client_msgr
->get_loopback_connection();
851 ASSERT_EQ(conn
->send_message(m
), 0);
852 std::unique_lock l
{cli_dispatcher
.lock
};
853 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
854 cli_dispatcher
.got_new
= false;
856 srv_dispatcher
.loopback
= false;
857 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
858 client_msgr
->shutdown();
860 server_msgr
->shutdown();
864 TEST_P(MessengerTest
, SimpleMsgr2Test
) {
865 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
866 entity_addr_t legacy_addr
;
867 legacy_addr
.parse("v1:127.0.0.1");
868 entity_addr_t msgr2_addr
;
869 msgr2_addr
.parse("v2:127.0.0.1");
870 entity_addrvec_t bind_addrs
;
871 bind_addrs
.v
.push_back(legacy_addr
);
872 bind_addrs
.v
.push_back(msgr2_addr
);
873 server_msgr
->bindv(bind_addrs
);
874 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
875 server_msgr
->start();
877 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
878 client_msgr
->start();
880 // 1. simple round trip
881 MPing
*m
= new MPing();
882 ConnectionRef conn
= client_msgr
->connect_to(
883 server_msgr
->get_mytype(),
884 server_msgr
->get_myaddrs());
886 ASSERT_EQ(conn
->send_message(m
), 0);
887 std::unique_lock l
{cli_dispatcher
.lock
};
888 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
889 cli_dispatcher
.got_new
= false;
891 ASSERT_TRUE(conn
->is_connected());
892 ASSERT_EQ(1u, static_cast<Session
*>(conn
->get_priv().get())->get_count());
893 ASSERT_TRUE(conn
->peer_is_osd());
895 // 2. test rebind port
896 set
<int> avoid_ports
;
897 for (int i
= 0; i
< 10 ; i
++) {
898 for (auto a
: server_msgr
->get_myaddrs().v
) {
899 avoid_ports
.insert(a
.get_port() + i
);
902 server_msgr
->rebind(avoid_ports
);
903 for (auto a
: server_msgr
->get_myaddrs().v
) {
904 ASSERT_TRUE(avoid_ports
.count(a
.get_port()) == 0);
907 conn
= client_msgr
->connect_to(
908 server_msgr
->get_mytype(),
909 server_msgr
->get_myaddrs());
912 ASSERT_EQ(conn
->send_message(m
), 0);
913 std::unique_lock l
{cli_dispatcher
.lock
};
914 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
915 cli_dispatcher
.got_new
= false;
917 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
919 // 3. test markdown connection
921 ASSERT_FALSE(conn
->is_connected());
923 // 4. test failed connection
924 server_msgr
->shutdown();
928 conn
->send_message(m
);
929 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
930 ASSERT_FALSE(conn
->is_connected());
932 // 5. loopback connection
933 srv_dispatcher
.loopback
= true;
934 conn
= client_msgr
->get_loopback_connection();
937 ASSERT_EQ(conn
->send_message(m
), 0);
938 std::unique_lock l
{cli_dispatcher
.lock
};
939 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
940 cli_dispatcher
.got_new
= false;
942 srv_dispatcher
.loopback
= false;
943 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
944 client_msgr
->shutdown();
946 server_msgr
->shutdown();
950 TEST_P(MessengerTest
, FeatureTest
) {
951 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
952 entity_addr_t bind_addr
;
953 bind_addr
.parse("v2:127.0.0.1");
954 uint64_t all_feature_supported
, feature_required
, feature_supported
= 0;
955 for (int i
= 0; i
< 10; i
++)
956 feature_supported
|= 1ULL << i
;
957 feature_supported
|= CEPH_FEATUREMASK_MSG_ADDR2
;
958 feature_supported
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
;
959 feature_required
= feature_supported
| 1ULL << 13;
960 all_feature_supported
= feature_required
| 1ULL << 14;
962 Messenger::Policy p
= server_msgr
->get_policy(entity_name_t::TYPE_CLIENT
);
963 p
.features_required
= feature_required
;
964 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
965 server_msgr
->bind(bind_addr
);
966 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
967 server_msgr
->start();
969 // 1. Suppose if only support less than required
970 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
971 p
.features_supported
= feature_supported
;
972 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
973 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
974 client_msgr
->start();
976 MPing
*m
= new MPing();
977 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
978 server_msgr
->get_myaddrs());
979 conn
->send_message(m
);
980 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
981 // should failed build a connection
982 ASSERT_FALSE(conn
->is_connected());
984 client_msgr
->shutdown();
987 // 2. supported met required
988 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
989 p
.features_supported
= all_feature_supported
;
990 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
991 client_msgr
->start();
993 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
994 server_msgr
->get_myaddrs());
997 ASSERT_EQ(conn
->send_message(m
), 0);
998 std::unique_lock l
{cli_dispatcher
.lock
};
999 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1000 cli_dispatcher
.got_new
= false;
1002 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1004 server_msgr
->shutdown();
1005 client_msgr
->shutdown();
1006 server_msgr
->wait();
1007 client_msgr
->wait();
1010 TEST_P(MessengerTest
, TimeoutTest
) {
1011 g_ceph_context
->_conf
.set_val("ms_connection_idle_timeout", "1");
1012 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1013 entity_addr_t bind_addr
;
1014 bind_addr
.parse("v2:127.0.0.1");
1015 server_msgr
->bind(bind_addr
);
1016 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1017 server_msgr
->start();
1019 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1020 client_msgr
->start();
1022 // 1. build the connection
1023 MPing
*m
= new MPing();
1024 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1025 server_msgr
->get_myaddrs());
1027 ASSERT_EQ(conn
->send_message(m
), 0);
1028 std::unique_lock l
{cli_dispatcher
.lock
};
1029 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1030 cli_dispatcher
.got_new
= false;
1032 ASSERT_TRUE(conn
->is_connected());
1033 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1034 ASSERT_TRUE(conn
->peer_is_osd());
1038 ASSERT_FALSE(conn
->is_connected());
1040 server_msgr
->shutdown();
1041 server_msgr
->wait();
1043 client_msgr
->shutdown();
1044 client_msgr
->wait();
1045 g_ceph_context
->_conf
.set_val("ms_connection_idle_timeout", "900");
1048 TEST_P(MessengerTest
, StatefulTest
) {
1050 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1051 entity_addr_t bind_addr
;
1052 bind_addr
.parse("v2:127.0.0.1");
1053 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1054 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1055 p
= Messenger::Policy::lossless_client(0);
1056 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1058 server_msgr
->bind(bind_addr
);
1059 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1060 server_msgr
->start();
1061 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1062 client_msgr
->start();
1064 // 1. test for server standby
1065 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1066 server_msgr
->get_myaddrs());
1069 ASSERT_EQ(conn
->send_message(m
), 0);
1070 std::unique_lock l
{cli_dispatcher
.lock
};
1071 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1072 cli_dispatcher
.got_new
= false;
1074 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1076 ASSERT_FALSE(conn
->is_connected());
1077 ConnectionRef server_conn
= server_msgr
->connect_to(
1078 client_msgr
->get_mytype(), srv_dispatcher
.last_accept
);
1080 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1082 srv_dispatcher
.got_new
= false;
1083 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1084 server_msgr
->get_myaddrs());
1087 ASSERT_EQ(conn
->send_message(m
), 0);
1088 std::unique_lock l
{cli_dispatcher
.lock
};
1089 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1090 cli_dispatcher
.got_new
= false;
1092 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1093 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1094 srv_dispatcher
.last_accept
);
1096 std::unique_lock l
{srv_dispatcher
.lock
};
1097 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_remote_reset
; });
1100 // 2. test for client reconnect
1101 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1102 cli_dispatcher
.got_connect
= false;
1103 cli_dispatcher
.got_new
= false;
1104 cli_dispatcher
.got_remote_reset
= false;
1105 server_conn
->mark_down();
1106 ASSERT_FALSE(server_conn
->is_connected());
1107 // ensure client detect server socket closed
1109 std::unique_lock l
{cli_dispatcher
.lock
};
1110 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_remote_reset
; });
1111 cli_dispatcher
.got_remote_reset
= false;
1114 std::unique_lock l
{cli_dispatcher
.lock
};
1115 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_connect
; });
1116 cli_dispatcher
.got_connect
= false;
1118 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1119 ASSERT_TRUE(conn
->is_connected());
1123 ASSERT_EQ(conn
->send_message(m
), 0);
1124 ASSERT_TRUE(conn
->is_connected());
1125 std::unique_lock l
{cli_dispatcher
.lock
};
1126 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1127 cli_dispatcher
.got_new
= false;
1129 // resetcheck happen
1130 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1131 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1132 srv_dispatcher
.last_accept
);
1133 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1134 cli_dispatcher
.got_remote_reset
= false;
1136 server_msgr
->shutdown();
1137 client_msgr
->shutdown();
1138 server_msgr
->wait();
1139 client_msgr
->wait();
1142 TEST_P(MessengerTest
, StatelessTest
) {
1144 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1145 entity_addr_t bind_addr
;
1146 bind_addr
.parse("v2:127.0.0.1");
1147 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
1148 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1149 p
= Messenger::Policy::lossy_client(0);
1150 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1152 server_msgr
->bind(bind_addr
);
1153 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1154 server_msgr
->start();
1155 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1156 client_msgr
->start();
1158 // 1. test for server lose state
1159 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1160 server_msgr
->get_myaddrs());
1163 ASSERT_EQ(conn
->send_message(m
), 0);
1164 std::unique_lock l
{cli_dispatcher
.lock
};
1165 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1166 cli_dispatcher
.got_new
= false;
1168 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1170 ASSERT_FALSE(conn
->is_connected());
1172 srv_dispatcher
.got_new
= false;
1173 ConnectionRef server_conn
;
1174 srv_dispatcher
.last_accept_con_ptr
= &server_conn
;
1175 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1176 server_msgr
->get_myaddrs());
1179 ASSERT_EQ(conn
->send_message(m
), 0);
1180 std::unique_lock l
{cli_dispatcher
.lock
};
1181 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1182 cli_dispatcher
.got_new
= false;
1184 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1185 ASSERT_TRUE(server_conn
);
1187 // server lose state
1189 std::unique_lock l
{srv_dispatcher
.lock
};
1190 srv_dispatcher
.cond
.wait(l
, [&] { return srv_dispatcher
.got_new
; });
1192 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1194 // 2. test for client lossy
1195 server_conn
->mark_down();
1196 ASSERT_FALSE(server_conn
->is_connected());
1197 conn
->send_keepalive();
1198 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
1199 ASSERT_FALSE(conn
->is_connected());
1200 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1201 server_msgr
->get_myaddrs());
1204 ASSERT_EQ(conn
->send_message(m
), 0);
1205 std::unique_lock l
{cli_dispatcher
.lock
};
1206 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1207 cli_dispatcher
.got_new
= false;
1209 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1211 server_msgr
->shutdown();
1212 client_msgr
->shutdown();
1213 server_msgr
->wait();
1214 client_msgr
->wait();
1217 TEST_P(MessengerTest
, AnonTest
) {
1219 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1220 entity_addr_t bind_addr
;
1221 bind_addr
.parse("v2:127.0.0.1");
1222 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
1223 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1224 p
= Messenger::Policy::lossy_client(0);
1225 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1227 server_msgr
->bind(bind_addr
);
1228 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1229 server_msgr
->start();
1230 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1231 client_msgr
->start();
1233 ConnectionRef server_con_a
, server_con_b
;
1236 srv_dispatcher
.last_accept_con_ptr
= &server_con_a
;
1237 ConnectionRef con_a
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1238 server_msgr
->get_myaddrs(),
1242 ASSERT_EQ(con_a
->send_message(m
), 0);
1243 std::unique_lock l
{cli_dispatcher
.lock
};
1244 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1245 cli_dispatcher
.got_new
= false;
1247 ASSERT_EQ(1U, static_cast<Session
*>(con_a
->get_priv().get())->get_count());
1250 srv_dispatcher
.last_accept_con_ptr
= &server_con_b
;
1251 ConnectionRef con_b
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1252 server_msgr
->get_myaddrs(),
1256 ASSERT_EQ(con_b
->send_message(m
), 0);
1257 std::unique_lock l
{cli_dispatcher
.lock
};
1258 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1259 cli_dispatcher
.got_new
= false;
1261 ASSERT_EQ(1U, static_cast<Session
*>(con_b
->get_priv().get())->get_count());
1263 // these should be distinct
1264 ASSERT_NE(con_a
, con_b
);
1265 ASSERT_NE(server_con_a
, server_con_b
);
1267 // and both connected
1270 ASSERT_EQ(con_a
->send_message(m
), 0);
1271 std::unique_lock l
{cli_dispatcher
.lock
};
1272 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1273 cli_dispatcher
.got_new
= false;
1277 ASSERT_EQ(con_b
->send_message(m
), 0);
1278 std::unique_lock l
{cli_dispatcher
.lock
};
1279 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1280 cli_dispatcher
.got_new
= false;
1285 ASSERT_FALSE(con_a
->is_connected());
1287 ASSERT_FALSE(con_b
->is_connected());
1289 server_msgr
->shutdown();
1290 client_msgr
->shutdown();
1291 server_msgr
->wait();
1292 client_msgr
->wait();
1295 TEST_P(MessengerTest
, ClientStandbyTest
) {
1297 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1298 entity_addr_t bind_addr
;
1299 bind_addr
.parse("v2:127.0.0.1");
1300 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1301 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1302 p
= Messenger::Policy::lossless_peer(0);
1303 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1305 server_msgr
->bind(bind_addr
);
1306 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1307 server_msgr
->start();
1308 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1309 client_msgr
->start();
1311 // 1. test for client standby, resetcheck
1312 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1313 server_msgr
->get_myaddrs());
1316 ASSERT_EQ(conn
->send_message(m
), 0);
1317 std::unique_lock l
{cli_dispatcher
.lock
};
1318 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1319 cli_dispatcher
.got_new
= false;
1321 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1322 ConnectionRef server_conn
= server_msgr
->connect_to(
1323 client_msgr
->get_mytype(),
1324 srv_dispatcher
.last_accept
);
1325 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
1326 cli_dispatcher
.got_connect
= false;
1327 server_conn
->mark_down();
1328 ASSERT_FALSE(server_conn
->is_connected());
1329 // client should be standby
1331 // client should be standby, so we use original connection
1333 // Try send message to verify got remote reset callback
1335 ASSERT_EQ(conn
->send_message(m
), 0);
1337 std::unique_lock l
{cli_dispatcher
.lock
};
1338 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_remote_reset
; });
1339 cli_dispatcher
.got_remote_reset
= false;
1340 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_connect
; });
1341 cli_dispatcher
.got_connect
= false;
1343 CHECK_AND_WAIT_TRUE(conn
->is_connected());
1344 ASSERT_TRUE(conn
->is_connected());
1346 ASSERT_EQ(conn
->send_message(m
), 0);
1347 std::unique_lock l
{cli_dispatcher
.lock
};
1348 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1349 cli_dispatcher
.got_new
= false;
1351 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1352 server_conn
= server_msgr
->connect_to(client_msgr
->get_mytype(),
1353 srv_dispatcher
.last_accept
);
1354 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv().get())->get_count());
1356 server_msgr
->shutdown();
1357 client_msgr
->shutdown();
1358 server_msgr
->wait();
1359 client_msgr
->wait();
1362 TEST_P(MessengerTest
, AuthTest
) {
1363 g_ceph_context
->_conf
.set_val("auth_cluster_required", "cephx");
1364 g_ceph_context
->_conf
.set_val("auth_service_required", "cephx");
1365 g_ceph_context
->_conf
.set_val("auth_client_required", "cephx");
1366 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1367 entity_addr_t bind_addr
;
1368 bind_addr
.parse("v2:127.0.0.1");
1369 server_msgr
->bind(bind_addr
);
1370 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1371 server_msgr
->start();
1373 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1374 client_msgr
->start();
1376 // 1. simple auth round trip
1377 MPing
*m
= new MPing();
1378 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1379 server_msgr
->get_myaddrs());
1381 ASSERT_EQ(conn
->send_message(m
), 0);
1382 std::unique_lock l
{cli_dispatcher
.lock
};
1383 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1384 cli_dispatcher
.got_new
= false;
1386 ASSERT_TRUE(conn
->is_connected());
1387 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1390 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
1391 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
1392 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
1394 ASSERT_FALSE(conn
->is_connected());
1395 conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1396 server_msgr
->get_myaddrs());
1398 MPing
*m
= new MPing();
1399 ASSERT_EQ(conn
->send_message(m
), 0);
1400 std::unique_lock l
{cli_dispatcher
.lock
};
1401 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1402 cli_dispatcher
.got_new
= false;
1404 ASSERT_TRUE(conn
->is_connected());
1405 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv().get())->get_count());
1406 server_msgr
->shutdown();
1407 client_msgr
->shutdown();
1408 server_msgr
->wait();
1409 client_msgr
->wait();
1412 TEST_P(MessengerTest
, MessageTest
) {
1413 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1414 entity_addr_t bind_addr
;
1415 bind_addr
.parse("v2:127.0.0.1");
1416 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
1417 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
1418 p
= Messenger::Policy::lossless_peer(0);
1419 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
1421 server_msgr
->bind(bind_addr
);
1422 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1423 server_msgr
->start();
1424 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1425 client_msgr
->start();
1428 // 1. A very large "front"(as well as "payload")
1429 // Because a external message need to invade Messenger::decode_message,
1430 // here we only use existing message class(MCommand)
1431 ConnectionRef conn
= client_msgr
->connect_to(server_msgr
->get_mytype(),
1432 server_msgr
->get_myaddrs());
1435 uuid
.generate_random();
1436 vector
<string
> cmds
;
1437 string
s("abcdefghijklmnopqrstuvwxyz");
1438 for (int i
= 0; i
< 1024*30; i
++)
1440 MCommand
*m
= new MCommand(uuid
);
1442 conn
->send_message(m
);
1443 std::unique_lock l
{cli_dispatcher
.lock
};
1444 cli_dispatcher
.cond
.wait_for(l
, 500s
, [&] { return cli_dispatcher
.got_new
; });
1445 ASSERT_TRUE(cli_dispatcher
.got_new
);
1446 cli_dispatcher
.got_new
= false;
1449 // 2. A very large "data"
1452 string
s("abcdefghijklmnopqrstuvwxyz");
1453 for (int i
= 0; i
< 1024*30; i
++)
1455 MPing
*m
= new MPing();
1457 conn
->send_message(m
);
1460 std::unique_lock l
{cli_dispatcher
.lock
};
1461 cli_dispatcher
.cond
.wait(l
, [&] { return cli_dispatcher
.got_new
; });
1462 ASSERT_TRUE(cli_dispatcher
.got_new
);
1463 cli_dispatcher
.got_new
= false;
1465 server_msgr
->shutdown();
1466 client_msgr
->shutdown();
1467 server_msgr
->wait();
1468 client_msgr
->wait();
1472 class SyntheticWorkload
;
1475 enum Who
: uint8_t {
1483 Payload(Who who
, uint64_t seq
, const bufferlist
& data
)
1484 : who(who
), seq(seq
), data(data
)
1486 Payload() = default;
1487 DENC(Payload
, v
, p
) {
1488 DENC_START(1, 1, p
);
1495 WRITE_CLASS_DENC(Payload
)
1497 ostream
& operator<<(ostream
& out
, const Payload
&pl
)
1499 return out
<< "reply=" << pl
.who
<< " i = " << pl
.seq
;
1502 class SyntheticDispatcher
: public Dispatcher
{
1504 ceph::mutex lock
= ceph::make_mutex("SyntheticDispatcher::lock");
1505 ceph::condition_variable cond
;
1508 bool got_remote_reset
;
1510 map
<ConnectionRef
, list
<uint64_t> > conn_sent
;
1511 map
<uint64_t, bufferlist
> sent
;
1512 atomic
<uint64_t> index
;
1513 SyntheticWorkload
*workload
;
1515 SyntheticDispatcher(bool s
, SyntheticWorkload
*wl
):
1516 Dispatcher(g_ceph_context
), is_server(s
), got_new(false),
1517 got_remote_reset(false), got_connect(false), index(0), workload(wl
) {
1519 bool ms_can_fast_dispatch_any() const override
{ return true; }
1520 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1521 switch (m
->get_type()) {
1530 void ms_handle_fast_connect(Connection
*con
) override
{
1531 std::lock_guard l
{lock
};
1532 list
<uint64_t> c
= conn_sent
[con
];
1533 for (list
<uint64_t>::iterator it
= c
.begin();
1534 it
!= c
.end(); ++it
)
1536 conn_sent
.erase(con
);
1540 void ms_handle_fast_accept(Connection
*con
) override
{
1541 std::lock_guard l
{lock
};
1542 list
<uint64_t> c
= conn_sent
[con
];
1543 for (list
<uint64_t>::iterator it
= c
.begin();
1544 it
!= c
.end(); ++it
)
1546 conn_sent
.erase(con
);
1549 bool ms_dispatch(Message
*m
) override
{
1552 bool ms_handle_reset(Connection
*con
) override
;
1553 void ms_handle_remote_reset(Connection
*con
) override
{
1554 std::lock_guard l
{lock
};
1555 list
<uint64_t> c
= conn_sent
[con
];
1556 for (list
<uint64_t>::iterator it
= c
.begin();
1557 it
!= c
.end(); ++it
)
1559 conn_sent
.erase(con
);
1560 got_remote_reset
= true;
1562 bool ms_handle_refused(Connection
*con
) override
{
1565 void ms_fast_dispatch(Message
*m
) override
{
1566 // MSG_COMMAND is used to disorganize regular message flow
1567 if (m
->get_type() == MSG_COMMAND
) {
1573 auto p
= m
->get_data().cbegin();
1575 if (pl
.who
== Payload::PING
) {
1576 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1577 reply_message(m
, pl
);
1579 std::lock_guard l
{lock
};
1583 std::lock_guard l
{lock
};
1584 if (sent
.count(pl
.seq
)) {
1585 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
1586 ASSERT_EQ(conn_sent
[m
->get_connection()].front(), pl
.seq
);
1587 ASSERT_TRUE(pl
.data
.contents_equal(sent
[pl
.seq
]));
1588 conn_sent
[m
->get_connection()].pop_front();
1597 int ms_handle_authentication(Connection
*con
) override
{
1601 void reply_message(const Message
*m
, Payload
& pl
) {
1602 pl
.who
= Payload::PONG
;
1605 MPing
*rm
= new MPing();
1607 m
->get_connection()->send_message(rm
);
1608 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << " reply m=" << m
<< " i=" << pl
.seq
<< dendl
;
1611 void send_message_wrap(ConnectionRef con
, const bufferlist
& data
) {
1612 Message
*m
= new MPing();
1613 Payload pl
{Payload::PING
, index
++, data
};
1617 if (!con
->get_messenger()->get_default_policy().lossy
) {
1618 std::lock_guard l
{lock
};
1619 sent
[pl
.seq
] = pl
.data
;
1620 conn_sent
[con
].push_back(pl
.seq
);
1622 lderr(g_ceph_context
) << __func__
<< " conn=" << con
.get() << " send m=" << m
<< " i=" << pl
.seq
<< dendl
;
1623 ASSERT_EQ(0, con
->send_message(m
));
1626 uint64_t get_pending() {
1627 std::lock_guard l
{lock
};
1631 void clear_pending(ConnectionRef con
) {
1632 std::lock_guard l
{lock
};
1634 for (list
<uint64_t>::iterator it
= conn_sent
[con
].begin();
1635 it
!= conn_sent
[con
].end(); ++it
)
1637 conn_sent
.erase(con
);
1641 for (auto && p
: conn_sent
) {
1642 if (!p
.second
.empty()) {
1643 lderr(g_ceph_context
) << __func__
<< " " << p
.first
<< " wait " << p
.second
.size() << dendl
;
1650 class SyntheticWorkload
{
1651 ceph::mutex lock
= ceph::make_mutex("SyntheticWorkload::lock");
1652 ceph::condition_variable cond
;
1653 set
<Messenger
*> available_servers
;
1654 set
<Messenger
*> available_clients
;
1655 Messenger::Policy client_policy
;
1656 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> > available_connections
;
1657 SyntheticDispatcher dispatcher
;
1659 vector
<bufferlist
> rand_data
;
1660 DummyAuthClientServer dummy_auth
;
1663 static const unsigned max_in_flight
= 64;
1664 static const unsigned max_connections
= 128;
1665 static const unsigned max_message_len
= 1024 * 1024 * 4;
1667 SyntheticWorkload(int servers
, int clients
, string type
, int random_num
,
1668 Messenger::Policy srv_policy
, Messenger::Policy cli_policy
)
1669 : client_policy(cli_policy
),
1670 dispatcher(false, this),
1672 dummy_auth(g_ceph_context
) {
1673 dummy_auth
.auth_registry
.refresh_config();
1675 int base_port
= 16800;
1676 entity_addr_t bind_addr
;
1678 for (int i
= 0; i
< servers
; ++i
) {
1679 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0),
1680 "server", getpid()+i
, 0);
1681 snprintf(addr
, sizeof(addr
), "v2:127.0.0.1:%d",
1683 bind_addr
.parse(addr
);
1684 msgr
->bind(bind_addr
);
1685 msgr
->add_dispatcher_head(&dispatcher
);
1686 msgr
->set_auth_client(&dummy_auth
);
1687 msgr
->set_auth_server(&dummy_auth
);
1690 msgr
->set_default_policy(srv_policy
);
1691 available_servers
.insert(msgr
);
1695 for (int i
= 0; i
< clients
; ++i
) {
1696 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(-1),
1697 "client", getpid()+i
+servers
, 0);
1698 if (cli_policy
.standby
) {
1699 snprintf(addr
, sizeof(addr
), "v2:127.0.0.1:%d",
1700 base_port
+i
+servers
);
1701 bind_addr
.parse(addr
);
1702 msgr
->bind(bind_addr
);
1704 msgr
->add_dispatcher_head(&dispatcher
);
1705 msgr
->set_auth_client(&dummy_auth
);
1706 msgr
->set_auth_server(&dummy_auth
);
1709 msgr
->set_default_policy(cli_policy
);
1710 available_clients
.insert(msgr
);
1714 for (int i
= 0; i
< random_num
; i
++) {
1716 boost::uniform_int
<> u(32, max_message_len
);
1717 uint64_t value_len
= u(rng
);
1718 bufferptr
bp(value_len
);
1720 for (uint64_t j
= 0; j
< value_len
-sizeof(i
); ) {
1721 memcpy(bp
.c_str()+j
, &i
, sizeof(i
));
1726 rand_data
.push_back(bl
);
1730 ConnectionRef
_get_random_connection() {
1731 while (dispatcher
.get_pending() > max_in_flight
) {
1736 ceph_assert(ceph_mutex_is_locked(lock
));
1737 boost::uniform_int
<> choose(0, available_connections
.size() - 1);
1738 int index
= choose(rng
);
1739 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> >::iterator i
= available_connections
.begin();
1740 for (; index
> 0; --index
, ++i
) ;
1744 bool can_create_connection() {
1745 return available_connections
.size() < max_connections
;
1748 void generate_connection() {
1749 std::lock_guard l
{lock
};
1750 if (!can_create_connection())
1753 Messenger
*server
, *client
;
1755 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1756 int index
= choose(rng
);
1757 set
<Messenger
*>::iterator i
= available_servers
.begin();
1758 for (; index
> 0; --index
, ++i
) ;
1762 boost::uniform_int
<> choose(0, available_clients
.size() - 1);
1763 int index
= choose(rng
);
1764 set
<Messenger
*>::iterator i
= available_clients
.begin();
1765 for (; index
> 0; --index
, ++i
) ;
1769 pair
<Messenger
*, Messenger
*> p
;
1771 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1772 if (server
->get_default_policy().server
) {
1773 p
= make_pair(client
, server
);
1774 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1775 server
->get_myaddrs());
1776 available_connections
[conn
] = p
;
1778 ConnectionRef conn
= client
->connect_to(server
->get_mytype(),
1779 server
->get_myaddrs());
1780 p
= make_pair(client
, server
);
1781 available_connections
[conn
] = p
;
1786 void send_message() {
1787 std::lock_guard l
{lock
};
1788 ConnectionRef conn
= _get_random_connection();
1789 boost::uniform_int
<> true_false(0, 99);
1790 int val
= true_false(rng
);
1793 uuid
.generate_random();
1794 MCommand
*m
= new MCommand(uuid
);
1795 vector
<string
> cmds
;
1796 cmds
.push_back("command");
1798 m
->set_priority(200);
1799 conn
->send_message(m
);
1801 boost::uniform_int
<> u(0, rand_data
.size()-1);
1802 dispatcher
.send_message_wrap(conn
, rand_data
[u(rng
)]);
1806 void drop_connection() {
1807 std::lock_guard l
{lock
};
1808 if (available_connections
.size() < 10)
1810 ConnectionRef conn
= _get_random_connection();
1811 dispatcher
.clear_pending(conn
);
1813 if (!client_policy
.server
&&
1814 !client_policy
.lossy
&&
1815 client_policy
.standby
) {
1816 // it's a lossless policy, so we need to mark down each side
1817 pair
<Messenger
*, Messenger
*> &p
= available_connections
[conn
];
1818 if (!p
.first
->get_default_policy().server
&& !p
.second
->get_default_policy().server
) {
1819 ASSERT_EQ(conn
->get_messenger(), p
.first
);
1820 ConnectionRef peer
= p
.second
->connect_to(p
.first
->get_mytype(),
1821 p
.first
->get_myaddrs());
1823 dispatcher
.clear_pending(peer
);
1824 available_connections
.erase(peer
);
1827 ASSERT_EQ(available_connections
.erase(conn
), 1U);
1830 void print_internal_state(bool detail
=false) {
1831 std::lock_guard l
{lock
};
1832 lderr(g_ceph_context
) << "available_connections: " << available_connections
.size()
1833 << " inflight messages: " << dispatcher
.get_pending() << dendl
;
1834 if (detail
&& !available_connections
.empty()) {
1839 void wait_for_done() {
1840 int64_t tick_us
= 1000 * 100; // 100ms
1841 int64_t timeout_us
= 5 * 60 * 1000 * 1000; // 5 mins
1843 while (dispatcher
.get_pending()) {
1845 timeout_us
-= tick_us
;
1847 print_internal_state(true);
1849 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
1851 for (set
<Messenger
*>::iterator it
= available_servers
.begin();
1852 it
!= available_servers
.end(); ++it
) {
1855 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1858 available_servers
.clear();
1860 for (set
<Messenger
*>::iterator it
= available_clients
.begin();
1861 it
!= available_clients
.end(); ++it
) {
1864 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1867 available_clients
.clear();
1870 void handle_reset(Connection
*con
) {
1871 std::lock_guard l
{lock
};
1872 available_connections
.erase(con
);
1873 dispatcher
.clear_pending(con
);
1877 bool SyntheticDispatcher::ms_handle_reset(Connection
*con
) {
1878 workload
->handle_reset(con
);
1882 TEST_P(MessengerTest
, SyntheticStressTest
) {
1883 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1884 Messenger::Policy::stateful_server(0),
1885 Messenger::Policy::lossless_client(0));
1886 for (int i
= 0; i
< 100; ++i
) {
1887 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1888 test_msg
.generate_connection();
1890 gen_type
rng(time(NULL
));
1891 for (int i
= 0; i
< 5000; ++i
) {
1893 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1894 test_msg
.print_internal_state();
1896 boost::uniform_int
<> true_false(0, 99);
1897 int val
= true_false(rng
);
1899 test_msg
.generate_connection();
1900 } else if (val
> 80) {
1901 test_msg
.drop_connection();
1902 } else if (val
> 10) {
1903 test_msg
.send_message();
1905 usleep(rand() % 1000 + 500);
1908 test_msg
.wait_for_done();
1911 TEST_P(MessengerTest
, SyntheticStressTest1
) {
1912 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
1913 Messenger::Policy::lossless_peer_reuse(0),
1914 Messenger::Policy::lossless_peer_reuse(0));
1915 for (int i
= 0; i
< 10; ++i
) {
1916 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1917 test_msg
.generate_connection();
1919 gen_type
rng(time(NULL
));
1920 for (int i
= 0; i
< 10000; ++i
) {
1922 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1923 test_msg
.print_internal_state();
1925 boost::uniform_int
<> true_false(0, 99);
1926 int val
= true_false(rng
);
1928 test_msg
.generate_connection();
1929 } else if (val
> 60) {
1930 test_msg
.drop_connection();
1931 } else if (val
> 10) {
1932 test_msg
.send_message();
1934 usleep(rand() % 1000 + 500);
1937 test_msg
.wait_for_done();
1941 TEST_P(MessengerTest
, SyntheticInjectTest
) {
1942 uint64_t dispatch_throttle_bytes
= g_ceph_context
->_conf
->ms_dispatch_throttle_bytes
;
1943 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
1944 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
1945 g_ceph_context
->_conf
.set_val("ms_dispatch_throttle_bytes", "16777216");
1946 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1947 Messenger::Policy::stateful_server(0),
1948 Messenger::Policy::lossless_client(0));
1949 for (int i
= 0; i
< 100; ++i
) {
1950 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1951 test_msg
.generate_connection();
1953 gen_type
rng(time(NULL
));
1954 for (int i
= 0; i
< 1000; ++i
) {
1956 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1957 test_msg
.print_internal_state();
1959 boost::uniform_int
<> true_false(0, 99);
1960 int val
= true_false(rng
);
1962 test_msg
.generate_connection();
1963 } else if (val
> 80) {
1964 test_msg
.drop_connection();
1965 } else if (val
> 10) {
1966 test_msg
.send_message();
1968 usleep(rand() % 500 + 100);
1971 test_msg
.wait_for_done();
1972 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
1973 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
1974 g_ceph_context
->_conf
.set_val(
1975 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes
));
1978 TEST_P(MessengerTest
, SyntheticInjectTest2
) {
1979 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
1980 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
1981 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
1982 Messenger::Policy::lossless_peer_reuse(0),
1983 Messenger::Policy::lossless_peer_reuse(0));
1984 for (int i
= 0; i
< 100; ++i
) {
1985 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1986 test_msg
.generate_connection();
1988 gen_type
rng(time(NULL
));
1989 for (int i
= 0; i
< 1000; ++i
) {
1991 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1992 test_msg
.print_internal_state();
1994 boost::uniform_int
<> true_false(0, 99);
1995 int val
= true_false(rng
);
1997 test_msg
.generate_connection();
1998 } else if (val
> 80) {
1999 test_msg
.drop_connection();
2000 } else if (val
> 10) {
2001 test_msg
.send_message();
2003 usleep(rand() % 500 + 100);
2006 test_msg
.wait_for_done();
2007 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2008 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2011 TEST_P(MessengerTest
, SyntheticInjectTest3
) {
2012 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "600");
2013 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2014 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
2015 Messenger::Policy::stateless_server(0),
2016 Messenger::Policy::lossy_client(0));
2017 for (int i
= 0; i
< 100; ++i
) {
2018 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2019 test_msg
.generate_connection();
2021 gen_type
rng(time(NULL
));
2022 for (int i
= 0; i
< 1000; ++i
) {
2024 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2025 test_msg
.print_internal_state();
2027 boost::uniform_int
<> true_false(0, 99);
2028 int val
= true_false(rng
);
2030 test_msg
.generate_connection();
2031 } else if (val
> 80) {
2032 test_msg
.drop_connection();
2033 } else if (val
> 10) {
2034 test_msg
.send_message();
2036 usleep(rand() % 500 + 100);
2039 test_msg
.wait_for_done();
2040 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2041 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2045 TEST_P(MessengerTest
, SyntheticInjectTest4
) {
2046 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "30");
2047 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0.1");
2048 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "1");
2049 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "client osd");
2050 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "5");
2051 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
2052 Messenger::Policy::lossless_peer(0),
2053 Messenger::Policy::lossless_peer(0));
2054 for (int i
= 0; i
< 100; ++i
) {
2055 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
2056 test_msg
.generate_connection();
2058 gen_type
rng(time(NULL
));
2059 for (int i
= 0; i
< 1000; ++i
) {
2061 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
2062 test_msg
.print_internal_state();
2064 boost::uniform_int
<> true_false(0, 99);
2065 int val
= true_false(rng
);
2067 test_msg
.generate_connection();
2068 } else if (val
> 80) {
2069 // test_msg.drop_connection();
2070 } else if (val
> 10) {
2071 test_msg
.send_message();
2073 usleep(rand() % 500 + 100);
2076 test_msg
.wait_for_done();
2077 g_ceph_context
->_conf
.set_val("ms_inject_socket_failures", "0");
2078 g_ceph_context
->_conf
.set_val("ms_inject_internal_delays", "0");
2079 g_ceph_context
->_conf
.set_val("ms_inject_delay_probability", "0");
2080 g_ceph_context
->_conf
.set_val("ms_inject_delay_type", "");
2081 g_ceph_context
->_conf
.set_val("ms_inject_delay_max", "0");
2085 class MarkdownDispatcher
: public Dispatcher
{
2086 ceph::mutex lock
= ceph::make_mutex("MarkdownDispatcher::lock");
2087 set
<ConnectionRef
> conns
;
2090 std::atomic
<uint64_t> count
= { 0 };
2091 explicit MarkdownDispatcher(bool s
): Dispatcher(g_ceph_context
),
2094 bool ms_can_fast_dispatch_any() const override
{ return false; }
2095 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2096 switch (m
->get_type()) {
2104 void ms_handle_fast_connect(Connection
*con
) override
{
2105 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2106 std::lock_guard l
{lock
};
2109 void ms_handle_fast_accept(Connection
*con
) override
{
2110 std::lock_guard l
{lock
};
2113 bool ms_dispatch(Message
*m
) override
{
2114 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << dendl
;
2115 std::lock_guard l
{lock
};
2117 conns
.insert(m
->get_connection());
2118 if (conns
.size() < 2 && !last_mark
) {
2124 usleep(rand() % 500);
2125 for (set
<ConnectionRef
>::iterator it
= conns
.begin(); it
!= conns
.end(); ++it
) {
2126 if ((*it
) != m
->get_connection().get()) {
2137 bool ms_handle_reset(Connection
*con
) override
{
2138 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2139 std::lock_guard l
{lock
};
2141 usleep(rand() % 500);
2144 void ms_handle_remote_reset(Connection
*con
) override
{
2145 std::lock_guard l
{lock
};
2147 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
2149 bool ms_handle_refused(Connection
*con
) override
{
2152 void ms_fast_dispatch(Message
*m
) override
{
2155 int ms_handle_authentication(Connection
*con
) override
{
2161 // Markdown with external lock
2162 TEST_P(MessengerTest
, MarkdownTest
) {
2163 Messenger
*server_msgr2
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
2164 MarkdownDispatcher
cli_dispatcher(false), srv_dispatcher(true);
2165 DummyAuthClientServer
dummy_auth(g_ceph_context
);
2166 dummy_auth
.auth_registry
.refresh_config();
2167 entity_addr_t bind_addr
;
2168 bind_addr
.parse("v2:127.0.0.1:16800");
2169 server_msgr
->bind(bind_addr
);
2170 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
2171 server_msgr
->set_auth_client(&dummy_auth
);
2172 server_msgr
->set_auth_server(&dummy_auth
);
2173 server_msgr
->start();
2174 bind_addr
.parse("v2:127.0.0.1:16801");
2175 server_msgr2
->bind(bind_addr
);
2176 server_msgr2
->add_dispatcher_head(&srv_dispatcher
);
2177 server_msgr2
->set_auth_client(&dummy_auth
);
2178 server_msgr2
->set_auth_server(&dummy_auth
);
2179 server_msgr2
->start();
2181 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
2182 client_msgr
->set_auth_client(&dummy_auth
);
2183 client_msgr
->set_auth_server(&dummy_auth
);
2184 client_msgr
->start();
2189 uint64_t equal_count
= 0;
2191 ConnectionRef conn1
= client_msgr
->connect_to(server_msgr
->get_mytype(),
2192 server_msgr
->get_myaddrs());
2193 ConnectionRef conn2
= client_msgr
->connect_to(server_msgr2
->get_mytype(),
2194 server_msgr2
->get_myaddrs());
2195 MPing
*m
= new MPing();
2196 ASSERT_EQ(conn1
->send_message(m
), 0);
2198 ASSERT_EQ(conn2
->send_message(m
), 0);
2199 CHECK_AND_WAIT_TRUE(srv_dispatcher
.count
> last
+ 1);
2200 if (srv_dispatcher
.count
== last
) {
2201 lderr(g_ceph_context
) << __func__
<< " last is " << last
<< dendl
;
2208 last
= srv_dispatcher
.count
;
2211 ASSERT_FALSE(equal
&& equal_count
> 3);
2213 server_msgr
->shutdown();
2214 client_msgr
->shutdown();
2215 server_msgr2
->shutdown();
2216 server_msgr
->wait();
2217 client_msgr
->wait();
2218 server_msgr2
->wait();
2219 delete server_msgr2
;
2222 INSTANTIATE_TEST_SUITE_P(
2230 int main(int argc
, char **argv
) {
2231 vector
<const char*> args
;
2232 argv_to_vec(argc
, (const char **)argv
, args
);
2234 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
2235 CODE_ENVIRONMENT_UTILITY
,
2236 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
2237 g_ceph_context
->_conf
.set_val("auth_cluster_required", "none");
2238 g_ceph_context
->_conf
.set_val("auth_service_required", "none");
2239 g_ceph_context
->_conf
.set_val("auth_client_required", "none");
2240 g_ceph_context
->_conf
.set_val("keyring", "/dev/null");
2241 g_ceph_context
->_conf
.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2242 g_ceph_context
->_conf
.set_val("ms_die_on_bad_msg", "true");
2243 g_ceph_context
->_conf
.set_val("ms_die_on_old_message", "true");
2244 g_ceph_context
->_conf
.set_val("ms_max_backoff", "1");
2245 common_init_finish(g_ceph_context
);
2247 ::testing::InitGoogleTest(&argc
, argv
);
2248 return RUN_ALL_TESTS();
2253 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"