1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
22 #include "common/Mutex.h"
23 #include "common/Cond.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "msg/Dispatcher.h"
27 #include "msg/msg_types.h"
28 #include "msg/Message.h"
29 #include "msg/Messenger.h"
30 #include "msg/Connection.h"
31 #include "messages/MPing.h"
32 #include "messages/MCommand.h"
34 #include <boost/random/mersenne_twister.hpp>
35 #include <boost/random/uniform_int.hpp>
36 #include <boost/random/binomial_distribution.hpp>
37 #include <gtest/gtest.h>
39 typedef boost::mt11213b gen_type
;
41 #include "common/dout.h"
42 #include "include/assert.h"
44 #define dout_subsys ceph_subsys_ms
46 #define dout_prefix *_dout << " ceph_test_msgr "
49 #if GTEST_HAS_PARAM_TEST
51 #define CHECK_AND_WAIT_TRUE(expr) do { \
60 class MessengerTest
: public ::testing::TestWithParam
<const char*> {
62 Messenger
*server_msgr
;
63 Messenger
*client_msgr
;
65 MessengerTest(): server_msgr(NULL
), client_msgr(NULL
) {}
66 void SetUp() override
{
67 lderr(g_ceph_context
) << __func__
<< " start set up " << GetParam() << dendl
;
68 server_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
69 client_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
70 server_msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
71 client_msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
73 void TearDown() override
{
74 ASSERT_EQ(server_msgr
->get_dispatch_queue_len(), 0);
75 ASSERT_EQ(client_msgr
->get_dispatch_queue_len(), 0);
83 class FakeDispatcher
: public Dispatcher
{
85 struct Session
: public RefCountedObject
{
86 atomic
<uint64_t> count
;
89 explicit Session(ConnectionRef c
): RefCountedObject(g_ceph_context
), count(0), con(c
) {
91 uint64_t get_count() { return count
; }
98 bool got_remote_reset
;
102 explicit FakeDispatcher(bool s
): Dispatcher(g_ceph_context
), lock("FakeDispatcher::lock"),
103 is_server(s
), got_new(false), got_remote_reset(false),
104 got_connect(false), loopback(false) {}
105 bool ms_can_fast_dispatch_any() const override
{ return true; }
106 bool ms_can_fast_dispatch(const Message
*m
) const override
{
107 switch (m
->get_type()) {
115 void ms_handle_fast_connect(Connection
*con
) override
{
117 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
118 Session
*s
= static_cast<Session
*>(con
->get_priv());
120 s
= new Session(con
);
121 con
->set_priv(s
->get());
122 lderr(g_ceph_context
) << __func__
<< " con: " << con
<< " count: " << s
->count
<< dendl
;
129 void ms_handle_fast_accept(Connection
*con
) override
{
130 Session
*s
= static_cast<Session
*>(con
->get_priv());
132 s
= new Session(con
);
133 con
->set_priv(s
->get());
137 bool ms_dispatch(Message
*m
) override
{
138 Session
*s
= static_cast<Session
*>(m
->get_connection()->get_priv());
140 s
= new Session(m
->get_connection());
141 m
->get_connection()->set_priv(s
->get());
145 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
149 Mutex::Locker
l(lock
);
155 bool ms_handle_reset(Connection
*con
) override
{
156 Mutex::Locker
l(lock
);
157 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
158 Session
*s
= static_cast<Session
*>(con
->get_priv());
160 s
->con
.reset(NULL
); // break con <-> session ref cycle
161 con
->set_priv(NULL
); // break ref <-> session cycle, if any
166 void ms_handle_remote_reset(Connection
*con
) override
{
167 Mutex::Locker
l(lock
);
168 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
169 Session
*s
= static_cast<Session
*>(con
->get_priv());
171 s
->con
.reset(NULL
); // break con <-> session ref cycle
172 con
->set_priv(NULL
); // break ref <-> session cycle, if any
175 got_remote_reset
= true;
178 bool ms_handle_refused(Connection
*con
) override
{
181 void ms_fast_dispatch(Message
*m
) override
{
182 Session
*s
= static_cast<Session
*>(m
->get_connection()->get_priv());
184 s
= new Session(m
->get_connection());
185 m
->get_connection()->set_priv(s
->get());
189 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
192 assert(m
->get_source().is_osd());
195 } else if (loopback
) {
196 assert(m
->get_source().is_client());
199 Mutex::Locker
l(lock
);
204 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
205 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
206 bool& isvalid
, CryptoKey
& session_key
,
207 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
) override
{
212 void reply_message(Message
*m
) {
213 MPing
*rm
= new MPing();
214 m
->get_connection()->send_message(rm
);
218 typedef FakeDispatcher::Session Session
;
220 TEST_P(MessengerTest
, SimpleTest
) {
221 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
222 entity_addr_t bind_addr
;
223 bind_addr
.parse("127.0.0.1");
224 server_msgr
->bind(bind_addr
);
225 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
226 server_msgr
->start();
228 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
229 client_msgr
->start();
231 // 1. simple round trip
232 MPing
*m
= new MPing();
233 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
235 ASSERT_EQ(conn
->send_message(m
), 0);
236 Mutex::Locker
l(cli_dispatcher
.lock
);
237 while (!cli_dispatcher
.got_new
)
238 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
239 cli_dispatcher
.got_new
= false;
241 ASSERT_TRUE(conn
->is_connected());
242 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
243 ASSERT_TRUE(conn
->peer_is_osd());
245 // 2. test rebind port
246 set
<int> avoid_ports
;
247 for (int i
= 0; i
< 10 ; i
++)
248 avoid_ports
.insert(server_msgr
->get_myaddr().get_port() + i
);
249 server_msgr
->rebind(avoid_ports
);
250 ASSERT_TRUE(avoid_ports
.count(server_msgr
->get_myaddr().get_port()) == 0);
252 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
255 ASSERT_EQ(conn
->send_message(m
), 0);
256 Mutex::Locker
l(cli_dispatcher
.lock
);
257 while (!cli_dispatcher
.got_new
)
258 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
259 cli_dispatcher
.got_new
= false;
261 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
263 // 3. test markdown connection
265 ASSERT_FALSE(conn
->is_connected());
267 // 4. test failed connection
268 server_msgr
->shutdown();
272 conn
->send_message(m
);
273 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
274 ASSERT_FALSE(conn
->is_connected());
276 // 5. loopback connection
277 srv_dispatcher
.loopback
= true;
278 conn
= client_msgr
->get_loopback_connection();
281 ASSERT_EQ(conn
->send_message(m
), 0);
282 Mutex::Locker
l(cli_dispatcher
.lock
);
283 while (!cli_dispatcher
.got_new
)
284 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
285 cli_dispatcher
.got_new
= false;
287 srv_dispatcher
.loopback
= false;
288 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
289 client_msgr
->shutdown();
291 server_msgr
->shutdown();
295 TEST_P(MessengerTest
, NameAddrTest
) {
296 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
297 entity_addr_t bind_addr
;
298 bind_addr
.parse("127.0.0.1");
299 server_msgr
->bind(bind_addr
);
300 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
301 server_msgr
->start();
303 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
304 client_msgr
->start();
306 MPing
*m
= new MPing();
307 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
309 ASSERT_EQ(conn
->send_message(m
), 0);
310 Mutex::Locker
l(cli_dispatcher
.lock
);
311 while (!cli_dispatcher
.got_new
)
312 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
313 cli_dispatcher
.got_new
= false;
315 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
316 ASSERT_TRUE(conn
->get_peer_addr() == server_msgr
->get_myaddr());
317 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
318 // Make should server_conn is the one we already accepted from client,
319 // so it means client_msgr has the same addr when server connection has
320 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
321 server_msgr
->shutdown();
322 client_msgr
->shutdown();
327 TEST_P(MessengerTest
, FeatureTest
) {
328 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
329 entity_addr_t bind_addr
;
330 bind_addr
.parse("127.0.0.1");
331 uint64_t all_feature_supported
, feature_required
, feature_supported
= 0;
332 for (int i
= 0; i
< 10; i
++)
333 feature_supported
|= 1ULL << i
;
334 feature_required
= feature_supported
| 1ULL << 13;
335 all_feature_supported
= feature_required
| 1ULL << 14;
337 Messenger::Policy p
= server_msgr
->get_policy(entity_name_t::TYPE_CLIENT
);
338 p
.features_required
= feature_required
;
339 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
340 server_msgr
->bind(bind_addr
);
341 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
342 server_msgr
->start();
344 // 1. Suppose if only support less than required
345 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
346 p
.features_supported
= feature_supported
;
347 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
348 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
349 client_msgr
->start();
351 MPing
*m
= new MPing();
352 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
353 conn
->send_message(m
);
354 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
355 // should failed build a connection
356 ASSERT_FALSE(conn
->is_connected());
358 client_msgr
->shutdown();
361 // 2. supported met required
362 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
363 p
.features_supported
= all_feature_supported
;
364 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
365 client_msgr
->start();
367 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
370 ASSERT_EQ(conn
->send_message(m
), 0);
371 Mutex::Locker
l(cli_dispatcher
.lock
);
372 while (!cli_dispatcher
.got_new
)
373 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
374 cli_dispatcher
.got_new
= false;
376 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
378 server_msgr
->shutdown();
379 client_msgr
->shutdown();
384 TEST_P(MessengerTest
, TimeoutTest
) {
385 g_ceph_context
->_conf
->set_val("ms_tcp_read_timeout", "1");
386 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
387 entity_addr_t bind_addr
;
388 bind_addr
.parse("127.0.0.1");
389 server_msgr
->bind(bind_addr
);
390 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
391 server_msgr
->start();
393 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
394 client_msgr
->start();
396 // 1. build the connection
397 MPing
*m
= new MPing();
398 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
400 ASSERT_EQ(conn
->send_message(m
), 0);
401 Mutex::Locker
l(cli_dispatcher
.lock
);
402 while (!cli_dispatcher
.got_new
)
403 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
404 cli_dispatcher
.got_new
= false;
406 ASSERT_TRUE(conn
->is_connected());
407 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
408 ASSERT_TRUE(conn
->peer_is_osd());
412 ASSERT_FALSE(conn
->is_connected());
414 server_msgr
->shutdown();
417 client_msgr
->shutdown();
419 g_ceph_context
->_conf
->set_val("ms_tcp_read_timeout", "900");
422 TEST_P(MessengerTest
, StatefulTest
) {
424 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
425 entity_addr_t bind_addr
;
426 bind_addr
.parse("127.0.0.1");
427 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
428 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
429 p
= Messenger::Policy::lossless_client(0);
430 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
432 server_msgr
->bind(bind_addr
);
433 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
434 server_msgr
->start();
435 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
436 client_msgr
->start();
438 // 1. test for server standby
439 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
442 ASSERT_EQ(conn
->send_message(m
), 0);
443 Mutex::Locker
l(cli_dispatcher
.lock
);
444 while (!cli_dispatcher
.got_new
)
445 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
446 cli_dispatcher
.got_new
= false;
448 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
450 ASSERT_FALSE(conn
->is_connected());
451 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
453 ASSERT_TRUE(static_cast<Session
*>(server_conn
->get_priv())->get_count() == 1);
455 srv_dispatcher
.got_new
= false;
456 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
459 ASSERT_EQ(conn
->send_message(m
), 0);
460 Mutex::Locker
l(cli_dispatcher
.lock
);
461 while (!cli_dispatcher
.got_new
)
462 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
463 cli_dispatcher
.got_new
= false;
465 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
466 server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
468 Mutex::Locker
l(srv_dispatcher
.lock
);
469 while (!srv_dispatcher
.got_remote_reset
)
470 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
473 // 2. test for client reconnect
474 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
475 cli_dispatcher
.got_connect
= false;
476 cli_dispatcher
.got_new
= false;
477 cli_dispatcher
.got_remote_reset
= false;
478 server_conn
->mark_down();
479 ASSERT_FALSE(server_conn
->is_connected());
480 // ensure client detect server socket closed
482 Mutex::Locker
l(cli_dispatcher
.lock
);
483 while (!cli_dispatcher
.got_remote_reset
)
484 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
485 cli_dispatcher
.got_remote_reset
= false;
488 Mutex::Locker
l(cli_dispatcher
.lock
);
489 while (!cli_dispatcher
.got_connect
)
490 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
491 cli_dispatcher
.got_connect
= false;
493 CHECK_AND_WAIT_TRUE(conn
->is_connected());
494 ASSERT_TRUE(conn
->is_connected());
498 ASSERT_EQ(conn
->send_message(m
), 0);
499 ASSERT_TRUE(conn
->is_connected());
500 Mutex::Locker
l(cli_dispatcher
.lock
);
501 while (!cli_dispatcher
.got_new
)
502 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
503 cli_dispatcher
.got_new
= false;
506 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv())->get_count());
507 server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
508 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv())->get_count());
509 cli_dispatcher
.got_remote_reset
= false;
511 server_msgr
->shutdown();
512 client_msgr
->shutdown();
517 TEST_P(MessengerTest
, StatelessTest
) {
519 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
520 entity_addr_t bind_addr
;
521 bind_addr
.parse("127.0.0.1");
522 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
523 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
524 p
= Messenger::Policy::lossy_client(0);
525 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
527 server_msgr
->bind(bind_addr
);
528 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
529 server_msgr
->start();
530 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
531 client_msgr
->start();
533 // 1. test for server lose state
534 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
537 ASSERT_EQ(conn
->send_message(m
), 0);
538 Mutex::Locker
l(cli_dispatcher
.lock
);
539 while (!cli_dispatcher
.got_new
)
540 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
541 cli_dispatcher
.got_new
= false;
543 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
545 ASSERT_FALSE(conn
->is_connected());
547 srv_dispatcher
.got_new
= false;
548 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
551 ASSERT_EQ(conn
->send_message(m
), 0);
552 Mutex::Locker
l(cli_dispatcher
.lock
);
553 while (!cli_dispatcher
.got_new
)
554 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
555 cli_dispatcher
.got_new
= false;
557 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
558 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
561 Mutex::Locker
l(srv_dispatcher
.lock
);
562 while (!srv_dispatcher
.got_new
)
563 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
565 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv())->get_count());
567 // 2. test for client lossy
568 server_conn
->mark_down();
569 ASSERT_FALSE(server_conn
->is_connected());
570 conn
->send_keepalive();
571 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
572 ASSERT_FALSE(conn
->is_connected());
573 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
576 ASSERT_EQ(conn
->send_message(m
), 0);
577 Mutex::Locker
l(cli_dispatcher
.lock
);
578 while (!cli_dispatcher
.got_new
)
579 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
580 cli_dispatcher
.got_new
= false;
582 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
584 server_msgr
->shutdown();
585 client_msgr
->shutdown();
590 TEST_P(MessengerTest
, ClientStandbyTest
) {
592 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
593 entity_addr_t bind_addr
;
594 bind_addr
.parse("127.0.0.1");
595 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
596 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
597 p
= Messenger::Policy::lossless_peer(0);
598 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
600 server_msgr
->bind(bind_addr
);
601 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
602 server_msgr
->start();
603 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
604 client_msgr
->start();
606 // 1. test for client standby, resetcheck
607 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
610 ASSERT_EQ(conn
->send_message(m
), 0);
611 Mutex::Locker
l(cli_dispatcher
.lock
);
612 while (!cli_dispatcher
.got_new
)
613 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
614 cli_dispatcher
.got_new
= false;
616 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
617 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
618 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
619 cli_dispatcher
.got_connect
= false;
620 server_conn
->mark_down();
621 ASSERT_FALSE(server_conn
->is_connected());
622 // client should be standby
624 // client should be standby, so we use original connection
626 // Try send message to verify got remote reset callback
628 ASSERT_EQ(conn
->send_message(m
), 0);
630 Mutex::Locker
l(cli_dispatcher
.lock
);
631 while (!cli_dispatcher
.got_remote_reset
)
632 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
633 cli_dispatcher
.got_remote_reset
= false;
634 while (!cli_dispatcher
.got_connect
)
635 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
636 cli_dispatcher
.got_connect
= false;
638 CHECK_AND_WAIT_TRUE(conn
->is_connected());
639 ASSERT_TRUE(conn
->is_connected());
641 ASSERT_EQ(conn
->send_message(m
), 0);
642 Mutex::Locker
l(cli_dispatcher
.lock
);
643 while (!cli_dispatcher
.got_new
)
644 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
645 cli_dispatcher
.got_new
= false;
647 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
648 server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
649 ASSERT_TRUE(static_cast<Session
*>(server_conn
->get_priv())->get_count() == 1);
651 server_msgr
->shutdown();
652 client_msgr
->shutdown();
657 TEST_P(MessengerTest
, AuthTest
) {
658 g_ceph_context
->_conf
->set_val("auth_cluster_required", "cephx");
659 g_ceph_context
->_conf
->set_val("auth_service_required", "cephx");
660 g_ceph_context
->_conf
->set_val("auth_client_required", "cephx");
661 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
662 entity_addr_t bind_addr
;
663 bind_addr
.parse("127.0.0.1");
664 server_msgr
->bind(bind_addr
);
665 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
666 server_msgr
->start();
668 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
669 client_msgr
->start();
671 // 1. simple auth round trip
672 MPing
*m
= new MPing();
673 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
675 ASSERT_EQ(conn
->send_message(m
), 0);
676 Mutex::Locker
l(cli_dispatcher
.lock
);
677 while (!cli_dispatcher
.got_new
)
678 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
679 cli_dispatcher
.got_new
= false;
681 ASSERT_TRUE(conn
->is_connected());
682 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
685 g_ceph_context
->_conf
->set_val("auth_cluster_required", "none");
686 g_ceph_context
->_conf
->set_val("auth_service_required", "none");
687 g_ceph_context
->_conf
->set_val("auth_client_required", "none");
689 ASSERT_FALSE(conn
->is_connected());
690 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
692 MPing
*m
= new MPing();
693 ASSERT_EQ(conn
->send_message(m
), 0);
694 Mutex::Locker
l(cli_dispatcher
.lock
);
695 while (!cli_dispatcher
.got_new
)
696 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
697 cli_dispatcher
.got_new
= false;
699 ASSERT_TRUE(conn
->is_connected());
700 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
702 server_msgr
->shutdown();
703 client_msgr
->shutdown();
708 TEST_P(MessengerTest
, MessageTest
) {
709 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
710 entity_addr_t bind_addr
;
711 bind_addr
.parse("127.0.0.1");
712 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
713 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
714 p
= Messenger::Policy::lossless_peer(0);
715 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
717 server_msgr
->bind(bind_addr
);
718 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
719 server_msgr
->start();
720 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
721 client_msgr
->start();
724 // 1. A very large "front"(as well as "payload")
725 // Because a external message need to invade Messenger::decode_message,
726 // here we only use existing message class(MCommand)
727 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
730 uuid
.generate_random();
732 string
s("abcdefghijklmnopqrstuvwxyz");
733 for (int i
= 0; i
< 1024*30; i
++)
735 MCommand
*m
= new MCommand(uuid
);
737 conn
->send_message(m
);
740 Mutex::Locker
l(cli_dispatcher
.lock
);
741 while (!cli_dispatcher
.got_new
)
742 cli_dispatcher
.cond
.WaitInterval(cli_dispatcher
.lock
, t
);
743 ASSERT_TRUE(cli_dispatcher
.got_new
);
744 cli_dispatcher
.got_new
= false;
747 // 2. A very large "data"
750 string
s("abcdefghijklmnopqrstuvwxyz");
751 for (int i
= 0; i
< 1024*30; i
++)
753 MPing
*m
= new MPing();
755 conn
->send_message(m
);
758 Mutex::Locker
l(cli_dispatcher
.lock
);
759 while (!cli_dispatcher
.got_new
)
760 cli_dispatcher
.cond
.WaitInterval(cli_dispatcher
.lock
, t
);
761 ASSERT_TRUE(cli_dispatcher
.got_new
);
762 cli_dispatcher
.got_new
= false;
764 server_msgr
->shutdown();
765 client_msgr
->shutdown();
771 class SyntheticWorkload
;
782 Payload(Who who
, uint64_t seq
, const bufferlist
& data
)
783 : who(who
), seq(seq
), data(data
)
786 DENC(Payload
, v
, p
) {
794 WRITE_CLASS_DENC(Payload
)
796 ostream
& operator<<(ostream
& out
, const Payload
&pl
)
798 return out
<< "reply=" << pl
.who
<< " i = " << pl
.seq
;
801 class SyntheticDispatcher
: public Dispatcher
{
807 bool got_remote_reset
;
809 map
<ConnectionRef
, list
<uint64_t> > conn_sent
;
810 map
<uint64_t, bufferlist
> sent
;
811 atomic
<uint64_t> index
;
812 SyntheticWorkload
*workload
;
814 SyntheticDispatcher(bool s
, SyntheticWorkload
*wl
):
815 Dispatcher(g_ceph_context
), lock("SyntheticDispatcher::lock"), is_server(s
), got_new(false),
816 got_remote_reset(false), got_connect(false), index(0), workload(wl
) {}
817 bool ms_can_fast_dispatch_any() const override
{ return true; }
818 bool ms_can_fast_dispatch(const Message
*m
) const override
{
819 switch (m
->get_type()) {
828 void ms_handle_fast_connect(Connection
*con
) override
{
829 Mutex::Locker
l(lock
);
830 list
<uint64_t> c
= conn_sent
[con
];
831 for (list
<uint64_t>::iterator it
= c
.begin();
834 conn_sent
.erase(con
);
838 void ms_handle_fast_accept(Connection
*con
) override
{
839 Mutex::Locker
l(lock
);
840 list
<uint64_t> c
= conn_sent
[con
];
841 for (list
<uint64_t>::iterator it
= c
.begin();
844 conn_sent
.erase(con
);
847 bool ms_dispatch(Message
*m
) override
{
850 bool ms_handle_reset(Connection
*con
) override
;
851 void ms_handle_remote_reset(Connection
*con
) override
{
852 Mutex::Locker
l(lock
);
853 list
<uint64_t> c
= conn_sent
[con
];
854 for (list
<uint64_t>::iterator it
= c
.begin();
857 conn_sent
.erase(con
);
858 got_remote_reset
= true;
860 bool ms_handle_refused(Connection
*con
) override
{
863 void ms_fast_dispatch(Message
*m
) override
{
864 // MSG_COMMAND is used to disorganize regular message flow
865 if (m
->get_type() == MSG_COMMAND
) {
871 auto p
= m
->get_data().begin();
873 if (pl
.who
== Payload::PING
) {
874 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
875 reply_message(m
, pl
);
877 Mutex::Locker
l(lock
);
881 Mutex::Locker
l(lock
);
882 if (sent
.count(pl
.seq
)) {
883 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
884 ASSERT_EQ(conn_sent
[m
->get_connection()].front(), pl
.seq
);
885 ASSERT_TRUE(pl
.data
.contents_equal(sent
[pl
.seq
]));
886 conn_sent
[m
->get_connection()].pop_front();
895 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
896 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
897 bool& isvalid
, CryptoKey
& session_key
,
898 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
) override
{
903 void reply_message(const Message
*m
, Payload
& pl
) {
904 pl
.who
= Payload::PONG
;
907 MPing
*rm
= new MPing();
909 m
->get_connection()->send_message(rm
);
910 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << " reply m=" << m
<< " i=" << pl
.seq
<< dendl
;
913 void send_message_wrap(ConnectionRef con
, const bufferlist
& data
) {
914 Message
*m
= new MPing();
915 Payload pl
{Payload::PING
, index
++, data
};
919 if (!con
->get_messenger()->get_default_policy().lossy
) {
920 Mutex::Locker
l(lock
);
921 sent
[pl
.seq
] = pl
.data
;
922 conn_sent
[con
].push_back(pl
.seq
);
924 lderr(g_ceph_context
) << __func__
<< " conn=" << con
.get() << " send m=" << m
<< " i=" << pl
.seq
<< dendl
;
925 ASSERT_EQ(0, con
->send_message(m
));
928 uint64_t get_pending() {
929 Mutex::Locker
l(lock
);
933 void clear_pending(ConnectionRef con
) {
934 Mutex::Locker
l(lock
);
936 for (list
<uint64_t>::iterator it
= conn_sent
[con
].begin();
937 it
!= conn_sent
[con
].end(); ++it
)
939 conn_sent
.erase(con
);
943 for (auto && p
: conn_sent
) {
944 if (!p
.second
.empty()) {
945 lderr(g_ceph_context
) << __func__
<< " " << p
.first
<< " wait " << p
.second
.size() << dendl
;
952 class SyntheticWorkload
{
955 set
<Messenger
*> available_servers
;
956 set
<Messenger
*> available_clients
;
957 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> > available_connections
;
958 SyntheticDispatcher dispatcher
;
960 vector
<bufferlist
> rand_data
;
963 static const unsigned max_in_flight
= 64;
964 static const unsigned max_connections
= 128;
965 static const unsigned max_message_len
= 1024 * 1024 * 4;
967 SyntheticWorkload(int servers
, int clients
, string type
, int random_num
,
968 Messenger::Policy srv_policy
, Messenger::Policy cli_policy
):
969 lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL
)) {
971 int base_port
= 16800;
972 entity_addr_t bind_addr
;
974 for (int i
= 0; i
< servers
; ++i
) {
975 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0),
976 "server", getpid()+i
, 0);
977 snprintf(addr
, sizeof(addr
), "127.0.0.1:%d", base_port
+i
);
978 bind_addr
.parse(addr
);
979 msgr
->bind(bind_addr
);
980 msgr
->add_dispatcher_head(&dispatcher
);
983 msgr
->set_default_policy(srv_policy
);
984 available_servers
.insert(msgr
);
988 for (int i
= 0; i
< clients
; ++i
) {
989 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(-1),
990 "client", getpid()+i
+servers
, 0);
991 if (cli_policy
.standby
) {
992 snprintf(addr
, sizeof(addr
), "127.0.0.1:%d", base_port
+i
+servers
);
993 bind_addr
.parse(addr
);
994 msgr
->bind(bind_addr
);
996 msgr
->add_dispatcher_head(&dispatcher
);
999 msgr
->set_default_policy(cli_policy
);
1000 available_clients
.insert(msgr
);
1004 for (int i
= 0; i
< random_num
; i
++) {
1006 boost::uniform_int
<> u(32, max_message_len
);
1007 uint64_t value_len
= u(rng
);
1008 bufferptr
bp(value_len
);
1010 for (uint64_t j
= 0; j
< value_len
-sizeof(i
); ) {
1011 memcpy(bp
.c_str()+j
, &i
, sizeof(i
));
1016 rand_data
.push_back(bl
);
1020 ConnectionRef
_get_random_connection() {
1021 while (dispatcher
.get_pending() > max_in_flight
) {
1026 assert(lock
.is_locked());
1027 boost::uniform_int
<> choose(0, available_connections
.size() - 1);
1028 int index
= choose(rng
);
1029 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> >::iterator i
= available_connections
.begin();
1030 for (; index
> 0; --index
, ++i
) ;
1034 bool can_create_connection() {
1035 return available_connections
.size() < max_connections
;
1038 void generate_connection() {
1039 Mutex::Locker
l(lock
);
1040 if (!can_create_connection())
1043 Messenger
*server
, *client
;
1045 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1046 int index
= choose(rng
);
1047 set
<Messenger
*>::iterator i
= available_servers
.begin();
1048 for (; index
> 0; --index
, ++i
) ;
1052 boost::uniform_int
<> choose(0, available_clients
.size() - 1);
1053 int index
= choose(rng
);
1054 set
<Messenger
*>::iterator i
= available_clients
.begin();
1055 for (; index
> 0; --index
, ++i
) ;
1059 pair
<Messenger
*, Messenger
*> p
;
1061 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1062 if (server
->get_default_policy().server
) {
1063 p
= make_pair(client
, server
);
1065 ConnectionRef conn
= client
->get_connection(server
->get_myinst());
1066 if (available_connections
.count(conn
) || choose(rng
) % 2)
1067 p
= make_pair(client
, server
);
1069 p
= make_pair(server
, client
);
1072 ConnectionRef conn
= p
.first
->get_connection(p
.second
->get_myinst());
1073 available_connections
[conn
] = p
;
1076 void send_message() {
1077 Mutex::Locker
l(lock
);
1078 ConnectionRef conn
= _get_random_connection();
1079 boost::uniform_int
<> true_false(0, 99);
1080 int val
= true_false(rng
);
1083 uuid
.generate_random();
1084 MCommand
*m
= new MCommand(uuid
);
1085 vector
<string
> cmds
;
1086 cmds
.push_back("command");
1088 m
->set_priority(200);
1089 conn
->send_message(m
);
1091 boost::uniform_int
<> u(0, rand_data
.size()-1);
1092 dispatcher
.send_message_wrap(conn
, rand_data
[u(rng
)]);
1096 void drop_connection() {
1097 Mutex::Locker
l(lock
);
1098 if (available_connections
.size() < 10)
1100 ConnectionRef conn
= _get_random_connection();
1101 dispatcher
.clear_pending(conn
);
1103 pair
<Messenger
*, Messenger
*> &p
= available_connections
[conn
];
1104 // it's a lossless policy, so we need to mark down each side
1105 if (!p
.first
->get_default_policy().server
&& !p
.second
->get_default_policy().server
) {
1106 ASSERT_EQ(conn
->get_messenger(), p
.first
);
1107 ConnectionRef peer
= p
.second
->get_connection(p
.first
->get_myinst());
1109 dispatcher
.clear_pending(peer
);
1110 available_connections
.erase(peer
);
1112 ASSERT_EQ(available_connections
.erase(conn
), 1U);
1115 void print_internal_state(bool detail
=false) {
1116 Mutex::Locker
l(lock
);
1117 lderr(g_ceph_context
) << "available_connections: " << available_connections
.size()
1118 << " inflight messages: " << dispatcher
.get_pending() << dendl
;
1119 if (detail
&& !available_connections
.empty()) {
1124 void wait_for_done() {
1125 int64_t tick_us
= 1000 * 100; // 100ms
1126 int64_t timeout_us
= 5 * 60 * 1000 * 1000; // 5 mins
1128 while (dispatcher
.get_pending()) {
1130 timeout_us
-= tick_us
;
1132 print_internal_state(true);
1134 assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!");
1136 for (set
<Messenger
*>::iterator it
= available_servers
.begin();
1137 it
!= available_servers
.end(); ++it
) {
1140 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1143 available_servers
.clear();
1145 for (set
<Messenger
*>::iterator it
= available_clients
.begin();
1146 it
!= available_clients
.end(); ++it
) {
1149 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1152 available_clients
.clear();
1155 void handle_reset(Connection
*con
) {
1156 Mutex::Locker
l(lock
);
1157 available_connections
.erase(con
);
1158 dispatcher
.clear_pending(con
);
1162 bool SyntheticDispatcher::ms_handle_reset(Connection
*con
) {
1163 workload
->handle_reset(con
);
1167 TEST_P(MessengerTest
, SyntheticStressTest
) {
1168 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1169 Messenger::Policy::stateful_server(0),
1170 Messenger::Policy::lossless_client(0));
1171 for (int i
= 0; i
< 100; ++i
) {
1172 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1173 test_msg
.generate_connection();
1175 gen_type
rng(time(NULL
));
1176 for (int i
= 0; i
< 5000; ++i
) {
1178 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1179 test_msg
.print_internal_state();
1181 boost::uniform_int
<> true_false(0, 99);
1182 int val
= true_false(rng
);
1184 test_msg
.generate_connection();
1185 } else if (val
> 80) {
1186 test_msg
.drop_connection();
1187 } else if (val
> 10) {
1188 test_msg
.send_message();
1190 usleep(rand() % 1000 + 500);
1193 test_msg
.wait_for_done();
1196 TEST_P(MessengerTest
, SyntheticStressTest1
) {
1197 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
1198 Messenger::Policy::lossless_peer_reuse(0),
1199 Messenger::Policy::lossless_peer_reuse(0));
1200 for (int i
= 0; i
< 10; ++i
) {
1201 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1202 test_msg
.generate_connection();
1204 gen_type
rng(time(NULL
));
1205 for (int i
= 0; i
< 10000; ++i
) {
1207 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1208 test_msg
.print_internal_state();
1210 boost::uniform_int
<> true_false(0, 99);
1211 int val
= true_false(rng
);
1213 test_msg
.generate_connection();
1214 } else if (val
> 60) {
1215 test_msg
.drop_connection();
1216 } else if (val
> 10) {
1217 test_msg
.send_message();
1219 usleep(rand() % 1000 + 500);
1222 test_msg
.wait_for_done();
1226 TEST_P(MessengerTest
, SyntheticInjectTest
) {
1227 uint64_t dispatch_throttle_bytes
= g_ceph_context
->_conf
->ms_dispatch_throttle_bytes
;
1228 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "30");
1229 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1230 g_ceph_context
->_conf
->set_val("ms_dispatch_throttle_bytes", "16777216");
1231 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1232 Messenger::Policy::stateful_server(0),
1233 Messenger::Policy::lossless_client(0));
1234 for (int i
= 0; i
< 100; ++i
) {
1235 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1236 test_msg
.generate_connection();
1238 gen_type
rng(time(NULL
));
1239 for (int i
= 0; i
< 1000; ++i
) {
1241 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1242 test_msg
.print_internal_state();
1244 boost::uniform_int
<> true_false(0, 99);
1245 int val
= true_false(rng
);
1247 test_msg
.generate_connection();
1248 } else if (val
> 80) {
1249 test_msg
.drop_connection();
1250 } else if (val
> 10) {
1251 test_msg
.send_message();
1253 usleep(rand() % 500 + 100);
1256 test_msg
.wait_for_done();
1257 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1258 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1259 g_ceph_context
->_conf
->set_val(
1260 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes
));
1263 TEST_P(MessengerTest
, SyntheticInjectTest2
) {
1264 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "30");
1265 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1266 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
1267 Messenger::Policy::lossless_peer_reuse(0),
1268 Messenger::Policy::lossless_peer_reuse(0));
1269 for (int i
= 0; i
< 100; ++i
) {
1270 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1271 test_msg
.generate_connection();
1273 gen_type
rng(time(NULL
));
1274 for (int i
= 0; i
< 1000; ++i
) {
1276 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1277 test_msg
.print_internal_state();
1279 boost::uniform_int
<> true_false(0, 99);
1280 int val
= true_false(rng
);
1282 test_msg
.generate_connection();
1283 } else if (val
> 80) {
1284 test_msg
.drop_connection();
1285 } else if (val
> 10) {
1286 test_msg
.send_message();
1288 usleep(rand() % 500 + 100);
1291 test_msg
.wait_for_done();
1292 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1293 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1296 TEST_P(MessengerTest
, SyntheticInjectTest3
) {
1297 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "600");
1298 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1299 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
1300 Messenger::Policy::stateless_server(0),
1301 Messenger::Policy::lossy_client(0));
1302 for (int i
= 0; i
< 100; ++i
) {
1303 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1304 test_msg
.generate_connection();
1306 gen_type
rng(time(NULL
));
1307 for (int i
= 0; i
< 1000; ++i
) {
1309 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1310 test_msg
.print_internal_state();
1312 boost::uniform_int
<> true_false(0, 99);
1313 int val
= true_false(rng
);
1315 test_msg
.generate_connection();
1316 } else if (val
> 80) {
1317 test_msg
.drop_connection();
1318 } else if (val
> 10) {
1319 test_msg
.send_message();
1321 usleep(rand() % 500 + 100);
1324 test_msg
.wait_for_done();
1325 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1326 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1330 TEST_P(MessengerTest
, SyntheticInjectTest4
) {
1331 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "30");
1332 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1333 g_ceph_context
->_conf
->set_val("ms_inject_delay_probability", "1");
1334 g_ceph_context
->_conf
->set_val("ms_inject_delay_type", "client osd", false);
1335 g_ceph_context
->_conf
->set_val("ms_inject_delay_max", "5");
1336 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
1337 Messenger::Policy::lossless_peer(0),
1338 Messenger::Policy::lossless_peer(0));
1339 for (int i
= 0; i
< 100; ++i
) {
1340 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1341 test_msg
.generate_connection();
1343 gen_type
rng(time(NULL
));
1344 for (int i
= 0; i
< 1000; ++i
) {
1346 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1347 test_msg
.print_internal_state();
1349 boost::uniform_int
<> true_false(0, 99);
1350 int val
= true_false(rng
);
1352 test_msg
.generate_connection();
1353 } else if (val
> 80) {
1354 // test_msg.drop_connection();
1355 } else if (val
> 10) {
1356 test_msg
.send_message();
1358 usleep(rand() % 500 + 100);
1361 test_msg
.wait_for_done();
1362 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1363 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1364 g_ceph_context
->_conf
->set_val("ms_inject_delay_probability", "0");
1365 g_ceph_context
->_conf
->set_val("ms_inject_delay_type", "", false);
1366 g_ceph_context
->_conf
->set_val("ms_inject_delay_max", "0");
1370 class MarkdownDispatcher
: public Dispatcher
{
1372 set
<ConnectionRef
> conns
;
1375 std::atomic
<uint64_t> count
= { 0 };
1376 explicit MarkdownDispatcher(bool s
): Dispatcher(g_ceph_context
), lock("MarkdownDispatcher::lock"),
1378 bool ms_can_fast_dispatch_any() const override
{ return false; }
1379 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1380 switch (m
->get_type()) {
1388 void ms_handle_fast_connect(Connection
*con
) override
{
1389 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
1390 Mutex::Locker
l(lock
);
1393 void ms_handle_fast_accept(Connection
*con
) override
{
1394 Mutex::Locker
l(lock
);
1397 bool ms_dispatch(Message
*m
) override
{
1398 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << dendl
;
1399 Mutex::Locker
l(lock
);
1401 conns
.insert(m
->get_connection());
1402 if (conns
.size() < 2 && !last_mark
) {
1408 usleep(rand() % 500);
1409 for (set
<ConnectionRef
>::iterator it
= conns
.begin(); it
!= conns
.end(); ++it
) {
1410 if ((*it
) != m
->get_connection().get()) {
1421 bool ms_handle_reset(Connection
*con
) override
{
1422 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
1423 Mutex::Locker
l(lock
);
1425 usleep(rand() % 500);
1428 void ms_handle_remote_reset(Connection
*con
) override
{
1429 Mutex::Locker
l(lock
);
1431 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
1433 bool ms_handle_refused(Connection
*con
) override
{
1436 void ms_fast_dispatch(Message
*m
) override
{
1439 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
1440 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
1441 bool& isvalid
, CryptoKey
& session_key
,
1442 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
) override
{
1449 // Markdown with external lock
1450 TEST_P(MessengerTest
, MarkdownTest
) {
1451 Messenger
*server_msgr2
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
1452 MarkdownDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1453 entity_addr_t bind_addr
;
1454 bind_addr
.parse("127.0.0.1:16800");
1455 server_msgr
->bind(bind_addr
);
1456 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1457 server_msgr
->start();
1458 bind_addr
.parse("127.0.0.1:16801");
1459 server_msgr2
->bind(bind_addr
);
1460 server_msgr2
->add_dispatcher_head(&srv_dispatcher
);
1461 server_msgr2
->start();
1463 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1464 client_msgr
->start();
1469 uint64_t equal_count
= 0;
1471 ConnectionRef conn1
= client_msgr
->get_connection(server_msgr
->get_myinst());
1472 ConnectionRef conn2
= client_msgr
->get_connection(server_msgr2
->get_myinst());
1473 MPing
*m
= new MPing();
1474 ASSERT_EQ(conn1
->send_message(m
), 0);
1476 ASSERT_EQ(conn2
->send_message(m
), 0);
1477 CHECK_AND_WAIT_TRUE(srv_dispatcher
.count
> last
+ 1);
1478 if (srv_dispatcher
.count
== last
) {
1479 lderr(g_ceph_context
) << __func__
<< " last is " << last
<< dendl
;
1486 last
= srv_dispatcher
.count
;
1489 ASSERT_FALSE(equal
&& equal_count
> 3);
1491 server_msgr
->shutdown();
1492 client_msgr
->shutdown();
1493 server_msgr2
->shutdown();
1494 server_msgr
->wait();
1495 client_msgr
->wait();
1496 server_msgr2
->wait();
1497 delete server_msgr2
;
1500 INSTANTIATE_TEST_CASE_P(
1511 // Google Test may not support value-parameterized tests with some
1512 // compilers. If we use conditional compilation to compile out all
1513 // code referring to the gtest_main library, MSVC linker will not link
1514 // that library at all and consequently complain about missing entry
1515 // point defined in that library (fatal error LNK1561: entry point
1516 // must be defined). This dummy test keeps gtest_main linked in.
1517 TEST(DummyTest
, ValueParameterizedTestsAreNotSupportedOnThisPlatform
) {}
1522 int main(int argc
, char **argv
) {
1523 vector
<const char*> args
;
1524 argv_to_vec(argc
, (const char **)argv
, args
);
1527 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
, CODE_ENVIRONMENT_UTILITY
, 0);
1528 g_ceph_context
->_conf
->set_val("auth_cluster_required", "none");
1529 g_ceph_context
->_conf
->set_val("auth_service_required", "none");
1530 g_ceph_context
->_conf
->set_val("auth_client_required", "none");
1531 g_ceph_context
->_conf
->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
1532 g_ceph_context
->_conf
->set_val("ms_die_on_bad_msg", "true");
1533 g_ceph_context
->_conf
->set_val("ms_die_on_old_message", "true");
1534 g_ceph_context
->_conf
->set_val("ms_max_backoff", "1");
1535 common_init_finish(g_ceph_context
);
1537 ::testing::InitGoogleTest(&argc
, argv
);
1538 return RUN_ALL_TESTS();
1543 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"