]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_msgr.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <atomic>
18 #include <iostream>
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <time.h>
22 #include <set>
23 #include <list>
24 #include "common/Mutex.h"
25 #include "common/Cond.h"
26 #include "common/ceph_argparse.h"
27 #include "global/global_init.h"
28 #include "msg/Dispatcher.h"
29 #include "msg/msg_types.h"
30 #include "msg/Message.h"
31 #include "msg/Messenger.h"
32 #include "msg/Connection.h"
33 #include "messages/MPing.h"
34 #include "messages/MCommand.h"
35
36 #include <boost/random/mersenne_twister.hpp>
37 #include <boost/random/uniform_int.hpp>
38 #include <boost/random/binomial_distribution.hpp>
39 #include <gtest/gtest.h>
40
41 typedef boost::mt11213b gen_type;
42
43 #include "common/dout.h"
44 #include "include/ceph_assert.h"
45
46 #include "auth/DummyAuth.h"
47
48 #define dout_subsys ceph_subsys_ms
49 #undef dout_prefix
50 #define dout_prefix *_dout << " ceph_test_msgr "
51
52
53 #if GTEST_HAS_PARAM_TEST
54
55 #define CHECK_AND_WAIT_TRUE(expr) do { \
56 int n = 1000; \
57 while (--n) { \
58 if (expr) \
59 break; \
60 usleep(1000); \
61 } \
62 } while(0);
63
64 class MessengerTest : public ::testing::TestWithParam<const char*> {
65 public:
66 DummyAuthClientServer dummy_auth;
67 Messenger *server_msgr;
68 Messenger *client_msgr;
69
70 MessengerTest() : dummy_auth(g_ceph_context),
71 server_msgr(NULL), client_msgr(NULL) {
72 dummy_auth.auth_registry.refresh_config();
73 }
74 void SetUp() override {
75 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
76 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
77 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
78 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
79 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
80 server_msgr->set_auth_client(&dummy_auth);
81 server_msgr->set_auth_server(&dummy_auth);
82 client_msgr->set_auth_client(&dummy_auth);
83 client_msgr->set_auth_server(&dummy_auth);
84 }
85 void TearDown() override {
86 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
87 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
88 delete server_msgr;
89 delete client_msgr;
90 }
91
92 };
93
94
95 class FakeDispatcher : public Dispatcher {
96 public:
97 struct Session : public RefCountedObject {
98 atomic<uint64_t> count;
99 ConnectionRef con;
100
101 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
102 }
103 uint64_t get_count() { return count; }
104 };
105
106 Mutex lock;
107 Cond cond;
108 bool is_server;
109 bool got_new;
110 bool got_remote_reset;
111 bool got_connect;
112 bool loopback;
113 entity_addrvec_t last_accept;
114
115 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
116 is_server(s), got_new(false), got_remote_reset(false),
117 got_connect(false), loopback(false) {
118 // don't need authorizers
119 ms_set_require_authorizer(false);
120 }
121 bool ms_can_fast_dispatch_any() const override { return true; }
122 bool ms_can_fast_dispatch(const Message *m) const override {
123 switch (m->get_type()) {
124 case CEPH_MSG_PING:
125 return true;
126 default:
127 return false;
128 }
129 }
130
131 void ms_handle_fast_connect(Connection *con) override {
132 lock.Lock();
133 lderr(g_ceph_context) << __func__ << " " << con << dendl;
134 auto s = con->get_priv();
135 if (!s) {
136 auto session = new Session(con);
137 con->set_priv(RefCountedPtr{session, false});
138 lderr(g_ceph_context) << __func__ << " con: " << con
139 << " count: " << session->count << dendl;
140 }
141 got_connect = true;
142 cond.Signal();
143 lock.Unlock();
144 }
145 void ms_handle_fast_accept(Connection *con) override {
146 last_accept = con->get_peer_addrs();
147 if (!con->get_priv()) {
148 con->set_priv(RefCountedPtr{new Session(con), false});
149 }
150 }
151 bool ms_dispatch(Message *m) override {
152 auto priv = m->get_connection()->get_priv();
153 auto s = static_cast<Session*>(priv.get());
154 if (!s) {
155 s = new Session(m->get_connection());
156 priv.reset(s, false);
157 m->get_connection()->set_priv(priv);
158 }
159 s->count++;
160 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
161 if (is_server) {
162 reply_message(m);
163 }
164 Mutex::Locker l(lock);
165 got_new = true;
166 cond.Signal();
167 m->put();
168 return true;
169 }
170 bool ms_handle_reset(Connection *con) override {
171 Mutex::Locker l(lock);
172 lderr(g_ceph_context) << __func__ << " " << con << dendl;
173 auto priv = con->get_priv();
174 if (auto s = static_cast<Session*>(priv.get()); s) {
175 s->con.reset(); // break con <-> session ref cycle
176 con->set_priv(nullptr); // break ref <-> session cycle, if any
177 }
178 return true;
179 }
180 void ms_handle_remote_reset(Connection *con) override {
181 Mutex::Locker l(lock);
182 lderr(g_ceph_context) << __func__ << " " << con << dendl;
183 auto priv = con->get_priv();
184 if (auto s = static_cast<Session*>(priv.get()); s) {
185 s->con.reset(); // break con <-> session ref cycle
186 con->set_priv(nullptr); // break ref <-> session cycle, if any
187 }
188 got_remote_reset = true;
189 cond.Signal();
190 }
191 bool ms_handle_refused(Connection *con) override {
192 return false;
193 }
194 void ms_fast_dispatch(Message *m) override {
195 auto priv = m->get_connection()->get_priv();
196 auto s = static_cast<Session*>(priv.get());
197 if (!s) {
198 s = new Session(m->get_connection());
199 priv.reset(s, false);
200 m->get_connection()->set_priv(priv);
201 }
202 s->count++;
203 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
204 if (is_server) {
205 if (loopback)
206 ceph_assert(m->get_source().is_osd());
207 else
208 reply_message(m);
209 } else if (loopback) {
210 ceph_assert(m->get_source().is_client());
211 }
212 m->put();
213 Mutex::Locker l(lock);
214 got_new = true;
215 cond.Signal();
216 }
217
218 int ms_handle_authentication(Connection *con) override {
219 return 1;
220 }
221
222 void reply_message(Message *m) {
223 MPing *rm = new MPing();
224 m->get_connection()->send_message(rm);
225 }
226 };
227
228 typedef FakeDispatcher::Session Session;
229
230 struct TestInterceptor : public Interceptor {
231
232 bool step_waiting = false;
233 bool waiting = true;
234 std::map<Connection *, uint32_t> current_step;
235 std::map<Connection *, std::list<uint32_t>> step_history;
236 std::map<uint32_t, std::optional<ACTION>> decisions;
237 std::set<uint32_t> breakpoints;
238
239 uint32_t count_step(Connection *conn, uint32_t step) {
240 uint32_t count = 0;
241 for (auto s : step_history[conn]) {
242 if (s == step) {
243 count++;
244 }
245 }
246 return count;
247 }
248
249 void breakpoint(uint32_t step) {
250 breakpoints.insert(step);
251 }
252
253 void remove_bp(uint32_t step) {
254 breakpoints.erase(step);
255 }
256
257 Connection *wait(uint32_t step, Connection *conn=nullptr) {
258 std::unique_lock<std::mutex> l(lock);
259 while(true) {
260 if (conn) {
261 auto it = current_step.find(conn);
262 if (it != current_step.end()) {
263 if (it->second == step) {
264 break;
265 }
266 }
267 } else {
268 for (auto it : current_step) {
269 if (it.second == step) {
270 conn = it.first;
271 break;
272 }
273 }
274 if (conn) {
275 break;
276 }
277 }
278 step_waiting = true;
279 cond_var.wait(l);
280 }
281 step_waiting = false;
282 return conn;
283 }
284
285 ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
286 if (decisions[step]) {
287 return *(decisions[step]);
288 }
289 waiting = true;
290 while(waiting) {
291 cond_var.wait(l);
292 }
293 return *(decisions[step]);
294 }
295
296 void proceed(uint32_t step, ACTION decision) {
297 std::unique_lock<std::mutex> l(lock);
298 decisions[step] = decision;
299 if (waiting) {
300 waiting = false;
301 cond_var.notify_one();
302 }
303 }
304
305 ACTION intercept(Connection *conn, uint32_t step) override {
306 lderr(g_ceph_context) << __func__ << " conn(" << conn
307 << ") intercept called on step=" << step << dendl;
308
309 {
310 std::unique_lock<std::mutex> l(lock);
311 step_history[conn].push_back(step);
312 current_step[conn] = step;
313 if (step_waiting) {
314 cond_var.notify_one();
315 }
316 }
317
318 std::unique_lock<std::mutex> l(lock);
319 ACTION decision = ACTION::CONTINUE;
320 if (breakpoints.find(step) != breakpoints.end()) {
321 lderr(g_ceph_context) << __func__ << " conn(" << conn
322 << ") pausing on step=" << step << dendl;
323 decision = wait_for_decision(step, l);
324 } else {
325 if (decisions[step]) {
326 decision = *(decisions[step]);
327 }
328 }
329 lderr(g_ceph_context) << __func__ << " conn(" << conn
330 << ") resuming step=" << step << " with decision="
331 << decision << dendl;
332 decisions[step].reset();
333 return decision;
334 }
335
336 };
337
338 /**
339 * Scenario: A connects to B, and B connects to A at the same time.
340 */
341 TEST_P(MessengerTest, ConnectionRaceTest) {
342 if (string(GetParam()) == "simple") {
343 return;
344 }
345
346 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
347
348 TestInterceptor *cli_interceptor = new TestInterceptor();
349 TestInterceptor *srv_interceptor = new TestInterceptor();
350
351 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
352 server_msgr->interceptor = srv_interceptor;
353
354 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
355 client_msgr->interceptor = cli_interceptor;
356
357 entity_addr_t bind_addr;
358 bind_addr.parse("v2:127.0.0.1:3300");
359 server_msgr->bind(bind_addr);
360 server_msgr->add_dispatcher_head(&srv_dispatcher);
361 server_msgr->start();
362
363 bind_addr.parse("v2:127.0.0.1:3301");
364 client_msgr->bind(bind_addr);
365 client_msgr->add_dispatcher_head(&cli_dispatcher);
366 client_msgr->start();
367
368 // pause before sending client_ident message
369 cli_interceptor->breakpoint(11);
370 // pause before sending client_ident message
371 srv_interceptor->breakpoint(11);
372
373 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
374 server_msgr->get_myaddrs());
375 MPing *m1 = new MPing();
376 ASSERT_EQ(c2s->send_message(m1), 0);
377
378 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
379 client_msgr->get_myaddrs());
380 MPing *m2 = new MPing();
381 ASSERT_EQ(s2c->send_message(m2), 0);
382
383 cli_interceptor->wait(11, c2s.get());
384 srv_interceptor->wait(11, s2c.get());
385
386 // at this point both connections (A->B, B->A) are paused just before sending
387 // the client_ident message.
388
389 cli_interceptor->remove_bp(11);
390 srv_interceptor->remove_bp(11);
391
392 cli_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
393 srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
394
395 {
396 Mutex::Locker l(cli_dispatcher.lock);
397 while (!cli_dispatcher.got_new)
398 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
399 cli_dispatcher.got_new = false;
400 }
401
402 {
403 Mutex::Locker l(srv_dispatcher.lock);
404 while (!srv_dispatcher.got_new)
405 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
406 srv_dispatcher.got_new = false;
407 }
408
409 ASSERT_TRUE(s2c->is_connected());
410 ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
411 ASSERT_TRUE(s2c->peer_is_client());
412
413 ASSERT_TRUE(c2s->is_connected());
414 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
415 ASSERT_TRUE(c2s->peer_is_osd());
416
417 client_msgr->shutdown();
418 client_msgr->wait();
419 server_msgr->shutdown();
420 server_msgr->wait();
421
422 delete cli_interceptor;
423 delete srv_interceptor;
424 }
425
426 /**
427 * Scenario:
428 * - A connects to B
429 * - A sends client_ident to B
430 * - B fails before sending server_ident to A
431 * - A reconnects
432 */
433 TEST_P(MessengerTest, MissingServerIdenTest) {
434 if (string(GetParam()) == "simple") {
435 return;
436 }
437
438 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
439
440 TestInterceptor *cli_interceptor = new TestInterceptor();
441 TestInterceptor *srv_interceptor = new TestInterceptor();
442
443 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
444 server_msgr->interceptor = srv_interceptor;
445
446 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
447 client_msgr->interceptor = cli_interceptor;
448
449 entity_addr_t bind_addr;
450 bind_addr.parse("v2:127.0.0.1:3300");
451 server_msgr->bind(bind_addr);
452 server_msgr->add_dispatcher_head(&srv_dispatcher);
453 server_msgr->start();
454
455 bind_addr.parse("v2:127.0.0.1:3301");
456 client_msgr->bind(bind_addr);
457 client_msgr->add_dispatcher_head(&cli_dispatcher);
458 client_msgr->start();
459
460 // pause before sending client_ident message
461 srv_interceptor->breakpoint(12);
462
463 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
464 server_msgr->get_myaddrs());
465 MPing *m1 = new MPing();
466 ASSERT_EQ(c2s->send_message(m1), 0);
467
468 Connection *c2s_accepter = srv_interceptor->wait(12);
469 srv_interceptor->remove_bp(12);
470
471 // We inject a message from this side of the connection to force it to be
472 // in standby when we inject the failure below
473 MPing *m2 = new MPing();
474 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
475
476 srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
477
478 {
479 Mutex::Locker l(srv_dispatcher.lock);
480 while (!srv_dispatcher.got_new)
481 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
482 srv_dispatcher.got_new = false;
483 }
484
485 {
486 Mutex::Locker l(cli_dispatcher.lock);
487 while (!cli_dispatcher.got_new)
488 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
489 cli_dispatcher.got_new = false;
490 }
491
492 ASSERT_TRUE(c2s->is_connected());
493 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
494 ASSERT_TRUE(c2s->peer_is_osd());
495
496 ASSERT_TRUE(c2s_accepter->is_connected());
497 ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
498 ASSERT_TRUE(c2s_accepter->peer_is_client());
499
500 client_msgr->shutdown();
501 client_msgr->wait();
502 server_msgr->shutdown();
503 server_msgr->wait();
504
505 delete cli_interceptor;
506 delete srv_interceptor;
507 }
508
509 /**
510 * Scenario:
511 * - A connects to B
512 * - A sends client_ident to B
513 * - B fails before sending server_ident to A
514 * - A goes to standby
515 * - B reconnects to A
516 */
517 TEST_P(MessengerTest, MissingServerIdenTest2) {
518 if (string(GetParam()) == "simple") {
519 return;
520 }
521
522 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
523
524 TestInterceptor *cli_interceptor = new TestInterceptor();
525 TestInterceptor *srv_interceptor = new TestInterceptor();
526
527 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
528 server_msgr->interceptor = srv_interceptor;
529
530 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
531 client_msgr->interceptor = cli_interceptor;
532
533 entity_addr_t bind_addr;
534 bind_addr.parse("v2:127.0.0.1:3300");
535 server_msgr->bind(bind_addr);
536 server_msgr->add_dispatcher_head(&srv_dispatcher);
537 server_msgr->start();
538
539 bind_addr.parse("v2:127.0.0.1:3301");
540 client_msgr->bind(bind_addr);
541 client_msgr->add_dispatcher_head(&cli_dispatcher);
542 client_msgr->start();
543
544 // pause before sending client_ident message
545 srv_interceptor->breakpoint(12);
546
547 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
548 server_msgr->get_myaddrs());
549
550 Connection *c2s_accepter = srv_interceptor->wait(12);
551 srv_interceptor->remove_bp(12);
552
553 // We inject a message from this side of the connection to force it to be
554 // in standby when we inject the failure below
555 MPing *m2 = new MPing();
556 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
557
558 srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
559
560 {
561 Mutex::Locker l(cli_dispatcher.lock);
562 while (!cli_dispatcher.got_new)
563 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
564 cli_dispatcher.got_new = false;
565 }
566
567 ASSERT_TRUE(c2s->is_connected());
568 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
569 ASSERT_TRUE(c2s->peer_is_osd());
570
571 ASSERT_TRUE(c2s_accepter->is_connected());
572 ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
573 ASSERT_TRUE(c2s_accepter->peer_is_client());
574
575 client_msgr->shutdown();
576 client_msgr->wait();
577 server_msgr->shutdown();
578 server_msgr->wait();
579
580 delete cli_interceptor;
581 delete srv_interceptor;
582 }
583
584 /**
585 * Scenario:
586 * - A connects to B
587 * - A and B exchange messages
588 * - A fails
589 * - B goes into standby
590 * - A reconnects
591 */
592 TEST_P(MessengerTest, ReconnectTest) {
593 if (string(GetParam()) == "simple") {
594 return;
595 }
596
597 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
598
599 TestInterceptor *cli_interceptor = new TestInterceptor();
600 TestInterceptor *srv_interceptor = new TestInterceptor();
601
602 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
603 server_msgr->interceptor = srv_interceptor;
604
605 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
606 client_msgr->interceptor = cli_interceptor;
607
608 entity_addr_t bind_addr;
609 bind_addr.parse("v2:127.0.0.1:3300");
610 server_msgr->bind(bind_addr);
611 server_msgr->add_dispatcher_head(&srv_dispatcher);
612 server_msgr->start();
613
614 bind_addr.parse("v2:127.0.0.1:3301");
615 client_msgr->bind(bind_addr);
616 client_msgr->add_dispatcher_head(&cli_dispatcher);
617 client_msgr->start();
618
619 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
620 server_msgr->get_myaddrs());
621
622 MPing *m1 = new MPing();
623 ASSERT_EQ(c2s->send_message(m1), 0);
624
625 {
626 Mutex::Locker l(cli_dispatcher.lock);
627 while (!cli_dispatcher.got_new)
628 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
629 cli_dispatcher.got_new = false;
630 }
631
632 ASSERT_TRUE(c2s->is_connected());
633 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
634 ASSERT_TRUE(c2s->peer_is_osd());
635
636 cli_interceptor->breakpoint(16);
637
638 MPing *m2 = new MPing();
639 ASSERT_EQ(c2s->send_message(m2), 0);
640
641 cli_interceptor->wait(16, c2s.get());
642 cli_interceptor->remove_bp(16);
643
644 // at this point client and server are connected together
645
646 srv_interceptor->breakpoint(15);
647
648 // failing client
649 cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
650
651 MPing *m3 = new MPing();
652 ASSERT_EQ(c2s->send_message(m3), 0);
653
654 Connection *c2s_accepter = srv_interceptor->wait(15);
655 // the srv end of theconnection is now paused at ready
656 // this means that the reconnect was successful
657 srv_interceptor->remove_bp(15);
658
659 ASSERT_TRUE(c2s_accepter->peer_is_client());
660 // c2s_accepter sent 0 reconnect messages
661 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 13), 0u);
662 // c2s_accepter sent 1 reconnect_ok messages
663 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 1u);
664 // c2s sent 1 reconnect messages
665 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
666 // c2s sent 0 reconnect_ok messages
667 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 0u);
668
669 srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
670
671 {
672 Mutex::Locker l(cli_dispatcher.lock);
673 while (!cli_dispatcher.got_new)
674 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
675 cli_dispatcher.got_new = false;
676 }
677
678 client_msgr->shutdown();
679 client_msgr->wait();
680 server_msgr->shutdown();
681 server_msgr->wait();
682
683 delete cli_interceptor;
684 delete srv_interceptor;
685 }
686
687 /**
688 * Scenario:
689 * - A connects to B
690 * - A and B exchange messages
691 * - A fails
692 * - A reconnects // B reconnects
693 */
694 TEST_P(MessengerTest, ReconnectRaceTest) {
695 if (string(GetParam()) == "simple") {
696 return;
697 }
698
699 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
700
701 TestInterceptor *cli_interceptor = new TestInterceptor();
702 TestInterceptor *srv_interceptor = new TestInterceptor();
703
704 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
705 server_msgr->interceptor = srv_interceptor;
706
707 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
708 client_msgr->interceptor = cli_interceptor;
709
710 entity_addr_t bind_addr;
711 bind_addr.parse("v2:127.0.0.1:3300");
712 server_msgr->bind(bind_addr);
713 server_msgr->add_dispatcher_head(&srv_dispatcher);
714 server_msgr->start();
715
716 bind_addr.parse("v2:127.0.0.1:3301");
717 client_msgr->bind(bind_addr);
718 client_msgr->add_dispatcher_head(&cli_dispatcher);
719 client_msgr->start();
720
721 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
722 server_msgr->get_myaddrs());
723
724 MPing *m1 = new MPing();
725 ASSERT_EQ(c2s->send_message(m1), 0);
726
727 {
728 Mutex::Locker l(cli_dispatcher.lock);
729 while (!cli_dispatcher.got_new)
730 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
731 cli_dispatcher.got_new = false;
732 }
733
734 ASSERT_TRUE(c2s->is_connected());
735 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
736 ASSERT_TRUE(c2s->peer_is_osd());
737
738 cli_interceptor->breakpoint(16);
739
740 MPing *m2 = new MPing();
741 ASSERT_EQ(c2s->send_message(m2), 0);
742
743 cli_interceptor->wait(16, c2s.get());
744 cli_interceptor->remove_bp(16);
745
746 // at this point client and server are connected together
747
748 // force both client and server to race on reconnect
749 cli_interceptor->breakpoint(13);
750 srv_interceptor->breakpoint(13);
751
752 // failing client
753 // this will cause both client and server to reconnect at the same time
754 cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
755
756 MPing *m3 = new MPing();
757 ASSERT_EQ(c2s->send_message(m3), 0);
758
759 cli_interceptor->wait(13, c2s.get());
760 srv_interceptor->wait(13);
761
762 cli_interceptor->remove_bp(13);
763 srv_interceptor->remove_bp(13);
764
765 // pause on "ready"
766 srv_interceptor->breakpoint(15);
767
768 cli_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
769 srv_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
770
771 Connection *c2s_accepter = srv_interceptor->wait(15);
772
773 // the server has reconnected and is "ready"
774 srv_interceptor->remove_bp(15);
775
776 ASSERT_TRUE(c2s_accepter->peer_is_client());
777 ASSERT_TRUE(c2s->peer_is_osd());
778
779 // the server should win the reconnect race
780
781 // c2s_accepter sent 1 or 2 reconnect messages
782 ASSERT_LT(srv_interceptor->count_step(c2s_accepter, 13), 3u);
783 ASSERT_GT(srv_interceptor->count_step(c2s_accepter, 13), 0u);
784 // c2s_accepter sent 0 reconnect_ok messages
785 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 0u);
786 // c2s sent 1 reconnect messages
787 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
788 // c2s sent 1 reconnect_ok messages
789 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 1u);
790
791 if (srv_interceptor->count_step(c2s_accepter, 13) == 2) {
792 // if the server send the reconnect message two times then
793 // the client must have sent a session retry message to the server
794 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 1u);
795 } else {
796 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 0u);
797 }
798
799 srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
800
801 {
802 Mutex::Locker l(cli_dispatcher.lock);
803 while (!cli_dispatcher.got_new)
804 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
805 cli_dispatcher.got_new = false;
806 }
807
808 client_msgr->shutdown();
809 client_msgr->wait();
810 server_msgr->shutdown();
811 server_msgr->wait();
812
813 delete cli_interceptor;
814 delete srv_interceptor;
815 }
816
817 TEST_P(MessengerTest, SimpleTest) {
818 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
819 entity_addr_t bind_addr;
820 if (string(GetParam()) == "simple")
821 bind_addr.parse("v1:127.0.0.1");
822 else
823 bind_addr.parse("v2:127.0.0.1");
824 server_msgr->bind(bind_addr);
825 server_msgr->add_dispatcher_head(&srv_dispatcher);
826 server_msgr->start();
827
828 client_msgr->add_dispatcher_head(&cli_dispatcher);
829 client_msgr->start();
830
831 // 1. simple round trip
832 MPing *m = new MPing();
833 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
834 server_msgr->get_myaddrs());
835 {
836 ASSERT_EQ(conn->send_message(m), 0);
837 Mutex::Locker l(cli_dispatcher.lock);
838 while (!cli_dispatcher.got_new)
839 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
840 cli_dispatcher.got_new = false;
841 }
842 ASSERT_TRUE(conn->is_connected());
843 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
844 ASSERT_TRUE(conn->peer_is_osd());
845
846 // 2. test rebind port
847 set<int> avoid_ports;
848 for (int i = 0; i < 10 ; i++) {
849 for (auto a : server_msgr->get_myaddrs().v) {
850 avoid_ports.insert(a.get_port() + i);
851 }
852 }
853 server_msgr->rebind(avoid_ports);
854 for (auto a : server_msgr->get_myaddrs().v) {
855 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
856 }
857
858 conn = client_msgr->connect_to(server_msgr->get_mytype(),
859 server_msgr->get_myaddrs());
860 {
861 m = new MPing();
862 ASSERT_EQ(conn->send_message(m), 0);
863 Mutex::Locker l(cli_dispatcher.lock);
864 while (!cli_dispatcher.got_new)
865 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
866 cli_dispatcher.got_new = false;
867 }
868 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
869
870 // 3. test markdown connection
871 conn->mark_down();
872 ASSERT_FALSE(conn->is_connected());
873
874 // 4. test failed connection
875 server_msgr->shutdown();
876 server_msgr->wait();
877
878 m = new MPing();
879 conn->send_message(m);
880 CHECK_AND_WAIT_TRUE(!conn->is_connected());
881 ASSERT_FALSE(conn->is_connected());
882
883 // 5. loopback connection
884 srv_dispatcher.loopback = true;
885 conn = client_msgr->get_loopback_connection();
886 {
887 m = new MPing();
888 ASSERT_EQ(conn->send_message(m), 0);
889 Mutex::Locker l(cli_dispatcher.lock);
890 while (!cli_dispatcher.got_new)
891 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
892 cli_dispatcher.got_new = false;
893 }
894 srv_dispatcher.loopback = false;
895 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
896 client_msgr->shutdown();
897 client_msgr->wait();
898 server_msgr->shutdown();
899 server_msgr->wait();
900 }
901
902 TEST_P(MessengerTest, SimpleMsgr2Test) {
903 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
904 entity_addr_t legacy_addr;
905 legacy_addr.parse("v1:127.0.0.1");
906 entity_addr_t msgr2_addr;
907 msgr2_addr.parse("v2:127.0.0.1");
908 entity_addrvec_t bind_addrs;
909 bind_addrs.v.push_back(legacy_addr);
910 bind_addrs.v.push_back(msgr2_addr);
911 server_msgr->bindv(bind_addrs);
912 server_msgr->add_dispatcher_head(&srv_dispatcher);
913 server_msgr->start();
914
915 client_msgr->add_dispatcher_head(&cli_dispatcher);
916 client_msgr->start();
917
918 // 1. simple round trip
919 MPing *m = new MPing();
920 ConnectionRef conn = client_msgr->connect_to(
921 server_msgr->get_mytype(),
922 server_msgr->get_myaddrs());
923 {
924 ASSERT_EQ(conn->send_message(m), 0);
925 Mutex::Locker l(cli_dispatcher.lock);
926 while (!cli_dispatcher.got_new)
927 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
928 cli_dispatcher.got_new = false;
929 }
930 ASSERT_TRUE(conn->is_connected());
931 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
932 ASSERT_TRUE(conn->peer_is_osd());
933
934 // 2. test rebind port
935 set<int> avoid_ports;
936 for (int i = 0; i < 10 ; i++) {
937 for (auto a : server_msgr->get_myaddrs().v) {
938 avoid_ports.insert(a.get_port() + i);
939 }
940 }
941 server_msgr->rebind(avoid_ports);
942 for (auto a : server_msgr->get_myaddrs().v) {
943 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
944 }
945
946 conn = client_msgr->connect_to(
947 server_msgr->get_mytype(),
948 server_msgr->get_myaddrs());
949 {
950 m = new MPing();
951 ASSERT_EQ(conn->send_message(m), 0);
952 Mutex::Locker l(cli_dispatcher.lock);
953 while (!cli_dispatcher.got_new)
954 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
955 cli_dispatcher.got_new = false;
956 }
957 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
958
959 // 3. test markdown connection
960 conn->mark_down();
961 ASSERT_FALSE(conn->is_connected());
962
963 // 4. test failed connection
964 server_msgr->shutdown();
965 server_msgr->wait();
966
967 m = new MPing();
968 conn->send_message(m);
969 CHECK_AND_WAIT_TRUE(!conn->is_connected());
970 ASSERT_FALSE(conn->is_connected());
971
972 // 5. loopback connection
973 srv_dispatcher.loopback = true;
974 conn = client_msgr->get_loopback_connection();
975 {
976 m = new MPing();
977 ASSERT_EQ(conn->send_message(m), 0);
978 Mutex::Locker l(cli_dispatcher.lock);
979 while (!cli_dispatcher.got_new)
980 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
981 cli_dispatcher.got_new = false;
982 }
983 srv_dispatcher.loopback = false;
984 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
985 client_msgr->shutdown();
986 client_msgr->wait();
987 server_msgr->shutdown();
988 server_msgr->wait();
989 }
990
991 TEST_P(MessengerTest, NameAddrTest) {
992 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
993 entity_addr_t bind_addr;
994 if (string(GetParam()) == "simple")
995 bind_addr.parse("v1:127.0.0.1");
996 else
997 bind_addr.parse("v2:127.0.0.1");
998 server_msgr->bind(bind_addr);
999 server_msgr->add_dispatcher_head(&srv_dispatcher);
1000 server_msgr->start();
1001
1002 client_msgr->add_dispatcher_head(&cli_dispatcher);
1003 client_msgr->start();
1004
1005 MPing *m = new MPing();
1006 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1007 server_msgr->get_myaddrs());
1008 {
1009 ASSERT_EQ(conn->send_message(m), 0);
1010 Mutex::Locker l(cli_dispatcher.lock);
1011 while (!cli_dispatcher.got_new)
1012 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1013 cli_dispatcher.got_new = false;
1014 }
1015 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1016 ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs());
1017 ConnectionRef server_conn = server_msgr->connect_to(
1018 client_msgr->get_mytype(), srv_dispatcher.last_accept);
1019 // Verify that server_conn is the one we already accepted from client,
1020 // so it means the session counter in server_conn is also incremented.
1021 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1022 server_msgr->shutdown();
1023 client_msgr->shutdown();
1024 server_msgr->wait();
1025 client_msgr->wait();
1026 }
1027
1028 TEST_P(MessengerTest, FeatureTest) {
1029 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1030 entity_addr_t bind_addr;
1031 if (string(GetParam()) == "simple")
1032 bind_addr.parse("v1:127.0.0.1");
1033 else
1034 bind_addr.parse("v2:127.0.0.1");
1035 uint64_t all_feature_supported, feature_required, feature_supported = 0;
1036 for (int i = 0; i < 10; i++)
1037 feature_supported |= 1ULL << i;
1038 feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2;
1039 feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS;
1040 feature_required = feature_supported | 1ULL << 13;
1041 all_feature_supported = feature_required | 1ULL << 14;
1042
1043 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
1044 p.features_required = feature_required;
1045 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1046 server_msgr->bind(bind_addr);
1047 server_msgr->add_dispatcher_head(&srv_dispatcher);
1048 server_msgr->start();
1049
1050 // 1. Suppose if only support less than required
1051 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1052 p.features_supported = feature_supported;
1053 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1054 client_msgr->add_dispatcher_head(&cli_dispatcher);
1055 client_msgr->start();
1056
1057 MPing *m = new MPing();
1058 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1059 server_msgr->get_myaddrs());
1060 conn->send_message(m);
1061 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1062 // should failed build a connection
1063 ASSERT_FALSE(conn->is_connected());
1064
1065 client_msgr->shutdown();
1066 client_msgr->wait();
1067
1068 // 2. supported met required
1069 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1070 p.features_supported = all_feature_supported;
1071 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1072 client_msgr->start();
1073
1074 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1075 server_msgr->get_myaddrs());
1076 {
1077 m = new MPing();
1078 ASSERT_EQ(conn->send_message(m), 0);
1079 Mutex::Locker l(cli_dispatcher.lock);
1080 while (!cli_dispatcher.got_new)
1081 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1082 cli_dispatcher.got_new = false;
1083 }
1084 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1085
1086 server_msgr->shutdown();
1087 client_msgr->shutdown();
1088 server_msgr->wait();
1089 client_msgr->wait();
1090 }
1091
1092 TEST_P(MessengerTest, TimeoutTest) {
1093 g_ceph_context->_conf.set_val("ms_tcp_read_timeout", "1");
1094 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1095 entity_addr_t bind_addr;
1096 if (string(GetParam()) == "simple")
1097 bind_addr.parse("v1:127.0.0.1");
1098 else
1099 bind_addr.parse("v2:127.0.0.1");
1100 server_msgr->bind(bind_addr);
1101 server_msgr->add_dispatcher_head(&srv_dispatcher);
1102 server_msgr->start();
1103
1104 client_msgr->add_dispatcher_head(&cli_dispatcher);
1105 client_msgr->start();
1106
1107 // 1. build the connection
1108 MPing *m = new MPing();
1109 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1110 server_msgr->get_myaddrs());
1111 {
1112 ASSERT_EQ(conn->send_message(m), 0);
1113 Mutex::Locker l(cli_dispatcher.lock);
1114 while (!cli_dispatcher.got_new)
1115 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1116 cli_dispatcher.got_new = false;
1117 }
1118 ASSERT_TRUE(conn->is_connected());
1119 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1120 ASSERT_TRUE(conn->peer_is_osd());
1121
1122 // 2. wait for idle
1123 usleep(2500*1000);
1124 ASSERT_FALSE(conn->is_connected());
1125
1126 server_msgr->shutdown();
1127 server_msgr->wait();
1128
1129 client_msgr->shutdown();
1130 client_msgr->wait();
1131 g_ceph_context->_conf.set_val("ms_tcp_read_timeout", "900");
1132 }
1133
1134 TEST_P(MessengerTest, StatefulTest) {
1135 Message *m;
1136 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1137 entity_addr_t bind_addr;
1138 if (string(GetParam()) == "simple")
1139 bind_addr.parse("v1:127.0.0.1");
1140 else
1141 bind_addr.parse("v2:127.0.0.1");
1142 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1143 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1144 p = Messenger::Policy::lossless_client(0);
1145 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1146
1147 server_msgr->bind(bind_addr);
1148 server_msgr->add_dispatcher_head(&srv_dispatcher);
1149 server_msgr->start();
1150 client_msgr->add_dispatcher_head(&cli_dispatcher);
1151 client_msgr->start();
1152
1153 // 1. test for server standby
1154 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1155 server_msgr->get_myaddrs());
1156 {
1157 m = new MPing();
1158 ASSERT_EQ(conn->send_message(m), 0);
1159 Mutex::Locker l(cli_dispatcher.lock);
1160 while (!cli_dispatcher.got_new)
1161 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1162 cli_dispatcher.got_new = false;
1163 }
1164 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1165 conn->mark_down();
1166 ASSERT_FALSE(conn->is_connected());
1167 ConnectionRef server_conn = server_msgr->connect_to(
1168 client_msgr->get_mytype(), srv_dispatcher.last_accept);
1169 // don't lose state
1170 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1171
1172 srv_dispatcher.got_new = false;
1173 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1174 server_msgr->get_myaddrs());
1175 {
1176 m = new MPing();
1177 ASSERT_EQ(conn->send_message(m), 0);
1178 Mutex::Locker l(cli_dispatcher.lock);
1179 while (!cli_dispatcher.got_new)
1180 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1181 cli_dispatcher.got_new = false;
1182 }
1183 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1184 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1185 srv_dispatcher.last_accept);
1186 {
1187 Mutex::Locker l(srv_dispatcher.lock);
1188 while (!srv_dispatcher.got_remote_reset)
1189 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
1190 }
1191
1192 // 2. test for client reconnect
1193 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1194 cli_dispatcher.got_connect = false;
1195 cli_dispatcher.got_new = false;
1196 cli_dispatcher.got_remote_reset = false;
1197 server_conn->mark_down();
1198 ASSERT_FALSE(server_conn->is_connected());
1199 // ensure client detect server socket closed
1200 {
1201 Mutex::Locker l(cli_dispatcher.lock);
1202 while (!cli_dispatcher.got_remote_reset)
1203 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1204 cli_dispatcher.got_remote_reset = false;
1205 }
1206 {
1207 Mutex::Locker l(cli_dispatcher.lock);
1208 while (!cli_dispatcher.got_connect)
1209 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1210 cli_dispatcher.got_connect = false;
1211 }
1212 CHECK_AND_WAIT_TRUE(conn->is_connected());
1213 ASSERT_TRUE(conn->is_connected());
1214
1215 {
1216 m = new MPing();
1217 ASSERT_EQ(conn->send_message(m), 0);
1218 ASSERT_TRUE(conn->is_connected());
1219 Mutex::Locker l(cli_dispatcher.lock);
1220 while (!cli_dispatcher.got_new)
1221 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1222 cli_dispatcher.got_new = false;
1223 }
1224 // resetcheck happen
1225 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1226 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1227 srv_dispatcher.last_accept);
1228 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1229 cli_dispatcher.got_remote_reset = false;
1230
1231 server_msgr->shutdown();
1232 client_msgr->shutdown();
1233 server_msgr->wait();
1234 client_msgr->wait();
1235 }
1236
1237 TEST_P(MessengerTest, StatelessTest) {
1238 Message *m;
1239 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1240 entity_addr_t bind_addr;
1241 if (string(GetParam()) == "simple")
1242 bind_addr.parse("v1:127.0.0.1");
1243 else
1244 bind_addr.parse("v2:127.0.0.1");
1245 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1246 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1247 p = Messenger::Policy::lossy_client(0);
1248 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1249
1250 server_msgr->bind(bind_addr);
1251 server_msgr->add_dispatcher_head(&srv_dispatcher);
1252 server_msgr->start();
1253 client_msgr->add_dispatcher_head(&cli_dispatcher);
1254 client_msgr->start();
1255
1256 // 1. test for server lose state
1257 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1258 server_msgr->get_myaddrs());
1259 {
1260 m = new MPing();
1261 ASSERT_EQ(conn->send_message(m), 0);
1262 Mutex::Locker l(cli_dispatcher.lock);
1263 while (!cli_dispatcher.got_new)
1264 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1265 cli_dispatcher.got_new = false;
1266 }
1267 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1268 conn->mark_down();
1269 ASSERT_FALSE(conn->is_connected());
1270
1271 srv_dispatcher.got_new = false;
1272 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1273 server_msgr->get_myaddrs());
1274 {
1275 m = new MPing();
1276 ASSERT_EQ(conn->send_message(m), 0);
1277 Mutex::Locker l(cli_dispatcher.lock);
1278 while (!cli_dispatcher.got_new)
1279 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1280 cli_dispatcher.got_new = false;
1281 }
1282 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1283 ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1284 srv_dispatcher.last_accept);
1285 // server lose state
1286 {
1287 Mutex::Locker l(srv_dispatcher.lock);
1288 while (!srv_dispatcher.got_new)
1289 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
1290 }
1291 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1292
1293 // 2. test for client lossy
1294 server_conn->mark_down();
1295 ASSERT_FALSE(server_conn->is_connected());
1296 conn->send_keepalive();
1297 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1298 ASSERT_FALSE(conn->is_connected());
1299 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1300 server_msgr->get_myaddrs());
1301 {
1302 m = new MPing();
1303 ASSERT_EQ(conn->send_message(m), 0);
1304 Mutex::Locker l(cli_dispatcher.lock);
1305 while (!cli_dispatcher.got_new)
1306 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1307 cli_dispatcher.got_new = false;
1308 }
1309 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1310
1311 server_msgr->shutdown();
1312 client_msgr->shutdown();
1313 server_msgr->wait();
1314 client_msgr->wait();
1315 }
1316
1317 TEST_P(MessengerTest, ClientStandbyTest) {
1318 Message *m;
1319 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1320 entity_addr_t bind_addr;
1321 if (string(GetParam()) == "simple")
1322 bind_addr.parse("v1:127.0.0.1");
1323 else
1324 bind_addr.parse("v2:127.0.0.1");
1325 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1326 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1327 p = Messenger::Policy::lossless_peer(0);
1328 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1329
1330 server_msgr->bind(bind_addr);
1331 server_msgr->add_dispatcher_head(&srv_dispatcher);
1332 server_msgr->start();
1333 client_msgr->add_dispatcher_head(&cli_dispatcher);
1334 client_msgr->start();
1335
1336 // 1. test for client standby, resetcheck
1337 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1338 server_msgr->get_myaddrs());
1339 {
1340 m = new MPing();
1341 ASSERT_EQ(conn->send_message(m), 0);
1342 Mutex::Locker l(cli_dispatcher.lock);
1343 while (!cli_dispatcher.got_new)
1344 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1345 cli_dispatcher.got_new = false;
1346 }
1347 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1348 ConnectionRef server_conn = server_msgr->connect_to(
1349 client_msgr->get_mytype(),
1350 srv_dispatcher.last_accept);
1351 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1352 cli_dispatcher.got_connect = false;
1353 server_conn->mark_down();
1354 ASSERT_FALSE(server_conn->is_connected());
1355 // client should be standby
1356 usleep(300*1000);
1357 // client should be standby, so we use original connection
1358 {
1359 // Try send message to verify got remote reset callback
1360 m = new MPing();
1361 ASSERT_EQ(conn->send_message(m), 0);
1362 {
1363 Mutex::Locker l(cli_dispatcher.lock);
1364 while (!cli_dispatcher.got_remote_reset)
1365 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1366 cli_dispatcher.got_remote_reset = false;
1367 while (!cli_dispatcher.got_connect)
1368 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1369 cli_dispatcher.got_connect = false;
1370 }
1371 CHECK_AND_WAIT_TRUE(conn->is_connected());
1372 ASSERT_TRUE(conn->is_connected());
1373 m = new MPing();
1374 ASSERT_EQ(conn->send_message(m), 0);
1375 Mutex::Locker l(cli_dispatcher.lock);
1376 while (!cli_dispatcher.got_new)
1377 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1378 cli_dispatcher.got_new = false;
1379 }
1380 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1381 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1382 srv_dispatcher.last_accept);
1383 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1384
1385 server_msgr->shutdown();
1386 client_msgr->shutdown();
1387 server_msgr->wait();
1388 client_msgr->wait();
1389 }
1390
1391 TEST_P(MessengerTest, AuthTest) {
1392 g_ceph_context->_conf.set_val("auth_cluster_required", "cephx");
1393 g_ceph_context->_conf.set_val("auth_service_required", "cephx");
1394 g_ceph_context->_conf.set_val("auth_client_required", "cephx");
1395 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1396 entity_addr_t bind_addr;
1397 if (string(GetParam()) == "simple")
1398 bind_addr.parse("v1:127.0.0.1");
1399 else
1400 bind_addr.parse("v2:127.0.0.1");
1401 server_msgr->bind(bind_addr);
1402 server_msgr->add_dispatcher_head(&srv_dispatcher);
1403 server_msgr->start();
1404
1405 client_msgr->add_dispatcher_head(&cli_dispatcher);
1406 client_msgr->start();
1407
1408 // 1. simple auth round trip
1409 MPing *m = new MPing();
1410 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1411 server_msgr->get_myaddrs());
1412 {
1413 ASSERT_EQ(conn->send_message(m), 0);
1414 Mutex::Locker l(cli_dispatcher.lock);
1415 while (!cli_dispatcher.got_new)
1416 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1417 cli_dispatcher.got_new = false;
1418 }
1419 ASSERT_TRUE(conn->is_connected());
1420 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1421
1422 // 2. mix auth
1423 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
1424 g_ceph_context->_conf.set_val("auth_service_required", "none");
1425 g_ceph_context->_conf.set_val("auth_client_required", "none");
1426 conn->mark_down();
1427 ASSERT_FALSE(conn->is_connected());
1428 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1429 server_msgr->get_myaddrs());
1430 {
1431 MPing *m = new MPing();
1432 ASSERT_EQ(conn->send_message(m), 0);
1433 Mutex::Locker l(cli_dispatcher.lock);
1434 while (!cli_dispatcher.got_new)
1435 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
1436 cli_dispatcher.got_new = false;
1437 }
1438 ASSERT_TRUE(conn->is_connected());
1439 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1440 server_msgr->shutdown();
1441 client_msgr->shutdown();
1442 server_msgr->wait();
1443 client_msgr->wait();
1444 }
1445
1446 TEST_P(MessengerTest, MessageTest) {
1447 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1448 entity_addr_t bind_addr;
1449 if (string(GetParam()) == "simple")
1450 bind_addr.parse("v1:127.0.0.1");
1451 else
1452 bind_addr.parse("v2:127.0.0.1");
1453 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1454 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1455 p = Messenger::Policy::lossless_peer(0);
1456 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1457
1458 server_msgr->bind(bind_addr);
1459 server_msgr->add_dispatcher_head(&srv_dispatcher);
1460 server_msgr->start();
1461 client_msgr->add_dispatcher_head(&cli_dispatcher);
1462 client_msgr->start();
1463
1464
1465 // 1. A very large "front"(as well as "payload")
1466 // Because a external message need to invade Messenger::decode_message,
1467 // here we only use existing message class(MCommand)
1468 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1469 server_msgr->get_myaddrs());
1470 {
1471 uuid_d uuid;
1472 uuid.generate_random();
1473 vector<string> cmds;
1474 string s("abcdefghijklmnopqrstuvwxyz");
1475 for (int i = 0; i < 1024*30; i++)
1476 cmds.push_back(s);
1477 MCommand *m = new MCommand(uuid);
1478 m->cmd = cmds;
1479 conn->send_message(m);
1480 utime_t t;
1481 t += 1000*1000*500;
1482 Mutex::Locker l(cli_dispatcher.lock);
1483 while (!cli_dispatcher.got_new)
1484 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
1485 ASSERT_TRUE(cli_dispatcher.got_new);
1486 cli_dispatcher.got_new = false;
1487 }
1488
1489 // 2. A very large "data"
1490 {
1491 bufferlist bl;
1492 string s("abcdefghijklmnopqrstuvwxyz");
1493 for (int i = 0; i < 1024*30; i++)
1494 bl.append(s);
1495 MPing *m = new MPing();
1496 m->set_data(bl);
1497 conn->send_message(m);
1498 utime_t t;
1499 t += 1000*1000*500;
1500 Mutex::Locker l(cli_dispatcher.lock);
1501 while (!cli_dispatcher.got_new)
1502 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
1503 ASSERT_TRUE(cli_dispatcher.got_new);
1504 cli_dispatcher.got_new = false;
1505 }
1506 server_msgr->shutdown();
1507 client_msgr->shutdown();
1508 server_msgr->wait();
1509 client_msgr->wait();
1510 }
1511
1512
1513 class SyntheticWorkload;
1514
1515 struct Payload {
1516 enum Who : uint8_t {
1517 PING = 0,
1518 PONG = 1,
1519 };
1520 uint8_t who = 0;
1521 uint64_t seq = 0;
1522 bufferlist data;
1523
1524 Payload(Who who, uint64_t seq, const bufferlist& data)
1525 : who(who), seq(seq), data(data)
1526 {}
1527 Payload() = default;
1528 DENC(Payload, v, p) {
1529 DENC_START(1, 1, p);
1530 denc(v.who, p);
1531 denc(v.seq, p);
1532 denc(v.data, p);
1533 DENC_FINISH(p);
1534 }
1535 };
1536 WRITE_CLASS_DENC(Payload)
1537
1538 ostream& operator<<(ostream& out, const Payload &pl)
1539 {
1540 return out << "reply=" << pl.who << " i = " << pl.seq;
1541 }
1542
1543 class SyntheticDispatcher : public Dispatcher {
1544 public:
1545 Mutex lock;
1546 Cond cond;
1547 bool is_server;
1548 bool got_new;
1549 bool got_remote_reset;
1550 bool got_connect;
1551 map<ConnectionRef, list<uint64_t> > conn_sent;
1552 map<uint64_t, bufferlist> sent;
1553 atomic<uint64_t> index;
1554 SyntheticWorkload *workload;
1555
1556 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
1557 Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
1558 got_remote_reset(false), got_connect(false), index(0), workload(wl) {
1559 // don't need authorizers
1560 ms_set_require_authorizer(false);
1561 }
1562 bool ms_can_fast_dispatch_any() const override { return true; }
1563 bool ms_can_fast_dispatch(const Message *m) const override {
1564 switch (m->get_type()) {
1565 case CEPH_MSG_PING:
1566 case MSG_COMMAND:
1567 return true;
1568 default:
1569 return false;
1570 }
1571 }
1572
1573 void ms_handle_fast_connect(Connection *con) override {
1574 Mutex::Locker l(lock);
1575 list<uint64_t> c = conn_sent[con];
1576 for (list<uint64_t>::iterator it = c.begin();
1577 it != c.end(); ++it)
1578 sent.erase(*it);
1579 conn_sent.erase(con);
1580 got_connect = true;
1581 cond.Signal();
1582 }
1583 void ms_handle_fast_accept(Connection *con) override {
1584 Mutex::Locker l(lock);
1585 list<uint64_t> c = conn_sent[con];
1586 for (list<uint64_t>::iterator it = c.begin();
1587 it != c.end(); ++it)
1588 sent.erase(*it);
1589 conn_sent.erase(con);
1590 cond.Signal();
1591 }
1592 bool ms_dispatch(Message *m) override {
1593 ceph_abort();
1594 }
1595 bool ms_handle_reset(Connection *con) override;
1596 void ms_handle_remote_reset(Connection *con) override {
1597 Mutex::Locker l(lock);
1598 list<uint64_t> c = conn_sent[con];
1599 for (list<uint64_t>::iterator it = c.begin();
1600 it != c.end(); ++it)
1601 sent.erase(*it);
1602 conn_sent.erase(con);
1603 got_remote_reset = true;
1604 }
1605 bool ms_handle_refused(Connection *con) override {
1606 return false;
1607 }
1608 void ms_fast_dispatch(Message *m) override {
1609 // MSG_COMMAND is used to disorganize regular message flow
1610 if (m->get_type() == MSG_COMMAND) {
1611 m->put();
1612 return ;
1613 }
1614
1615 Payload pl;
1616 auto p = m->get_data().cbegin();
1617 decode(pl, p);
1618 if (pl.who == Payload::PING) {
1619 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1620 reply_message(m, pl);
1621 m->put();
1622 Mutex::Locker l(lock);
1623 got_new = true;
1624 cond.Signal();
1625 } else {
1626 Mutex::Locker l(lock);
1627 if (sent.count(pl.seq)) {
1628 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1629 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
1630 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
1631 conn_sent[m->get_connection()].pop_front();
1632 sent.erase(pl.seq);
1633 }
1634 m->put();
1635 got_new = true;
1636 cond.Signal();
1637 }
1638 }
1639
1640 int ms_handle_authentication(Connection *con) override {
1641 return 1;
1642 }
1643
1644 void reply_message(const Message *m, Payload& pl) {
1645 pl.who = Payload::PONG;
1646 bufferlist bl;
1647 encode(pl, bl);
1648 MPing *rm = new MPing();
1649 rm->set_data(bl);
1650 m->get_connection()->send_message(rm);
1651 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
1652 }
1653
1654 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
1655 Message *m = new MPing();
1656 Payload pl{Payload::PING, index++, data};
1657 bufferlist bl;
1658 encode(pl, bl);
1659 m->set_data(bl);
1660 if (!con->get_messenger()->get_default_policy().lossy) {
1661 Mutex::Locker l(lock);
1662 sent[pl.seq] = pl.data;
1663 conn_sent[con].push_back(pl.seq);
1664 }
1665 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
1666 ASSERT_EQ(0, con->send_message(m));
1667 }
1668
1669 uint64_t get_pending() {
1670 Mutex::Locker l(lock);
1671 return sent.size();
1672 }
1673
1674 void clear_pending(ConnectionRef con) {
1675 Mutex::Locker l(lock);
1676
1677 for (list<uint64_t>::iterator it = conn_sent[con].begin();
1678 it != conn_sent[con].end(); ++it)
1679 sent.erase(*it);
1680 conn_sent.erase(con);
1681 }
1682
1683 void print() {
1684 for (auto && p : conn_sent) {
1685 if (!p.second.empty()) {
1686 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
1687 }
1688 }
1689 }
1690 };
1691
1692
1693 class SyntheticWorkload {
1694 Mutex lock;
1695 Cond cond;
1696 set<Messenger*> available_servers;
1697 set<Messenger*> available_clients;
1698 Messenger::Policy client_policy;
1699 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
1700 SyntheticDispatcher dispatcher;
1701 gen_type rng;
1702 vector<bufferlist> rand_data;
1703 DummyAuthClientServer dummy_auth;
1704
1705 public:
1706 static const unsigned max_in_flight = 64;
1707 static const unsigned max_connections = 128;
1708 static const unsigned max_message_len = 1024 * 1024 * 4;
1709
1710 SyntheticWorkload(int servers, int clients, string type, int random_num,
1711 Messenger::Policy srv_policy, Messenger::Policy cli_policy)
1712 : lock("SyntheticWorkload::lock"),
1713 client_policy(cli_policy),
1714 dispatcher(false, this),
1715 rng(time(NULL)),
1716 dummy_auth(g_ceph_context) {
1717 dummy_auth.auth_registry.refresh_config();
1718 Messenger *msgr;
1719 int base_port = 16800;
1720 entity_addr_t bind_addr;
1721 char addr[64];
1722 for (int i = 0; i < servers; ++i) {
1723 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
1724 "server", getpid()+i, 0);
1725 snprintf(addr, sizeof(addr), "%s127.0.0.1:%d",
1726 (type == "simple") ? "v1:":"v2:",
1727 base_port+i);
1728 bind_addr.parse(addr);
1729 msgr->bind(bind_addr);
1730 msgr->add_dispatcher_head(&dispatcher);
1731 msgr->set_auth_client(&dummy_auth);
1732 msgr->set_auth_server(&dummy_auth);
1733
1734 ceph_assert(msgr);
1735 msgr->set_default_policy(srv_policy);
1736 available_servers.insert(msgr);
1737 msgr->start();
1738 }
1739
1740 for (int i = 0; i < clients; ++i) {
1741 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
1742 "client", getpid()+i+servers, 0);
1743 if (cli_policy.standby) {
1744 snprintf(addr, sizeof(addr), "%s127.0.0.1:%d",
1745 (type == "simple") ? "v1:":"v2:",
1746 base_port+i+servers);
1747 bind_addr.parse(addr);
1748 msgr->bind(bind_addr);
1749 }
1750 msgr->add_dispatcher_head(&dispatcher);
1751 msgr->set_auth_client(&dummy_auth);
1752 msgr->set_auth_server(&dummy_auth);
1753
1754 ceph_assert(msgr);
1755 msgr->set_default_policy(cli_policy);
1756 available_clients.insert(msgr);
1757 msgr->start();
1758 }
1759
1760 for (int i = 0; i < random_num; i++) {
1761 bufferlist bl;
1762 boost::uniform_int<> u(32, max_message_len);
1763 uint64_t value_len = u(rng);
1764 bufferptr bp(value_len);
1765 bp.zero();
1766 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1767 memcpy(bp.c_str()+j, &i, sizeof(i));
1768 j += 4096;
1769 }
1770
1771 bl.append(bp);
1772 rand_data.push_back(bl);
1773 }
1774 }
1775
1776 ConnectionRef _get_random_connection() {
1777 while (dispatcher.get_pending() > max_in_flight) {
1778 lock.Unlock();
1779 usleep(500);
1780 lock.Lock();
1781 }
1782 ceph_assert(lock.is_locked());
1783 boost::uniform_int<> choose(0, available_connections.size() - 1);
1784 int index = choose(rng);
1785 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1786 for (; index > 0; --index, ++i) ;
1787 return i->first;
1788 }
1789
1790 bool can_create_connection() {
1791 return available_connections.size() < max_connections;
1792 }
1793
1794 void generate_connection() {
1795 Mutex::Locker l(lock);
1796 if (!can_create_connection())
1797 return ;
1798
1799 Messenger *server, *client;
1800 {
1801 boost::uniform_int<> choose(0, available_servers.size() - 1);
1802 int index = choose(rng);
1803 set<Messenger*>::iterator i = available_servers.begin();
1804 for (; index > 0; --index, ++i) ;
1805 server = *i;
1806 }
1807 {
1808 boost::uniform_int<> choose(0, available_clients.size() - 1);
1809 int index = choose(rng);
1810 set<Messenger*>::iterator i = available_clients.begin();
1811 for (; index > 0; --index, ++i) ;
1812 client = *i;
1813 }
1814
1815 pair<Messenger*, Messenger*> p;
1816 {
1817 boost::uniform_int<> choose(0, available_servers.size() - 1);
1818 if (server->get_default_policy().server) {
1819 p = make_pair(client, server);
1820 ConnectionRef conn = client->connect_to(server->get_mytype(),
1821 server->get_myaddrs());
1822 available_connections[conn] = p;
1823 } else {
1824 ConnectionRef conn = client->connect_to(server->get_mytype(),
1825 server->get_myaddrs());
1826 p = make_pair(client, server);
1827 available_connections[conn] = p;
1828 }
1829 }
1830 }
1831
1832 void send_message() {
1833 Mutex::Locker l(lock);
1834 ConnectionRef conn = _get_random_connection();
1835 boost::uniform_int<> true_false(0, 99);
1836 int val = true_false(rng);
1837 if (val >= 95) {
1838 uuid_d uuid;
1839 uuid.generate_random();
1840 MCommand *m = new MCommand(uuid);
1841 vector<string> cmds;
1842 cmds.push_back("command");
1843 m->cmd = cmds;
1844 m->set_priority(200);
1845 conn->send_message(m);
1846 } else {
1847 boost::uniform_int<> u(0, rand_data.size()-1);
1848 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1849 }
1850 }
1851
1852 void drop_connection() {
1853 Mutex::Locker l(lock);
1854 if (available_connections.size() < 10)
1855 return;
1856 ConnectionRef conn = _get_random_connection();
1857 dispatcher.clear_pending(conn);
1858 conn->mark_down();
1859 if (!client_policy.server &&
1860 !client_policy.lossy &&
1861 client_policy.standby) {
1862 // it's a lossless policy, so we need to mark down each side
1863 pair<Messenger*, Messenger*> &p = available_connections[conn];
1864 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1865 ASSERT_EQ(conn->get_messenger(), p.first);
1866 ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
1867 p.first->get_myaddrs());
1868 peer->mark_down();
1869 dispatcher.clear_pending(peer);
1870 available_connections.erase(peer);
1871 }
1872 }
1873 ASSERT_EQ(available_connections.erase(conn), 1U);
1874 }
1875
1876 void print_internal_state(bool detail=false) {
1877 Mutex::Locker l(lock);
1878 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1879 << " inflight messages: " << dispatcher.get_pending() << dendl;
1880 if (detail && !available_connections.empty()) {
1881 dispatcher.print();
1882 }
1883 }
1884
1885 void wait_for_done() {
1886 int64_t tick_us = 1000 * 100; // 100ms
1887 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1888 int i = 0;
1889 while (dispatcher.get_pending()) {
1890 usleep(tick_us);
1891 timeout_us -= tick_us;
1892 if (i++ % 50 == 0)
1893 print_internal_state(true);
1894 if (timeout_us < 0)
1895 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
1896 }
1897 for (set<Messenger*>::iterator it = available_servers.begin();
1898 it != available_servers.end(); ++it) {
1899 (*it)->shutdown();
1900 (*it)->wait();
1901 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1902 delete (*it);
1903 }
1904 available_servers.clear();
1905
1906 for (set<Messenger*>::iterator it = available_clients.begin();
1907 it != available_clients.end(); ++it) {
1908 (*it)->shutdown();
1909 (*it)->wait();
1910 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1911 delete (*it);
1912 }
1913 available_clients.clear();
1914 }
1915
1916 void handle_reset(Connection *con) {
1917 Mutex::Locker l(lock);
1918 available_connections.erase(con);
1919 dispatcher.clear_pending(con);
1920 }
1921 };
1922
1923 bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1924 workload->handle_reset(con);
1925 return true;
1926 }
1927
1928 TEST_P(MessengerTest, SyntheticStressTest) {
1929 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1930 Messenger::Policy::stateful_server(0),
1931 Messenger::Policy::lossless_client(0));
1932 for (int i = 0; i < 100; ++i) {
1933 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1934 test_msg.generate_connection();
1935 }
1936 gen_type rng(time(NULL));
1937 for (int i = 0; i < 5000; ++i) {
1938 if (!(i % 10)) {
1939 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1940 test_msg.print_internal_state();
1941 }
1942 boost::uniform_int<> true_false(0, 99);
1943 int val = true_false(rng);
1944 if (val > 90) {
1945 test_msg.generate_connection();
1946 } else if (val > 80) {
1947 test_msg.drop_connection();
1948 } else if (val > 10) {
1949 test_msg.send_message();
1950 } else {
1951 usleep(rand() % 1000 + 500);
1952 }
1953 }
1954 test_msg.wait_for_done();
1955 }
1956
1957 TEST_P(MessengerTest, SyntheticStressTest1) {
1958 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1959 Messenger::Policy::lossless_peer_reuse(0),
1960 Messenger::Policy::lossless_peer_reuse(0));
1961 for (int i = 0; i < 10; ++i) {
1962 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1963 test_msg.generate_connection();
1964 }
1965 gen_type rng(time(NULL));
1966 for (int i = 0; i < 10000; ++i) {
1967 if (!(i % 10)) {
1968 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1969 test_msg.print_internal_state();
1970 }
1971 boost::uniform_int<> true_false(0, 99);
1972 int val = true_false(rng);
1973 if (val > 80) {
1974 test_msg.generate_connection();
1975 } else if (val > 60) {
1976 test_msg.drop_connection();
1977 } else if (val > 10) {
1978 test_msg.send_message();
1979 } else {
1980 usleep(rand() % 1000 + 500);
1981 }
1982 }
1983 test_msg.wait_for_done();
1984 }
1985
1986
1987 TEST_P(MessengerTest, SyntheticInjectTest) {
1988 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
1989 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
1990 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
1991 g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216");
1992 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1993 Messenger::Policy::stateful_server(0),
1994 Messenger::Policy::lossless_client(0));
1995 for (int i = 0; i < 100; ++i) {
1996 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1997 test_msg.generate_connection();
1998 }
1999 gen_type rng(time(NULL));
2000 for (int i = 0; i < 1000; ++i) {
2001 if (!(i % 10)) {
2002 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2003 test_msg.print_internal_state();
2004 }
2005 boost::uniform_int<> true_false(0, 99);
2006 int val = true_false(rng);
2007 if (val > 90) {
2008 test_msg.generate_connection();
2009 } else if (val > 80) {
2010 test_msg.drop_connection();
2011 } else if (val > 10) {
2012 test_msg.send_message();
2013 } else {
2014 usleep(rand() % 500 + 100);
2015 }
2016 }
2017 test_msg.wait_for_done();
2018 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2019 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2020 g_ceph_context->_conf.set_val(
2021 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
2022 }
2023
2024 TEST_P(MessengerTest, SyntheticInjectTest2) {
2025 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2026 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2027 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2028 Messenger::Policy::lossless_peer_reuse(0),
2029 Messenger::Policy::lossless_peer_reuse(0));
2030 for (int i = 0; i < 100; ++i) {
2031 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2032 test_msg.generate_connection();
2033 }
2034 gen_type rng(time(NULL));
2035 for (int i = 0; i < 1000; ++i) {
2036 if (!(i % 10)) {
2037 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2038 test_msg.print_internal_state();
2039 }
2040 boost::uniform_int<> true_false(0, 99);
2041 int val = true_false(rng);
2042 if (val > 90) {
2043 test_msg.generate_connection();
2044 } else if (val > 80) {
2045 test_msg.drop_connection();
2046 } else if (val > 10) {
2047 test_msg.send_message();
2048 } else {
2049 usleep(rand() % 500 + 100);
2050 }
2051 }
2052 test_msg.wait_for_done();
2053 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2054 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2055 }
2056
2057 TEST_P(MessengerTest, SyntheticInjectTest3) {
2058 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600");
2059 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2060 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2061 Messenger::Policy::stateless_server(0),
2062 Messenger::Policy::lossy_client(0));
2063 for (int i = 0; i < 100; ++i) {
2064 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2065 test_msg.generate_connection();
2066 }
2067 gen_type rng(time(NULL));
2068 for (int i = 0; i < 1000; ++i) {
2069 if (!(i % 10)) {
2070 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2071 test_msg.print_internal_state();
2072 }
2073 boost::uniform_int<> true_false(0, 99);
2074 int val = true_false(rng);
2075 if (val > 90) {
2076 test_msg.generate_connection();
2077 } else if (val > 80) {
2078 test_msg.drop_connection();
2079 } else if (val > 10) {
2080 test_msg.send_message();
2081 } else {
2082 usleep(rand() % 500 + 100);
2083 }
2084 }
2085 test_msg.wait_for_done();
2086 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2087 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2088 }
2089
2090
2091 TEST_P(MessengerTest, SyntheticInjectTest4) {
2092 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2093 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2094 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1");
2095 g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd");
2096 g_ceph_context->_conf.set_val("ms_inject_delay_max", "5");
2097 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2098 Messenger::Policy::lossless_peer(0),
2099 Messenger::Policy::lossless_peer(0));
2100 for (int i = 0; i < 100; ++i) {
2101 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2102 test_msg.generate_connection();
2103 }
2104 gen_type rng(time(NULL));
2105 for (int i = 0; i < 1000; ++i) {
2106 if (!(i % 10)) {
2107 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2108 test_msg.print_internal_state();
2109 }
2110 boost::uniform_int<> true_false(0, 99);
2111 int val = true_false(rng);
2112 if (val > 95) {
2113 test_msg.generate_connection();
2114 } else if (val > 80) {
2115 // test_msg.drop_connection();
2116 } else if (val > 10) {
2117 test_msg.send_message();
2118 } else {
2119 usleep(rand() % 500 + 100);
2120 }
2121 }
2122 test_msg.wait_for_done();
2123 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2124 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2125 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0");
2126 g_ceph_context->_conf.set_val("ms_inject_delay_type", "");
2127 g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
2128 }
2129
2130
2131 class MarkdownDispatcher : public Dispatcher {
2132 Mutex lock;
2133 set<ConnectionRef> conns;
2134 bool last_mark;
2135 public:
2136 std::atomic<uint64_t> count = { 0 };
2137 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
2138 last_mark(false) {
2139 // don't need authorizers
2140 ms_set_require_authorizer(false);
2141 }
2142 bool ms_can_fast_dispatch_any() const override { return false; }
2143 bool ms_can_fast_dispatch(const Message *m) const override {
2144 switch (m->get_type()) {
2145 case CEPH_MSG_PING:
2146 return true;
2147 default:
2148 return false;
2149 }
2150 }
2151
2152 void ms_handle_fast_connect(Connection *con) override {
2153 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2154 Mutex::Locker l(lock);
2155 conns.insert(con);
2156 }
2157 void ms_handle_fast_accept(Connection *con) override {
2158 Mutex::Locker l(lock);
2159 conns.insert(con);
2160 }
2161 bool ms_dispatch(Message *m) override {
2162 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
2163 Mutex::Locker l(lock);
2164 count++;
2165 conns.insert(m->get_connection());
2166 if (conns.size() < 2 && !last_mark) {
2167 m->put();
2168 return true;
2169 }
2170
2171 last_mark = true;
2172 usleep(rand() % 500);
2173 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
2174 if ((*it) != m->get_connection().get()) {
2175 (*it)->mark_down();
2176 conns.erase(it);
2177 break;
2178 }
2179 }
2180 if (conns.empty())
2181 last_mark = false;
2182 m->put();
2183 return true;
2184 }
2185 bool ms_handle_reset(Connection *con) override {
2186 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2187 Mutex::Locker l(lock);
2188 conns.erase(con);
2189 usleep(rand() % 500);
2190 return true;
2191 }
2192 void ms_handle_remote_reset(Connection *con) override {
2193 Mutex::Locker l(lock);
2194 conns.erase(con);
2195 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2196 }
2197 bool ms_handle_refused(Connection *con) override {
2198 return false;
2199 }
2200 void ms_fast_dispatch(Message *m) override {
2201 ceph_abort();
2202 }
2203 int ms_handle_authentication(Connection *con) override {
2204 return 1;
2205 }
2206 };
2207
2208
2209 // Markdown with external lock
2210 TEST_P(MessengerTest, MarkdownTest) {
2211 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
2212 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
2213 DummyAuthClientServer dummy_auth(g_ceph_context);
2214 dummy_auth.auth_registry.refresh_config();
2215 entity_addr_t bind_addr;
2216 if (string(GetParam()) == "simple")
2217 bind_addr.parse("v1:127.0.0.1:16800");
2218 else
2219 bind_addr.parse("v2:127.0.0.1:16800");
2220 server_msgr->bind(bind_addr);
2221 server_msgr->add_dispatcher_head(&srv_dispatcher);
2222 server_msgr->set_auth_client(&dummy_auth);
2223 server_msgr->set_auth_server(&dummy_auth);
2224 server_msgr->start();
2225 if (string(GetParam()) == "simple")
2226 bind_addr.parse("v1:127.0.0.1:16801");
2227 else
2228 bind_addr.parse("v2:127.0.0.1:16801");
2229 server_msgr2->bind(bind_addr);
2230 server_msgr2->add_dispatcher_head(&srv_dispatcher);
2231 server_msgr2->set_auth_client(&dummy_auth);
2232 server_msgr2->set_auth_server(&dummy_auth);
2233 server_msgr2->start();
2234
2235 client_msgr->add_dispatcher_head(&cli_dispatcher);
2236 client_msgr->set_auth_client(&dummy_auth);
2237 client_msgr->set_auth_server(&dummy_auth);
2238 client_msgr->start();
2239
2240 int i = 1000;
2241 uint64_t last = 0;
2242 bool equal = false;
2243 uint64_t equal_count = 0;
2244 while (i--) {
2245 ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
2246 server_msgr->get_myaddrs());
2247 ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
2248 server_msgr2->get_myaddrs());
2249 MPing *m = new MPing();
2250 ASSERT_EQ(conn1->send_message(m), 0);
2251 m = new MPing();
2252 ASSERT_EQ(conn2->send_message(m), 0);
2253 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
2254 if (srv_dispatcher.count == last) {
2255 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
2256 equal = true;
2257 equal_count++;
2258 } else {
2259 equal = false;
2260 equal_count = 0;
2261 }
2262 last = srv_dispatcher.count;
2263 if (equal_count)
2264 usleep(1000*500);
2265 ASSERT_FALSE(equal && equal_count > 3);
2266 }
2267 server_msgr->shutdown();
2268 client_msgr->shutdown();
2269 server_msgr2->shutdown();
2270 server_msgr->wait();
2271 client_msgr->wait();
2272 server_msgr2->wait();
2273 delete server_msgr2;
2274 }
2275
2276 INSTANTIATE_TEST_CASE_P(
2277 Messenger,
2278 MessengerTest,
2279 ::testing::Values(
2280 "async+posix",
2281 "simple"
2282 )
2283 );
2284
2285 #else
2286
2287 // Google Test may not support value-parameterized tests with some
2288 // compilers. If we use conditional compilation to compile out all
2289 // code referring to the gtest_main library, MSVC linker will not link
2290 // that library at all and consequently complain about missing entry
2291 // point defined in that library (fatal error LNK1561: entry point
2292 // must be defined). This dummy test keeps gtest_main linked in.
2293 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
2294
2295 #endif
2296
2297
2298 int main(int argc, char **argv) {
2299 vector<const char*> args;
2300 argv_to_vec(argc, (const char **)argv, args);
2301
2302 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
2303 CODE_ENVIRONMENT_UTILITY,
2304 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
2305 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
2306 g_ceph_context->_conf.set_val("auth_service_required", "none");
2307 g_ceph_context->_conf.set_val("auth_client_required", "none");
2308 g_ceph_context->_conf.set_val("keyring", "/dev/null");
2309 g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2310 g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true");
2311 g_ceph_context->_conf.set_val("ms_die_on_old_message", "true");
2312 g_ceph_context->_conf.set_val("ms_max_backoff", "1");
2313 common_init_finish(g_ceph_context);
2314
2315 ::testing::InitGoogleTest(&argc, argv);
2316 return RUN_ALL_TESTS();
2317 }
2318
2319 /*
2320 * Local Variables:
2321 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
2322 * End:
2323 */