1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
22 #include "common/Mutex.h"
23 #include "common/Cond.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "msg/Dispatcher.h"
27 #include "msg/msg_types.h"
28 #include "msg/Message.h"
29 #include "msg/Messenger.h"
30 #include "msg/Connection.h"
31 #include "messages/MPing.h"
32 #include "messages/MCommand.h"
34 #include <boost/random/mersenne_twister.hpp>
35 #include <boost/random/uniform_int.hpp>
36 #include <boost/random/binomial_distribution.hpp>
37 #include <gtest/gtest.h>
39 typedef boost::mt11213b gen_type
;
41 #include "common/dout.h"
42 #include "include/assert.h"
44 #define dout_subsys ceph_subsys_ms
46 #define dout_prefix *_dout << " ceph_test_msgr "
49 #if GTEST_HAS_PARAM_TEST
51 #define CHECK_AND_WAIT_TRUE(expr) do { \
60 class MessengerTest
: public ::testing::TestWithParam
<const char*> {
62 Messenger
*server_msgr
;
63 Messenger
*client_msgr
;
65 MessengerTest(): server_msgr(NULL
), client_msgr(NULL
) {}
66 void SetUp() override
{
67 lderr(g_ceph_context
) << __func__
<< " start set up " << GetParam() << dendl
;
68 server_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
69 client_msgr
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
70 server_msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
71 client_msgr
->set_default_policy(Messenger::Policy::lossy_client(0));
73 void TearDown() override
{
74 ASSERT_EQ(server_msgr
->get_dispatch_queue_len(), 0);
75 ASSERT_EQ(client_msgr
->get_dispatch_queue_len(), 0);
83 class FakeDispatcher
: public Dispatcher
{
85 struct Session
: public RefCountedObject
{
86 atomic
<uint64_t> count
;
89 explicit Session(ConnectionRef c
): RefCountedObject(g_ceph_context
), count(0), con(c
) {
91 uint64_t get_count() { return count
; }
98 bool got_remote_reset
;
102 explicit FakeDispatcher(bool s
): Dispatcher(g_ceph_context
), lock("FakeDispatcher::lock"),
103 is_server(s
), got_new(false), got_remote_reset(false),
104 got_connect(false), loopback(false) {}
105 bool ms_can_fast_dispatch_any() const override
{ return true; }
106 bool ms_can_fast_dispatch(const Message
*m
) const override
{
107 switch (m
->get_type()) {
115 void ms_handle_fast_connect(Connection
*con
) override
{
117 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
118 Session
*s
= static_cast<Session
*>(con
->get_priv());
120 s
= new Session(con
);
121 con
->set_priv(s
->get());
122 lderr(g_ceph_context
) << __func__
<< " con: " << con
<< " count: " << s
->count
<< dendl
;
129 void ms_handle_fast_accept(Connection
*con
) override
{
130 Session
*s
= static_cast<Session
*>(con
->get_priv());
132 s
= new Session(con
);
133 con
->set_priv(s
->get());
137 bool ms_dispatch(Message
*m
) override
{
138 Session
*s
= static_cast<Session
*>(m
->get_connection()->get_priv());
140 s
= new Session(m
->get_connection());
141 m
->get_connection()->set_priv(s
->get());
145 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
149 Mutex::Locker
l(lock
);
155 bool ms_handle_reset(Connection
*con
) override
{
156 Mutex::Locker
l(lock
);
157 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
158 Session
*s
= static_cast<Session
*>(con
->get_priv());
160 s
->con
.reset(NULL
); // break con <-> session ref cycle
161 con
->set_priv(NULL
); // break ref <-> session cycle, if any
166 void ms_handle_remote_reset(Connection
*con
) override
{
167 Mutex::Locker
l(lock
);
168 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
169 Session
*s
= static_cast<Session
*>(con
->get_priv());
171 s
->con
.reset(NULL
); // break con <-> session ref cycle
172 con
->set_priv(NULL
); // break ref <-> session cycle, if any
175 got_remote_reset
= true;
178 bool ms_handle_refused(Connection
*con
) override
{
181 void ms_fast_dispatch(Message
*m
) override
{
182 Session
*s
= static_cast<Session
*>(m
->get_connection()->get_priv());
184 s
= new Session(m
->get_connection());
185 m
->get_connection()->set_priv(s
->get());
189 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << " session " << s
<< " count: " << s
->count
<< dendl
;
192 assert(m
->get_source().is_osd());
195 } else if (loopback
) {
196 assert(m
->get_source().is_client());
199 Mutex::Locker
l(lock
);
204 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
205 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
206 bool& isvalid
, CryptoKey
& session_key
) override
{
211 void reply_message(Message
*m
) {
212 MPing
*rm
= new MPing();
213 m
->get_connection()->send_message(rm
);
217 typedef FakeDispatcher::Session Session
;
219 TEST_P(MessengerTest
, SimpleTest
) {
220 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
221 entity_addr_t bind_addr
;
222 bind_addr
.parse("127.0.0.1");
223 server_msgr
->bind(bind_addr
);
224 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
225 server_msgr
->start();
227 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
228 client_msgr
->start();
230 // 1. simple round trip
231 MPing
*m
= new MPing();
232 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
234 ASSERT_EQ(conn
->send_message(m
), 0);
235 Mutex::Locker
l(cli_dispatcher
.lock
);
236 while (!cli_dispatcher
.got_new
)
237 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
238 cli_dispatcher
.got_new
= false;
240 ASSERT_TRUE(conn
->is_connected());
241 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
242 ASSERT_TRUE(conn
->peer_is_osd());
244 // 2. test rebind port
245 set
<int> avoid_ports
;
246 for (int i
= 0; i
< 10 ; i
++)
247 avoid_ports
.insert(server_msgr
->get_myaddr().get_port() + i
);
248 server_msgr
->rebind(avoid_ports
);
249 ASSERT_TRUE(avoid_ports
.count(server_msgr
->get_myaddr().get_port()) == 0);
251 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
254 ASSERT_EQ(conn
->send_message(m
), 0);
255 Mutex::Locker
l(cli_dispatcher
.lock
);
256 while (!cli_dispatcher
.got_new
)
257 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
258 cli_dispatcher
.got_new
= false;
260 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
262 // 3. test markdown connection
264 ASSERT_FALSE(conn
->is_connected());
266 // 4. test failed connection
267 server_msgr
->shutdown();
271 conn
->send_message(m
);
272 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
273 ASSERT_FALSE(conn
->is_connected());
275 // 5. loopback connection
276 srv_dispatcher
.loopback
= true;
277 conn
= client_msgr
->get_loopback_connection();
280 ASSERT_EQ(conn
->send_message(m
), 0);
281 Mutex::Locker
l(cli_dispatcher
.lock
);
282 while (!cli_dispatcher
.got_new
)
283 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
284 cli_dispatcher
.got_new
= false;
286 srv_dispatcher
.loopback
= false;
287 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
288 client_msgr
->shutdown();
290 server_msgr
->shutdown();
294 TEST_P(MessengerTest
, NameAddrTest
) {
295 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
296 entity_addr_t bind_addr
;
297 bind_addr
.parse("127.0.0.1");
298 server_msgr
->bind(bind_addr
);
299 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
300 server_msgr
->start();
302 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
303 client_msgr
->start();
305 MPing
*m
= new MPing();
306 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
308 ASSERT_EQ(conn
->send_message(m
), 0);
309 Mutex::Locker
l(cli_dispatcher
.lock
);
310 while (!cli_dispatcher
.got_new
)
311 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
312 cli_dispatcher
.got_new
= false;
314 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
315 ASSERT_TRUE(conn
->get_peer_addr() == server_msgr
->get_myaddr());
316 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
317 // Make should server_conn is the one we already accepted from client,
318 // so it means client_msgr has the same addr when server connection has
319 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
320 server_msgr
->shutdown();
321 client_msgr
->shutdown();
326 TEST_P(MessengerTest
, FeatureTest
) {
327 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
328 entity_addr_t bind_addr
;
329 bind_addr
.parse("127.0.0.1");
330 uint64_t all_feature_supported
, feature_required
, feature_supported
= 0;
331 for (int i
= 0; i
< 10; i
++)
332 feature_supported
|= 1ULL << i
;
333 feature_required
= feature_supported
| 1ULL << 13;
334 all_feature_supported
= feature_required
| 1ULL << 14;
336 Messenger::Policy p
= server_msgr
->get_policy(entity_name_t::TYPE_CLIENT
);
337 p
.features_required
= feature_required
;
338 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
339 server_msgr
->bind(bind_addr
);
340 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
341 server_msgr
->start();
343 // 1. Suppose if only support less than required
344 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
345 p
.features_supported
= feature_supported
;
346 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
347 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
348 client_msgr
->start();
350 MPing
*m
= new MPing();
351 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
352 conn
->send_message(m
);
353 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
354 // should failed build a connection
355 ASSERT_FALSE(conn
->is_connected());
357 client_msgr
->shutdown();
360 // 2. supported met required
361 p
= client_msgr
->get_policy(entity_name_t::TYPE_OSD
);
362 p
.features_supported
= all_feature_supported
;
363 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
364 client_msgr
->start();
366 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
369 ASSERT_EQ(conn
->send_message(m
), 0);
370 Mutex::Locker
l(cli_dispatcher
.lock
);
371 while (!cli_dispatcher
.got_new
)
372 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
373 cli_dispatcher
.got_new
= false;
375 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
377 server_msgr
->shutdown();
378 client_msgr
->shutdown();
383 TEST_P(MessengerTest
, TimeoutTest
) {
384 g_ceph_context
->_conf
->set_val("ms_tcp_read_timeout", "1");
385 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
386 entity_addr_t bind_addr
;
387 bind_addr
.parse("127.0.0.1");
388 server_msgr
->bind(bind_addr
);
389 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
390 server_msgr
->start();
392 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
393 client_msgr
->start();
395 // 1. build the connection
396 MPing
*m
= new MPing();
397 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
399 ASSERT_EQ(conn
->send_message(m
), 0);
400 Mutex::Locker
l(cli_dispatcher
.lock
);
401 while (!cli_dispatcher
.got_new
)
402 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
403 cli_dispatcher
.got_new
= false;
405 ASSERT_TRUE(conn
->is_connected());
406 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
407 ASSERT_TRUE(conn
->peer_is_osd());
411 ASSERT_FALSE(conn
->is_connected());
413 server_msgr
->shutdown();
416 client_msgr
->shutdown();
418 g_ceph_context
->_conf
->set_val("ms_tcp_read_timeout", "900");
421 TEST_P(MessengerTest
, StatefulTest
) {
423 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
424 entity_addr_t bind_addr
;
425 bind_addr
.parse("127.0.0.1");
426 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
427 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
428 p
= Messenger::Policy::lossless_client(0);
429 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
431 server_msgr
->bind(bind_addr
);
432 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
433 server_msgr
->start();
434 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
435 client_msgr
->start();
437 // 1. test for server standby
438 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
441 ASSERT_EQ(conn
->send_message(m
), 0);
442 Mutex::Locker
l(cli_dispatcher
.lock
);
443 while (!cli_dispatcher
.got_new
)
444 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
445 cli_dispatcher
.got_new
= false;
447 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
449 ASSERT_FALSE(conn
->is_connected());
450 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
452 ASSERT_TRUE(static_cast<Session
*>(server_conn
->get_priv())->get_count() == 1);
454 srv_dispatcher
.got_new
= false;
455 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
458 ASSERT_EQ(conn
->send_message(m
), 0);
459 Mutex::Locker
l(cli_dispatcher
.lock
);
460 while (!cli_dispatcher
.got_new
)
461 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
462 cli_dispatcher
.got_new
= false;
464 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
465 server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
467 Mutex::Locker
l(srv_dispatcher
.lock
);
468 while (!srv_dispatcher
.got_remote_reset
)
469 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
472 // 2. test for client reconnect
473 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
474 cli_dispatcher
.got_connect
= false;
475 cli_dispatcher
.got_new
= false;
476 cli_dispatcher
.got_remote_reset
= false;
477 server_conn
->mark_down();
478 ASSERT_FALSE(server_conn
->is_connected());
479 // ensure client detect server socket closed
481 Mutex::Locker
l(cli_dispatcher
.lock
);
482 while (!cli_dispatcher
.got_remote_reset
)
483 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
484 cli_dispatcher
.got_remote_reset
= false;
487 Mutex::Locker
l(cli_dispatcher
.lock
);
488 while (!cli_dispatcher
.got_connect
)
489 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
490 cli_dispatcher
.got_connect
= false;
492 CHECK_AND_WAIT_TRUE(conn
->is_connected());
493 ASSERT_TRUE(conn
->is_connected());
497 ASSERT_EQ(conn
->send_message(m
), 0);
498 ASSERT_TRUE(conn
->is_connected());
499 Mutex::Locker
l(cli_dispatcher
.lock
);
500 while (!cli_dispatcher
.got_new
)
501 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
502 cli_dispatcher
.got_new
= false;
505 ASSERT_EQ(1U, static_cast<Session
*>(conn
->get_priv())->get_count());
506 server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
507 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv())->get_count());
508 cli_dispatcher
.got_remote_reset
= false;
510 server_msgr
->shutdown();
511 client_msgr
->shutdown();
516 TEST_P(MessengerTest
, StatelessTest
) {
518 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
519 entity_addr_t bind_addr
;
520 bind_addr
.parse("127.0.0.1");
521 Messenger::Policy p
= Messenger::Policy::stateless_server(0);
522 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
523 p
= Messenger::Policy::lossy_client(0);
524 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
526 server_msgr
->bind(bind_addr
);
527 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
528 server_msgr
->start();
529 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
530 client_msgr
->start();
532 // 1. test for server lose state
533 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
536 ASSERT_EQ(conn
->send_message(m
), 0);
537 Mutex::Locker
l(cli_dispatcher
.lock
);
538 while (!cli_dispatcher
.got_new
)
539 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
540 cli_dispatcher
.got_new
= false;
542 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
544 ASSERT_FALSE(conn
->is_connected());
546 srv_dispatcher
.got_new
= false;
547 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
550 ASSERT_EQ(conn
->send_message(m
), 0);
551 Mutex::Locker
l(cli_dispatcher
.lock
);
552 while (!cli_dispatcher
.got_new
)
553 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
554 cli_dispatcher
.got_new
= false;
556 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
557 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
560 Mutex::Locker
l(srv_dispatcher
.lock
);
561 while (!srv_dispatcher
.got_new
)
562 srv_dispatcher
.cond
.Wait(srv_dispatcher
.lock
);
564 ASSERT_EQ(1U, static_cast<Session
*>(server_conn
->get_priv())->get_count());
566 // 2. test for client lossy
567 server_conn
->mark_down();
568 ASSERT_FALSE(server_conn
->is_connected());
569 conn
->send_keepalive();
570 CHECK_AND_WAIT_TRUE(!conn
->is_connected());
571 ASSERT_FALSE(conn
->is_connected());
572 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
575 ASSERT_EQ(conn
->send_message(m
), 0);
576 Mutex::Locker
l(cli_dispatcher
.lock
);
577 while (!cli_dispatcher
.got_new
)
578 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
579 cli_dispatcher
.got_new
= false;
581 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
583 server_msgr
->shutdown();
584 client_msgr
->shutdown();
589 TEST_P(MessengerTest
, ClientStandbyTest
) {
591 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
592 entity_addr_t bind_addr
;
593 bind_addr
.parse("127.0.0.1");
594 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
595 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
596 p
= Messenger::Policy::lossless_peer(0);
597 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
599 server_msgr
->bind(bind_addr
);
600 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
601 server_msgr
->start();
602 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
603 client_msgr
->start();
605 // 1. test for client standby, resetcheck
606 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
609 ASSERT_EQ(conn
->send_message(m
), 0);
610 Mutex::Locker
l(cli_dispatcher
.lock
);
611 while (!cli_dispatcher
.got_new
)
612 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
613 cli_dispatcher
.got_new
= false;
615 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
616 ConnectionRef server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
617 ASSERT_FALSE(cli_dispatcher
.got_remote_reset
);
618 cli_dispatcher
.got_connect
= false;
619 server_conn
->mark_down();
620 ASSERT_FALSE(server_conn
->is_connected());
621 // client should be standby
623 // client should be standby, so we use original connection
625 // Try send message to verify got remote reset callback
627 ASSERT_EQ(conn
->send_message(m
), 0);
629 Mutex::Locker
l(cli_dispatcher
.lock
);
630 while (!cli_dispatcher
.got_remote_reset
)
631 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
632 cli_dispatcher
.got_remote_reset
= false;
633 while (!cli_dispatcher
.got_connect
)
634 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
635 cli_dispatcher
.got_connect
= false;
637 CHECK_AND_WAIT_TRUE(conn
->is_connected());
638 ASSERT_TRUE(conn
->is_connected());
640 ASSERT_EQ(conn
->send_message(m
), 0);
641 Mutex::Locker
l(cli_dispatcher
.lock
);
642 while (!cli_dispatcher
.got_new
)
643 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
644 cli_dispatcher
.got_new
= false;
646 ASSERT_TRUE(static_cast<Session
*>(conn
->get_priv())->get_count() == 1);
647 server_conn
= server_msgr
->get_connection(client_msgr
->get_myinst());
648 ASSERT_TRUE(static_cast<Session
*>(server_conn
->get_priv())->get_count() == 1);
650 server_msgr
->shutdown();
651 client_msgr
->shutdown();
656 TEST_P(MessengerTest
, AuthTest
) {
657 g_ceph_context
->_conf
->set_val("auth_cluster_required", "cephx");
658 g_ceph_context
->_conf
->set_val("auth_service_required", "cephx");
659 g_ceph_context
->_conf
->set_val("auth_client_required", "cephx");
660 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
661 entity_addr_t bind_addr
;
662 bind_addr
.parse("127.0.0.1");
663 server_msgr
->bind(bind_addr
);
664 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
665 server_msgr
->start();
667 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
668 client_msgr
->start();
670 // 1. simple auth round trip
671 MPing
*m
= new MPing();
672 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
674 ASSERT_EQ(conn
->send_message(m
), 0);
675 Mutex::Locker
l(cli_dispatcher
.lock
);
676 while (!cli_dispatcher
.got_new
)
677 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
678 cli_dispatcher
.got_new
= false;
680 ASSERT_TRUE(conn
->is_connected());
681 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
684 g_ceph_context
->_conf
->set_val("auth_cluster_required", "none");
685 g_ceph_context
->_conf
->set_val("auth_service_required", "none");
686 g_ceph_context
->_conf
->set_val("auth_client_required", "none");
688 ASSERT_FALSE(conn
->is_connected());
689 conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
691 MPing
*m
= new MPing();
692 ASSERT_EQ(conn
->send_message(m
), 0);
693 Mutex::Locker
l(cli_dispatcher
.lock
);
694 while (!cli_dispatcher
.got_new
)
695 cli_dispatcher
.cond
.Wait(cli_dispatcher
.lock
);
696 cli_dispatcher
.got_new
= false;
698 ASSERT_TRUE(conn
->is_connected());
699 ASSERT_TRUE((static_cast<Session
*>(conn
->get_priv()))->get_count() == 1);
701 server_msgr
->shutdown();
702 client_msgr
->shutdown();
707 TEST_P(MessengerTest
, MessageTest
) {
708 FakeDispatcher
cli_dispatcher(false), srv_dispatcher(true);
709 entity_addr_t bind_addr
;
710 bind_addr
.parse("127.0.0.1");
711 Messenger::Policy p
= Messenger::Policy::stateful_server(0);
712 server_msgr
->set_policy(entity_name_t::TYPE_CLIENT
, p
);
713 p
= Messenger::Policy::lossless_peer(0);
714 client_msgr
->set_policy(entity_name_t::TYPE_OSD
, p
);
716 server_msgr
->bind(bind_addr
);
717 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
718 server_msgr
->start();
719 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
720 client_msgr
->start();
723 // 1. A very large "front"(as well as "payload")
724 // Because a external message need to invade Messenger::decode_message,
725 // here we only use existing message class(MCommand)
726 ConnectionRef conn
= client_msgr
->get_connection(server_msgr
->get_myinst());
729 uuid
.generate_random();
731 string
s("abcdefghijklmnopqrstuvwxyz");
732 for (int i
= 0; i
< 1024*30; i
++)
734 MCommand
*m
= new MCommand(uuid
);
736 conn
->send_message(m
);
739 Mutex::Locker
l(cli_dispatcher
.lock
);
740 while (!cli_dispatcher
.got_new
)
741 cli_dispatcher
.cond
.WaitInterval(cli_dispatcher
.lock
, t
);
742 ASSERT_TRUE(cli_dispatcher
.got_new
);
743 cli_dispatcher
.got_new
= false;
746 // 2. A very large "data"
749 string
s("abcdefghijklmnopqrstuvwxyz");
750 for (int i
= 0; i
< 1024*30; i
++)
752 MPing
*m
= new MPing();
754 conn
->send_message(m
);
757 Mutex::Locker
l(cli_dispatcher
.lock
);
758 while (!cli_dispatcher
.got_new
)
759 cli_dispatcher
.cond
.WaitInterval(cli_dispatcher
.lock
, t
);
760 ASSERT_TRUE(cli_dispatcher
.got_new
);
761 cli_dispatcher
.got_new
= false;
763 server_msgr
->shutdown();
764 client_msgr
->shutdown();
770 class SyntheticWorkload
;
781 Payload(Who who
, uint64_t seq
, const bufferlist
& data
)
782 : who(who
), seq(seq
), data(data
)
785 DENC(Payload
, v
, p
) {
793 WRITE_CLASS_DENC(Payload
)
795 ostream
& operator<<(ostream
& out
, const Payload
&pl
)
797 return out
<< "reply=" << pl
.who
<< " i = " << pl
.seq
;
800 class SyntheticDispatcher
: public Dispatcher
{
806 bool got_remote_reset
;
808 map
<ConnectionRef
, list
<uint64_t> > conn_sent
;
809 map
<uint64_t, bufferlist
> sent
;
810 atomic
<uint64_t> index
;
811 SyntheticWorkload
*workload
;
813 SyntheticDispatcher(bool s
, SyntheticWorkload
*wl
):
814 Dispatcher(g_ceph_context
), lock("SyntheticDispatcher::lock"), is_server(s
), got_new(false),
815 got_remote_reset(false), got_connect(false), index(0), workload(wl
) {}
816 bool ms_can_fast_dispatch_any() const override
{ return true; }
817 bool ms_can_fast_dispatch(const Message
*m
) const override
{
818 switch (m
->get_type()) {
827 void ms_handle_fast_connect(Connection
*con
) override
{
828 Mutex::Locker
l(lock
);
829 list
<uint64_t> c
= conn_sent
[con
];
830 for (list
<uint64_t>::iterator it
= c
.begin();
833 conn_sent
.erase(con
);
837 void ms_handle_fast_accept(Connection
*con
) override
{
838 Mutex::Locker
l(lock
);
839 list
<uint64_t> c
= conn_sent
[con
];
840 for (list
<uint64_t>::iterator it
= c
.begin();
843 conn_sent
.erase(con
);
846 bool ms_dispatch(Message
*m
) override
{
849 bool ms_handle_reset(Connection
*con
) override
;
850 void ms_handle_remote_reset(Connection
*con
) override
{
851 Mutex::Locker
l(lock
);
852 list
<uint64_t> c
= conn_sent
[con
];
853 for (list
<uint64_t>::iterator it
= c
.begin();
856 conn_sent
.erase(con
);
857 got_remote_reset
= true;
859 bool ms_handle_refused(Connection
*con
) override
{
862 void ms_fast_dispatch(Message
*m
) override
{
863 // MSG_COMMAND is used to disorganize regular message flow
864 if (m
->get_type() == MSG_COMMAND
) {
870 auto p
= m
->get_data().begin();
872 if (pl
.who
== Payload::PING
) {
873 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
874 reply_message(m
, pl
);
876 Mutex::Locker
l(lock
);
880 Mutex::Locker
l(lock
);
881 if (sent
.count(pl
.seq
)) {
882 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << pl
<< dendl
;
883 ASSERT_EQ(conn_sent
[m
->get_connection()].front(), pl
.seq
);
884 ASSERT_TRUE(pl
.data
.contents_equal(sent
[pl
.seq
]));
885 conn_sent
[m
->get_connection()].pop_front();
894 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
895 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
896 bool& isvalid
, CryptoKey
& session_key
) override
{
901 void reply_message(const Message
*m
, Payload
& pl
) {
902 pl
.who
= Payload::PONG
;
905 MPing
*rm
= new MPing();
907 m
->get_connection()->send_message(rm
);
908 lderr(g_ceph_context
) << __func__
<< " conn=" << m
->get_connection() << " reply m=" << m
<< " i=" << pl
.seq
<< dendl
;
911 void send_message_wrap(ConnectionRef con
, const bufferlist
& data
) {
912 Message
*m
= new MPing();
913 Payload pl
{Payload::PING
, index
++, data
};
917 if (!con
->get_messenger()->get_default_policy().lossy
) {
918 Mutex::Locker
l(lock
);
919 sent
[pl
.seq
] = pl
.data
;
920 conn_sent
[con
].push_back(pl
.seq
);
922 lderr(g_ceph_context
) << __func__
<< " conn=" << con
.get() << " send m=" << m
<< " i=" << pl
.seq
<< dendl
;
923 ASSERT_EQ(0, con
->send_message(m
));
926 uint64_t get_pending() {
927 Mutex::Locker
l(lock
);
931 void clear_pending(ConnectionRef con
) {
932 Mutex::Locker
l(lock
);
934 for (list
<uint64_t>::iterator it
= conn_sent
[con
].begin();
935 it
!= conn_sent
[con
].end(); ++it
)
937 conn_sent
.erase(con
);
941 for (auto && p
: conn_sent
) {
942 if (!p
.second
.empty()) {
943 lderr(g_ceph_context
) << __func__
<< " " << p
.first
<< " wait " << p
.second
.size() << dendl
;
950 class SyntheticWorkload
{
953 set
<Messenger
*> available_servers
;
954 set
<Messenger
*> available_clients
;
955 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> > available_connections
;
956 SyntheticDispatcher dispatcher
;
958 vector
<bufferlist
> rand_data
;
961 static const unsigned max_in_flight
= 64;
962 static const unsigned max_connections
= 128;
963 static const unsigned max_message_len
= 1024 * 1024 * 4;
965 SyntheticWorkload(int servers
, int clients
, string type
, int random_num
,
966 Messenger::Policy srv_policy
, Messenger::Policy cli_policy
):
967 lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL
)) {
969 int base_port
= 16800;
970 entity_addr_t bind_addr
;
972 for (int i
= 0; i
< servers
; ++i
) {
973 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0),
974 "server", getpid()+i
, 0);
975 snprintf(addr
, sizeof(addr
), "127.0.0.1:%d", base_port
+i
);
976 bind_addr
.parse(addr
);
977 msgr
->bind(bind_addr
);
978 msgr
->add_dispatcher_head(&dispatcher
);
981 msgr
->set_default_policy(srv_policy
);
982 available_servers
.insert(msgr
);
986 for (int i
= 0; i
< clients
; ++i
) {
987 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::CLIENT(-1),
988 "client", getpid()+i
+servers
, 0);
989 if (cli_policy
.standby
) {
990 snprintf(addr
, sizeof(addr
), "127.0.0.1:%d", base_port
+i
+servers
);
991 bind_addr
.parse(addr
);
992 msgr
->bind(bind_addr
);
994 msgr
->add_dispatcher_head(&dispatcher
);
997 msgr
->set_default_policy(cli_policy
);
998 available_clients
.insert(msgr
);
1002 for (int i
= 0; i
< random_num
; i
++) {
1004 boost::uniform_int
<> u(32, max_message_len
);
1005 uint64_t value_len
= u(rng
);
1006 bufferptr
bp(value_len
);
1008 for (uint64_t j
= 0; j
< value_len
-sizeof(i
); ) {
1009 memcpy(bp
.c_str()+j
, &i
, sizeof(i
));
1014 rand_data
.push_back(bl
);
1018 ConnectionRef
_get_random_connection() {
1019 while (dispatcher
.get_pending() > max_in_flight
) {
1024 assert(lock
.is_locked());
1025 boost::uniform_int
<> choose(0, available_connections
.size() - 1);
1026 int index
= choose(rng
);
1027 map
<ConnectionRef
, pair
<Messenger
*, Messenger
*> >::iterator i
= available_connections
.begin();
1028 for (; index
> 0; --index
, ++i
) ;
1032 bool can_create_connection() {
1033 return available_connections
.size() < max_connections
;
1036 void generate_connection() {
1037 Mutex::Locker
l(lock
);
1038 if (!can_create_connection())
1041 Messenger
*server
, *client
;
1043 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1044 int index
= choose(rng
);
1045 set
<Messenger
*>::iterator i
= available_servers
.begin();
1046 for (; index
> 0; --index
, ++i
) ;
1050 boost::uniform_int
<> choose(0, available_clients
.size() - 1);
1051 int index
= choose(rng
);
1052 set
<Messenger
*>::iterator i
= available_clients
.begin();
1053 for (; index
> 0; --index
, ++i
) ;
1057 pair
<Messenger
*, Messenger
*> p
;
1059 boost::uniform_int
<> choose(0, available_servers
.size() - 1);
1060 if (server
->get_default_policy().server
) {
1061 p
= make_pair(client
, server
);
1063 ConnectionRef conn
= client
->get_connection(server
->get_myinst());
1064 if (available_connections
.count(conn
) || choose(rng
) % 2)
1065 p
= make_pair(client
, server
);
1067 p
= make_pair(server
, client
);
1070 ConnectionRef conn
= p
.first
->get_connection(p
.second
->get_myinst());
1071 available_connections
[conn
] = p
;
1074 void send_message() {
1075 Mutex::Locker
l(lock
);
1076 ConnectionRef conn
= _get_random_connection();
1077 boost::uniform_int
<> true_false(0, 99);
1078 int val
= true_false(rng
);
1081 uuid
.generate_random();
1082 MCommand
*m
= new MCommand(uuid
);
1083 vector
<string
> cmds
;
1084 cmds
.push_back("command");
1086 m
->set_priority(200);
1087 conn
->send_message(m
);
1089 boost::uniform_int
<> u(0, rand_data
.size()-1);
1090 dispatcher
.send_message_wrap(conn
, rand_data
[u(rng
)]);
1094 void drop_connection() {
1095 Mutex::Locker
l(lock
);
1096 if (available_connections
.size() < 10)
1098 ConnectionRef conn
= _get_random_connection();
1099 dispatcher
.clear_pending(conn
);
1101 pair
<Messenger
*, Messenger
*> &p
= available_connections
[conn
];
1102 // it's a lossless policy, so we need to mark down each side
1103 if (!p
.first
->get_default_policy().server
&& !p
.second
->get_default_policy().server
) {
1104 ASSERT_EQ(conn
->get_messenger(), p
.first
);
1105 ConnectionRef peer
= p
.second
->get_connection(p
.first
->get_myinst());
1107 dispatcher
.clear_pending(peer
);
1108 available_connections
.erase(peer
);
1110 ASSERT_EQ(available_connections
.erase(conn
), 1U);
1113 void print_internal_state(bool detail
=false) {
1114 Mutex::Locker
l(lock
);
1115 lderr(g_ceph_context
) << "available_connections: " << available_connections
.size()
1116 << " inflight messages: " << dispatcher
.get_pending() << dendl
;
1117 if (detail
&& !available_connections
.empty()) {
1122 void wait_for_done() {
1123 int64_t tick_us
= 1000 * 100; // 100ms
1124 int64_t timeout_us
= 5 * 60 * 1000 * 1000; // 5 mins
1126 while (dispatcher
.get_pending()) {
1128 timeout_us
-= tick_us
;
1130 print_internal_state(true);
1132 assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!");
1134 for (set
<Messenger
*>::iterator it
= available_servers
.begin();
1135 it
!= available_servers
.end(); ++it
) {
1138 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1141 available_servers
.clear();
1143 for (set
<Messenger
*>::iterator it
= available_clients
.begin();
1144 it
!= available_clients
.end(); ++it
) {
1147 ASSERT_EQ((*it
)->get_dispatch_queue_len(), 0);
1150 available_clients
.clear();
1153 void handle_reset(Connection
*con
) {
1154 Mutex::Locker
l(lock
);
1155 available_connections
.erase(con
);
1156 dispatcher
.clear_pending(con
);
1160 bool SyntheticDispatcher::ms_handle_reset(Connection
*con
) {
1161 workload
->handle_reset(con
);
1165 TEST_P(MessengerTest
, SyntheticStressTest
) {
1166 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1167 Messenger::Policy::stateful_server(0),
1168 Messenger::Policy::lossless_client(0));
1169 for (int i
= 0; i
< 100; ++i
) {
1170 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1171 test_msg
.generate_connection();
1173 gen_type
rng(time(NULL
));
1174 for (int i
= 0; i
< 5000; ++i
) {
1176 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1177 test_msg
.print_internal_state();
1179 boost::uniform_int
<> true_false(0, 99);
1180 int val
= true_false(rng
);
1182 test_msg
.generate_connection();
1183 } else if (val
> 80) {
1184 test_msg
.drop_connection();
1185 } else if (val
> 10) {
1186 test_msg
.send_message();
1188 usleep(rand() % 1000 + 500);
1191 test_msg
.wait_for_done();
1194 TEST_P(MessengerTest
, SyntheticStressTest1
) {
1195 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
1196 Messenger::Policy::lossless_peer_reuse(0),
1197 Messenger::Policy::lossless_peer_reuse(0));
1198 for (int i
= 0; i
< 10; ++i
) {
1199 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1200 test_msg
.generate_connection();
1202 gen_type
rng(time(NULL
));
1203 for (int i
= 0; i
< 10000; ++i
) {
1205 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1206 test_msg
.print_internal_state();
1208 boost::uniform_int
<> true_false(0, 99);
1209 int val
= true_false(rng
);
1211 test_msg
.generate_connection();
1212 } else if (val
> 60) {
1213 test_msg
.drop_connection();
1214 } else if (val
> 10) {
1215 test_msg
.send_message();
1217 usleep(rand() % 1000 + 500);
1220 test_msg
.wait_for_done();
1224 TEST_P(MessengerTest
, SyntheticInjectTest
) {
1225 uint64_t dispatch_throttle_bytes
= g_ceph_context
->_conf
->ms_dispatch_throttle_bytes
;
1226 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "30");
1227 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1228 g_ceph_context
->_conf
->set_val("ms_dispatch_throttle_bytes", "16777216");
1229 SyntheticWorkload
test_msg(8, 32, GetParam(), 100,
1230 Messenger::Policy::stateful_server(0),
1231 Messenger::Policy::lossless_client(0));
1232 for (int i
= 0; i
< 100; ++i
) {
1233 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1234 test_msg
.generate_connection();
1236 gen_type
rng(time(NULL
));
1237 for (int i
= 0; i
< 1000; ++i
) {
1239 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1240 test_msg
.print_internal_state();
1242 boost::uniform_int
<> true_false(0, 99);
1243 int val
= true_false(rng
);
1245 test_msg
.generate_connection();
1246 } else if (val
> 80) {
1247 test_msg
.drop_connection();
1248 } else if (val
> 10) {
1249 test_msg
.send_message();
1251 usleep(rand() % 500 + 100);
1254 test_msg
.wait_for_done();
1255 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1256 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1257 g_ceph_context
->_conf
->set_val(
1258 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes
));
1261 TEST_P(MessengerTest
, SyntheticInjectTest2
) {
1262 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "30");
1263 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1264 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
1265 Messenger::Policy::lossless_peer_reuse(0),
1266 Messenger::Policy::lossless_peer_reuse(0));
1267 for (int i
= 0; i
< 100; ++i
) {
1268 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1269 test_msg
.generate_connection();
1271 gen_type
rng(time(NULL
));
1272 for (int i
= 0; i
< 1000; ++i
) {
1274 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1275 test_msg
.print_internal_state();
1277 boost::uniform_int
<> true_false(0, 99);
1278 int val
= true_false(rng
);
1280 test_msg
.generate_connection();
1281 } else if (val
> 80) {
1282 test_msg
.drop_connection();
1283 } else if (val
> 10) {
1284 test_msg
.send_message();
1286 usleep(rand() % 500 + 100);
1289 test_msg
.wait_for_done();
1290 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1291 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1294 TEST_P(MessengerTest
, SyntheticInjectTest3
) {
1295 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "600");
1296 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1297 SyntheticWorkload
test_msg(8, 16, GetParam(), 100,
1298 Messenger::Policy::stateless_server(0),
1299 Messenger::Policy::lossy_client(0));
1300 for (int i
= 0; i
< 100; ++i
) {
1301 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1302 test_msg
.generate_connection();
1304 gen_type
rng(time(NULL
));
1305 for (int i
= 0; i
< 1000; ++i
) {
1307 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1308 test_msg
.print_internal_state();
1310 boost::uniform_int
<> true_false(0, 99);
1311 int val
= true_false(rng
);
1313 test_msg
.generate_connection();
1314 } else if (val
> 80) {
1315 test_msg
.drop_connection();
1316 } else if (val
> 10) {
1317 test_msg
.send_message();
1319 usleep(rand() % 500 + 100);
1322 test_msg
.wait_for_done();
1323 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1324 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1328 TEST_P(MessengerTest
, SyntheticInjectTest4
) {
1329 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "30");
1330 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0.1");
1331 g_ceph_context
->_conf
->set_val("ms_inject_delay_probability", "1");
1332 g_ceph_context
->_conf
->set_val("ms_inject_delay_type", "client osd", false);
1333 g_ceph_context
->_conf
->set_val("ms_inject_delay_max", "5");
1334 SyntheticWorkload
test_msg(16, 32, GetParam(), 100,
1335 Messenger::Policy::lossless_peer(0),
1336 Messenger::Policy::lossless_peer(0));
1337 for (int i
= 0; i
< 100; ++i
) {
1338 if (!(i
% 10)) lderr(g_ceph_context
) << "seeding connection " << i
<< dendl
;
1339 test_msg
.generate_connection();
1341 gen_type
rng(time(NULL
));
1342 for (int i
= 0; i
< 1000; ++i
) {
1344 lderr(g_ceph_context
) << "Op " << i
<< ": " << dendl
;
1345 test_msg
.print_internal_state();
1347 boost::uniform_int
<> true_false(0, 99);
1348 int val
= true_false(rng
);
1350 test_msg
.generate_connection();
1351 } else if (val
> 80) {
1352 // test_msg.drop_connection();
1353 } else if (val
> 10) {
1354 test_msg
.send_message();
1356 usleep(rand() % 500 + 100);
1359 test_msg
.wait_for_done();
1360 g_ceph_context
->_conf
->set_val("ms_inject_socket_failures", "0");
1361 g_ceph_context
->_conf
->set_val("ms_inject_internal_delays", "0");
1362 g_ceph_context
->_conf
->set_val("ms_inject_delay_probability", "0");
1363 g_ceph_context
->_conf
->set_val("ms_inject_delay_type", "", false);
1364 g_ceph_context
->_conf
->set_val("ms_inject_delay_max", "0");
1368 class MarkdownDispatcher
: public Dispatcher
{
1370 set
<ConnectionRef
> conns
;
1373 std::atomic
<uint64_t> count
= { 0 };
1374 explicit MarkdownDispatcher(bool s
): Dispatcher(g_ceph_context
), lock("MarkdownDispatcher::lock"),
1376 bool ms_can_fast_dispatch_any() const override
{ return false; }
1377 bool ms_can_fast_dispatch(const Message
*m
) const override
{
1378 switch (m
->get_type()) {
1386 void ms_handle_fast_connect(Connection
*con
) override
{
1387 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
1388 Mutex::Locker
l(lock
);
1391 void ms_handle_fast_accept(Connection
*con
) override
{
1392 Mutex::Locker
l(lock
);
1395 bool ms_dispatch(Message
*m
) override
{
1396 lderr(g_ceph_context
) << __func__
<< " conn: " << m
->get_connection() << dendl
;
1397 Mutex::Locker
l(lock
);
1399 conns
.insert(m
->get_connection());
1400 if (conns
.size() < 2 && !last_mark
) {
1406 usleep(rand() % 500);
1407 for (set
<ConnectionRef
>::iterator it
= conns
.begin(); it
!= conns
.end(); ++it
) {
1408 if ((*it
) != m
->get_connection().get()) {
1419 bool ms_handle_reset(Connection
*con
) override
{
1420 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
1421 Mutex::Locker
l(lock
);
1423 usleep(rand() % 500);
1426 void ms_handle_remote_reset(Connection
*con
) override
{
1427 Mutex::Locker
l(lock
);
1429 lderr(g_ceph_context
) << __func__
<< " " << con
<< dendl
;
1431 bool ms_handle_refused(Connection
*con
) override
{
1434 void ms_fast_dispatch(Message
*m
) override
{
1437 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
1438 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
1439 bool& isvalid
, CryptoKey
& session_key
) override
{
1446 // Markdown with external lock
1447 TEST_P(MessengerTest
, MarkdownTest
) {
1448 Messenger
*server_msgr2
= Messenger::create(g_ceph_context
, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
1449 MarkdownDispatcher
cli_dispatcher(false), srv_dispatcher(true);
1450 entity_addr_t bind_addr
;
1451 bind_addr
.parse("127.0.0.1:16800");
1452 server_msgr
->bind(bind_addr
);
1453 server_msgr
->add_dispatcher_head(&srv_dispatcher
);
1454 server_msgr
->start();
1455 bind_addr
.parse("127.0.0.1:16801");
1456 server_msgr2
->bind(bind_addr
);
1457 server_msgr2
->add_dispatcher_head(&srv_dispatcher
);
1458 server_msgr2
->start();
1460 client_msgr
->add_dispatcher_head(&cli_dispatcher
);
1461 client_msgr
->start();
1466 uint64_t equal_count
= 0;
1468 ConnectionRef conn1
= client_msgr
->get_connection(server_msgr
->get_myinst());
1469 ConnectionRef conn2
= client_msgr
->get_connection(server_msgr2
->get_myinst());
1470 MPing
*m
= new MPing();
1471 ASSERT_EQ(conn1
->send_message(m
), 0);
1473 ASSERT_EQ(conn2
->send_message(m
), 0);
1474 CHECK_AND_WAIT_TRUE(srv_dispatcher
.count
> last
+ 1);
1475 if (srv_dispatcher
.count
== last
) {
1476 lderr(g_ceph_context
) << __func__
<< " last is " << last
<< dendl
;
1483 last
= srv_dispatcher
.count
;
1486 ASSERT_FALSE(equal
&& equal_count
> 3);
1488 server_msgr
->shutdown();
1489 client_msgr
->shutdown();
1490 server_msgr2
->shutdown();
1491 server_msgr
->wait();
1492 client_msgr
->wait();
1493 server_msgr2
->wait();
1494 delete server_msgr2
;
1497 INSTANTIATE_TEST_CASE_P(
1508 // Google Test may not support value-parameterized tests with some
1509 // compilers. If we use conditional compilation to compile out all
1510 // code referring to the gtest_main library, MSVC linker will not link
1511 // that library at all and consequently complain about missing entry
1512 // point defined in that library (fatal error LNK1561: entry point
1513 // must be defined). This dummy test keeps gtest_main linked in.
1514 TEST(DummyTest
, ValueParameterizedTestsAreNotSupportedOnThisPlatform
) {}
1519 int main(int argc
, char **argv
) {
1520 vector
<const char*> args
;
1521 argv_to_vec(argc
, (const char **)argv
, args
);
1524 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
, CODE_ENVIRONMENT_UTILITY
, 0);
1525 g_ceph_context
->_conf
->set_val("auth_cluster_required", "none");
1526 g_ceph_context
->_conf
->set_val("auth_service_required", "none");
1527 g_ceph_context
->_conf
->set_val("auth_client_required", "none");
1528 g_ceph_context
->_conf
->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
1529 g_ceph_context
->_conf
->set_val("ms_die_on_bad_msg", "true");
1530 g_ceph_context
->_conf
->set_val("ms_die_on_old_message", "true");
1531 g_ceph_context
->_conf
->set_val("ms_max_backoff", "1");
1532 common_init_finish(g_ceph_context
);
1534 ::testing::InitGoogleTest(&argc
, argv
);
1535 return RUN_ALL_TESTS();
1540 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"