]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_msgr.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <atomic>
18 #include <iostream>
19 #include <list>
20 #include <memory>
21 #include <set>
22 #include <stdlib.h>
23 #include <time.h>
24 #include <unistd.h>
25
26 #include <boost/random/binomial_distribution.hpp>
27 #include <boost/random/mersenne_twister.hpp>
28 #include <boost/random/uniform_int.hpp>
29 #include <gtest/gtest.h>
30
31 #define MSG_POLICY_UNIT_TESTING
32
33 #include "common/ceph_argparse.h"
34 #include "common/ceph_mutex.h"
35 #include "global/global_init.h"
36 #include "messages/MCommand.h"
37 #include "messages/MPing.h"
38 #include "msg/Connection.h"
39 #include "msg/Dispatcher.h"
40 #include "msg/Message.h"
41 #include "msg/Messenger.h"
42 #include "msg/msg_types.h"
43
44 typedef boost::mt11213b gen_type;
45
46 #include "common/dout.h"
47 #include "include/ceph_assert.h"
48
49 #include "auth/DummyAuth.h"
50
51 #define dout_subsys ceph_subsys_ms
52 #undef dout_prefix
53 #define dout_prefix *_dout << " ceph_test_msgr "
54
55
56 #define CHECK_AND_WAIT_TRUE(expr) do { \
57 int n = 1000; \
58 while (--n) { \
59 if (expr) \
60 break; \
61 usleep(1000); \
62 } \
63 } while(0);
64
65 using namespace std;
66
67 class MessengerTest : public ::testing::TestWithParam<const char*> {
68 public:
69 DummyAuthClientServer dummy_auth;
70 Messenger *server_msgr;
71 Messenger *client_msgr;
72
73 MessengerTest() : dummy_auth(g_ceph_context),
74 server_msgr(NULL), client_msgr(NULL) {
75 dummy_auth.auth_registry.refresh_config();
76 }
77 void SetUp() override {
78 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
79 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
80 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
81 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
82 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
83 server_msgr->set_auth_client(&dummy_auth);
84 server_msgr->set_auth_server(&dummy_auth);
85 client_msgr->set_auth_client(&dummy_auth);
86 client_msgr->set_auth_server(&dummy_auth);
87 server_msgr->set_require_authorizer(false);
88 }
89 void TearDown() override {
90 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
91 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
92 delete server_msgr;
93 delete client_msgr;
94 }
95
96 };
97
98
99 class FakeDispatcher : public Dispatcher {
100 public:
101 struct Session : public RefCountedObject {
102 atomic<uint64_t> count;
103 ConnectionRef con;
104
105 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
106 }
107 uint64_t get_count() { return count; }
108 };
109
110 ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock");
111 ceph::condition_variable cond;
112 bool is_server;
113 bool got_new;
114 bool got_remote_reset;
115 bool got_connect;
116 bool loopback;
117 entity_addrvec_t last_accept;
118 ConnectionRef *last_accept_con_ptr = nullptr;
119
120 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
121 is_server(s), got_new(false), got_remote_reset(false),
122 got_connect(false), loopback(false) {
123 }
124 bool ms_can_fast_dispatch_any() const override { return true; }
125 bool ms_can_fast_dispatch(const Message *m) const override {
126 switch (m->get_type()) {
127 case CEPH_MSG_PING:
128 return true;
129 default:
130 return false;
131 }
132 }
133
134 void ms_handle_fast_connect(Connection *con) override {
135 std::scoped_lock l{lock};
136 lderr(g_ceph_context) << __func__ << " " << con << dendl;
137 auto s = con->get_priv();
138 if (!s) {
139 auto session = new Session(con);
140 con->set_priv(RefCountedPtr{session, false});
141 lderr(g_ceph_context) << __func__ << " con: " << con
142 << " count: " << session->count << dendl;
143 }
144 got_connect = true;
145 cond.notify_all();
146 }
147 void ms_handle_fast_accept(Connection *con) override {
148 last_accept = con->get_peer_addrs();
149 if (last_accept_con_ptr) {
150 *last_accept_con_ptr = con;
151 }
152 if (!con->get_priv()) {
153 con->set_priv(RefCountedPtr{new Session(con), false});
154 }
155 }
156 bool ms_dispatch(Message *m) override {
157 auto priv = m->get_connection()->get_priv();
158 auto s = static_cast<Session*>(priv.get());
159 if (!s) {
160 s = new Session(m->get_connection());
161 priv.reset(s, false);
162 m->get_connection()->set_priv(priv);
163 }
164 s->count++;
165 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
166 if (is_server) {
167 reply_message(m);
168 }
169 std::lock_guard l{lock};
170 got_new = true;
171 cond.notify_all();
172 m->put();
173 return true;
174 }
175 bool ms_handle_reset(Connection *con) override {
176 std::lock_guard l{lock};
177 lderr(g_ceph_context) << __func__ << " " << con << dendl;
178 auto priv = con->get_priv();
179 if (auto s = static_cast<Session*>(priv.get()); s) {
180 s->con.reset(); // break con <-> session ref cycle
181 con->set_priv(nullptr); // break ref <-> session cycle, if any
182 }
183 return true;
184 }
185 void ms_handle_remote_reset(Connection *con) override {
186 std::lock_guard l{lock};
187 lderr(g_ceph_context) << __func__ << " " << con << dendl;
188 auto priv = con->get_priv();
189 if (auto s = static_cast<Session*>(priv.get()); s) {
190 s->con.reset(); // break con <-> session ref cycle
191 con->set_priv(nullptr); // break ref <-> session cycle, if any
192 }
193 got_remote_reset = true;
194 cond.notify_all();
195 }
196 bool ms_handle_refused(Connection *con) override {
197 return false;
198 }
199 void ms_fast_dispatch(Message *m) override {
200 auto priv = m->get_connection()->get_priv();
201 auto s = static_cast<Session*>(priv.get());
202 if (!s) {
203 s = new Session(m->get_connection());
204 priv.reset(s, false);
205 m->get_connection()->set_priv(priv);
206 }
207 s->count++;
208 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
209 if (is_server) {
210 if (loopback)
211 ceph_assert(m->get_source().is_osd());
212 else
213 reply_message(m);
214 } else if (loopback) {
215 ceph_assert(m->get_source().is_client());
216 }
217 m->put();
218 std::lock_guard l{lock};
219 got_new = true;
220 cond.notify_all();
221 }
222
223 int ms_handle_fast_authentication(Connection *con) override {
224 return 1;
225 }
226
227 void reply_message(Message *m) {
228 MPing *rm = new MPing();
229 m->get_connection()->send_message(rm);
230 }
231 };
232
233 typedef FakeDispatcher::Session Session;
234
235 struct TestInterceptor : public Interceptor {
236
237 bool step_waiting = false;
238 bool waiting = true;
239 std::map<Connection *, uint32_t> current_step;
240 std::map<Connection *, std::list<uint32_t>> step_history;
241 std::map<uint32_t, std::optional<ACTION>> decisions;
242 std::set<uint32_t> breakpoints;
243
244 uint32_t count_step(Connection *conn, uint32_t step) {
245 uint32_t count = 0;
246 for (auto s : step_history[conn]) {
247 if (s == step) {
248 count++;
249 }
250 }
251 return count;
252 }
253
254 void breakpoint(uint32_t step) {
255 breakpoints.insert(step);
256 }
257
258 void remove_bp(uint32_t step) {
259 breakpoints.erase(step);
260 }
261
262 Connection *wait(uint32_t step, Connection *conn=nullptr) {
263 std::unique_lock<std::mutex> l(lock);
264 while(true) {
265 if (conn) {
266 auto it = current_step.find(conn);
267 if (it != current_step.end()) {
268 if (it->second == step) {
269 break;
270 }
271 }
272 } else {
273 for (auto it : current_step) {
274 if (it.second == step) {
275 conn = it.first;
276 break;
277 }
278 }
279 if (conn) {
280 break;
281 }
282 }
283 step_waiting = true;
284 cond_var.wait(l);
285 }
286 step_waiting = false;
287 return conn;
288 }
289
290 ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
291 if (decisions[step]) {
292 return *(decisions[step]);
293 }
294 waiting = true;
295 cond_var.wait(l, [this] { return !waiting; });
296 return *(decisions[step]);
297 }
298
299 void proceed(uint32_t step, ACTION decision) {
300 std::unique_lock<std::mutex> l(lock);
301 decisions[step] = decision;
302 if (waiting) {
303 waiting = false;
304 cond_var.notify_one();
305 }
306 }
307
308 ACTION intercept(Connection *conn, uint32_t step) override {
309 lderr(g_ceph_context) << __func__ << " conn(" << conn
310 << ") intercept called on step=" << step << dendl;
311
312 {
313 std::unique_lock<std::mutex> l(lock);
314 step_history[conn].push_back(step);
315 current_step[conn] = step;
316 if (step_waiting) {
317 cond_var.notify_one();
318 }
319 }
320
321 std::unique_lock<std::mutex> l(lock);
322 ACTION decision = ACTION::CONTINUE;
323 if (breakpoints.find(step) != breakpoints.end()) {
324 lderr(g_ceph_context) << __func__ << " conn(" << conn
325 << ") pausing on step=" << step << dendl;
326 decision = wait_for_decision(step, l);
327 } else {
328 if (decisions[step]) {
329 decision = *(decisions[step]);
330 }
331 }
332 lderr(g_ceph_context) << __func__ << " conn(" << conn
333 << ") resuming step=" << step << " with decision="
334 << decision << dendl;
335 decisions[step].reset();
336 return decision;
337 }
338
339 };
340
341 /**
342 * Scenario: A connects to B, and B connects to A at the same time.
343 */
344 TEST_P(MessengerTest, ConnectionRaceTest) {
345 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
346
347 TestInterceptor *cli_interceptor = new TestInterceptor();
348 TestInterceptor *srv_interceptor = new TestInterceptor();
349
350 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
351 server_msgr->interceptor = srv_interceptor;
352
353 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
354 client_msgr->interceptor = cli_interceptor;
355
356 entity_addr_t bind_addr;
357 bind_addr.parse("v2:127.0.0.1:3300");
358 server_msgr->bind(bind_addr);
359 server_msgr->add_dispatcher_head(&srv_dispatcher);
360 server_msgr->start();
361
362 bind_addr.parse("v2:127.0.0.1:3301");
363 client_msgr->bind(bind_addr);
364 client_msgr->add_dispatcher_head(&cli_dispatcher);
365 client_msgr->start();
366
367 // pause before sending client_ident message
368 cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
369 // pause before sending client_ident message
370 srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
371
372 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
373 server_msgr->get_myaddrs());
374 MPing *m1 = new MPing();
375 ASSERT_EQ(c2s->send_message(m1), 0);
376
377 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
378 client_msgr->get_myaddrs());
379 MPing *m2 = new MPing();
380 ASSERT_EQ(s2c->send_message(m2), 0);
381
382 cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get());
383 srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get());
384
385 // at this point both connections (A->B, B->A) are paused just before sending
386 // the client_ident message.
387
388 cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
389 srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
390
391 cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
392 srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
393
394 {
395 std::unique_lock l{cli_dispatcher.lock};
396 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
397 cli_dispatcher.got_new = false;
398 }
399
400 {
401 std::unique_lock l{srv_dispatcher.lock};
402 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
403 srv_dispatcher.got_new = false;
404 }
405
406 ASSERT_TRUE(s2c->is_connected());
407 ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
408 ASSERT_TRUE(s2c->peer_is_client());
409
410 ASSERT_TRUE(c2s->is_connected());
411 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
412 ASSERT_TRUE(c2s->peer_is_osd());
413
414 client_msgr->shutdown();
415 client_msgr->wait();
416 server_msgr->shutdown();
417 server_msgr->wait();
418
419 delete cli_interceptor;
420 delete srv_interceptor;
421 }
422
423 /**
424 * Scenario: A connects to B, and B connects to A at the same time.
425 * The first (A -> B) connection gets to message flow handshake, the
426 * second (B -> A) connection is stuck waiting for a banner from A.
427 * After A sends client_ident to B, the first connection wins and B
428 * calls reuse_connection() to replace the second connection's socket
429 * while the second connection is still in BANNER_CONNECTING.
430 */
431 TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) {
432 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
433
434 auto cli_interceptor = std::make_unique<TestInterceptor>();
435 auto srv_interceptor = std::make_unique<TestInterceptor>();
436
437 server_msgr->set_policy(entity_name_t::TYPE_CLIENT,
438 Messenger::Policy::lossless_peer_reuse(0));
439 server_msgr->interceptor = srv_interceptor.get();
440
441 client_msgr->set_policy(entity_name_t::TYPE_OSD,
442 Messenger::Policy::lossless_peer_reuse(0));
443 client_msgr->interceptor = cli_interceptor.get();
444
445 entity_addr_t bind_addr;
446 bind_addr.parse("v2:127.0.0.1:3300");
447 server_msgr->bind(bind_addr);
448 server_msgr->add_dispatcher_head(&srv_dispatcher);
449 server_msgr->start();
450
451 bind_addr.parse("v2:127.0.0.1:3301");
452 client_msgr->bind(bind_addr);
453 client_msgr->add_dispatcher_head(&cli_dispatcher);
454 client_msgr->start();
455
456 // pause before sending client_ident message
457 srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
458
459 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
460 client_msgr->get_myaddrs());
461 MPing *m1 = new MPing();
462 ASSERT_EQ(s2c->send_message(m1), 0);
463
464 srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY);
465 srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
466
467 // pause before sending banner
468 cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
469
470 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
471 server_msgr->get_myaddrs());
472 MPing *m2 = new MPing();
473 ASSERT_EQ(c2s->send_message(m2), 0);
474
475 cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
476 cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
477
478 // second connection is in BANNER_CONNECTING, ensure it stays so
479 // and send client_ident
480 srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE);
481 srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
482
483 // handle client_ident -- triggers reuse_connection() with exproto
484 // in BANNER_CONNECTING
485 cli_interceptor->breakpoint(Interceptor::STEP::READY);
486 cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE);
487
488 cli_interceptor->wait(Interceptor::STEP::READY);
489 cli_interceptor->remove_bp(Interceptor::STEP::READY);
490
491 // first connection is in READY
492 Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE);
493 srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE);
494
495 srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE);
496 cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
497
498 {
499 std::unique_lock l{cli_dispatcher.lock};
500 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
501 cli_dispatcher.got_new = false;
502 }
503
504 {
505 std::unique_lock l{srv_dispatcher.lock};
506 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
507 srv_dispatcher.got_new = false;
508 }
509
510 EXPECT_TRUE(s2c->is_connected());
511 EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
512 EXPECT_TRUE(s2c->peer_is_client());
513
514 EXPECT_TRUE(c2s->is_connected());
515 EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
516 EXPECT_TRUE(c2s->peer_is_osd());
517
518 // closed in reuse_connection() -- EPIPE when writing banner/hello
519 EXPECT_FALSE(s2c_accepter->is_connected());
520
521 // established exactly once, never faulted and reconnected
522 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u);
523 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u);
524 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u);
525
526 client_msgr->shutdown();
527 client_msgr->wait();
528 server_msgr->shutdown();
529 server_msgr->wait();
530 }
531
532 /**
533 * Scenario:
534 * - A connects to B
535 * - A sends client_ident to B
536 * - B fails before sending server_ident to A
537 * - A reconnects
538 */
539 TEST_P(MessengerTest, MissingServerIdenTest) {
540 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
541
542 TestInterceptor *cli_interceptor = new TestInterceptor();
543 TestInterceptor *srv_interceptor = new TestInterceptor();
544
545 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
546 server_msgr->interceptor = srv_interceptor;
547
548 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
549 client_msgr->interceptor = cli_interceptor;
550
551 entity_addr_t bind_addr;
552 bind_addr.parse("v2:127.0.0.1:3300");
553 server_msgr->bind(bind_addr);
554 server_msgr->add_dispatcher_head(&srv_dispatcher);
555 server_msgr->start();
556
557 bind_addr.parse("v2:127.0.0.1:3301");
558 client_msgr->bind(bind_addr);
559 client_msgr->add_dispatcher_head(&cli_dispatcher);
560 client_msgr->start();
561
562 // pause before sending server_ident message
563 srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
564
565 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
566 server_msgr->get_myaddrs());
567 MPing *m1 = new MPing();
568 ASSERT_EQ(c2s->send_message(m1), 0);
569
570 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
571 srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
572
573 // We inject a message from this side of the connection to force it to be
574 // in standby when we inject the failure below
575 MPing *m2 = new MPing();
576 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
577
578 srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
579
580 {
581 std::unique_lock l{srv_dispatcher.lock};
582 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
583 srv_dispatcher.got_new = false;
584 }
585
586 {
587 std::unique_lock l{cli_dispatcher.lock};
588 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
589 cli_dispatcher.got_new = false;
590 }
591
592 ASSERT_TRUE(c2s->is_connected());
593 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
594 ASSERT_TRUE(c2s->peer_is_osd());
595
596 ASSERT_TRUE(c2s_accepter->is_connected());
597 ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
598 ASSERT_TRUE(c2s_accepter->peer_is_client());
599
600 client_msgr->shutdown();
601 client_msgr->wait();
602 server_msgr->shutdown();
603 server_msgr->wait();
604
605 delete cli_interceptor;
606 delete srv_interceptor;
607 }
608
609 /**
610 * Scenario:
611 * - A connects to B
612 * - A sends client_ident to B
613 * - B fails before sending server_ident to A
614 * - A goes to standby
615 * - B reconnects to A
616 */
617 TEST_P(MessengerTest, MissingServerIdenTest2) {
618 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
619
620 TestInterceptor *cli_interceptor = new TestInterceptor();
621 TestInterceptor *srv_interceptor = new TestInterceptor();
622
623 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
624 server_msgr->interceptor = srv_interceptor;
625
626 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
627 client_msgr->interceptor = cli_interceptor;
628
629 entity_addr_t bind_addr;
630 bind_addr.parse("v2:127.0.0.1:3300");
631 server_msgr->bind(bind_addr);
632 server_msgr->add_dispatcher_head(&srv_dispatcher);
633 server_msgr->start();
634
635 bind_addr.parse("v2:127.0.0.1:3301");
636 client_msgr->bind(bind_addr);
637 client_msgr->add_dispatcher_head(&cli_dispatcher);
638 client_msgr->start();
639
640 // pause before sending server_ident message
641 srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
642
643 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
644 server_msgr->get_myaddrs());
645
646 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
647 srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
648
649 // We inject a message from this side of the connection to force it to be
650 // in standby when we inject the failure below
651 MPing *m2 = new MPing();
652 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
653
654 srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
655
656 {
657 std::unique_lock l{cli_dispatcher.lock};
658 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
659 cli_dispatcher.got_new = false;
660 }
661
662 ASSERT_TRUE(c2s->is_connected());
663 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
664 ASSERT_TRUE(c2s->peer_is_osd());
665
666 ASSERT_TRUE(c2s_accepter->is_connected());
667 ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
668 ASSERT_TRUE(c2s_accepter->peer_is_client());
669
670 client_msgr->shutdown();
671 client_msgr->wait();
672 server_msgr->shutdown();
673 server_msgr->wait();
674
675 delete cli_interceptor;
676 delete srv_interceptor;
677 }
678
679 /**
680 * Scenario:
681 * - A connects to B
682 * - A and B exchange messages
683 * - A fails
684 * - B goes into standby
685 * - A reconnects
686 */
687 TEST_P(MessengerTest, ReconnectTest) {
688 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
689
690 TestInterceptor *cli_interceptor = new TestInterceptor();
691 TestInterceptor *srv_interceptor = new TestInterceptor();
692
693 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
694 server_msgr->interceptor = srv_interceptor;
695
696 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
697 client_msgr->interceptor = cli_interceptor;
698
699 entity_addr_t bind_addr;
700 bind_addr.parse("v2:127.0.0.1:3300");
701 server_msgr->bind(bind_addr);
702 server_msgr->add_dispatcher_head(&srv_dispatcher);
703 server_msgr->start();
704
705 bind_addr.parse("v2:127.0.0.1:3301");
706 client_msgr->bind(bind_addr);
707 client_msgr->add_dispatcher_head(&cli_dispatcher);
708 client_msgr->start();
709
710 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
711 server_msgr->get_myaddrs());
712
713 MPing *m1 = new MPing();
714 ASSERT_EQ(c2s->send_message(m1), 0);
715
716 {
717 std::unique_lock l{cli_dispatcher.lock};
718 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
719 cli_dispatcher.got_new = false;
720 }
721
722 ASSERT_TRUE(c2s->is_connected());
723 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
724 ASSERT_TRUE(c2s->peer_is_osd());
725
726 cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
727
728 MPing *m2 = new MPing();
729 ASSERT_EQ(c2s->send_message(m2), 0);
730
731 cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
732 cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
733
734 // at this point client and server are connected together
735
736 srv_interceptor->breakpoint(Interceptor::STEP::READY);
737
738 // failing client
739 cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
740
741 MPing *m3 = new MPing();
742 ASSERT_EQ(c2s->send_message(m3), 0);
743
744 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
745 // the srv end of theconnection is now paused at ready
746 // this means that the reconnect was successful
747 srv_interceptor->remove_bp(Interceptor::STEP::READY);
748
749 ASSERT_TRUE(c2s_accepter->peer_is_client());
750 // c2s_accepter sent 0 reconnect messages
751 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
752 // c2s_accepter sent 1 reconnect_ok messages
753 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u);
754 // c2s sent 1 reconnect messages
755 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
756 // c2s sent 0 reconnect_ok messages
757 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u);
758
759 srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
760
761 {
762 std::unique_lock l{cli_dispatcher.lock};
763 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
764 cli_dispatcher.got_new = false;
765 }
766
767 client_msgr->shutdown();
768 client_msgr->wait();
769 server_msgr->shutdown();
770 server_msgr->wait();
771
772 delete cli_interceptor;
773 delete srv_interceptor;
774 }
775
776 /**
777 * Scenario:
778 * - A connects to B
779 * - A and B exchange messages
780 * - A fails
781 * - A reconnects // B reconnects
782 */
783 TEST_P(MessengerTest, ReconnectRaceTest) {
784 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
785
786 TestInterceptor *cli_interceptor = new TestInterceptor();
787 TestInterceptor *srv_interceptor = new TestInterceptor();
788
789 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
790 server_msgr->interceptor = srv_interceptor;
791
792 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
793 client_msgr->interceptor = cli_interceptor;
794
795 entity_addr_t bind_addr;
796 bind_addr.parse("v2:127.0.0.1:3300");
797 server_msgr->bind(bind_addr);
798 server_msgr->add_dispatcher_head(&srv_dispatcher);
799 server_msgr->start();
800
801 bind_addr.parse("v2:127.0.0.1:3301");
802 client_msgr->bind(bind_addr);
803 client_msgr->add_dispatcher_head(&cli_dispatcher);
804 client_msgr->start();
805
806 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
807 server_msgr->get_myaddrs());
808
809 MPing *m1 = new MPing();
810 ASSERT_EQ(c2s->send_message(m1), 0);
811
812 {
813 std::unique_lock l{cli_dispatcher.lock};
814 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
815 cli_dispatcher.got_new = false;
816 }
817
818 ASSERT_TRUE(c2s->is_connected());
819 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
820 ASSERT_TRUE(c2s->peer_is_osd());
821
822 cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
823
824 MPing *m2 = new MPing();
825 ASSERT_EQ(c2s->send_message(m2), 0);
826
827 cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
828 cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
829
830 // at this point client and server are connected together
831
832 // force both client and server to race on reconnect
833 cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
834 srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
835
836 // failing client
837 // this will cause both client and server to reconnect at the same time
838 cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
839
840 MPing *m3 = new MPing();
841 ASSERT_EQ(c2s->send_message(m3), 0);
842
843 cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get());
844 srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT);
845
846 cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
847 srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
848
849 // pause on "ready"
850 srv_interceptor->breakpoint(Interceptor::STEP::READY);
851
852 cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
853 srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
854
855 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
856
857 // the server has reconnected and is "ready"
858 srv_interceptor->remove_bp(Interceptor::STEP::READY);
859
860 ASSERT_TRUE(c2s_accepter->peer_is_client());
861 ASSERT_TRUE(c2s->peer_is_osd());
862
863 // the server should win the reconnect race
864
865 // c2s_accepter sent 1 or 2 reconnect messages
866 ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u);
867 ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
868 // c2s_accepter sent 0 reconnect_ok messages
869 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u);
870 // c2s sent 1 reconnect messages
871 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
872 // c2s sent 1 reconnect_ok messages
873 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u);
874
875 if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) {
876 // if the server send the reconnect message two times then
877 // the client must have sent a session retry message to the server
878 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u);
879 } else {
880 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u);
881 }
882
883 srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
884
885 {
886 std::unique_lock l{cli_dispatcher.lock};
887 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
888 cli_dispatcher.got_new = false;
889 }
890
891 client_msgr->shutdown();
892 client_msgr->wait();
893 server_msgr->shutdown();
894 server_msgr->wait();
895
896 delete cli_interceptor;
897 delete srv_interceptor;
898 }
899
900 TEST_P(MessengerTest, SimpleTest) {
901 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
902 entity_addr_t bind_addr;
903 bind_addr.parse("v2:127.0.0.1");
904 server_msgr->bind(bind_addr);
905 server_msgr->add_dispatcher_head(&srv_dispatcher);
906 server_msgr->start();
907
908 client_msgr->add_dispatcher_head(&cli_dispatcher);
909 client_msgr->start();
910
911 // 1. simple round trip
912 MPing *m = new MPing();
913 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
914 server_msgr->get_myaddrs());
915 {
916 ASSERT_EQ(conn->send_message(m), 0);
917 std::unique_lock l{cli_dispatcher.lock};
918 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
919 cli_dispatcher.got_new = false;
920 }
921 ASSERT_TRUE(conn->is_connected());
922 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
923 ASSERT_TRUE(conn->peer_is_osd());
924
925 // 2. test rebind port
926 set<int> avoid_ports;
927 for (int i = 0; i < 10 ; i++) {
928 for (auto a : server_msgr->get_myaddrs().v) {
929 avoid_ports.insert(a.get_port() + i);
930 }
931 }
932 server_msgr->rebind(avoid_ports);
933 for (auto a : server_msgr->get_myaddrs().v) {
934 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
935 }
936
937 conn = client_msgr->connect_to(server_msgr->get_mytype(),
938 server_msgr->get_myaddrs());
939 {
940 m = new MPing();
941 ASSERT_EQ(conn->send_message(m), 0);
942 std::unique_lock l{cli_dispatcher.lock};
943 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
944 cli_dispatcher.got_new = false;
945 }
946 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
947
948 // 3. test markdown connection
949 conn->mark_down();
950 ASSERT_FALSE(conn->is_connected());
951
952 // 4. test failed connection
953 server_msgr->shutdown();
954 server_msgr->wait();
955
956 m = new MPing();
957 conn->send_message(m);
958 CHECK_AND_WAIT_TRUE(!conn->is_connected());
959 ASSERT_FALSE(conn->is_connected());
960
961 // 5. loopback connection
962 srv_dispatcher.loopback = true;
963 conn = client_msgr->get_loopback_connection();
964 {
965 m = new MPing();
966 ASSERT_EQ(conn->send_message(m), 0);
967 std::unique_lock l{cli_dispatcher.lock};
968 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
969 cli_dispatcher.got_new = false;
970 }
971 srv_dispatcher.loopback = false;
972 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
973 client_msgr->shutdown();
974 client_msgr->wait();
975 server_msgr->shutdown();
976 server_msgr->wait();
977 }
978
979 TEST_P(MessengerTest, SimpleMsgr2Test) {
980 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
981 entity_addr_t legacy_addr;
982 legacy_addr.parse("v1:127.0.0.1");
983 entity_addr_t msgr2_addr;
984 msgr2_addr.parse("v2:127.0.0.1");
985 entity_addrvec_t bind_addrs;
986 bind_addrs.v.push_back(legacy_addr);
987 bind_addrs.v.push_back(msgr2_addr);
988 server_msgr->bindv(bind_addrs);
989 server_msgr->add_dispatcher_head(&srv_dispatcher);
990 server_msgr->start();
991
992 client_msgr->add_dispatcher_head(&cli_dispatcher);
993 client_msgr->start();
994
995 // 1. simple round trip
996 MPing *m = new MPing();
997 ConnectionRef conn = client_msgr->connect_to(
998 server_msgr->get_mytype(),
999 server_msgr->get_myaddrs());
1000 {
1001 ASSERT_EQ(conn->send_message(m), 0);
1002 std::unique_lock l{cli_dispatcher.lock};
1003 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1004 cli_dispatcher.got_new = false;
1005 }
1006 ASSERT_TRUE(conn->is_connected());
1007 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
1008 ASSERT_TRUE(conn->peer_is_osd());
1009
1010 // 2. test rebind port
1011 set<int> avoid_ports;
1012 for (int i = 0; i < 10 ; i++) {
1013 for (auto a : server_msgr->get_myaddrs().v) {
1014 avoid_ports.insert(a.get_port() + i);
1015 }
1016 }
1017 server_msgr->rebind(avoid_ports);
1018 for (auto a : server_msgr->get_myaddrs().v) {
1019 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
1020 }
1021
1022 conn = client_msgr->connect_to(
1023 server_msgr->get_mytype(),
1024 server_msgr->get_myaddrs());
1025 {
1026 m = new MPing();
1027 ASSERT_EQ(conn->send_message(m), 0);
1028 std::unique_lock l{cli_dispatcher.lock};
1029 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1030 cli_dispatcher.got_new = false;
1031 }
1032 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1033
1034 // 3. test markdown connection
1035 conn->mark_down();
1036 ASSERT_FALSE(conn->is_connected());
1037
1038 // 4. test failed connection
1039 server_msgr->shutdown();
1040 server_msgr->wait();
1041
1042 m = new MPing();
1043 conn->send_message(m);
1044 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1045 ASSERT_FALSE(conn->is_connected());
1046
1047 // 5. loopback connection
1048 srv_dispatcher.loopback = true;
1049 conn = client_msgr->get_loopback_connection();
1050 {
1051 m = new MPing();
1052 ASSERT_EQ(conn->send_message(m), 0);
1053 std::unique_lock l{cli_dispatcher.lock};
1054 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1055 cli_dispatcher.got_new = false;
1056 }
1057 srv_dispatcher.loopback = false;
1058 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1059 client_msgr->shutdown();
1060 client_msgr->wait();
1061 server_msgr->shutdown();
1062 server_msgr->wait();
1063 }
1064
1065 TEST_P(MessengerTest, FeatureTest) {
1066 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1067 entity_addr_t bind_addr;
1068 bind_addr.parse("v2:127.0.0.1");
1069 uint64_t all_feature_supported, feature_required, feature_supported = 0;
1070 for (int i = 0; i < 10; i++)
1071 feature_supported |= 1ULL << i;
1072 feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2;
1073 feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS;
1074 feature_required = feature_supported | 1ULL << 13;
1075 all_feature_supported = feature_required | 1ULL << 14;
1076
1077 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
1078 p.features_required = feature_required;
1079 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1080 server_msgr->bind(bind_addr);
1081 server_msgr->add_dispatcher_head(&srv_dispatcher);
1082 server_msgr->start();
1083
1084 // 1. Suppose if only support less than required
1085 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1086 p.features_supported = feature_supported;
1087 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1088 client_msgr->add_dispatcher_head(&cli_dispatcher);
1089 client_msgr->start();
1090
1091 MPing *m = new MPing();
1092 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1093 server_msgr->get_myaddrs());
1094 conn->send_message(m);
1095 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1096 // should failed build a connection
1097 ASSERT_FALSE(conn->is_connected());
1098
1099 client_msgr->shutdown();
1100 client_msgr->wait();
1101
1102 // 2. supported met required
1103 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1104 p.features_supported = all_feature_supported;
1105 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1106 client_msgr->start();
1107
1108 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1109 server_msgr->get_myaddrs());
1110 {
1111 m = new MPing();
1112 ASSERT_EQ(conn->send_message(m), 0);
1113 std::unique_lock l{cli_dispatcher.lock};
1114 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1115 cli_dispatcher.got_new = false;
1116 }
1117 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1118
1119 server_msgr->shutdown();
1120 client_msgr->shutdown();
1121 server_msgr->wait();
1122 client_msgr->wait();
1123 }
1124
1125 TEST_P(MessengerTest, TimeoutTest) {
1126 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1");
1127 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1128 entity_addr_t bind_addr;
1129 bind_addr.parse("v2:127.0.0.1");
1130 server_msgr->bind(bind_addr);
1131 server_msgr->add_dispatcher_head(&srv_dispatcher);
1132 server_msgr->start();
1133
1134 client_msgr->add_dispatcher_head(&cli_dispatcher);
1135 client_msgr->start();
1136
1137 // 1. build the connection
1138 MPing *m = new MPing();
1139 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1140 server_msgr->get_myaddrs());
1141 {
1142 ASSERT_EQ(conn->send_message(m), 0);
1143 std::unique_lock l{cli_dispatcher.lock};
1144 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1145 cli_dispatcher.got_new = false;
1146 }
1147 ASSERT_TRUE(conn->is_connected());
1148 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1149 ASSERT_TRUE(conn->peer_is_osd());
1150
1151 // 2. wait for idle
1152 usleep(2500*1000);
1153 ASSERT_FALSE(conn->is_connected());
1154
1155 server_msgr->shutdown();
1156 server_msgr->wait();
1157
1158 client_msgr->shutdown();
1159 client_msgr->wait();
1160 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900");
1161 }
1162
1163 TEST_P(MessengerTest, StatefulTest) {
1164 Message *m;
1165 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1166 entity_addr_t bind_addr;
1167 bind_addr.parse("v2:127.0.0.1");
1168 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1169 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1170 p = Messenger::Policy::lossless_client(0);
1171 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1172
1173 server_msgr->bind(bind_addr);
1174 server_msgr->add_dispatcher_head(&srv_dispatcher);
1175 server_msgr->start();
1176 client_msgr->add_dispatcher_head(&cli_dispatcher);
1177 client_msgr->start();
1178
1179 // 1. test for server standby
1180 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1181 server_msgr->get_myaddrs());
1182 {
1183 m = new MPing();
1184 ASSERT_EQ(conn->send_message(m), 0);
1185 std::unique_lock l{cli_dispatcher.lock};
1186 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1187 cli_dispatcher.got_new = false;
1188 }
1189 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1190 conn->mark_down();
1191 ASSERT_FALSE(conn->is_connected());
1192 ConnectionRef server_conn = server_msgr->connect_to(
1193 client_msgr->get_mytype(), srv_dispatcher.last_accept);
1194 // don't lose state
1195 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1196
1197 srv_dispatcher.got_new = false;
1198 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1199 server_msgr->get_myaddrs());
1200 {
1201 m = new MPing();
1202 ASSERT_EQ(conn->send_message(m), 0);
1203 std::unique_lock l{cli_dispatcher.lock};
1204 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1205 cli_dispatcher.got_new = false;
1206 }
1207 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1208 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1209 srv_dispatcher.last_accept);
1210 {
1211 std::unique_lock l{srv_dispatcher.lock};
1212 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; });
1213 }
1214
1215 // 2. test for client reconnect
1216 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1217 cli_dispatcher.got_connect = false;
1218 cli_dispatcher.got_new = false;
1219 cli_dispatcher.got_remote_reset = false;
1220 server_conn->mark_down();
1221 ASSERT_FALSE(server_conn->is_connected());
1222 // ensure client detect server socket closed
1223 {
1224 std::unique_lock l{cli_dispatcher.lock};
1225 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
1226 cli_dispatcher.got_remote_reset = false;
1227 }
1228 {
1229 std::unique_lock l{cli_dispatcher.lock};
1230 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
1231 cli_dispatcher.got_connect = false;
1232 }
1233 CHECK_AND_WAIT_TRUE(conn->is_connected());
1234 ASSERT_TRUE(conn->is_connected());
1235
1236 {
1237 m = new MPing();
1238 ASSERT_EQ(conn->send_message(m), 0);
1239 ASSERT_TRUE(conn->is_connected());
1240 std::unique_lock l{cli_dispatcher.lock};
1241 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1242 cli_dispatcher.got_new = false;
1243 }
1244 // resetcheck happen
1245 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1246 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1247 srv_dispatcher.last_accept);
1248 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1249 cli_dispatcher.got_remote_reset = false;
1250
1251 server_msgr->shutdown();
1252 client_msgr->shutdown();
1253 server_msgr->wait();
1254 client_msgr->wait();
1255 }
1256
1257 TEST_P(MessengerTest, StatelessTest) {
1258 Message *m;
1259 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1260 entity_addr_t bind_addr;
1261 bind_addr.parse("v2:127.0.0.1");
1262 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1263 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1264 p = Messenger::Policy::lossy_client(0);
1265 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1266
1267 server_msgr->bind(bind_addr);
1268 server_msgr->add_dispatcher_head(&srv_dispatcher);
1269 server_msgr->start();
1270 client_msgr->add_dispatcher_head(&cli_dispatcher);
1271 client_msgr->start();
1272
1273 // 1. test for server lose state
1274 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1275 server_msgr->get_myaddrs());
1276 {
1277 m = new MPing();
1278 ASSERT_EQ(conn->send_message(m), 0);
1279 std::unique_lock l{cli_dispatcher.lock};
1280 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1281 cli_dispatcher.got_new = false;
1282 }
1283 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1284 conn->mark_down();
1285 ASSERT_FALSE(conn->is_connected());
1286
1287 srv_dispatcher.got_new = false;
1288 ConnectionRef server_conn;
1289 srv_dispatcher.last_accept_con_ptr = &server_conn;
1290 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1291 server_msgr->get_myaddrs());
1292 {
1293 m = new MPing();
1294 ASSERT_EQ(conn->send_message(m), 0);
1295 std::unique_lock l{cli_dispatcher.lock};
1296 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1297 cli_dispatcher.got_new = false;
1298 }
1299 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1300 ASSERT_TRUE(server_conn);
1301
1302 // server lose state
1303 {
1304 std::unique_lock l{srv_dispatcher.lock};
1305 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
1306 }
1307 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1308
1309 // 2. test for client lossy
1310 server_conn->mark_down();
1311 ASSERT_FALSE(server_conn->is_connected());
1312 conn->send_keepalive();
1313 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1314 ASSERT_FALSE(conn->is_connected());
1315 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1316 server_msgr->get_myaddrs());
1317 {
1318 m = new MPing();
1319 ASSERT_EQ(conn->send_message(m), 0);
1320 std::unique_lock l{cli_dispatcher.lock};
1321 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1322 cli_dispatcher.got_new = false;
1323 }
1324 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1325
1326 server_msgr->shutdown();
1327 client_msgr->shutdown();
1328 server_msgr->wait();
1329 client_msgr->wait();
1330 }
1331
1332 TEST_P(MessengerTest, AnonTest) {
1333 Message *m;
1334 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1335 entity_addr_t bind_addr;
1336 bind_addr.parse("v2:127.0.0.1");
1337 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1338 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1339 p = Messenger::Policy::lossy_client(0);
1340 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1341
1342 server_msgr->bind(bind_addr);
1343 server_msgr->add_dispatcher_head(&srv_dispatcher);
1344 server_msgr->start();
1345 client_msgr->add_dispatcher_head(&cli_dispatcher);
1346 client_msgr->start();
1347
1348 ConnectionRef server_con_a, server_con_b;
1349
1350 // a
1351 srv_dispatcher.last_accept_con_ptr = &server_con_a;
1352 ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(),
1353 server_msgr->get_myaddrs(),
1354 true);
1355 {
1356 m = new MPing();
1357 ASSERT_EQ(con_a->send_message(m), 0);
1358 std::unique_lock l{cli_dispatcher.lock};
1359 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1360 cli_dispatcher.got_new = false;
1361 }
1362 ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count());
1363
1364 // b
1365 srv_dispatcher.last_accept_con_ptr = &server_con_b;
1366 ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(),
1367 server_msgr->get_myaddrs(),
1368 true);
1369 {
1370 m = new MPing();
1371 ASSERT_EQ(con_b->send_message(m), 0);
1372 std::unique_lock l{cli_dispatcher.lock};
1373 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1374 cli_dispatcher.got_new = false;
1375 }
1376 ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count());
1377
1378 // these should be distinct
1379 ASSERT_NE(con_a, con_b);
1380 ASSERT_NE(server_con_a, server_con_b);
1381
1382 // and both connected
1383 {
1384 m = new MPing();
1385 ASSERT_EQ(con_a->send_message(m), 0);
1386 std::unique_lock l{cli_dispatcher.lock};
1387 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1388 cli_dispatcher.got_new = false;
1389 }
1390 {
1391 m = new MPing();
1392 ASSERT_EQ(con_b->send_message(m), 0);
1393 std::unique_lock l{cli_dispatcher.lock};
1394 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1395 cli_dispatcher.got_new = false;
1396 }
1397
1398 // clean up
1399 con_a->mark_down();
1400 ASSERT_FALSE(con_a->is_connected());
1401 con_b->mark_down();
1402 ASSERT_FALSE(con_b->is_connected());
1403
1404 server_msgr->shutdown();
1405 client_msgr->shutdown();
1406 server_msgr->wait();
1407 client_msgr->wait();
1408 }
1409
1410 TEST_P(MessengerTest, ClientStandbyTest) {
1411 Message *m;
1412 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1413 entity_addr_t bind_addr;
1414 bind_addr.parse("v2:127.0.0.1");
1415 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1416 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1417 p = Messenger::Policy::lossless_peer(0);
1418 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1419
1420 server_msgr->bind(bind_addr);
1421 server_msgr->add_dispatcher_head(&srv_dispatcher);
1422 server_msgr->start();
1423 client_msgr->add_dispatcher_head(&cli_dispatcher);
1424 client_msgr->start();
1425
1426 // 1. test for client standby, resetcheck
1427 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1428 server_msgr->get_myaddrs());
1429 {
1430 m = new MPing();
1431 ASSERT_EQ(conn->send_message(m), 0);
1432 std::unique_lock l{cli_dispatcher.lock};
1433 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1434 cli_dispatcher.got_new = false;
1435 }
1436 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1437 ConnectionRef server_conn = server_msgr->connect_to(
1438 client_msgr->get_mytype(),
1439 srv_dispatcher.last_accept);
1440 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1441 cli_dispatcher.got_connect = false;
1442 server_conn->mark_down();
1443 ASSERT_FALSE(server_conn->is_connected());
1444 // client should be standby
1445 usleep(300*1000);
1446 // client should be standby, so we use original connection
1447 {
1448 // Try send message to verify got remote reset callback
1449 m = new MPing();
1450 ASSERT_EQ(conn->send_message(m), 0);
1451 {
1452 std::unique_lock l{cli_dispatcher.lock};
1453 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
1454 cli_dispatcher.got_remote_reset = false;
1455 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
1456 cli_dispatcher.got_connect = false;
1457 }
1458 CHECK_AND_WAIT_TRUE(conn->is_connected());
1459 ASSERT_TRUE(conn->is_connected());
1460 m = new MPing();
1461 ASSERT_EQ(conn->send_message(m), 0);
1462 std::unique_lock l{cli_dispatcher.lock};
1463 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1464 cli_dispatcher.got_new = false;
1465 }
1466 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1467 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1468 srv_dispatcher.last_accept);
1469 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
1470
1471 server_msgr->shutdown();
1472 client_msgr->shutdown();
1473 server_msgr->wait();
1474 client_msgr->wait();
1475 }
1476
1477 TEST_P(MessengerTest, AuthTest) {
1478 g_ceph_context->_conf.set_val("auth_cluster_required", "cephx");
1479 g_ceph_context->_conf.set_val("auth_service_required", "cephx");
1480 g_ceph_context->_conf.set_val("auth_client_required", "cephx");
1481 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1482 entity_addr_t bind_addr;
1483 bind_addr.parse("v2:127.0.0.1");
1484 server_msgr->bind(bind_addr);
1485 server_msgr->add_dispatcher_head(&srv_dispatcher);
1486 server_msgr->start();
1487
1488 client_msgr->add_dispatcher_head(&cli_dispatcher);
1489 client_msgr->start();
1490
1491 // 1. simple auth round trip
1492 MPing *m = new MPing();
1493 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1494 server_msgr->get_myaddrs());
1495 {
1496 ASSERT_EQ(conn->send_message(m), 0);
1497 std::unique_lock l{cli_dispatcher.lock};
1498 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1499 cli_dispatcher.got_new = false;
1500 }
1501 ASSERT_TRUE(conn->is_connected());
1502 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1503
1504 // 2. mix auth
1505 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
1506 g_ceph_context->_conf.set_val("auth_service_required", "none");
1507 g_ceph_context->_conf.set_val("auth_client_required", "none");
1508 conn->mark_down();
1509 ASSERT_FALSE(conn->is_connected());
1510 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1511 server_msgr->get_myaddrs());
1512 {
1513 MPing *m = new MPing();
1514 ASSERT_EQ(conn->send_message(m), 0);
1515 std::unique_lock l{cli_dispatcher.lock};
1516 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1517 cli_dispatcher.got_new = false;
1518 }
1519 ASSERT_TRUE(conn->is_connected());
1520 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1521 server_msgr->shutdown();
1522 client_msgr->shutdown();
1523 server_msgr->wait();
1524 client_msgr->wait();
1525 }
1526
1527 TEST_P(MessengerTest, MessageTest) {
1528 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1529 entity_addr_t bind_addr;
1530 bind_addr.parse("v2:127.0.0.1");
1531 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1532 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1533 p = Messenger::Policy::lossless_peer(0);
1534 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1535
1536 server_msgr->bind(bind_addr);
1537 server_msgr->add_dispatcher_head(&srv_dispatcher);
1538 server_msgr->start();
1539 client_msgr->add_dispatcher_head(&cli_dispatcher);
1540 client_msgr->start();
1541
1542
1543 // 1. A very large "front"(as well as "payload")
1544 // Because a external message need to invade Messenger::decode_message,
1545 // here we only use existing message class(MCommand)
1546 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1547 server_msgr->get_myaddrs());
1548 {
1549 uuid_d uuid;
1550 uuid.generate_random();
1551 vector<string> cmds;
1552 string s("abcdefghijklmnopqrstuvwxyz");
1553 for (int i = 0; i < 1024*30; i++)
1554 cmds.push_back(s);
1555 MCommand *m = new MCommand(uuid);
1556 m->cmd = cmds;
1557 conn->send_message(m);
1558 std::unique_lock l{cli_dispatcher.lock};
1559 cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; });
1560 ASSERT_TRUE(cli_dispatcher.got_new);
1561 cli_dispatcher.got_new = false;
1562 }
1563
1564 // 2. A very large "data"
1565 {
1566 bufferlist bl;
1567 string s("abcdefghijklmnopqrstuvwxyz");
1568 for (int i = 0; i < 1024*30; i++)
1569 bl.append(s);
1570 MPing *m = new MPing();
1571 m->set_data(bl);
1572 conn->send_message(m);
1573 utime_t t;
1574 t += 1000*1000*500;
1575 std::unique_lock l{cli_dispatcher.lock};
1576 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1577 ASSERT_TRUE(cli_dispatcher.got_new);
1578 cli_dispatcher.got_new = false;
1579 }
1580 server_msgr->shutdown();
1581 client_msgr->shutdown();
1582 server_msgr->wait();
1583 client_msgr->wait();
1584 }
1585
1586
1587 class SyntheticWorkload;
1588
1589 struct Payload {
1590 enum Who : uint8_t {
1591 PING = 0,
1592 PONG = 1,
1593 };
1594 uint8_t who = 0;
1595 uint64_t seq = 0;
1596 bufferlist data;
1597
1598 Payload(Who who, uint64_t seq, const bufferlist& data)
1599 : who(who), seq(seq), data(data)
1600 {}
1601 Payload() = default;
1602 DENC(Payload, v, p) {
1603 DENC_START(1, 1, p);
1604 denc(v.who, p);
1605 denc(v.seq, p);
1606 denc(v.data, p);
1607 DENC_FINISH(p);
1608 }
1609 };
1610 WRITE_CLASS_DENC(Payload)
1611
1612 ostream& operator<<(ostream& out, const Payload &pl)
1613 {
1614 return out << "reply=" << pl.who << " i = " << pl.seq;
1615 }
1616
1617 class SyntheticDispatcher : public Dispatcher {
1618 public:
1619 ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock");
1620 ceph::condition_variable cond;
1621 bool is_server;
1622 bool got_new;
1623 bool got_remote_reset;
1624 bool got_connect;
1625 map<ConnectionRef, list<uint64_t> > conn_sent;
1626 map<uint64_t, bufferlist> sent;
1627 atomic<uint64_t> index;
1628 SyntheticWorkload *workload;
1629
1630 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
1631 Dispatcher(g_ceph_context), is_server(s), got_new(false),
1632 got_remote_reset(false), got_connect(false), index(0), workload(wl) {
1633 }
1634 bool ms_can_fast_dispatch_any() const override { return true; }
1635 bool ms_can_fast_dispatch(const Message *m) const override {
1636 switch (m->get_type()) {
1637 case CEPH_MSG_PING:
1638 case MSG_COMMAND:
1639 return true;
1640 default:
1641 return false;
1642 }
1643 }
1644
1645 void ms_handle_fast_connect(Connection *con) override {
1646 std::lock_guard l{lock};
1647 list<uint64_t> c = conn_sent[con];
1648 for (list<uint64_t>::iterator it = c.begin();
1649 it != c.end(); ++it)
1650 sent.erase(*it);
1651 conn_sent.erase(con);
1652 got_connect = true;
1653 cond.notify_all();
1654 }
1655 void ms_handle_fast_accept(Connection *con) override {
1656 std::lock_guard l{lock};
1657 list<uint64_t> c = conn_sent[con];
1658 for (list<uint64_t>::iterator it = c.begin();
1659 it != c.end(); ++it)
1660 sent.erase(*it);
1661 conn_sent.erase(con);
1662 cond.notify_all();
1663 }
1664 bool ms_dispatch(Message *m) override {
1665 ceph_abort();
1666 }
1667 bool ms_handle_reset(Connection *con) override;
1668 void ms_handle_remote_reset(Connection *con) override {
1669 std::lock_guard l{lock};
1670 list<uint64_t> c = conn_sent[con];
1671 for (list<uint64_t>::iterator it = c.begin();
1672 it != c.end(); ++it)
1673 sent.erase(*it);
1674 conn_sent.erase(con);
1675 got_remote_reset = true;
1676 }
1677 bool ms_handle_refused(Connection *con) override {
1678 return false;
1679 }
1680 void ms_fast_dispatch(Message *m) override {
1681 // MSG_COMMAND is used to disorganize regular message flow
1682 if (m->get_type() == MSG_COMMAND) {
1683 m->put();
1684 return ;
1685 }
1686
1687 Payload pl;
1688 auto p = m->get_data().cbegin();
1689 decode(pl, p);
1690 if (pl.who == Payload::PING) {
1691 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1692 reply_message(m, pl);
1693 m->put();
1694 std::lock_guard l{lock};
1695 got_new = true;
1696 cond.notify_all();
1697 } else {
1698 std::lock_guard l{lock};
1699 if (sent.count(pl.seq)) {
1700 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1701 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
1702 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
1703 conn_sent[m->get_connection()].pop_front();
1704 sent.erase(pl.seq);
1705 }
1706 m->put();
1707 got_new = true;
1708 cond.notify_all();
1709 }
1710 }
1711
1712 int ms_handle_fast_authentication(Connection *con) override {
1713 return 1;
1714 }
1715
1716 void reply_message(const Message *m, Payload& pl) {
1717 pl.who = Payload::PONG;
1718 bufferlist bl;
1719 encode(pl, bl);
1720 MPing *rm = new MPing();
1721 rm->set_data(bl);
1722 m->get_connection()->send_message(rm);
1723 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
1724 }
1725
1726 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
1727 Message *m = new MPing();
1728 Payload pl{Payload::PING, index++, data};
1729 bufferlist bl;
1730 encode(pl, bl);
1731 m->set_data(bl);
1732 if (!con->get_messenger()->get_default_policy().lossy) {
1733 std::lock_guard l{lock};
1734 sent[pl.seq] = pl.data;
1735 conn_sent[con].push_back(pl.seq);
1736 }
1737 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
1738 ASSERT_EQ(0, con->send_message(m));
1739 }
1740
1741 uint64_t get_pending() {
1742 std::lock_guard l{lock};
1743 return sent.size();
1744 }
1745
1746 void clear_pending(ConnectionRef con) {
1747 std::lock_guard l{lock};
1748
1749 for (list<uint64_t>::iterator it = conn_sent[con].begin();
1750 it != conn_sent[con].end(); ++it)
1751 sent.erase(*it);
1752 conn_sent.erase(con);
1753 }
1754
1755 void print() {
1756 for (auto && p : conn_sent) {
1757 if (!p.second.empty()) {
1758 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
1759 }
1760 }
1761 }
1762 };
1763
1764
1765 class SyntheticWorkload {
1766 ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock");
1767 ceph::condition_variable cond;
1768 set<Messenger*> available_servers;
1769 set<Messenger*> available_clients;
1770 Messenger::Policy client_policy;
1771 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
1772 SyntheticDispatcher dispatcher;
1773 gen_type rng;
1774 vector<bufferlist> rand_data;
1775 DummyAuthClientServer dummy_auth;
1776
1777 public:
1778 const unsigned max_in_flight = 0;
1779 const unsigned max_connections = 0;
1780 static const unsigned max_message_len = 1024 * 1024 * 4;
1781
1782 SyntheticWorkload(int servers, int clients, string type, int random_num,
1783 Messenger::Policy srv_policy, Messenger::Policy cli_policy,
1784 int _max_in_flight = 64, int _max_connections = 128)
1785 : client_policy(cli_policy),
1786 dispatcher(false, this),
1787 rng(time(NULL)),
1788 dummy_auth(g_ceph_context),
1789 max_in_flight(_max_in_flight),
1790 max_connections(_max_connections) {
1791
1792 dummy_auth.auth_registry.refresh_config();
1793 Messenger *msgr;
1794 int base_port = 16800;
1795 entity_addr_t bind_addr;
1796 char addr[64];
1797 for (int i = 0; i < servers; ++i) {
1798 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
1799 "server", getpid()+i);
1800 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
1801 base_port+i);
1802 bind_addr.parse(addr);
1803 msgr->bind(bind_addr);
1804 msgr->add_dispatcher_head(&dispatcher);
1805 msgr->set_auth_client(&dummy_auth);
1806 msgr->set_auth_server(&dummy_auth);
1807
1808 ceph_assert(msgr);
1809 msgr->set_default_policy(srv_policy);
1810 available_servers.insert(msgr);
1811 msgr->start();
1812 }
1813
1814 for (int i = 0; i < clients; ++i) {
1815 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
1816 "client", getpid()+i+servers);
1817 if (cli_policy.standby) {
1818 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
1819 base_port+i+servers);
1820 bind_addr.parse(addr);
1821 msgr->bind(bind_addr);
1822 }
1823 msgr->add_dispatcher_head(&dispatcher);
1824 msgr->set_auth_client(&dummy_auth);
1825 msgr->set_auth_server(&dummy_auth);
1826
1827 ceph_assert(msgr);
1828 msgr->set_default_policy(cli_policy);
1829 available_clients.insert(msgr);
1830 msgr->start();
1831 }
1832
1833 for (int i = 0; i < random_num; i++) {
1834 bufferlist bl;
1835 boost::uniform_int<> u(32, max_message_len);
1836 uint64_t value_len = u(rng);
1837 bufferptr bp(value_len);
1838 bp.zero();
1839 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1840 memcpy(bp.c_str()+j, &i, sizeof(i));
1841 j += 4096;
1842 }
1843
1844 bl.append(bp);
1845 rand_data.push_back(bl);
1846 }
1847 }
1848
1849 ConnectionRef _get_random_connection() {
1850 while (dispatcher.get_pending() > max_in_flight) {
1851 lock.unlock();
1852 usleep(500);
1853 lock.lock();
1854 }
1855 ceph_assert(ceph_mutex_is_locked(lock));
1856 boost::uniform_int<> choose(0, available_connections.size() - 1);
1857 int index = choose(rng);
1858 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1859 for (; index > 0; --index, ++i) ;
1860 return i->first;
1861 }
1862
1863 bool can_create_connection() {
1864 return available_connections.size() < max_connections;
1865 }
1866
1867 void generate_connection() {
1868 std::lock_guard l{lock};
1869 if (!can_create_connection())
1870 return ;
1871
1872 Messenger *server, *client;
1873 {
1874 boost::uniform_int<> choose(0, available_servers.size() - 1);
1875 int index = choose(rng);
1876 set<Messenger*>::iterator i = available_servers.begin();
1877 for (; index > 0; --index, ++i) ;
1878 server = *i;
1879 }
1880 {
1881 boost::uniform_int<> choose(0, available_clients.size() - 1);
1882 int index = choose(rng);
1883 set<Messenger*>::iterator i = available_clients.begin();
1884 for (; index > 0; --index, ++i) ;
1885 client = *i;
1886 }
1887
1888 pair<Messenger*, Messenger*> p;
1889 {
1890 boost::uniform_int<> choose(0, available_servers.size() - 1);
1891 if (server->get_default_policy().server) {
1892 p = make_pair(client, server);
1893 ConnectionRef conn = client->connect_to(server->get_mytype(),
1894 server->get_myaddrs());
1895 available_connections[conn] = p;
1896 } else {
1897 ConnectionRef conn = client->connect_to(server->get_mytype(),
1898 server->get_myaddrs());
1899 p = make_pair(client, server);
1900 available_connections[conn] = p;
1901 }
1902 }
1903 }
1904
1905 void send_message() {
1906 std::lock_guard l{lock};
1907 ConnectionRef conn = _get_random_connection();
1908 boost::uniform_int<> true_false(0, 99);
1909 int val = true_false(rng);
1910 if (val >= 95) {
1911 uuid_d uuid;
1912 uuid.generate_random();
1913 MCommand *m = new MCommand(uuid);
1914 vector<string> cmds;
1915 cmds.push_back("command");
1916 m->cmd = cmds;
1917 m->set_priority(200);
1918 conn->send_message(m);
1919 } else {
1920 boost::uniform_int<> u(0, rand_data.size()-1);
1921 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1922 }
1923 }
1924
1925 void send_large_message(bool inject_network_congestion=false) {
1926 std::lock_guard l{lock};
1927 ConnectionRef conn = _get_random_connection();
1928 uuid_d uuid;
1929 uuid.generate_random();
1930 MCommand *m = new MCommand(uuid);
1931 vector<string> cmds;
1932 cmds.push_back("command");
1933 // set the random data to make the large message
1934 bufferlist bl;
1935 string s("abcdefghijklmnopqrstuvwxyz");
1936 for (int i = 0; i < 1024*256; i++)
1937 bl.append(s);
1938 // bl is around 6M
1939 m->set_data(bl);
1940 m->cmd = cmds;
1941 m->set_priority(200);
1942 // setup after connection is ready
1943 if (inject_network_congestion && conn->is_connected()) {
1944 g_ceph_context->_conf.set_val("ms_inject_network_congestion", "100");
1945 } else {
1946 g_ceph_context->_conf.set_val("ms_inject_network_congestion", "0");
1947 }
1948 conn->send_message(m);
1949 }
1950
1951 void drop_connection() {
1952 std::lock_guard l{lock};
1953 if (available_connections.size() < 10)
1954 return;
1955 ConnectionRef conn = _get_random_connection();
1956 dispatcher.clear_pending(conn);
1957 conn->mark_down();
1958 if (!client_policy.server &&
1959 !client_policy.lossy &&
1960 client_policy.standby) {
1961 // it's a lossless policy, so we need to mark down each side
1962 pair<Messenger*, Messenger*> &p = available_connections[conn];
1963 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1964 ASSERT_EQ(conn->get_messenger(), p.first);
1965 ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
1966 p.first->get_myaddrs());
1967 peer->mark_down();
1968 dispatcher.clear_pending(peer);
1969 available_connections.erase(peer);
1970 }
1971 }
1972 ASSERT_EQ(available_connections.erase(conn), 1U);
1973 }
1974
1975 void print_internal_state(bool detail=false) {
1976 std::lock_guard l{lock};
1977 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1978 << " inflight messages: " << dispatcher.get_pending() << dendl;
1979 if (detail && !available_connections.empty()) {
1980 dispatcher.print();
1981 }
1982 }
1983
1984 void wait_for_done() {
1985 int64_t tick_us = 1000 * 100; // 100ms
1986 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1987 int i = 0;
1988 while (dispatcher.get_pending()) {
1989 usleep(tick_us);
1990 timeout_us -= tick_us;
1991 if (i++ % 50 == 0)
1992 print_internal_state(true);
1993 if (timeout_us < 0)
1994 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
1995 }
1996 for (set<Messenger*>::iterator it = available_servers.begin();
1997 it != available_servers.end(); ++it) {
1998 (*it)->shutdown();
1999 (*it)->wait();
2000 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
2001 delete (*it);
2002 }
2003 available_servers.clear();
2004
2005 for (set<Messenger*>::iterator it = available_clients.begin();
2006 it != available_clients.end(); ++it) {
2007 (*it)->shutdown();
2008 (*it)->wait();
2009 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
2010 delete (*it);
2011 }
2012 available_clients.clear();
2013 }
2014
2015 void handle_reset(Connection *con) {
2016 std::lock_guard l{lock};
2017 available_connections.erase(con);
2018 dispatcher.clear_pending(con);
2019 }
2020 };
2021
2022 bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
2023 workload->handle_reset(con);
2024 return true;
2025 }
2026
2027 TEST_P(MessengerTest, SyntheticStressTest) {
2028 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
2029 Messenger::Policy::stateful_server(0),
2030 Messenger::Policy::lossless_client(0));
2031 for (int i = 0; i < 100; ++i) {
2032 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2033 test_msg.generate_connection();
2034 }
2035 gen_type rng(time(NULL));
2036 for (int i = 0; i < 5000; ++i) {
2037 if (!(i % 10)) {
2038 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2039 test_msg.print_internal_state();
2040 }
2041 boost::uniform_int<> true_false(0, 99);
2042 int val = true_false(rng);
2043 if (val > 90) {
2044 test_msg.generate_connection();
2045 } else if (val > 80) {
2046 test_msg.drop_connection();
2047 } else if (val > 10) {
2048 test_msg.send_message();
2049 } else {
2050 usleep(rand() % 1000 + 500);
2051 }
2052 }
2053 test_msg.wait_for_done();
2054 }
2055
2056 TEST_P(MessengerTest, SyntheticStressTest1) {
2057 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2058 Messenger::Policy::lossless_peer_reuse(0),
2059 Messenger::Policy::lossless_peer_reuse(0));
2060 for (int i = 0; i < 10; ++i) {
2061 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2062 test_msg.generate_connection();
2063 }
2064 gen_type rng(time(NULL));
2065 for (int i = 0; i < 10000; ++i) {
2066 if (!(i % 10)) {
2067 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2068 test_msg.print_internal_state();
2069 }
2070 boost::uniform_int<> true_false(0, 99);
2071 int val = true_false(rng);
2072 if (val > 80) {
2073 test_msg.generate_connection();
2074 } else if (val > 60) {
2075 test_msg.drop_connection();
2076 } else if (val > 10) {
2077 test_msg.send_message();
2078 } else {
2079 usleep(rand() % 1000 + 500);
2080 }
2081 }
2082 test_msg.wait_for_done();
2083 }
2084
2085
2086 TEST_P(MessengerTest, SyntheticInjectTest) {
2087 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
2088 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2089 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2090 g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216");
2091 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
2092 Messenger::Policy::stateful_server(0),
2093 Messenger::Policy::lossless_client(0));
2094 for (int i = 0; i < 100; ++i) {
2095 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2096 test_msg.generate_connection();
2097 }
2098 gen_type rng(time(NULL));
2099 for (int i = 0; i < 1000; ++i) {
2100 if (!(i % 10)) {
2101 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2102 test_msg.print_internal_state();
2103 }
2104 boost::uniform_int<> true_false(0, 99);
2105 int val = true_false(rng);
2106 if (val > 90) {
2107 test_msg.generate_connection();
2108 } else if (val > 80) {
2109 test_msg.drop_connection();
2110 } else if (val > 10) {
2111 test_msg.send_message();
2112 } else {
2113 usleep(rand() % 500 + 100);
2114 }
2115 }
2116 test_msg.wait_for_done();
2117 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2118 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2119 g_ceph_context->_conf.set_val(
2120 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
2121 }
2122
2123 TEST_P(MessengerTest, SyntheticInjectTest2) {
2124 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2125 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2126 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2127 Messenger::Policy::lossless_peer_reuse(0),
2128 Messenger::Policy::lossless_peer_reuse(0));
2129 for (int i = 0; i < 100; ++i) {
2130 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2131 test_msg.generate_connection();
2132 }
2133 gen_type rng(time(NULL));
2134 for (int i = 0; i < 1000; ++i) {
2135 if (!(i % 10)) {
2136 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2137 test_msg.print_internal_state();
2138 }
2139 boost::uniform_int<> true_false(0, 99);
2140 int val = true_false(rng);
2141 if (val > 90) {
2142 test_msg.generate_connection();
2143 } else if (val > 80) {
2144 test_msg.drop_connection();
2145 } else if (val > 10) {
2146 test_msg.send_message();
2147 } else {
2148 usleep(rand() % 500 + 100);
2149 }
2150 }
2151 test_msg.wait_for_done();
2152 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2153 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2154 }
2155
2156 TEST_P(MessengerTest, SyntheticInjectTest3) {
2157 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600");
2158 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2159 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2160 Messenger::Policy::stateless_server(0),
2161 Messenger::Policy::lossy_client(0));
2162 for (int i = 0; i < 100; ++i) {
2163 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2164 test_msg.generate_connection();
2165 }
2166 gen_type rng(time(NULL));
2167 for (int i = 0; i < 1000; ++i) {
2168 if (!(i % 10)) {
2169 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2170 test_msg.print_internal_state();
2171 }
2172 boost::uniform_int<> true_false(0, 99);
2173 int val = true_false(rng);
2174 if (val > 90) {
2175 test_msg.generate_connection();
2176 } else if (val > 80) {
2177 test_msg.drop_connection();
2178 } else if (val > 10) {
2179 test_msg.send_message();
2180 } else {
2181 usleep(rand() % 500 + 100);
2182 }
2183 }
2184 test_msg.wait_for_done();
2185 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2186 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2187 }
2188
2189
2190 TEST_P(MessengerTest, SyntheticInjectTest4) {
2191 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2192 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2193 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1");
2194 g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd");
2195 g_ceph_context->_conf.set_val("ms_inject_delay_max", "5");
2196 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2197 Messenger::Policy::lossless_peer(0),
2198 Messenger::Policy::lossless_peer(0));
2199 for (int i = 0; i < 100; ++i) {
2200 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2201 test_msg.generate_connection();
2202 }
2203 gen_type rng(time(NULL));
2204 for (int i = 0; i < 1000; ++i) {
2205 if (!(i % 10)) {
2206 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2207 test_msg.print_internal_state();
2208 }
2209 boost::uniform_int<> true_false(0, 99);
2210 int val = true_false(rng);
2211 if (val > 95) {
2212 test_msg.generate_connection();
2213 } else if (val > 80) {
2214 // test_msg.drop_connection();
2215 } else if (val > 10) {
2216 test_msg.send_message();
2217 } else {
2218 usleep(rand() % 500 + 100);
2219 }
2220 }
2221 test_msg.wait_for_done();
2222 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2223 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2224 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0");
2225 g_ceph_context->_conf.set_val("ms_inject_delay_type", "");
2226 g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
2227 }
2228
2229 // This is test for network block, means ::send return EAGAIN
2230 TEST_P(MessengerTest, SyntheticInjectTest5) {
2231 SyntheticWorkload test_msg(1, 8, GetParam(), 100,
2232 Messenger::Policy::stateful_server(0),
2233 Messenger::Policy::lossless_client(0),
2234 64, 2);
2235 bool simulate_network_congestion = true;
2236 for (int i = 0; i < 2; ++i)
2237 test_msg.generate_connection();
2238 for (int i = 0; i < 5000; ++i) {
2239 if (!(i % 10)) {
2240 ldout(g_ceph_context, 0) << "Op " << i << ": " << dendl;
2241 test_msg.print_internal_state();
2242 }
2243 if (i < 1600) {
2244 // means that we would stuck 1600 * 6M (9.6G) around with 2 connections
2245 test_msg.send_large_message(simulate_network_congestion);
2246 } else {
2247 simulate_network_congestion = false;
2248 test_msg.send_large_message(simulate_network_congestion);
2249 }
2250 }
2251 test_msg.wait_for_done();
2252 }
2253
2254
2255 class MarkdownDispatcher : public Dispatcher {
2256 ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");
2257 set<ConnectionRef> conns;
2258 bool last_mark;
2259 public:
2260 std::atomic<uint64_t> count = { 0 };
2261 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context),
2262 last_mark(false) {
2263 }
2264 bool ms_can_fast_dispatch_any() const override { return false; }
2265 bool ms_can_fast_dispatch(const Message *m) const override {
2266 switch (m->get_type()) {
2267 case CEPH_MSG_PING:
2268 return true;
2269 default:
2270 return false;
2271 }
2272 }
2273
2274 void ms_handle_fast_connect(Connection *con) override {
2275 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2276 std::lock_guard l{lock};
2277 conns.insert(con);
2278 }
2279 void ms_handle_fast_accept(Connection *con) override {
2280 std::lock_guard l{lock};
2281 conns.insert(con);
2282 }
2283 bool ms_dispatch(Message *m) override {
2284 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
2285 std::lock_guard l{lock};
2286 count++;
2287 conns.insert(m->get_connection());
2288 if (conns.size() < 2 && !last_mark) {
2289 m->put();
2290 return true;
2291 }
2292
2293 last_mark = true;
2294 usleep(rand() % 500);
2295 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
2296 if ((*it) != m->get_connection().get()) {
2297 (*it)->mark_down();
2298 conns.erase(it);
2299 break;
2300 }
2301 }
2302 if (conns.empty())
2303 last_mark = false;
2304 m->put();
2305 return true;
2306 }
2307 bool ms_handle_reset(Connection *con) override {
2308 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2309 std::lock_guard l{lock};
2310 conns.erase(con);
2311 usleep(rand() % 500);
2312 return true;
2313 }
2314 void ms_handle_remote_reset(Connection *con) override {
2315 std::lock_guard l{lock};
2316 conns.erase(con);
2317 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2318 }
2319 bool ms_handle_refused(Connection *con) override {
2320 return false;
2321 }
2322 void ms_fast_dispatch(Message *m) override {
2323 ceph_abort();
2324 }
2325 int ms_handle_fast_authentication(Connection *con) override {
2326 return 1;
2327 }
2328 };
2329
2330
2331 // Markdown with external lock
2332 TEST_P(MessengerTest, MarkdownTest) {
2333 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
2334 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
2335 DummyAuthClientServer dummy_auth(g_ceph_context);
2336 dummy_auth.auth_registry.refresh_config();
2337 entity_addr_t bind_addr;
2338 bind_addr.parse("v2:127.0.0.1:16800");
2339 server_msgr->bind(bind_addr);
2340 server_msgr->add_dispatcher_head(&srv_dispatcher);
2341 server_msgr->set_auth_client(&dummy_auth);
2342 server_msgr->set_auth_server(&dummy_auth);
2343 server_msgr->start();
2344 bind_addr.parse("v2:127.0.0.1:16801");
2345 server_msgr2->bind(bind_addr);
2346 server_msgr2->add_dispatcher_head(&srv_dispatcher);
2347 server_msgr2->set_auth_client(&dummy_auth);
2348 server_msgr2->set_auth_server(&dummy_auth);
2349 server_msgr2->start();
2350
2351 client_msgr->add_dispatcher_head(&cli_dispatcher);
2352 client_msgr->set_auth_client(&dummy_auth);
2353 client_msgr->set_auth_server(&dummy_auth);
2354 client_msgr->start();
2355
2356 int i = 1000;
2357 uint64_t last = 0;
2358 bool equal = false;
2359 uint64_t equal_count = 0;
2360 while (i--) {
2361 ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
2362 server_msgr->get_myaddrs());
2363 ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
2364 server_msgr2->get_myaddrs());
2365 MPing *m = new MPing();
2366 ASSERT_EQ(conn1->send_message(m), 0);
2367 m = new MPing();
2368 ASSERT_EQ(conn2->send_message(m), 0);
2369 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
2370 if (srv_dispatcher.count == last) {
2371 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
2372 equal = true;
2373 equal_count++;
2374 } else {
2375 equal = false;
2376 equal_count = 0;
2377 }
2378 last = srv_dispatcher.count;
2379 if (equal_count)
2380 usleep(1000*500);
2381 ASSERT_FALSE(equal && equal_count > 3);
2382 }
2383 server_msgr->shutdown();
2384 client_msgr->shutdown();
2385 server_msgr2->shutdown();
2386 server_msgr->wait();
2387 client_msgr->wait();
2388 server_msgr2->wait();
2389 delete server_msgr2;
2390 }
2391
2392 INSTANTIATE_TEST_SUITE_P(
2393 Messenger,
2394 MessengerTest,
2395 ::testing::Values(
2396 "async+posix"
2397 )
2398 );
2399
2400 int main(int argc, char **argv) {
2401 auto args = argv_to_vec(argc, argv);
2402
2403 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
2404 CODE_ENVIRONMENT_UTILITY,
2405 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
2406 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
2407 g_ceph_context->_conf.set_val("auth_service_required", "none");
2408 g_ceph_context->_conf.set_val("auth_client_required", "none");
2409 g_ceph_context->_conf.set_val("keyring", "/dev/null");
2410 g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2411 g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true");
2412 g_ceph_context->_conf.set_val("ms_die_on_old_message", "true");
2413 g_ceph_context->_conf.set_val("ms_max_backoff", "1");
2414 common_init_finish(g_ceph_context);
2415
2416 ::testing::InitGoogleTest(&argc, argv);
2417 return RUN_ALL_TESTS();
2418 }
2419
2420 /*
2421 * Local Variables:
2422 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
2423 * End:
2424 */