]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_msgr.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <atomic>
18 #include <iostream>
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <time.h>
22 #include <set>
23 #include <list>
24 #include "common/ceph_mutex.h"
25 #include "common/ceph_argparse.h"
26 #include "global/global_init.h"
27 #include "msg/Dispatcher.h"
28 #include "msg/msg_types.h"
29 #include "msg/Message.h"
30 #include "msg/Messenger.h"
31 #include "msg/Connection.h"
32 #include "messages/MPing.h"
33 #include "messages/MCommand.h"
34
35 #include <boost/random/mersenne_twister.hpp>
36 #include <boost/random/uniform_int.hpp>
37 #include <boost/random/binomial_distribution.hpp>
38 #include <gtest/gtest.h>
39
40 typedef boost::mt11213b gen_type;
41
42 #include "common/dout.h"
43 #include "include/ceph_assert.h"
44
45 #include "auth/DummyAuth.h"
46
47 #define dout_subsys ceph_subsys_ms
48 #undef dout_prefix
49 #define dout_prefix *_dout << " ceph_test_msgr "
50
51
52 #define CHECK_AND_WAIT_TRUE(expr) do { \
53 int n = 1000; \
54 while (--n) { \
55 if (expr) \
56 break; \
57 usleep(1000); \
58 } \
59 } while(0);
60
61 class MessengerTest : public ::testing::TestWithParam<const char*> {
62 public:
63 DummyAuthClientServer dummy_auth;
64 Messenger *server_msgr;
65 Messenger *client_msgr;
66
67 MessengerTest() : dummy_auth(g_ceph_context),
68 server_msgr(NULL), client_msgr(NULL) {
69 dummy_auth.auth_registry.refresh_config();
70 }
71 void SetUp() override {
72 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
73 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
74 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
75 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
76 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
77 server_msgr->set_auth_client(&dummy_auth);
78 server_msgr->set_auth_server(&dummy_auth);
79 client_msgr->set_auth_client(&dummy_auth);
80 client_msgr->set_auth_server(&dummy_auth);
81 server_msgr->set_require_authorizer(false);
82 }
83 void TearDown() override {
84 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
85 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
86 delete server_msgr;
87 delete client_msgr;
88 }
89
90 };
91
92
93 class FakeDispatcher : public Dispatcher {
94 public:
95 struct Session : public RefCountedObject {
96 atomic<uint64_t> count;
97 ConnectionRef con;
98
99 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
100 }
101 uint64_t get_count() { return count; }
102 };
103
104 ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock");
105 ceph::condition_variable cond;
106 bool is_server;
107 bool got_new;
108 bool got_remote_reset;
109 bool got_connect;
110 bool loopback;
111 entity_addrvec_t last_accept;
112 ConnectionRef *last_accept_con_ptr = nullptr;
113
114 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
115 is_server(s), got_new(false), got_remote_reset(false),
116 got_connect(false), loopback(false) {
117 }
118 bool ms_can_fast_dispatch_any() const override { return true; }
119 bool ms_can_fast_dispatch(const Message *m) const override {
120 switch (m->get_type()) {
121 case CEPH_MSG_PING:
122 return true;
123 default:
124 return false;
125 }
126 }
127
128 void ms_handle_fast_connect(Connection *con) override {
129 std::scoped_lock l{lock};
130 lderr(g_ceph_context) << __func__ << " " << con << dendl;
131 auto s = con->get_priv();
132 if (!s) {
133 auto session = new Session(con);
134 con->set_priv(RefCountedPtr{session, false});
135 lderr(g_ceph_context) << __func__ << " con: " << con
136 << " count: " << session->count << dendl;
137 }
138 got_connect = true;
139 cond.notify_all();
140 }
141 void ms_handle_fast_accept(Connection *con) override {
142 last_accept = con->get_peer_addrs();
143 if (last_accept_con_ptr) {
144 *last_accept_con_ptr = con;
145 }
146 if (!con->get_priv()) {
147 con->set_priv(RefCountedPtr{new Session(con), false});
148 }
149 }
150 bool ms_dispatch(Message *m) override {
151 auto priv = m->get_connection()->get_priv();
152 auto s = static_cast<Session*>(priv.get());
153 if (!s) {
154 s = new Session(m->get_connection());
155 priv.reset(s, false);
156 m->get_connection()->set_priv(priv);
157 }
158 s->count++;
159 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
160 if (is_server) {
161 reply_message(m);
162 }
163 std::lock_guard l{lock};
164 got_new = true;
165 cond.notify_all();
166 m->put();
167 return true;
168 }
169 bool ms_handle_reset(Connection *con) override {
170 std::lock_guard l{lock};
171 lderr(g_ceph_context) << __func__ << " " << con << dendl;
172 auto priv = con->get_priv();
173 if (auto s = static_cast<Session*>(priv.get()); s) {
174 s->con.reset(); // break con <-> session ref cycle
175 con->set_priv(nullptr); // break ref <-> session cycle, if any
176 }
177 return true;
178 }
179 void ms_handle_remote_reset(Connection *con) override {
180 std::lock_guard l{lock};
181 lderr(g_ceph_context) << __func__ << " " << con << dendl;
182 auto priv = con->get_priv();
183 if (auto s = static_cast<Session*>(priv.get()); s) {
184 s->con.reset(); // break con <-> session ref cycle
185 con->set_priv(nullptr); // break ref <-> session cycle, if any
186 }
187 got_remote_reset = true;
188 cond.notify_all();
189 }
190 bool ms_handle_refused(Connection *con) override {
191 return false;
192 }
193 void ms_fast_dispatch(Message *m) override {
194 auto priv = m->get_connection()->get_priv();
195 auto s = static_cast<Session*>(priv.get());
196 if (!s) {
197 s = new Session(m->get_connection());
198 priv.reset(s, false);
199 m->get_connection()->set_priv(priv);
200 }
201 s->count++;
202 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
203 if (is_server) {
204 if (loopback)
205 ceph_assert(m->get_source().is_osd());
206 else
207 reply_message(m);
208 } else if (loopback) {
209 ceph_assert(m->get_source().is_client());
210 }
211 m->put();
212 std::lock_guard l{lock};
213 got_new = true;
214 cond.notify_all();
215 }
216
217 int ms_handle_authentication(Connection *con) override {
218 return 1;
219 }
220
221 void reply_message(Message *m) {
222 MPing *rm = new MPing();
223 m->get_connection()->send_message(rm);
224 }
225 };
226
227 typedef FakeDispatcher::Session Session;
228
229 struct TestInterceptor : public Interceptor {
230
231 bool step_waiting = false;
232 bool waiting = true;
233 std::map<Connection *, uint32_t> current_step;
234 std::map<Connection *, std::list<uint32_t>> step_history;
235 std::map<uint32_t, std::optional<ACTION>> decisions;
236 std::set<uint32_t> breakpoints;
237
238 uint32_t count_step(Connection *conn, uint32_t step) {
239 uint32_t count = 0;
240 for (auto s : step_history[conn]) {
241 if (s == step) {
242 count++;
243 }
244 }
245 return count;
246 }
247
248 void breakpoint(uint32_t step) {
249 breakpoints.insert(step);
250 }
251
252 void remove_bp(uint32_t step) {
253 breakpoints.erase(step);
254 }
255
256 Connection *wait(uint32_t step, Connection *conn=nullptr) {
257 std::unique_lock<std::mutex> l(lock);
258 while(true) {
259 if (conn) {
260 auto it = current_step.find(conn);
261 if (it != current_step.end()) {
262 if (it->second == step) {
263 break;
264 }
265 }
266 } else {
267 for (auto it : current_step) {
268 if (it.second == step) {
269 conn = it.first;
270 break;
271 }
272 }
273 if (conn) {
274 break;
275 }
276 }
277 step_waiting = true;
278 cond_var.wait(l);
279 }
280 step_waiting = false;
281 return conn;
282 }
283
284 ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
285 if (decisions[step]) {
286 return *(decisions[step]);
287 }
288 waiting = true;
289 cond_var.wait(l, [this] { return !waiting; });
290 return *(decisions[step]);
291 }
292
293 void proceed(uint32_t step, ACTION decision) {
294 std::unique_lock<std::mutex> l(lock);
295 decisions[step] = decision;
296 if (waiting) {
297 waiting = false;
298 cond_var.notify_one();
299 }
300 }
301
302 ACTION intercept(Connection *conn, uint32_t step) override {
303 lderr(g_ceph_context) << __func__ << " conn(" << conn
304 << ") intercept called on step=" << step << dendl;
305
306 {
307 std::unique_lock<std::mutex> l(lock);
308 step_history[conn].push_back(step);
309 current_step[conn] = step;
310 if (step_waiting) {
311 cond_var.notify_one();
312 }
313 }
314
315 std::unique_lock<std::mutex> l(lock);
316 ACTION decision = ACTION::CONTINUE;
317 if (breakpoints.find(step) != breakpoints.end()) {
318 lderr(g_ceph_context) << __func__ << " conn(" << conn
319 << ") pausing on step=" << step << dendl;
320 decision = wait_for_decision(step, l);
321 } else {
322 if (decisions[step]) {
323 decision = *(decisions[step]);
324 }
325 }
326 lderr(g_ceph_context) << __func__ << " conn(" << conn
327 << ") resuming step=" << step << " with decision="
328 << decision << dendl;
329 decisions[step].reset();
330 return decision;
331 }
332
333 };
334
335 /**
336 * Scenario: A connects to B, and B connects to A at the same time.
337 */
338 TEST_P(MessengerTest, ConnectionRaceTest) {
339 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
340
341 TestInterceptor *cli_interceptor = new TestInterceptor();
342 TestInterceptor *srv_interceptor = new TestInterceptor();
343
344 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
345 server_msgr->interceptor = srv_interceptor;
346
347 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
348 client_msgr->interceptor = cli_interceptor;
349
350 entity_addr_t bind_addr;
351 bind_addr.parse("v2:127.0.0.1:3300");
352 server_msgr->bind(bind_addr);
353 server_msgr->add_dispatcher_head(&srv_dispatcher);
354 server_msgr->start();
355
356 bind_addr.parse("v2:127.0.0.1:3301");
357 client_msgr->bind(bind_addr);
358 client_msgr->add_dispatcher_head(&cli_dispatcher);
359 client_msgr->start();
360
361 // pause before sending client_ident message
362 cli_interceptor->breakpoint(11);
363 // pause before sending client_ident message
364 srv_interceptor->breakpoint(11);
365
366 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
367 server_msgr->get_myaddrs());
368 MPing *m1 = new MPing();
369 ASSERT_EQ(c2s->send_message(m1), 0);
370
371 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
372 client_msgr->get_myaddrs());
373 MPing *m2 = new MPing();
374 ASSERT_EQ(s2c->send_message(m2), 0);
375
376 cli_interceptor->wait(11, c2s.get());
377 srv_interceptor->wait(11, s2c.get());
378
379 // at this point both connections (A->B, B->A) are paused just before sending
380 // the client_ident message.
381
382 cli_interceptor->remove_bp(11);
383 srv_interceptor->remove_bp(11);
384
385 cli_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
386 srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
387
388 {
389 std::unique_lock l{cli_dispatcher.lock};
390 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
391 cli_dispatcher.got_new = false;
392 }
393
394 {
395 std::unique_lock l{srv_dispatcher.lock};
396 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
397 srv_dispatcher.got_new = false;
398 }
399
400 ASSERT_TRUE(s2c->is_connected());
401 ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
402 ASSERT_TRUE(s2c->peer_is_client());
403
404 ASSERT_TRUE(c2s->is_connected());
405 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
406 ASSERT_TRUE(c2s->peer_is_osd());
407
408 client_msgr->shutdown();
409 client_msgr->wait();
410 server_msgr->shutdown();
411 server_msgr->wait();
412
413 delete cli_interceptor;
414 delete srv_interceptor;
415 }
416
417 /**
418 * Scenario:
419 * - A connects to B
420 * - A sends client_ident to B
421 * - B fails before sending server_ident to A
422 * - A reconnects
423 */
424 TEST_P(MessengerTest, MissingServerIdenTest) {
425 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
426
427 TestInterceptor *cli_interceptor = new TestInterceptor();
428 TestInterceptor *srv_interceptor = new TestInterceptor();
429
430 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
431 server_msgr->interceptor = srv_interceptor;
432
433 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
434 client_msgr->interceptor = cli_interceptor;
435
436 entity_addr_t bind_addr;
437 bind_addr.parse("v2:127.0.0.1:3300");
438 server_msgr->bind(bind_addr);
439 server_msgr->add_dispatcher_head(&srv_dispatcher);
440 server_msgr->start();
441
442 bind_addr.parse("v2:127.0.0.1:3301");
443 client_msgr->bind(bind_addr);
444 client_msgr->add_dispatcher_head(&cli_dispatcher);
445 client_msgr->start();
446
447 // pause before sending client_ident message
448 srv_interceptor->breakpoint(12);
449
450 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
451 server_msgr->get_myaddrs());
452 MPing *m1 = new MPing();
453 ASSERT_EQ(c2s->send_message(m1), 0);
454
455 Connection *c2s_accepter = srv_interceptor->wait(12);
456 srv_interceptor->remove_bp(12);
457
458 // We inject a message from this side of the connection to force it to be
459 // in standby when we inject the failure below
460 MPing *m2 = new MPing();
461 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
462
463 srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
464
465 {
466 std::unique_lock l{srv_dispatcher.lock};
467 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
468 srv_dispatcher.got_new = false;
469 }
470
471 {
472 std::unique_lock l{cli_dispatcher.lock};
473 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
474 cli_dispatcher.got_new = false;
475 }
476
477 ASSERT_TRUE(c2s->is_connected());
478 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
479 ASSERT_TRUE(c2s->peer_is_osd());
480
481 ASSERT_TRUE(c2s_accepter->is_connected());
482 ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
483 ASSERT_TRUE(c2s_accepter->peer_is_client());
484
485 client_msgr->shutdown();
486 client_msgr->wait();
487 server_msgr->shutdown();
488 server_msgr->wait();
489
490 delete cli_interceptor;
491 delete srv_interceptor;
492 }
493
494 /**
495 * Scenario:
496 * - A connects to B
497 * - A sends client_ident to B
498 * - B fails before sending server_ident to A
499 * - A goes to standby
500 * - B reconnects to A
501 */
502 TEST_P(MessengerTest, MissingServerIdenTest2) {
503 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
504
505 TestInterceptor *cli_interceptor = new TestInterceptor();
506 TestInterceptor *srv_interceptor = new TestInterceptor();
507
508 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
509 server_msgr->interceptor = srv_interceptor;
510
511 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
512 client_msgr->interceptor = cli_interceptor;
513
514 entity_addr_t bind_addr;
515 bind_addr.parse("v2:127.0.0.1:3300");
516 server_msgr->bind(bind_addr);
517 server_msgr->add_dispatcher_head(&srv_dispatcher);
518 server_msgr->start();
519
520 bind_addr.parse("v2:127.0.0.1:3301");
521 client_msgr->bind(bind_addr);
522 client_msgr->add_dispatcher_head(&cli_dispatcher);
523 client_msgr->start();
524
525 // pause before sending client_ident message
526 srv_interceptor->breakpoint(12);
527
528 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
529 server_msgr->get_myaddrs());
530
531 Connection *c2s_accepter = srv_interceptor->wait(12);
532 srv_interceptor->remove_bp(12);
533
534 // We inject a message from this side of the connection to force it to be
535 // in standby when we inject the failure below
536 MPing *m2 = new MPing();
537 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
538
539 srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
540
541 {
542 std::unique_lock l{cli_dispatcher.lock};
543 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
544 cli_dispatcher.got_new = false;
545 }
546
547 ASSERT_TRUE(c2s->is_connected());
548 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
549 ASSERT_TRUE(c2s->peer_is_osd());
550
551 ASSERT_TRUE(c2s_accepter->is_connected());
552 ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
553 ASSERT_TRUE(c2s_accepter->peer_is_client());
554
555 client_msgr->shutdown();
556 client_msgr->wait();
557 server_msgr->shutdown();
558 server_msgr->wait();
559
560 delete cli_interceptor;
561 delete srv_interceptor;
562 }
563
564 /**
565 * Scenario:
566 * - A connects to B
567 * - A and B exchange messages
568 * - A fails
569 * - B goes into standby
570 * - A reconnects
571 */
572 TEST_P(MessengerTest, ReconnectTest) {
573 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
574
575 TestInterceptor *cli_interceptor = new TestInterceptor();
576 TestInterceptor *srv_interceptor = new TestInterceptor();
577
578 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
579 server_msgr->interceptor = srv_interceptor;
580
581 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
582 client_msgr->interceptor = cli_interceptor;
583
584 entity_addr_t bind_addr;
585 bind_addr.parse("v2:127.0.0.1:3300");
586 server_msgr->bind(bind_addr);
587 server_msgr->add_dispatcher_head(&srv_dispatcher);
588 server_msgr->start();
589
590 bind_addr.parse("v2:127.0.0.1:3301");
591 client_msgr->bind(bind_addr);
592 client_msgr->add_dispatcher_head(&cli_dispatcher);
593 client_msgr->start();
594
595 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
596 server_msgr->get_myaddrs());
597
598 MPing *m1 = new MPing();
599 ASSERT_EQ(c2s->send_message(m1), 0);
600
601 {
602 std::unique_lock l{cli_dispatcher.lock};
603 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
604 cli_dispatcher.got_new = false;
605 }
606
607 ASSERT_TRUE(c2s->is_connected());
608 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
609 ASSERT_TRUE(c2s->peer_is_osd());
610
611 cli_interceptor->breakpoint(16);
612
613 MPing *m2 = new MPing();
614 ASSERT_EQ(c2s->send_message(m2), 0);
615
616 cli_interceptor->wait(16, c2s.get());
617 cli_interceptor->remove_bp(16);
618
619 // at this point client and server are connected together
620
621 srv_interceptor->breakpoint(15);
622
623 // failing client
624 cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
625
626 MPing *m3 = new MPing();
627 ASSERT_EQ(c2s->send_message(m3), 0);
628
629 Connection *c2s_accepter = srv_interceptor->wait(15);
630 // the srv end of theconnection is now paused at ready
631 // this means that the reconnect was successful
632 srv_interceptor->remove_bp(15);
633
634 ASSERT_TRUE(c2s_accepter->peer_is_client());
635 // c2s_accepter sent 0 reconnect messages
636 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 13), 0u);
637 // c2s_accepter sent 1 reconnect_ok messages
638 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 1u);
639 // c2s sent 1 reconnect messages
640 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
641 // c2s sent 0 reconnect_ok messages
642 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 0u);
643
644 srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
645
646 {
647 std::unique_lock l{cli_dispatcher.lock};
648 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
649 cli_dispatcher.got_new = false;
650 }
651
652 client_msgr->shutdown();
653 client_msgr->wait();
654 server_msgr->shutdown();
655 server_msgr->wait();
656
657 delete cli_interceptor;
658 delete srv_interceptor;
659 }
660
661 /**
662 * Scenario:
663 * - A connects to B
664 * - A and B exchange messages
665 * - A fails
666 * - A reconnects // B reconnects
667 */
668 TEST_P(MessengerTest, ReconnectRaceTest) {
669 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
670
671 TestInterceptor *cli_interceptor = new TestInterceptor();
672 TestInterceptor *srv_interceptor = new TestInterceptor();
673
674 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
675 server_msgr->interceptor = srv_interceptor;
676
677 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
678 client_msgr->interceptor = cli_interceptor;
679
680 entity_addr_t bind_addr;
681 bind_addr.parse("v2:127.0.0.1:3300");
682 server_msgr->bind(bind_addr);
683 server_msgr->add_dispatcher_head(&srv_dispatcher);
684 server_msgr->start();
685
686 bind_addr.parse("v2:127.0.0.1:3301");
687 client_msgr->bind(bind_addr);
688 client_msgr->add_dispatcher_head(&cli_dispatcher);
689 client_msgr->start();
690
691 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
692 server_msgr->get_myaddrs());
693
694 MPing *m1 = new MPing();
695 ASSERT_EQ(c2s->send_message(m1), 0);
696
697 {
698 std::unique_lock l{cli_dispatcher.lock};
699 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
700 cli_dispatcher.got_new = false;
701 }
702
703 ASSERT_TRUE(c2s->is_connected());
704 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
705 ASSERT_TRUE(c2s->peer_is_osd());
706
707 cli_interceptor->breakpoint(16);
708
709 MPing *m2 = new MPing();
710 ASSERT_EQ(c2s->send_message(m2), 0);
711
712 cli_interceptor->wait(16, c2s.get());
713 cli_interceptor->remove_bp(16);
714
715 // at this point client and server are connected together
716
717 // force both client and server to race on reconnect
718 cli_interceptor->breakpoint(13);
719 srv_interceptor->breakpoint(13);
720
721 // failing client
722 // this will cause both client and server to reconnect at the same time
723 cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
724
725 MPing *m3 = new MPing();
726 ASSERT_EQ(c2s->send_message(m3), 0);
727
728 cli_interceptor->wait(13, c2s.get());
729 srv_interceptor->wait(13);
730
731 cli_interceptor->remove_bp(13);
732 srv_interceptor->remove_bp(13);
733
734 // pause on "ready"
735 srv_interceptor->breakpoint(15);
736
737 cli_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
738 srv_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
739
740 Connection *c2s_accepter = srv_interceptor->wait(15);
741
742 // the server has reconnected and is "ready"
743 srv_interceptor->remove_bp(15);
744
745 ASSERT_TRUE(c2s_accepter->peer_is_client());
746 ASSERT_TRUE(c2s->peer_is_osd());
747
748 // the server should win the reconnect race
749
750 // c2s_accepter sent 1 or 2 reconnect messages
751 ASSERT_LT(srv_interceptor->count_step(c2s_accepter, 13), 3u);
752 ASSERT_GT(srv_interceptor->count_step(c2s_accepter, 13), 0u);
753 // c2s_accepter sent 0 reconnect_ok messages
754 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 0u);
755 // c2s sent 1 reconnect messages
756 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
757 // c2s sent 1 reconnect_ok messages
758 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 1u);
759
760 if (srv_interceptor->count_step(c2s_accepter, 13) == 2) {
761 // if the server send the reconnect message two times then
762 // the client must have sent a session retry message to the server
763 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 1u);
764 } else {
765 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 0u);
766 }
767
768 srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
769
770 {
771 std::unique_lock l{cli_dispatcher.lock};
772 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
773 cli_dispatcher.got_new = false;
774 }
775
776 client_msgr->shutdown();
777 client_msgr->wait();
778 server_msgr->shutdown();
779 server_msgr->wait();
780
781 delete cli_interceptor;
782 delete srv_interceptor;
783 }
784
785 TEST_P(MessengerTest, SimpleTest) {
786 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
787 entity_addr_t bind_addr;
788 bind_addr.parse("v2:127.0.0.1");
789 server_msgr->bind(bind_addr);
790 server_msgr->add_dispatcher_head(&srv_dispatcher);
791 server_msgr->start();
792
793 client_msgr->add_dispatcher_head(&cli_dispatcher);
794 client_msgr->start();
795
796 // 1. simple round trip
797 MPing *m = new MPing();
798 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
799 server_msgr->get_myaddrs());
800 {
801 ASSERT_EQ(conn->send_message(m), 0);
802 std::unique_lock l{cli_dispatcher.lock};
803 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
804 cli_dispatcher.got_new = false;
805 }
806 ASSERT_TRUE(conn->is_connected());
807 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
808 ASSERT_TRUE(conn->peer_is_osd());
809
810 // 2. test rebind port
811 set<int> avoid_ports;
812 for (int i = 0; i < 10 ; i++) {
813 for (auto a : server_msgr->get_myaddrs().v) {
814 avoid_ports.insert(a.get_port() + i);
815 }
816 }
817 server_msgr->rebind(avoid_ports);
818 for (auto a : server_msgr->get_myaddrs().v) {
819 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
820 }
821
822 conn = client_msgr->connect_to(server_msgr->get_mytype(),
823 server_msgr->get_myaddrs());
824 {
825 m = new MPing();
826 ASSERT_EQ(conn->send_message(m), 0);
827 std::unique_lock l{cli_dispatcher.lock};
828 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
829 cli_dispatcher.got_new = false;
830 }
831 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
832
833 // 3. test markdown connection
834 conn->mark_down();
835 ASSERT_FALSE(conn->is_connected());
836
837 // 4. test failed connection
838 server_msgr->shutdown();
839 server_msgr->wait();
840
841 m = new MPing();
842 conn->send_message(m);
843 CHECK_AND_WAIT_TRUE(!conn->is_connected());
844 ASSERT_FALSE(conn->is_connected());
845
846 // 5. loopback connection
847 srv_dispatcher.loopback = true;
848 conn = client_msgr->get_loopback_connection();
849 {
850 m = new MPing();
851 ASSERT_EQ(conn->send_message(m), 0);
852 std::unique_lock l{cli_dispatcher.lock};
853 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
854 cli_dispatcher.got_new = false;
855 }
856 srv_dispatcher.loopback = false;
857 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
858 client_msgr->shutdown();
859 client_msgr->wait();
860 server_msgr->shutdown();
861 server_msgr->wait();
862 }
863
864 TEST_P(MessengerTest, SimpleMsgr2Test) {
865 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
866 entity_addr_t legacy_addr;
867 legacy_addr.parse("v1:127.0.0.1");
868 entity_addr_t msgr2_addr;
869 msgr2_addr.parse("v2:127.0.0.1");
870 entity_addrvec_t bind_addrs;
871 bind_addrs.v.push_back(legacy_addr);
872 bind_addrs.v.push_back(msgr2_addr);
873 server_msgr->bindv(bind_addrs);
874 server_msgr->add_dispatcher_head(&srv_dispatcher);
875 server_msgr->start();
876
877 client_msgr->add_dispatcher_head(&cli_dispatcher);
878 client_msgr->start();
879
880 // 1. simple round trip
881 MPing *m = new MPing();
882 ConnectionRef conn = client_msgr->connect_to(
883 server_msgr->get_mytype(),
884 server_msgr->get_myaddrs());
885 {
886 ASSERT_EQ(conn->send_message(m), 0);
887 std::unique_lock l{cli_dispatcher.lock};
888 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
889 cli_dispatcher.got_new = false;
890 }
891 ASSERT_TRUE(conn->is_connected());
892 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
893 ASSERT_TRUE(conn->peer_is_osd());
894
895 // 2. test rebind port
896 set<int> avoid_ports;
897 for (int i = 0; i < 10 ; i++) {
898 for (auto a : server_msgr->get_myaddrs().v) {
899 avoid_ports.insert(a.get_port() + i);
900 }
901 }
902 server_msgr->rebind(avoid_ports);
903 for (auto a : server_msgr->get_myaddrs().v) {
904 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
905 }
906
907 conn = client_msgr->connect_to(
908 server_msgr->get_mytype(),
909 server_msgr->get_myaddrs());
910 {
911 m = new MPing();
912 ASSERT_EQ(conn->send_message(m), 0);
913 std::unique_lock l{cli_dispatcher.lock};
914 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
915 cli_dispatcher.got_new = false;
916 }
917 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
918
919 // 3. test markdown connection
920 conn->mark_down();
921 ASSERT_FALSE(conn->is_connected());
922
923 // 4. test failed connection
924 server_msgr->shutdown();
925 server_msgr->wait();
926
927 m = new MPing();
928 conn->send_message(m);
929 CHECK_AND_WAIT_TRUE(!conn->is_connected());
930 ASSERT_FALSE(conn->is_connected());
931
932 // 5. loopback connection
933 srv_dispatcher.loopback = true;
934 conn = client_msgr->get_loopback_connection();
935 {
936 m = new MPing();
937 ASSERT_EQ(conn->send_message(m), 0);
938 std::unique_lock l{cli_dispatcher.lock};
939 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
940 cli_dispatcher.got_new = false;
941 }
942 srv_dispatcher.loopback = false;
943 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
944 client_msgr->shutdown();
945 client_msgr->wait();
946 server_msgr->shutdown();
947 server_msgr->wait();
948 }
949
950 TEST_P(MessengerTest, FeatureTest) {
951 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
952 entity_addr_t bind_addr;
953 bind_addr.parse("v2:127.0.0.1");
954 uint64_t all_feature_supported, feature_required, feature_supported = 0;
955 for (int i = 0; i < 10; i++)
956 feature_supported |= 1ULL << i;
957 feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2;
958 feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS;
959 feature_required = feature_supported | 1ULL << 13;
960 all_feature_supported = feature_required | 1ULL << 14;
961
962 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
963 p.features_required = feature_required;
964 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
965 server_msgr->bind(bind_addr);
966 server_msgr->add_dispatcher_head(&srv_dispatcher);
967 server_msgr->start();
968
969 // 1. Suppose if only support less than required
970 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
971 p.features_supported = feature_supported;
972 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
973 client_msgr->add_dispatcher_head(&cli_dispatcher);
974 client_msgr->start();
975
976 MPing *m = new MPing();
977 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
978 server_msgr->get_myaddrs());
979 conn->send_message(m);
980 CHECK_AND_WAIT_TRUE(!conn->is_connected());
981 // should failed build a connection
982 ASSERT_FALSE(conn->is_connected());
983
984 client_msgr->shutdown();
985 client_msgr->wait();
986
987 // 2. supported met required
988 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
989 p.features_supported = all_feature_supported;
990 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
991 client_msgr->start();
992
993 conn = client_msgr->connect_to(server_msgr->get_mytype(),
994 server_msgr->get_myaddrs());
995 {
996 m = new MPing();
997 ASSERT_EQ(conn->send_message(m), 0);
998 std::unique_lock l{cli_dispatcher.lock};
999 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1000 cli_dispatcher.got_new = false;
1001 }
1002 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1003
1004 server_msgr->shutdown();
1005 client_msgr->shutdown();
1006 server_msgr->wait();
1007 client_msgr->wait();
1008 }
1009
1010 TEST_P(MessengerTest, TimeoutTest) {
1011 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1");
1012 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1013 entity_addr_t bind_addr;
1014 bind_addr.parse("v2:127.0.0.1");
1015 server_msgr->bind(bind_addr);
1016 server_msgr->add_dispatcher_head(&srv_dispatcher);
1017 server_msgr->start();
1018
1019 client_msgr->add_dispatcher_head(&cli_dispatcher);
1020 client_msgr->start();
1021
1022 // 1. build the connection
1023 MPing *m = new MPing();
1024 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1025 server_msgr->get_myaddrs());
1026 {
1027 ASSERT_EQ(conn->send_message(m), 0);
1028 std::unique_lock l{cli_dispatcher.lock};
1029 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1030 cli_dispatcher.got_new = false;
1031 }
1032 ASSERT_TRUE(conn->is_connected());
1033 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1034 ASSERT_TRUE(conn->peer_is_osd());
1035
1036 // 2. wait for idle
1037 usleep(2500*1000);
1038 ASSERT_FALSE(conn->is_connected());
1039
1040 server_msgr->shutdown();
1041 server_msgr->wait();
1042
1043 client_msgr->shutdown();
1044 client_msgr->wait();
1045 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900");
1046 }
1047
1048 TEST_P(MessengerTest, StatefulTest) {
1049 Message *m;
1050 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1051 entity_addr_t bind_addr;
1052 bind_addr.parse("v2:127.0.0.1");
1053 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1054 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1055 p = Messenger::Policy::lossless_client(0);
1056 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1057
1058 server_msgr->bind(bind_addr);
1059 server_msgr->add_dispatcher_head(&srv_dispatcher);
1060 server_msgr->start();
1061 client_msgr->add_dispatcher_head(&cli_dispatcher);
1062 client_msgr->start();
1063
1064 // 1. test for server standby
1065 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1066 server_msgr->get_myaddrs());
1067 {
1068 m = new MPing();
1069 ASSERT_EQ(conn->send_message(m), 0);
1070 std::unique_lock l{cli_dispatcher.lock};
1071 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1072 cli_dispatcher.got_new = false;
1073 }
1074 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1075 conn->mark_down();
1076 ASSERT_FALSE(conn->is_connected());
1077 ConnectionRef server_conn = server_msgr->connect_to(
1078 client_msgr->get_mytype(), srv_dispatcher.last_accept);
1079 // don't lose state
1080 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1081
1082 srv_dispatcher.got_new = false;
1083 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1084 server_msgr->get_myaddrs());
1085 {
1086 m = new MPing();
1087 ASSERT_EQ(conn->send_message(m), 0);
1088 std::unique_lock l{cli_dispatcher.lock};
1089 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1090 cli_dispatcher.got_new = false;
1091 }
1092 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1093 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1094 srv_dispatcher.last_accept);
1095 {
1096 std::unique_lock l{srv_dispatcher.lock};
1097 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; });
1098 }
1099
1100 // 2. test for client reconnect
1101 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1102 cli_dispatcher.got_connect = false;
1103 cli_dispatcher.got_new = false;
1104 cli_dispatcher.got_remote_reset = false;
1105 server_conn->mark_down();
1106 ASSERT_FALSE(server_conn->is_connected());
1107 // ensure client detect server socket closed
1108 {
1109 std::unique_lock l{cli_dispatcher.lock};
1110 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
1111 cli_dispatcher.got_remote_reset = false;
1112 }
1113 {
1114 std::unique_lock l{cli_dispatcher.lock};
1115 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
1116 cli_dispatcher.got_connect = false;
1117 }
1118 CHECK_AND_WAIT_TRUE(conn->is_connected());
1119 ASSERT_TRUE(conn->is_connected());
1120
1121 {
1122 m = new MPing();
1123 ASSERT_EQ(conn->send_message(m), 0);
1124 ASSERT_TRUE(conn->is_connected());
1125 std::unique_lock l{cli_dispatcher.lock};
1126 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1127 cli_dispatcher.got_new = false;
1128 }
1129 // resetcheck happen
1130 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1131 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1132 srv_dispatcher.last_accept);
1133 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1134 cli_dispatcher.got_remote_reset = false;
1135
1136 server_msgr->shutdown();
1137 client_msgr->shutdown();
1138 server_msgr->wait();
1139 client_msgr->wait();
1140 }
1141
1142 TEST_P(MessengerTest, StatelessTest) {
1143 Message *m;
1144 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1145 entity_addr_t bind_addr;
1146 bind_addr.parse("v2:127.0.0.1");
1147 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1148 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1149 p = Messenger::Policy::lossy_client(0);
1150 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1151
1152 server_msgr->bind(bind_addr);
1153 server_msgr->add_dispatcher_head(&srv_dispatcher);
1154 server_msgr->start();
1155 client_msgr->add_dispatcher_head(&cli_dispatcher);
1156 client_msgr->start();
1157
1158 // 1. test for server lose state
1159 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1160 server_msgr->get_myaddrs());
1161 {
1162 m = new MPing();
1163 ASSERT_EQ(conn->send_message(m), 0);
1164 std::unique_lock l{cli_dispatcher.lock};
1165 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1166 cli_dispatcher.got_new = false;
1167 }
1168 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1169 conn->mark_down();
1170 ASSERT_FALSE(conn->is_connected());
1171
1172 srv_dispatcher.got_new = false;
1173 ConnectionRef server_conn;
1174 srv_dispatcher.last_accept_con_ptr = &server_conn;
1175 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1176 server_msgr->get_myaddrs());
1177 {
1178 m = new MPing();
1179 ASSERT_EQ(conn->send_message(m), 0);
1180 std::unique_lock l{cli_dispatcher.lock};
1181 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1182 cli_dispatcher.got_new = false;
1183 }
1184 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1185 ASSERT_TRUE(server_conn);
1186
1187 // server lose state
1188 {
1189 std::unique_lock l{srv_dispatcher.lock};
1190 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
1191 }
1192 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1193
1194 // 2. test for client lossy
1195 server_conn->mark_down();
1196 ASSERT_FALSE(server_conn->is_connected());
1197 conn->send_keepalive();
1198 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1199 ASSERT_FALSE(conn->is_connected());
1200 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1201 server_msgr->get_myaddrs());
1202 {
1203 m = new MPing();
1204 ASSERT_EQ(conn->send_message(m), 0);
1205 std::unique_lock l{cli_dispatcher.lock};
1206 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1207 cli_dispatcher.got_new = false;
1208 }
1209 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1210
1211 server_msgr->shutdown();
1212 client_msgr->shutdown();
1213 server_msgr->wait();
1214 client_msgr->wait();
1215 }
1216
1217 TEST_P(MessengerTest, AnonTest) {
1218 Message *m;
1219 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1220 entity_addr_t bind_addr;
1221 bind_addr.parse("v2:127.0.0.1");
1222 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1223 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1224 p = Messenger::Policy::lossy_client(0);
1225 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1226
1227 server_msgr->bind(bind_addr);
1228 server_msgr->add_dispatcher_head(&srv_dispatcher);
1229 server_msgr->start();
1230 client_msgr->add_dispatcher_head(&cli_dispatcher);
1231 client_msgr->start();
1232
1233 ConnectionRef server_con_a, server_con_b;
1234
1235 // a
1236 srv_dispatcher.last_accept_con_ptr = &server_con_a;
1237 ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(),
1238 server_msgr->get_myaddrs(),
1239 true);
1240 {
1241 m = new MPing();
1242 ASSERT_EQ(con_a->send_message(m), 0);
1243 std::unique_lock l{cli_dispatcher.lock};
1244 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1245 cli_dispatcher.got_new = false;
1246 }
1247 ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count());
1248
1249 // b
1250 srv_dispatcher.last_accept_con_ptr = &server_con_b;
1251 ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(),
1252 server_msgr->get_myaddrs(),
1253 true);
1254 {
1255 m = new MPing();
1256 ASSERT_EQ(con_b->send_message(m), 0);
1257 std::unique_lock l{cli_dispatcher.lock};
1258 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1259 cli_dispatcher.got_new = false;
1260 }
1261 ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count());
1262
1263 // these should be distinct
1264 ASSERT_NE(con_a, con_b);
1265 ASSERT_NE(server_con_a, server_con_b);
1266
1267 // and both connected
1268 {
1269 m = new MPing();
1270 ASSERT_EQ(con_a->send_message(m), 0);
1271 std::unique_lock l{cli_dispatcher.lock};
1272 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1273 cli_dispatcher.got_new = false;
1274 }
1275 {
1276 m = new MPing();
1277 ASSERT_EQ(con_b->send_message(m), 0);
1278 std::unique_lock l{cli_dispatcher.lock};
1279 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1280 cli_dispatcher.got_new = false;
1281 }
1282
1283 // clean up
1284 con_a->mark_down();
1285 ASSERT_FALSE(con_a->is_connected());
1286 con_b->mark_down();
1287 ASSERT_FALSE(con_b->is_connected());
1288
1289 server_msgr->shutdown();
1290 client_msgr->shutdown();
1291 server_msgr->wait();
1292 client_msgr->wait();
1293 }
1294
1295 TEST_P(MessengerTest, ClientStandbyTest) {
1296 Message *m;
1297 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1298 entity_addr_t bind_addr;
1299 bind_addr.parse("v2:127.0.0.1");
1300 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1301 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1302 p = Messenger::Policy::lossless_peer(0);
1303 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1304
1305 server_msgr->bind(bind_addr);
1306 server_msgr->add_dispatcher_head(&srv_dispatcher);
1307 server_msgr->start();
1308 client_msgr->add_dispatcher_head(&cli_dispatcher);
1309 client_msgr->start();
1310
1311 // 1. test for client standby, resetcheck
1312 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1313 server_msgr->get_myaddrs());
1314 {
1315 m = new MPing();
1316 ASSERT_EQ(conn->send_message(m), 0);
1317 std::unique_lock l{cli_dispatcher.lock};
1318 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1319 cli_dispatcher.got_new = false;
1320 }
1321 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1322 ConnectionRef server_conn = server_msgr->connect_to(
1323 client_msgr->get_mytype(),
1324 srv_dispatcher.last_accept);
1325 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1326 cli_dispatcher.got_connect = false;
1327 server_conn->mark_down();
1328 ASSERT_FALSE(server_conn->is_connected());
1329 // client should be standby
1330 usleep(300*1000);
1331 // client should be standby, so we use original connection
1332 {
1333 // Try send message to verify got remote reset callback
1334 m = new MPing();
1335 ASSERT_EQ(conn->send_message(m), 0);
1336 {
1337 std::unique_lock l{cli_dispatcher.lock};
1338 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
1339 cli_dispatcher.got_remote_reset = false;
1340 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
1341 cli_dispatcher.got_connect = false;
1342 }
1343 CHECK_AND_WAIT_TRUE(conn->is_connected());
1344 ASSERT_TRUE(conn->is_connected());
1345 m = new MPing();
1346 ASSERT_EQ(conn->send_message(m), 0);
1347 std::unique_lock l{cli_dispatcher.lock};
1348 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1349 cli_dispatcher.got_new = false;
1350 }
1351 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1352 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1353 srv_dispatcher.last_accept);
1354 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1355
1356 server_msgr->shutdown();
1357 client_msgr->shutdown();
1358 server_msgr->wait();
1359 client_msgr->wait();
1360 }
1361
1362 TEST_P(MessengerTest, AuthTest) {
1363 g_ceph_context->_conf.set_val("auth_cluster_required", "cephx");
1364 g_ceph_context->_conf.set_val("auth_service_required", "cephx");
1365 g_ceph_context->_conf.set_val("auth_client_required", "cephx");
1366 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1367 entity_addr_t bind_addr;
1368 bind_addr.parse("v2:127.0.0.1");
1369 server_msgr->bind(bind_addr);
1370 server_msgr->add_dispatcher_head(&srv_dispatcher);
1371 server_msgr->start();
1372
1373 client_msgr->add_dispatcher_head(&cli_dispatcher);
1374 client_msgr->start();
1375
1376 // 1. simple auth round trip
1377 MPing *m = new MPing();
1378 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1379 server_msgr->get_myaddrs());
1380 {
1381 ASSERT_EQ(conn->send_message(m), 0);
1382 std::unique_lock l{cli_dispatcher.lock};
1383 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1384 cli_dispatcher.got_new = false;
1385 }
1386 ASSERT_TRUE(conn->is_connected());
1387 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1388
1389 // 2. mix auth
1390 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
1391 g_ceph_context->_conf.set_val("auth_service_required", "none");
1392 g_ceph_context->_conf.set_val("auth_client_required", "none");
1393 conn->mark_down();
1394 ASSERT_FALSE(conn->is_connected());
1395 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1396 server_msgr->get_myaddrs());
1397 {
1398 MPing *m = new MPing();
1399 ASSERT_EQ(conn->send_message(m), 0);
1400 std::unique_lock l{cli_dispatcher.lock};
1401 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1402 cli_dispatcher.got_new = false;
1403 }
1404 ASSERT_TRUE(conn->is_connected());
1405 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1406 server_msgr->shutdown();
1407 client_msgr->shutdown();
1408 server_msgr->wait();
1409 client_msgr->wait();
1410 }
1411
1412 TEST_P(MessengerTest, MessageTest) {
1413 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1414 entity_addr_t bind_addr;
1415 bind_addr.parse("v2:127.0.0.1");
1416 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1417 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1418 p = Messenger::Policy::lossless_peer(0);
1419 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1420
1421 server_msgr->bind(bind_addr);
1422 server_msgr->add_dispatcher_head(&srv_dispatcher);
1423 server_msgr->start();
1424 client_msgr->add_dispatcher_head(&cli_dispatcher);
1425 client_msgr->start();
1426
1427
1428 // 1. A very large "front"(as well as "payload")
1429 // Because a external message need to invade Messenger::decode_message,
1430 // here we only use existing message class(MCommand)
1431 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1432 server_msgr->get_myaddrs());
1433 {
1434 uuid_d uuid;
1435 uuid.generate_random();
1436 vector<string> cmds;
1437 string s("abcdefghijklmnopqrstuvwxyz");
1438 for (int i = 0; i < 1024*30; i++)
1439 cmds.push_back(s);
1440 MCommand *m = new MCommand(uuid);
1441 m->cmd = cmds;
1442 conn->send_message(m);
1443 std::unique_lock l{cli_dispatcher.lock};
1444 cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; });
1445 ASSERT_TRUE(cli_dispatcher.got_new);
1446 cli_dispatcher.got_new = false;
1447 }
1448
1449 // 2. A very large "data"
1450 {
1451 bufferlist bl;
1452 string s("abcdefghijklmnopqrstuvwxyz");
1453 for (int i = 0; i < 1024*30; i++)
1454 bl.append(s);
1455 MPing *m = new MPing();
1456 m->set_data(bl);
1457 conn->send_message(m);
1458 utime_t t;
1459 t += 1000*1000*500;
1460 std::unique_lock l{cli_dispatcher.lock};
1461 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1462 ASSERT_TRUE(cli_dispatcher.got_new);
1463 cli_dispatcher.got_new = false;
1464 }
1465 server_msgr->shutdown();
1466 client_msgr->shutdown();
1467 server_msgr->wait();
1468 client_msgr->wait();
1469 }
1470
1471
1472 class SyntheticWorkload;
1473
1474 struct Payload {
1475 enum Who : uint8_t {
1476 PING = 0,
1477 PONG = 1,
1478 };
1479 uint8_t who = 0;
1480 uint64_t seq = 0;
1481 bufferlist data;
1482
1483 Payload(Who who, uint64_t seq, const bufferlist& data)
1484 : who(who), seq(seq), data(data)
1485 {}
1486 Payload() = default;
1487 DENC(Payload, v, p) {
1488 DENC_START(1, 1, p);
1489 denc(v.who, p);
1490 denc(v.seq, p);
1491 denc(v.data, p);
1492 DENC_FINISH(p);
1493 }
1494 };
1495 WRITE_CLASS_DENC(Payload)
1496
1497 ostream& operator<<(ostream& out, const Payload &pl)
1498 {
1499 return out << "reply=" << pl.who << " i = " << pl.seq;
1500 }
1501
1502 class SyntheticDispatcher : public Dispatcher {
1503 public:
1504 ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock");
1505 ceph::condition_variable cond;
1506 bool is_server;
1507 bool got_new;
1508 bool got_remote_reset;
1509 bool got_connect;
1510 map<ConnectionRef, list<uint64_t> > conn_sent;
1511 map<uint64_t, bufferlist> sent;
1512 atomic<uint64_t> index;
1513 SyntheticWorkload *workload;
1514
1515 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
1516 Dispatcher(g_ceph_context), is_server(s), got_new(false),
1517 got_remote_reset(false), got_connect(false), index(0), workload(wl) {
1518 }
1519 bool ms_can_fast_dispatch_any() const override { return true; }
1520 bool ms_can_fast_dispatch(const Message *m) const override {
1521 switch (m->get_type()) {
1522 case CEPH_MSG_PING:
1523 case MSG_COMMAND:
1524 return true;
1525 default:
1526 return false;
1527 }
1528 }
1529
1530 void ms_handle_fast_connect(Connection *con) override {
1531 std::lock_guard l{lock};
1532 list<uint64_t> c = conn_sent[con];
1533 for (list<uint64_t>::iterator it = c.begin();
1534 it != c.end(); ++it)
1535 sent.erase(*it);
1536 conn_sent.erase(con);
1537 got_connect = true;
1538 cond.notify_all();
1539 }
1540 void ms_handle_fast_accept(Connection *con) override {
1541 std::lock_guard l{lock};
1542 list<uint64_t> c = conn_sent[con];
1543 for (list<uint64_t>::iterator it = c.begin();
1544 it != c.end(); ++it)
1545 sent.erase(*it);
1546 conn_sent.erase(con);
1547 cond.notify_all();
1548 }
1549 bool ms_dispatch(Message *m) override {
1550 ceph_abort();
1551 }
1552 bool ms_handle_reset(Connection *con) override;
1553 void ms_handle_remote_reset(Connection *con) override {
1554 std::lock_guard l{lock};
1555 list<uint64_t> c = conn_sent[con];
1556 for (list<uint64_t>::iterator it = c.begin();
1557 it != c.end(); ++it)
1558 sent.erase(*it);
1559 conn_sent.erase(con);
1560 got_remote_reset = true;
1561 }
1562 bool ms_handle_refused(Connection *con) override {
1563 return false;
1564 }
1565 void ms_fast_dispatch(Message *m) override {
1566 // MSG_COMMAND is used to disorganize regular message flow
1567 if (m->get_type() == MSG_COMMAND) {
1568 m->put();
1569 return ;
1570 }
1571
1572 Payload pl;
1573 auto p = m->get_data().cbegin();
1574 decode(pl, p);
1575 if (pl.who == Payload::PING) {
1576 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1577 reply_message(m, pl);
1578 m->put();
1579 std::lock_guard l{lock};
1580 got_new = true;
1581 cond.notify_all();
1582 } else {
1583 std::lock_guard l{lock};
1584 if (sent.count(pl.seq)) {
1585 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1586 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
1587 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
1588 conn_sent[m->get_connection()].pop_front();
1589 sent.erase(pl.seq);
1590 }
1591 m->put();
1592 got_new = true;
1593 cond.notify_all();
1594 }
1595 }
1596
1597 int ms_handle_authentication(Connection *con) override {
1598 return 1;
1599 }
1600
1601 void reply_message(const Message *m, Payload& pl) {
1602 pl.who = Payload::PONG;
1603 bufferlist bl;
1604 encode(pl, bl);
1605 MPing *rm = new MPing();
1606 rm->set_data(bl);
1607 m->get_connection()->send_message(rm);
1608 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
1609 }
1610
1611 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
1612 Message *m = new MPing();
1613 Payload pl{Payload::PING, index++, data};
1614 bufferlist bl;
1615 encode(pl, bl);
1616 m->set_data(bl);
1617 if (!con->get_messenger()->get_default_policy().lossy) {
1618 std::lock_guard l{lock};
1619 sent[pl.seq] = pl.data;
1620 conn_sent[con].push_back(pl.seq);
1621 }
1622 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
1623 ASSERT_EQ(0, con->send_message(m));
1624 }
1625
1626 uint64_t get_pending() {
1627 std::lock_guard l{lock};
1628 return sent.size();
1629 }
1630
1631 void clear_pending(ConnectionRef con) {
1632 std::lock_guard l{lock};
1633
1634 for (list<uint64_t>::iterator it = conn_sent[con].begin();
1635 it != conn_sent[con].end(); ++it)
1636 sent.erase(*it);
1637 conn_sent.erase(con);
1638 }
1639
1640 void print() {
1641 for (auto && p : conn_sent) {
1642 if (!p.second.empty()) {
1643 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
1644 }
1645 }
1646 }
1647 };
1648
1649
1650 class SyntheticWorkload {
1651 ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock");
1652 ceph::condition_variable cond;
1653 set<Messenger*> available_servers;
1654 set<Messenger*> available_clients;
1655 Messenger::Policy client_policy;
1656 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
1657 SyntheticDispatcher dispatcher;
1658 gen_type rng;
1659 vector<bufferlist> rand_data;
1660 DummyAuthClientServer dummy_auth;
1661
1662 public:
1663 static const unsigned max_in_flight = 64;
1664 static const unsigned max_connections = 128;
1665 static const unsigned max_message_len = 1024 * 1024 * 4;
1666
1667 SyntheticWorkload(int servers, int clients, string type, int random_num,
1668 Messenger::Policy srv_policy, Messenger::Policy cli_policy)
1669 : client_policy(cli_policy),
1670 dispatcher(false, this),
1671 rng(time(NULL)),
1672 dummy_auth(g_ceph_context) {
1673 dummy_auth.auth_registry.refresh_config();
1674 Messenger *msgr;
1675 int base_port = 16800;
1676 entity_addr_t bind_addr;
1677 char addr[64];
1678 for (int i = 0; i < servers; ++i) {
1679 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
1680 "server", getpid()+i, 0);
1681 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
1682 base_port+i);
1683 bind_addr.parse(addr);
1684 msgr->bind(bind_addr);
1685 msgr->add_dispatcher_head(&dispatcher);
1686 msgr->set_auth_client(&dummy_auth);
1687 msgr->set_auth_server(&dummy_auth);
1688
1689 ceph_assert(msgr);
1690 msgr->set_default_policy(srv_policy);
1691 available_servers.insert(msgr);
1692 msgr->start();
1693 }
1694
1695 for (int i = 0; i < clients; ++i) {
1696 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
1697 "client", getpid()+i+servers, 0);
1698 if (cli_policy.standby) {
1699 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
1700 base_port+i+servers);
1701 bind_addr.parse(addr);
1702 msgr->bind(bind_addr);
1703 }
1704 msgr->add_dispatcher_head(&dispatcher);
1705 msgr->set_auth_client(&dummy_auth);
1706 msgr->set_auth_server(&dummy_auth);
1707
1708 ceph_assert(msgr);
1709 msgr->set_default_policy(cli_policy);
1710 available_clients.insert(msgr);
1711 msgr->start();
1712 }
1713
1714 for (int i = 0; i < random_num; i++) {
1715 bufferlist bl;
1716 boost::uniform_int<> u(32, max_message_len);
1717 uint64_t value_len = u(rng);
1718 bufferptr bp(value_len);
1719 bp.zero();
1720 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1721 memcpy(bp.c_str()+j, &i, sizeof(i));
1722 j += 4096;
1723 }
1724
1725 bl.append(bp);
1726 rand_data.push_back(bl);
1727 }
1728 }
1729
1730 ConnectionRef _get_random_connection() {
1731 while (dispatcher.get_pending() > max_in_flight) {
1732 lock.unlock();
1733 usleep(500);
1734 lock.lock();
1735 }
1736 ceph_assert(ceph_mutex_is_locked(lock));
1737 boost::uniform_int<> choose(0, available_connections.size() - 1);
1738 int index = choose(rng);
1739 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1740 for (; index > 0; --index, ++i) ;
1741 return i->first;
1742 }
1743
1744 bool can_create_connection() {
1745 return available_connections.size() < max_connections;
1746 }
1747
1748 void generate_connection() {
1749 std::lock_guard l{lock};
1750 if (!can_create_connection())
1751 return ;
1752
1753 Messenger *server, *client;
1754 {
1755 boost::uniform_int<> choose(0, available_servers.size() - 1);
1756 int index = choose(rng);
1757 set<Messenger*>::iterator i = available_servers.begin();
1758 for (; index > 0; --index, ++i) ;
1759 server = *i;
1760 }
1761 {
1762 boost::uniform_int<> choose(0, available_clients.size() - 1);
1763 int index = choose(rng);
1764 set<Messenger*>::iterator i = available_clients.begin();
1765 for (; index > 0; --index, ++i) ;
1766 client = *i;
1767 }
1768
1769 pair<Messenger*, Messenger*> p;
1770 {
1771 boost::uniform_int<> choose(0, available_servers.size() - 1);
1772 if (server->get_default_policy().server) {
1773 p = make_pair(client, server);
1774 ConnectionRef conn = client->connect_to(server->get_mytype(),
1775 server->get_myaddrs());
1776 available_connections[conn] = p;
1777 } else {
1778 ConnectionRef conn = client->connect_to(server->get_mytype(),
1779 server->get_myaddrs());
1780 p = make_pair(client, server);
1781 available_connections[conn] = p;
1782 }
1783 }
1784 }
1785
1786 void send_message() {
1787 std::lock_guard l{lock};
1788 ConnectionRef conn = _get_random_connection();
1789 boost::uniform_int<> true_false(0, 99);
1790 int val = true_false(rng);
1791 if (val >= 95) {
1792 uuid_d uuid;
1793 uuid.generate_random();
1794 MCommand *m = new MCommand(uuid);
1795 vector<string> cmds;
1796 cmds.push_back("command");
1797 m->cmd = cmds;
1798 m->set_priority(200);
1799 conn->send_message(m);
1800 } else {
1801 boost::uniform_int<> u(0, rand_data.size()-1);
1802 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1803 }
1804 }
1805
1806 void drop_connection() {
1807 std::lock_guard l{lock};
1808 if (available_connections.size() < 10)
1809 return;
1810 ConnectionRef conn = _get_random_connection();
1811 dispatcher.clear_pending(conn);
1812 conn->mark_down();
1813 if (!client_policy.server &&
1814 !client_policy.lossy &&
1815 client_policy.standby) {
1816 // it's a lossless policy, so we need to mark down each side
1817 pair<Messenger*, Messenger*> &p = available_connections[conn];
1818 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1819 ASSERT_EQ(conn->get_messenger(), p.first);
1820 ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
1821 p.first->get_myaddrs());
1822 peer->mark_down();
1823 dispatcher.clear_pending(peer);
1824 available_connections.erase(peer);
1825 }
1826 }
1827 ASSERT_EQ(available_connections.erase(conn), 1U);
1828 }
1829
1830 void print_internal_state(bool detail=false) {
1831 std::lock_guard l{lock};
1832 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1833 << " inflight messages: " << dispatcher.get_pending() << dendl;
1834 if (detail && !available_connections.empty()) {
1835 dispatcher.print();
1836 }
1837 }
1838
1839 void wait_for_done() {
1840 int64_t tick_us = 1000 * 100; // 100ms
1841 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1842 int i = 0;
1843 while (dispatcher.get_pending()) {
1844 usleep(tick_us);
1845 timeout_us -= tick_us;
1846 if (i++ % 50 == 0)
1847 print_internal_state(true);
1848 if (timeout_us < 0)
1849 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
1850 }
1851 for (set<Messenger*>::iterator it = available_servers.begin();
1852 it != available_servers.end(); ++it) {
1853 (*it)->shutdown();
1854 (*it)->wait();
1855 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1856 delete (*it);
1857 }
1858 available_servers.clear();
1859
1860 for (set<Messenger*>::iterator it = available_clients.begin();
1861 it != available_clients.end(); ++it) {
1862 (*it)->shutdown();
1863 (*it)->wait();
1864 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1865 delete (*it);
1866 }
1867 available_clients.clear();
1868 }
1869
1870 void handle_reset(Connection *con) {
1871 std::lock_guard l{lock};
1872 available_connections.erase(con);
1873 dispatcher.clear_pending(con);
1874 }
1875 };
1876
1877 bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1878 workload->handle_reset(con);
1879 return true;
1880 }
1881
1882 TEST_P(MessengerTest, SyntheticStressTest) {
1883 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1884 Messenger::Policy::stateful_server(0),
1885 Messenger::Policy::lossless_client(0));
1886 for (int i = 0; i < 100; ++i) {
1887 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1888 test_msg.generate_connection();
1889 }
1890 gen_type rng(time(NULL));
1891 for (int i = 0; i < 5000; ++i) {
1892 if (!(i % 10)) {
1893 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1894 test_msg.print_internal_state();
1895 }
1896 boost::uniform_int<> true_false(0, 99);
1897 int val = true_false(rng);
1898 if (val > 90) {
1899 test_msg.generate_connection();
1900 } else if (val > 80) {
1901 test_msg.drop_connection();
1902 } else if (val > 10) {
1903 test_msg.send_message();
1904 } else {
1905 usleep(rand() % 1000 + 500);
1906 }
1907 }
1908 test_msg.wait_for_done();
1909 }
1910
1911 TEST_P(MessengerTest, SyntheticStressTest1) {
1912 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1913 Messenger::Policy::lossless_peer_reuse(0),
1914 Messenger::Policy::lossless_peer_reuse(0));
1915 for (int i = 0; i < 10; ++i) {
1916 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1917 test_msg.generate_connection();
1918 }
1919 gen_type rng(time(NULL));
1920 for (int i = 0; i < 10000; ++i) {
1921 if (!(i % 10)) {
1922 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1923 test_msg.print_internal_state();
1924 }
1925 boost::uniform_int<> true_false(0, 99);
1926 int val = true_false(rng);
1927 if (val > 80) {
1928 test_msg.generate_connection();
1929 } else if (val > 60) {
1930 test_msg.drop_connection();
1931 } else if (val > 10) {
1932 test_msg.send_message();
1933 } else {
1934 usleep(rand() % 1000 + 500);
1935 }
1936 }
1937 test_msg.wait_for_done();
1938 }
1939
1940
1941 TEST_P(MessengerTest, SyntheticInjectTest) {
1942 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
1943 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
1944 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
1945 g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216");
1946 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1947 Messenger::Policy::stateful_server(0),
1948 Messenger::Policy::lossless_client(0));
1949 for (int i = 0; i < 100; ++i) {
1950 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1951 test_msg.generate_connection();
1952 }
1953 gen_type rng(time(NULL));
1954 for (int i = 0; i < 1000; ++i) {
1955 if (!(i % 10)) {
1956 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1957 test_msg.print_internal_state();
1958 }
1959 boost::uniform_int<> true_false(0, 99);
1960 int val = true_false(rng);
1961 if (val > 90) {
1962 test_msg.generate_connection();
1963 } else if (val > 80) {
1964 test_msg.drop_connection();
1965 } else if (val > 10) {
1966 test_msg.send_message();
1967 } else {
1968 usleep(rand() % 500 + 100);
1969 }
1970 }
1971 test_msg.wait_for_done();
1972 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
1973 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
1974 g_ceph_context->_conf.set_val(
1975 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
1976 }
1977
1978 TEST_P(MessengerTest, SyntheticInjectTest2) {
1979 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
1980 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
1981 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1982 Messenger::Policy::lossless_peer_reuse(0),
1983 Messenger::Policy::lossless_peer_reuse(0));
1984 for (int i = 0; i < 100; ++i) {
1985 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1986 test_msg.generate_connection();
1987 }
1988 gen_type rng(time(NULL));
1989 for (int i = 0; i < 1000; ++i) {
1990 if (!(i % 10)) {
1991 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1992 test_msg.print_internal_state();
1993 }
1994 boost::uniform_int<> true_false(0, 99);
1995 int val = true_false(rng);
1996 if (val > 90) {
1997 test_msg.generate_connection();
1998 } else if (val > 80) {
1999 test_msg.drop_connection();
2000 } else if (val > 10) {
2001 test_msg.send_message();
2002 } else {
2003 usleep(rand() % 500 + 100);
2004 }
2005 }
2006 test_msg.wait_for_done();
2007 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2008 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2009 }
2010
2011 TEST_P(MessengerTest, SyntheticInjectTest3) {
2012 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600");
2013 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2014 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2015 Messenger::Policy::stateless_server(0),
2016 Messenger::Policy::lossy_client(0));
2017 for (int i = 0; i < 100; ++i) {
2018 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2019 test_msg.generate_connection();
2020 }
2021 gen_type rng(time(NULL));
2022 for (int i = 0; i < 1000; ++i) {
2023 if (!(i % 10)) {
2024 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2025 test_msg.print_internal_state();
2026 }
2027 boost::uniform_int<> true_false(0, 99);
2028 int val = true_false(rng);
2029 if (val > 90) {
2030 test_msg.generate_connection();
2031 } else if (val > 80) {
2032 test_msg.drop_connection();
2033 } else if (val > 10) {
2034 test_msg.send_message();
2035 } else {
2036 usleep(rand() % 500 + 100);
2037 }
2038 }
2039 test_msg.wait_for_done();
2040 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2041 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2042 }
2043
2044
2045 TEST_P(MessengerTest, SyntheticInjectTest4) {
2046 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2047 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2048 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1");
2049 g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd");
2050 g_ceph_context->_conf.set_val("ms_inject_delay_max", "5");
2051 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2052 Messenger::Policy::lossless_peer(0),
2053 Messenger::Policy::lossless_peer(0));
2054 for (int i = 0; i < 100; ++i) {
2055 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2056 test_msg.generate_connection();
2057 }
2058 gen_type rng(time(NULL));
2059 for (int i = 0; i < 1000; ++i) {
2060 if (!(i % 10)) {
2061 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2062 test_msg.print_internal_state();
2063 }
2064 boost::uniform_int<> true_false(0, 99);
2065 int val = true_false(rng);
2066 if (val > 95) {
2067 test_msg.generate_connection();
2068 } else if (val > 80) {
2069 // test_msg.drop_connection();
2070 } else if (val > 10) {
2071 test_msg.send_message();
2072 } else {
2073 usleep(rand() % 500 + 100);
2074 }
2075 }
2076 test_msg.wait_for_done();
2077 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2078 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2079 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0");
2080 g_ceph_context->_conf.set_val("ms_inject_delay_type", "");
2081 g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
2082 }
2083
2084
2085 class MarkdownDispatcher : public Dispatcher {
2086 ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");
2087 set<ConnectionRef> conns;
2088 bool last_mark;
2089 public:
2090 std::atomic<uint64_t> count = { 0 };
2091 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context),
2092 last_mark(false) {
2093 }
2094 bool ms_can_fast_dispatch_any() const override { return false; }
2095 bool ms_can_fast_dispatch(const Message *m) const override {
2096 switch (m->get_type()) {
2097 case CEPH_MSG_PING:
2098 return true;
2099 default:
2100 return false;
2101 }
2102 }
2103
2104 void ms_handle_fast_connect(Connection *con) override {
2105 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2106 std::lock_guard l{lock};
2107 conns.insert(con);
2108 }
2109 void ms_handle_fast_accept(Connection *con) override {
2110 std::lock_guard l{lock};
2111 conns.insert(con);
2112 }
2113 bool ms_dispatch(Message *m) override {
2114 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
2115 std::lock_guard l{lock};
2116 count++;
2117 conns.insert(m->get_connection());
2118 if (conns.size() < 2 && !last_mark) {
2119 m->put();
2120 return true;
2121 }
2122
2123 last_mark = true;
2124 usleep(rand() % 500);
2125 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
2126 if ((*it) != m->get_connection().get()) {
2127 (*it)->mark_down();
2128 conns.erase(it);
2129 break;
2130 }
2131 }
2132 if (conns.empty())
2133 last_mark = false;
2134 m->put();
2135 return true;
2136 }
2137 bool ms_handle_reset(Connection *con) override {
2138 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2139 std::lock_guard l{lock};
2140 conns.erase(con);
2141 usleep(rand() % 500);
2142 return true;
2143 }
2144 void ms_handle_remote_reset(Connection *con) override {
2145 std::lock_guard l{lock};
2146 conns.erase(con);
2147 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2148 }
2149 bool ms_handle_refused(Connection *con) override {
2150 return false;
2151 }
2152 void ms_fast_dispatch(Message *m) override {
2153 ceph_abort();
2154 }
2155 int ms_handle_authentication(Connection *con) override {
2156 return 1;
2157 }
2158 };
2159
2160
2161 // Markdown with external lock
2162 TEST_P(MessengerTest, MarkdownTest) {
2163 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
2164 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
2165 DummyAuthClientServer dummy_auth(g_ceph_context);
2166 dummy_auth.auth_registry.refresh_config();
2167 entity_addr_t bind_addr;
2168 bind_addr.parse("v2:127.0.0.1:16800");
2169 server_msgr->bind(bind_addr);
2170 server_msgr->add_dispatcher_head(&srv_dispatcher);
2171 server_msgr->set_auth_client(&dummy_auth);
2172 server_msgr->set_auth_server(&dummy_auth);
2173 server_msgr->start();
2174 bind_addr.parse("v2:127.0.0.1:16801");
2175 server_msgr2->bind(bind_addr);
2176 server_msgr2->add_dispatcher_head(&srv_dispatcher);
2177 server_msgr2->set_auth_client(&dummy_auth);
2178 server_msgr2->set_auth_server(&dummy_auth);
2179 server_msgr2->start();
2180
2181 client_msgr->add_dispatcher_head(&cli_dispatcher);
2182 client_msgr->set_auth_client(&dummy_auth);
2183 client_msgr->set_auth_server(&dummy_auth);
2184 client_msgr->start();
2185
2186 int i = 1000;
2187 uint64_t last = 0;
2188 bool equal = false;
2189 uint64_t equal_count = 0;
2190 while (i--) {
2191 ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
2192 server_msgr->get_myaddrs());
2193 ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
2194 server_msgr2->get_myaddrs());
2195 MPing *m = new MPing();
2196 ASSERT_EQ(conn1->send_message(m), 0);
2197 m = new MPing();
2198 ASSERT_EQ(conn2->send_message(m), 0);
2199 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
2200 if (srv_dispatcher.count == last) {
2201 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
2202 equal = true;
2203 equal_count++;
2204 } else {
2205 equal = false;
2206 equal_count = 0;
2207 }
2208 last = srv_dispatcher.count;
2209 if (equal_count)
2210 usleep(1000*500);
2211 ASSERT_FALSE(equal && equal_count > 3);
2212 }
2213 server_msgr->shutdown();
2214 client_msgr->shutdown();
2215 server_msgr2->shutdown();
2216 server_msgr->wait();
2217 client_msgr->wait();
2218 server_msgr2->wait();
2219 delete server_msgr2;
2220 }
2221
2222 INSTANTIATE_TEST_SUITE_P(
2223 Messenger,
2224 MessengerTest,
2225 ::testing::Values(
2226 "async+posix"
2227 )
2228 );
2229
2230 int main(int argc, char **argv) {
2231 vector<const char*> args;
2232 argv_to_vec(argc, (const char **)argv, args);
2233
2234 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
2235 CODE_ENVIRONMENT_UTILITY,
2236 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
2237 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
2238 g_ceph_context->_conf.set_val("auth_service_required", "none");
2239 g_ceph_context->_conf.set_val("auth_client_required", "none");
2240 g_ceph_context->_conf.set_val("keyring", "/dev/null");
2241 g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2242 g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true");
2243 g_ceph_context->_conf.set_val("ms_die_on_old_message", "true");
2244 g_ceph_context->_conf.set_val("ms_max_backoff", "1");
2245 common_init_finish(g_ceph_context);
2246
2247 ::testing::InitGoogleTest(&argc, argv);
2248 return RUN_ALL_TESTS();
2249 }
2250
2251 /*
2252 * Local Variables:
2253 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
2254 * End:
2255 */