]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/msgr/test_msgr.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <atomic>
18#include <iostream>
20effc67 19#include <list>
f6b5b4d7 20#include <memory>
20effc67 21#include <set>
7c673cae
FG
22#include <stdlib.h>
23#include <time.h>
20effc67
TL
24#include <unistd.h>
25
26#include <boost/random/binomial_distribution.hpp>
27#include <boost/random/mersenne_twister.hpp>
28#include <boost/random/uniform_int.hpp>
29#include <gtest/gtest.h>
30
31#define MSG_POLICY_UNIT_TESTING
32
7c673cae 33#include "common/ceph_argparse.h"
20effc67 34#include "common/ceph_mutex.h"
7c673cae 35#include "global/global_init.h"
20effc67
TL
36#include "messages/MCommand.h"
37#include "messages/MPing.h"
38#include "msg/Connection.h"
7c673cae 39#include "msg/Dispatcher.h"
7c673cae
FG
40#include "msg/Message.h"
41#include "msg/Messenger.h"
20effc67 42#include "msg/msg_types.h"
7c673cae
FG
43
44typedef boost::mt11213b gen_type;
45
46#include "common/dout.h"
11fdf7f2
TL
47#include "include/ceph_assert.h"
48
49#include "auth/DummyAuth.h"
7c673cae
FG
50
51#define dout_subsys ceph_subsys_ms
52#undef dout_prefix
53#define dout_prefix *_dout << " ceph_test_msgr "
54
55
7c673cae
FG
56#define CHECK_AND_WAIT_TRUE(expr) do { \
57 int n = 1000; \
58 while (--n) { \
59 if (expr) \
60 break; \
61 usleep(1000); \
62 } \
63} while(0);
64
20effc67
TL
65using namespace std;
66
7c673cae
FG
67class MessengerTest : public ::testing::TestWithParam<const char*> {
68 public:
11fdf7f2 69 DummyAuthClientServer dummy_auth;
7c673cae
FG
70 Messenger *server_msgr;
71 Messenger *client_msgr;
72
11fdf7f2
TL
73 MessengerTest() : dummy_auth(g_ceph_context),
74 server_msgr(NULL), client_msgr(NULL) {
75 dummy_auth.auth_registry.refresh_config();
76 }
7c673cae
FG
77 void SetUp() override {
78 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
f67539c2
TL
79 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
80 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
7c673cae
FG
81 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
82 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
11fdf7f2
TL
83 server_msgr->set_auth_client(&dummy_auth);
84 server_msgr->set_auth_server(&dummy_auth);
85 client_msgr->set_auth_client(&dummy_auth);
86 client_msgr->set_auth_server(&dummy_auth);
9f95a23c 87 server_msgr->set_require_authorizer(false);
7c673cae
FG
88 }
89 void TearDown() override {
90 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
91 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
92 delete server_msgr;
93 delete client_msgr;
94 }
95
96};
97
98
99class FakeDispatcher : public Dispatcher {
100 public:
101 struct Session : public RefCountedObject {
102 atomic<uint64_t> count;
103 ConnectionRef con;
104
105 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
106 }
107 uint64_t get_count() { return count; }
108 };
109
9f95a23c
TL
110 ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock");
111 ceph::condition_variable cond;
7c673cae
FG
112 bool is_server;
113 bool got_new;
114 bool got_remote_reset;
115 bool got_connect;
116 bool loopback;
11fdf7f2 117 entity_addrvec_t last_accept;
9f95a23c 118 ConnectionRef *last_accept_con_ptr = nullptr;
7c673cae 119
9f95a23c 120 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
7c673cae 121 is_server(s), got_new(false), got_remote_reset(false),
11fdf7f2 122 got_connect(false), loopback(false) {
11fdf7f2 123 }
7c673cae
FG
124 bool ms_can_fast_dispatch_any() const override { return true; }
125 bool ms_can_fast_dispatch(const Message *m) const override {
126 switch (m->get_type()) {
127 case CEPH_MSG_PING:
128 return true;
129 default:
130 return false;
131 }
132 }
133
134 void ms_handle_fast_connect(Connection *con) override {
9f95a23c 135 std::scoped_lock l{lock};
7c673cae 136 lderr(g_ceph_context) << __func__ << " " << con << dendl;
11fdf7f2 137 auto s = con->get_priv();
7c673cae 138 if (!s) {
11fdf7f2
TL
139 auto session = new Session(con);
140 con->set_priv(RefCountedPtr{session, false});
141 lderr(g_ceph_context) << __func__ << " con: " << con
142 << " count: " << session->count << dendl;
7c673cae 143 }
7c673cae 144 got_connect = true;
9f95a23c 145 cond.notify_all();
7c673cae
FG
146 }
147 void ms_handle_fast_accept(Connection *con) override {
11fdf7f2 148 last_accept = con->get_peer_addrs();
9f95a23c
TL
149 if (last_accept_con_ptr) {
150 *last_accept_con_ptr = con;
151 }
11fdf7f2
TL
152 if (!con->get_priv()) {
153 con->set_priv(RefCountedPtr{new Session(con), false});
7c673cae 154 }
7c673cae
FG
155 }
156 bool ms_dispatch(Message *m) override {
11fdf7f2
TL
157 auto priv = m->get_connection()->get_priv();
158 auto s = static_cast<Session*>(priv.get());
7c673cae
FG
159 if (!s) {
160 s = new Session(m->get_connection());
11fdf7f2
TL
161 priv.reset(s, false);
162 m->get_connection()->set_priv(priv);
7c673cae 163 }
7c673cae
FG
164 s->count++;
165 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
166 if (is_server) {
167 reply_message(m);
168 }
9f95a23c 169 std::lock_guard l{lock};
7c673cae 170 got_new = true;
9f95a23c 171 cond.notify_all();
7c673cae
FG
172 m->put();
173 return true;
174 }
175 bool ms_handle_reset(Connection *con) override {
9f95a23c 176 std::lock_guard l{lock};
7c673cae 177 lderr(g_ceph_context) << __func__ << " " << con << dendl;
11fdf7f2
TL
178 auto priv = con->get_priv();
179 if (auto s = static_cast<Session*>(priv.get()); s) {
180 s->con.reset(); // break con <-> session ref cycle
181 con->set_priv(nullptr); // break ref <-> session cycle, if any
7c673cae
FG
182 }
183 return true;
184 }
185 void ms_handle_remote_reset(Connection *con) override {
9f95a23c 186 std::lock_guard l{lock};
7c673cae 187 lderr(g_ceph_context) << __func__ << " " << con << dendl;
11fdf7f2
TL
188 auto priv = con->get_priv();
189 if (auto s = static_cast<Session*>(priv.get()); s) {
190 s->con.reset(); // break con <-> session ref cycle
191 con->set_priv(nullptr); // break ref <-> session cycle, if any
7c673cae
FG
192 }
193 got_remote_reset = true;
9f95a23c 194 cond.notify_all();
7c673cae
FG
195 }
196 bool ms_handle_refused(Connection *con) override {
197 return false;
198 }
199 void ms_fast_dispatch(Message *m) override {
11fdf7f2
TL
200 auto priv = m->get_connection()->get_priv();
201 auto s = static_cast<Session*>(priv.get());
7c673cae
FG
202 if (!s) {
203 s = new Session(m->get_connection());
11fdf7f2
TL
204 priv.reset(s, false);
205 m->get_connection()->set_priv(priv);
7c673cae 206 }
7c673cae
FG
207 s->count++;
208 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
209 if (is_server) {
210 if (loopback)
11fdf7f2 211 ceph_assert(m->get_source().is_osd());
7c673cae
FG
212 else
213 reply_message(m);
214 } else if (loopback) {
11fdf7f2 215 ceph_assert(m->get_source().is_client());
7c673cae
FG
216 }
217 m->put();
9f95a23c 218 std::lock_guard l{lock};
7c673cae 219 got_new = true;
9f95a23c 220 cond.notify_all();
7c673cae
FG
221 }
222
11fdf7f2
TL
223 int ms_handle_authentication(Connection *con) override {
224 return 1;
7c673cae
FG
225 }
226
227 void reply_message(Message *m) {
228 MPing *rm = new MPing();
229 m->get_connection()->send_message(rm);
230 }
231};
232
233typedef FakeDispatcher::Session Session;
234
11fdf7f2
TL
235struct TestInterceptor : public Interceptor {
236
237 bool step_waiting = false;
238 bool waiting = true;
239 std::map<Connection *, uint32_t> current_step;
240 std::map<Connection *, std::list<uint32_t>> step_history;
241 std::map<uint32_t, std::optional<ACTION>> decisions;
242 std::set<uint32_t> breakpoints;
243
244 uint32_t count_step(Connection *conn, uint32_t step) {
245 uint32_t count = 0;
246 for (auto s : step_history[conn]) {
247 if (s == step) {
248 count++;
249 }
250 }
251 return count;
252 }
253
254 void breakpoint(uint32_t step) {
255 breakpoints.insert(step);
256 }
257
258 void remove_bp(uint32_t step) {
259 breakpoints.erase(step);
260 }
261
262 Connection *wait(uint32_t step, Connection *conn=nullptr) {
263 std::unique_lock<std::mutex> l(lock);
264 while(true) {
265 if (conn) {
266 auto it = current_step.find(conn);
267 if (it != current_step.end()) {
268 if (it->second == step) {
269 break;
270 }
271 }
272 } else {
273 for (auto it : current_step) {
274 if (it.second == step) {
275 conn = it.first;
276 break;
277 }
278 }
279 if (conn) {
280 break;
281 }
282 }
283 step_waiting = true;
284 cond_var.wait(l);
285 }
286 step_waiting = false;
287 return conn;
288 }
289
290 ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
291 if (decisions[step]) {
292 return *(decisions[step]);
293 }
294 waiting = true;
9f95a23c 295 cond_var.wait(l, [this] { return !waiting; });
11fdf7f2
TL
296 return *(decisions[step]);
297 }
298
299 void proceed(uint32_t step, ACTION decision) {
300 std::unique_lock<std::mutex> l(lock);
301 decisions[step] = decision;
302 if (waiting) {
303 waiting = false;
304 cond_var.notify_one();
305 }
306 }
307
308 ACTION intercept(Connection *conn, uint32_t step) override {
309 lderr(g_ceph_context) << __func__ << " conn(" << conn
310 << ") intercept called on step=" << step << dendl;
311
312 {
313 std::unique_lock<std::mutex> l(lock);
314 step_history[conn].push_back(step);
315 current_step[conn] = step;
316 if (step_waiting) {
317 cond_var.notify_one();
318 }
319 }
320
321 std::unique_lock<std::mutex> l(lock);
322 ACTION decision = ACTION::CONTINUE;
323 if (breakpoints.find(step) != breakpoints.end()) {
324 lderr(g_ceph_context) << __func__ << " conn(" << conn
325 << ") pausing on step=" << step << dendl;
326 decision = wait_for_decision(step, l);
327 } else {
328 if (decisions[step]) {
329 decision = *(decisions[step]);
330 }
331 }
332 lderr(g_ceph_context) << __func__ << " conn(" << conn
333 << ") resuming step=" << step << " with decision="
334 << decision << dendl;
335 decisions[step].reset();
336 return decision;
337 }
338
339};
340
341/**
342 * Scenario: A connects to B, and B connects to A at the same time.
343 */
344TEST_P(MessengerTest, ConnectionRaceTest) {
11fdf7f2
TL
345 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
346
347 TestInterceptor *cli_interceptor = new TestInterceptor();
348 TestInterceptor *srv_interceptor = new TestInterceptor();
349
350 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
351 server_msgr->interceptor = srv_interceptor;
352
353 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
354 client_msgr->interceptor = cli_interceptor;
355
356 entity_addr_t bind_addr;
357 bind_addr.parse("v2:127.0.0.1:3300");
358 server_msgr->bind(bind_addr);
359 server_msgr->add_dispatcher_head(&srv_dispatcher);
360 server_msgr->start();
361
362 bind_addr.parse("v2:127.0.0.1:3301");
363 client_msgr->bind(bind_addr);
364 client_msgr->add_dispatcher_head(&cli_dispatcher);
365 client_msgr->start();
366
367 // pause before sending client_ident message
f67539c2 368 cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
11fdf7f2 369 // pause before sending client_ident message
f67539c2 370 srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
11fdf7f2
TL
371
372 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
373 server_msgr->get_myaddrs());
374 MPing *m1 = new MPing();
375 ASSERT_EQ(c2s->send_message(m1), 0);
376
377 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
378 client_msgr->get_myaddrs());
379 MPing *m2 = new MPing();
380 ASSERT_EQ(s2c->send_message(m2), 0);
381
f67539c2
TL
382 cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get());
383 srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get());
11fdf7f2
TL
384
385 // at this point both connections (A->B, B->A) are paused just before sending
386 // the client_ident message.
387
f67539c2
TL
388 cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
389 srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
11fdf7f2 390
f67539c2
TL
391 cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
392 srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
11fdf7f2
TL
393
394 {
9f95a23c
TL
395 std::unique_lock l{cli_dispatcher.lock};
396 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
397 cli_dispatcher.got_new = false;
398 }
399
400 {
9f95a23c
TL
401 std::unique_lock l{srv_dispatcher.lock};
402 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
11fdf7f2
TL
403 srv_dispatcher.got_new = false;
404 }
405
406 ASSERT_TRUE(s2c->is_connected());
407 ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
408 ASSERT_TRUE(s2c->peer_is_client());
409
410 ASSERT_TRUE(c2s->is_connected());
411 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
412 ASSERT_TRUE(c2s->peer_is_osd());
413
414 client_msgr->shutdown();
415 client_msgr->wait();
416 server_msgr->shutdown();
417 server_msgr->wait();
418
419 delete cli_interceptor;
420 delete srv_interceptor;
421}
422
f6b5b4d7
TL
423/**
424 * Scenario: A connects to B, and B connects to A at the same time.
425 * The first (A -> B) connection gets to message flow handshake, the
426 * second (B -> A) connection is stuck waiting for a banner from A.
427 * After A sends client_ident to B, the first connection wins and B
428 * calls reuse_connection() to replace the second connection's socket
429 * while the second connection is still in BANNER_CONNECTING.
430 */
431TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) {
432 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
433
434 auto cli_interceptor = std::make_unique<TestInterceptor>();
435 auto srv_interceptor = std::make_unique<TestInterceptor>();
436
437 server_msgr->set_policy(entity_name_t::TYPE_CLIENT,
438 Messenger::Policy::lossless_peer_reuse(0));
439 server_msgr->interceptor = srv_interceptor.get();
440
441 client_msgr->set_policy(entity_name_t::TYPE_OSD,
442 Messenger::Policy::lossless_peer_reuse(0));
443 client_msgr->interceptor = cli_interceptor.get();
444
445 entity_addr_t bind_addr;
446 bind_addr.parse("v2:127.0.0.1:3300");
447 server_msgr->bind(bind_addr);
448 server_msgr->add_dispatcher_head(&srv_dispatcher);
449 server_msgr->start();
450
451 bind_addr.parse("v2:127.0.0.1:3301");
452 client_msgr->bind(bind_addr);
453 client_msgr->add_dispatcher_head(&cli_dispatcher);
454 client_msgr->start();
455
456 // pause before sending client_ident message
f67539c2 457 srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
f6b5b4d7
TL
458
459 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
460 client_msgr->get_myaddrs());
461 MPing *m1 = new MPing();
462 ASSERT_EQ(s2c->send_message(m1), 0);
463
f67539c2
TL
464 srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY);
465 srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
f6b5b4d7
TL
466
467 // pause before sending banner
f67539c2 468 cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
f6b5b4d7
TL
469
470 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
471 server_msgr->get_myaddrs());
472 MPing *m2 = new MPing();
473 ASSERT_EQ(c2s->send_message(m2), 0);
474
f67539c2
TL
475 cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
476 cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
f6b5b4d7
TL
477
478 // second connection is in BANNER_CONNECTING, ensure it stays so
479 // and send client_ident
f67539c2
TL
480 srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE);
481 srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
f6b5b4d7
TL
482
483 // handle client_ident -- triggers reuse_connection() with exproto
484 // in BANNER_CONNECTING
f67539c2
TL
485 cli_interceptor->breakpoint(Interceptor::STEP::READY);
486 cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE);
f6b5b4d7 487
f67539c2
TL
488 cli_interceptor->wait(Interceptor::STEP::READY);
489 cli_interceptor->remove_bp(Interceptor::STEP::READY);
f6b5b4d7
TL
490
491 // first connection is in READY
f67539c2
TL
492 Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE);
493 srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE);
f6b5b4d7 494
f67539c2
TL
495 srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE);
496 cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
f6b5b4d7
TL
497
498 {
499 std::unique_lock l{cli_dispatcher.lock};
500 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
501 cli_dispatcher.got_new = false;
502 }
503
504 {
505 std::unique_lock l{srv_dispatcher.lock};
506 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
507 srv_dispatcher.got_new = false;
508 }
509
510 EXPECT_TRUE(s2c->is_connected());
511 EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
512 EXPECT_TRUE(s2c->peer_is_client());
513
514 EXPECT_TRUE(c2s->is_connected());
515 EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
516 EXPECT_TRUE(c2s->peer_is_osd());
517
518 // closed in reuse_connection() -- EPIPE when writing banner/hello
519 EXPECT_FALSE(s2c_accepter->is_connected());
520
521 // established exactly once, never faulted and reconnected
f67539c2
TL
522 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u);
523 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u);
524 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u);
f6b5b4d7
TL
525
526 client_msgr->shutdown();
527 client_msgr->wait();
528 server_msgr->shutdown();
529 server_msgr->wait();
530}
531
11fdf7f2
TL
532/**
533 * Scenario:
534 * - A connects to B
535 * - A sends client_ident to B
536 * - B fails before sending server_ident to A
537 * - A reconnects
538 */
539TEST_P(MessengerTest, MissingServerIdenTest) {
11fdf7f2
TL
540 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
541
542 TestInterceptor *cli_interceptor = new TestInterceptor();
543 TestInterceptor *srv_interceptor = new TestInterceptor();
544
545 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
546 server_msgr->interceptor = srv_interceptor;
547
548 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
549 client_msgr->interceptor = cli_interceptor;
550
551 entity_addr_t bind_addr;
552 bind_addr.parse("v2:127.0.0.1:3300");
553 server_msgr->bind(bind_addr);
554 server_msgr->add_dispatcher_head(&srv_dispatcher);
555 server_msgr->start();
556
557 bind_addr.parse("v2:127.0.0.1:3301");
558 client_msgr->bind(bind_addr);
559 client_msgr->add_dispatcher_head(&cli_dispatcher);
560 client_msgr->start();
561
f67539c2
TL
562 // pause before sending server_ident message
563 srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
564
565 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
566 server_msgr->get_myaddrs());
567 MPing *m1 = new MPing();
568 ASSERT_EQ(c2s->send_message(m1), 0);
569
f67539c2
TL
570 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
571 srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
572
573 // We inject a message from this side of the connection to force it to be
574 // in standby when we inject the failure below
575 MPing *m2 = new MPing();
576 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
577
f67539c2 578 srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
11fdf7f2
TL
579
580 {
9f95a23c
TL
581 std::unique_lock l{srv_dispatcher.lock};
582 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
11fdf7f2
TL
583 srv_dispatcher.got_new = false;
584 }
585
586 {
9f95a23c
TL
587 std::unique_lock l{cli_dispatcher.lock};
588 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
589 cli_dispatcher.got_new = false;
590 }
591
592 ASSERT_TRUE(c2s->is_connected());
593 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
594 ASSERT_TRUE(c2s->peer_is_osd());
595
596 ASSERT_TRUE(c2s_accepter->is_connected());
597 ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
598 ASSERT_TRUE(c2s_accepter->peer_is_client());
599
600 client_msgr->shutdown();
601 client_msgr->wait();
602 server_msgr->shutdown();
603 server_msgr->wait();
604
605 delete cli_interceptor;
606 delete srv_interceptor;
607}
608
609/**
610 * Scenario:
611 * - A connects to B
612 * - A sends client_ident to B
613 * - B fails before sending server_ident to A
614 * - A goes to standby
615 * - B reconnects to A
616 */
617TEST_P(MessengerTest, MissingServerIdenTest2) {
11fdf7f2
TL
618 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
619
620 TestInterceptor *cli_interceptor = new TestInterceptor();
621 TestInterceptor *srv_interceptor = new TestInterceptor();
622
623 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
624 server_msgr->interceptor = srv_interceptor;
625
626 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
627 client_msgr->interceptor = cli_interceptor;
628
629 entity_addr_t bind_addr;
630 bind_addr.parse("v2:127.0.0.1:3300");
631 server_msgr->bind(bind_addr);
632 server_msgr->add_dispatcher_head(&srv_dispatcher);
633 server_msgr->start();
634
635 bind_addr.parse("v2:127.0.0.1:3301");
636 client_msgr->bind(bind_addr);
637 client_msgr->add_dispatcher_head(&cli_dispatcher);
638 client_msgr->start();
639
f67539c2
TL
640 // pause before sending server_ident message
641 srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
642
643 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
644 server_msgr->get_myaddrs());
645
f67539c2
TL
646 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
647 srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
648
649 // We inject a message from this side of the connection to force it to be
650 // in standby when we inject the failure below
651 MPing *m2 = new MPing();
652 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
653
f67539c2 654 srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
11fdf7f2
TL
655
656 {
9f95a23c
TL
657 std::unique_lock l{cli_dispatcher.lock};
658 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
659 cli_dispatcher.got_new = false;
660 }
661
662 ASSERT_TRUE(c2s->is_connected());
663 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
664 ASSERT_TRUE(c2s->peer_is_osd());
665
666 ASSERT_TRUE(c2s_accepter->is_connected());
667 ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
668 ASSERT_TRUE(c2s_accepter->peer_is_client());
669
670 client_msgr->shutdown();
671 client_msgr->wait();
672 server_msgr->shutdown();
673 server_msgr->wait();
674
675 delete cli_interceptor;
676 delete srv_interceptor;
677}
678
679/**
680 * Scenario:
681 * - A connects to B
682 * - A and B exchange messages
683 * - A fails
684 * - B goes into standby
685 * - A reconnects
686 */
687TEST_P(MessengerTest, ReconnectTest) {
11fdf7f2
TL
688 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
689
690 TestInterceptor *cli_interceptor = new TestInterceptor();
691 TestInterceptor *srv_interceptor = new TestInterceptor();
692
693 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
694 server_msgr->interceptor = srv_interceptor;
695
696 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
697 client_msgr->interceptor = cli_interceptor;
698
699 entity_addr_t bind_addr;
700 bind_addr.parse("v2:127.0.0.1:3300");
701 server_msgr->bind(bind_addr);
702 server_msgr->add_dispatcher_head(&srv_dispatcher);
703 server_msgr->start();
704
705 bind_addr.parse("v2:127.0.0.1:3301");
706 client_msgr->bind(bind_addr);
707 client_msgr->add_dispatcher_head(&cli_dispatcher);
708 client_msgr->start();
709
710 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
711 server_msgr->get_myaddrs());
712
713 MPing *m1 = new MPing();
714 ASSERT_EQ(c2s->send_message(m1), 0);
715
716 {
9f95a23c
TL
717 std::unique_lock l{cli_dispatcher.lock};
718 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
719 cli_dispatcher.got_new = false;
720 }
721
722 ASSERT_TRUE(c2s->is_connected());
723 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
724 ASSERT_TRUE(c2s->peer_is_osd());
725
f67539c2 726 cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
727
728 MPing *m2 = new MPing();
729 ASSERT_EQ(c2s->send_message(m2), 0);
730
f67539c2
TL
731 cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
732 cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
733
734 // at this point client and server are connected together
735
f67539c2 736 srv_interceptor->breakpoint(Interceptor::STEP::READY);
11fdf7f2
TL
737
738 // failing client
f67539c2 739 cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
11fdf7f2
TL
740
741 MPing *m3 = new MPing();
742 ASSERT_EQ(c2s->send_message(m3), 0);
743
f67539c2 744 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
11fdf7f2
TL
745 // the srv end of theconnection is now paused at ready
746 // this means that the reconnect was successful
f67539c2 747 srv_interceptor->remove_bp(Interceptor::STEP::READY);
11fdf7f2
TL
748
749 ASSERT_TRUE(c2s_accepter->peer_is_client());
750 // c2s_accepter sent 0 reconnect messages
f67539c2 751 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
11fdf7f2 752 // c2s_accepter sent 1 reconnect_ok messages
f67539c2 753 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u);
11fdf7f2 754 // c2s sent 1 reconnect messages
f67539c2 755 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
11fdf7f2 756 // c2s sent 0 reconnect_ok messages
f67539c2 757 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u);
11fdf7f2
TL
758
759 srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
760
761 {
9f95a23c
TL
762 std::unique_lock l{cli_dispatcher.lock};
763 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
764 cli_dispatcher.got_new = false;
765 }
766
767 client_msgr->shutdown();
768 client_msgr->wait();
769 server_msgr->shutdown();
770 server_msgr->wait();
771
772 delete cli_interceptor;
773 delete srv_interceptor;
774}
775
776/**
777 * Scenario:
778 * - A connects to B
779 * - A and B exchange messages
780 * - A fails
781 * - A reconnects // B reconnects
782 */
783TEST_P(MessengerTest, ReconnectRaceTest) {
11fdf7f2
TL
784 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
785
786 TestInterceptor *cli_interceptor = new TestInterceptor();
787 TestInterceptor *srv_interceptor = new TestInterceptor();
788
789 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
790 server_msgr->interceptor = srv_interceptor;
791
792 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
793 client_msgr->interceptor = cli_interceptor;
794
795 entity_addr_t bind_addr;
796 bind_addr.parse("v2:127.0.0.1:3300");
797 server_msgr->bind(bind_addr);
798 server_msgr->add_dispatcher_head(&srv_dispatcher);
799 server_msgr->start();
800
801 bind_addr.parse("v2:127.0.0.1:3301");
802 client_msgr->bind(bind_addr);
803 client_msgr->add_dispatcher_head(&cli_dispatcher);
804 client_msgr->start();
805
806 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
807 server_msgr->get_myaddrs());
808
809 MPing *m1 = new MPing();
810 ASSERT_EQ(c2s->send_message(m1), 0);
811
812 {
9f95a23c
TL
813 std::unique_lock l{cli_dispatcher.lock};
814 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
815 cli_dispatcher.got_new = false;
816 }
817
818 ASSERT_TRUE(c2s->is_connected());
819 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
820 ASSERT_TRUE(c2s->peer_is_osd());
821
f67539c2 822 cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
823
824 MPing *m2 = new MPing();
825 ASSERT_EQ(c2s->send_message(m2), 0);
826
f67539c2
TL
827 cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
828 cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
829
830 // at this point client and server are connected together
831
832 // force both client and server to race on reconnect
f67539c2
TL
833 cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
834 srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
11fdf7f2
TL
835
836 // failing client
837 // this will cause both client and server to reconnect at the same time
f67539c2 838 cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
11fdf7f2
TL
839
840 MPing *m3 = new MPing();
841 ASSERT_EQ(c2s->send_message(m3), 0);
842
f67539c2
TL
843 cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get());
844 srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT);
11fdf7f2 845
f67539c2
TL
846 cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
847 srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
11fdf7f2
TL
848
849 // pause on "ready"
f67539c2 850 srv_interceptor->breakpoint(Interceptor::STEP::READY);
11fdf7f2 851
f67539c2
TL
852 cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
853 srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
11fdf7f2 854
f67539c2 855 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
11fdf7f2
TL
856
857 // the server has reconnected and is "ready"
f67539c2 858 srv_interceptor->remove_bp(Interceptor::STEP::READY);
11fdf7f2
TL
859
860 ASSERT_TRUE(c2s_accepter->peer_is_client());
861 ASSERT_TRUE(c2s->peer_is_osd());
862
863 // the server should win the reconnect race
864
865 // c2s_accepter sent 1 or 2 reconnect messages
f67539c2
TL
866 ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u);
867 ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
11fdf7f2 868 // c2s_accepter sent 0 reconnect_ok messages
f67539c2 869 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u);
11fdf7f2 870 // c2s sent 1 reconnect messages
f67539c2 871 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
11fdf7f2 872 // c2s sent 1 reconnect_ok messages
f67539c2 873 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u);
11fdf7f2 874
f67539c2 875 if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) {
11fdf7f2
TL
876 // if the server send the reconnect message two times then
877 // the client must have sent a session retry message to the server
f67539c2 878 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u);
11fdf7f2 879 } else {
f67539c2 880 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u);
11fdf7f2
TL
881 }
882
f67539c2 883 srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
11fdf7f2
TL
884
885 {
9f95a23c
TL
886 std::unique_lock l{cli_dispatcher.lock};
887 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
888 cli_dispatcher.got_new = false;
889 }
890
891 client_msgr->shutdown();
892 client_msgr->wait();
893 server_msgr->shutdown();
894 server_msgr->wait();
895
896 delete cli_interceptor;
897 delete srv_interceptor;
898}
899
7c673cae
FG
900TEST_P(MessengerTest, SimpleTest) {
901 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
902 entity_addr_t bind_addr;
9f95a23c 903 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
904 server_msgr->bind(bind_addr);
905 server_msgr->add_dispatcher_head(&srv_dispatcher);
906 server_msgr->start();
907
908 client_msgr->add_dispatcher_head(&cli_dispatcher);
909 client_msgr->start();
910
911 // 1. simple round trip
912 MPing *m = new MPing();
11fdf7f2
TL
913 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
914 server_msgr->get_myaddrs());
7c673cae
FG
915 {
916 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
917 std::unique_lock l{cli_dispatcher.lock};
918 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
919 cli_dispatcher.got_new = false;
920 }
921 ASSERT_TRUE(conn->is_connected());
11fdf7f2 922 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
923 ASSERT_TRUE(conn->peer_is_osd());
924
925 // 2. test rebind port
926 set<int> avoid_ports;
11fdf7f2
TL
927 for (int i = 0; i < 10 ; i++) {
928 for (auto a : server_msgr->get_myaddrs().v) {
929 avoid_ports.insert(a.get_port() + i);
930 }
931 }
7c673cae 932 server_msgr->rebind(avoid_ports);
11fdf7f2
TL
933 for (auto a : server_msgr->get_myaddrs().v) {
934 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
935 }
7c673cae 936
11fdf7f2
TL
937 conn = client_msgr->connect_to(server_msgr->get_mytype(),
938 server_msgr->get_myaddrs());
7c673cae
FG
939 {
940 m = new MPing();
941 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
942 std::unique_lock l{cli_dispatcher.lock};
943 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
944 cli_dispatcher.got_new = false;
945 }
11fdf7f2 946 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
947
948 // 3. test markdown connection
949 conn->mark_down();
950 ASSERT_FALSE(conn->is_connected());
951
952 // 4. test failed connection
953 server_msgr->shutdown();
954 server_msgr->wait();
955
956 m = new MPing();
957 conn->send_message(m);
958 CHECK_AND_WAIT_TRUE(!conn->is_connected());
959 ASSERT_FALSE(conn->is_connected());
960
961 // 5. loopback connection
962 srv_dispatcher.loopback = true;
963 conn = client_msgr->get_loopback_connection();
964 {
965 m = new MPing();
966 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
967 std::unique_lock l{cli_dispatcher.lock};
968 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
969 cli_dispatcher.got_new = false;
970 }
971 srv_dispatcher.loopback = false;
11fdf7f2
TL
972 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
973 client_msgr->shutdown();
974 client_msgr->wait();
975 server_msgr->shutdown();
976 server_msgr->wait();
977}
978
979TEST_P(MessengerTest, SimpleMsgr2Test) {
980 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
981 entity_addr_t legacy_addr;
982 legacy_addr.parse("v1:127.0.0.1");
983 entity_addr_t msgr2_addr;
984 msgr2_addr.parse("v2:127.0.0.1");
985 entity_addrvec_t bind_addrs;
986 bind_addrs.v.push_back(legacy_addr);
987 bind_addrs.v.push_back(msgr2_addr);
988 server_msgr->bindv(bind_addrs);
989 server_msgr->add_dispatcher_head(&srv_dispatcher);
990 server_msgr->start();
991
992 client_msgr->add_dispatcher_head(&cli_dispatcher);
993 client_msgr->start();
994
995 // 1. simple round trip
996 MPing *m = new MPing();
997 ConnectionRef conn = client_msgr->connect_to(
998 server_msgr->get_mytype(),
999 server_msgr->get_myaddrs());
1000 {
1001 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1002 std::unique_lock l{cli_dispatcher.lock};
1003 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
1004 cli_dispatcher.got_new = false;
1005 }
1006 ASSERT_TRUE(conn->is_connected());
1007 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
1008 ASSERT_TRUE(conn->peer_is_osd());
1009
1010 // 2. test rebind port
1011 set<int> avoid_ports;
1012 for (int i = 0; i < 10 ; i++) {
1013 for (auto a : server_msgr->get_myaddrs().v) {
1014 avoid_ports.insert(a.get_port() + i);
1015 }
1016 }
1017 server_msgr->rebind(avoid_ports);
1018 for (auto a : server_msgr->get_myaddrs().v) {
1019 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
1020 }
1021
1022 conn = client_msgr->connect_to(
1023 server_msgr->get_mytype(),
1024 server_msgr->get_myaddrs());
1025 {
1026 m = new MPing();
1027 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1028 std::unique_lock l{cli_dispatcher.lock};
1029 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
1030 cli_dispatcher.got_new = false;
1031 }
1032 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1033
1034 // 3. test markdown connection
1035 conn->mark_down();
1036 ASSERT_FALSE(conn->is_connected());
1037
1038 // 4. test failed connection
1039 server_msgr->shutdown();
1040 server_msgr->wait();
1041
1042 m = new MPing();
1043 conn->send_message(m);
1044 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1045 ASSERT_FALSE(conn->is_connected());
1046
1047 // 5. loopback connection
1048 srv_dispatcher.loopback = true;
1049 conn = client_msgr->get_loopback_connection();
1050 {
1051 m = new MPing();
1052 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1053 std::unique_lock l{cli_dispatcher.lock};
1054 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
1055 cli_dispatcher.got_new = false;
1056 }
1057 srv_dispatcher.loopback = false;
1058 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1059 client_msgr->shutdown();
1060 client_msgr->wait();
1061 server_msgr->shutdown();
1062 server_msgr->wait();
1063}
1064
7c673cae
FG
1065TEST_P(MessengerTest, FeatureTest) {
1066 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1067 entity_addr_t bind_addr;
9f95a23c 1068 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1069 uint64_t all_feature_supported, feature_required, feature_supported = 0;
1070 for (int i = 0; i < 10; i++)
1071 feature_supported |= 1ULL << i;
11fdf7f2
TL
1072 feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2;
1073 feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS;
7c673cae
FG
1074 feature_required = feature_supported | 1ULL << 13;
1075 all_feature_supported = feature_required | 1ULL << 14;
1076
1077 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
1078 p.features_required = feature_required;
1079 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1080 server_msgr->bind(bind_addr);
1081 server_msgr->add_dispatcher_head(&srv_dispatcher);
1082 server_msgr->start();
1083
1084 // 1. Suppose if only support less than required
1085 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1086 p.features_supported = feature_supported;
1087 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1088 client_msgr->add_dispatcher_head(&cli_dispatcher);
1089 client_msgr->start();
1090
1091 MPing *m = new MPing();
11fdf7f2
TL
1092 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1093 server_msgr->get_myaddrs());
7c673cae
FG
1094 conn->send_message(m);
1095 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1096 // should failed build a connection
1097 ASSERT_FALSE(conn->is_connected());
1098
1099 client_msgr->shutdown();
1100 client_msgr->wait();
1101
1102 // 2. supported met required
1103 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1104 p.features_supported = all_feature_supported;
1105 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1106 client_msgr->start();
1107
11fdf7f2
TL
1108 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1109 server_msgr->get_myaddrs());
7c673cae
FG
1110 {
1111 m = new MPing();
1112 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1113 std::unique_lock l{cli_dispatcher.lock};
1114 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1115 cli_dispatcher.got_new = false;
1116 }
11fdf7f2 1117 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1118
1119 server_msgr->shutdown();
1120 client_msgr->shutdown();
1121 server_msgr->wait();
1122 client_msgr->wait();
1123}
1124
1125TEST_P(MessengerTest, TimeoutTest) {
81eedcae 1126 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1");
7c673cae
FG
1127 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1128 entity_addr_t bind_addr;
9f95a23c 1129 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1130 server_msgr->bind(bind_addr);
1131 server_msgr->add_dispatcher_head(&srv_dispatcher);
1132 server_msgr->start();
1133
1134 client_msgr->add_dispatcher_head(&cli_dispatcher);
1135 client_msgr->start();
1136
1137 // 1. build the connection
1138 MPing *m = new MPing();
11fdf7f2
TL
1139 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1140 server_msgr->get_myaddrs());
7c673cae
FG
1141 {
1142 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1143 std::unique_lock l{cli_dispatcher.lock};
1144 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1145 cli_dispatcher.got_new = false;
1146 }
1147 ASSERT_TRUE(conn->is_connected());
11fdf7f2 1148 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1149 ASSERT_TRUE(conn->peer_is_osd());
1150
1151 // 2. wait for idle
1152 usleep(2500*1000);
1153 ASSERT_FALSE(conn->is_connected());
1154
1155 server_msgr->shutdown();
1156 server_msgr->wait();
1157
1158 client_msgr->shutdown();
1159 client_msgr->wait();
81eedcae 1160 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900");
7c673cae
FG
1161}
1162
1163TEST_P(MessengerTest, StatefulTest) {
1164 Message *m;
1165 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1166 entity_addr_t bind_addr;
9f95a23c 1167 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1168 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1169 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1170 p = Messenger::Policy::lossless_client(0);
1171 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1172
1173 server_msgr->bind(bind_addr);
1174 server_msgr->add_dispatcher_head(&srv_dispatcher);
1175 server_msgr->start();
1176 client_msgr->add_dispatcher_head(&cli_dispatcher);
1177 client_msgr->start();
1178
1179 // 1. test for server standby
11fdf7f2
TL
1180 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1181 server_msgr->get_myaddrs());
7c673cae
FG
1182 {
1183 m = new MPing();
1184 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1185 std::unique_lock l{cli_dispatcher.lock};
1186 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1187 cli_dispatcher.got_new = false;
1188 }
11fdf7f2 1189 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1190 conn->mark_down();
1191 ASSERT_FALSE(conn->is_connected());
11fdf7f2
TL
1192 ConnectionRef server_conn = server_msgr->connect_to(
1193 client_msgr->get_mytype(), srv_dispatcher.last_accept);
7c673cae 1194 // don't lose state
11fdf7f2 1195 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1196
1197 srv_dispatcher.got_new = false;
11fdf7f2
TL
1198 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1199 server_msgr->get_myaddrs());
7c673cae
FG
1200 {
1201 m = new MPing();
1202 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1203 std::unique_lock l{cli_dispatcher.lock};
1204 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1205 cli_dispatcher.got_new = false;
1206 }
11fdf7f2
TL
1207 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1208 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1209 srv_dispatcher.last_accept);
7c673cae 1210 {
9f95a23c
TL
1211 std::unique_lock l{srv_dispatcher.lock};
1212 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; });
7c673cae
FG
1213 }
1214
1215 // 2. test for client reconnect
1216 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1217 cli_dispatcher.got_connect = false;
1218 cli_dispatcher.got_new = false;
1219 cli_dispatcher.got_remote_reset = false;
1220 server_conn->mark_down();
1221 ASSERT_FALSE(server_conn->is_connected());
1222 // ensure client detect server socket closed
1223 {
9f95a23c
TL
1224 std::unique_lock l{cli_dispatcher.lock};
1225 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
7c673cae
FG
1226 cli_dispatcher.got_remote_reset = false;
1227 }
1228 {
9f95a23c
TL
1229 std::unique_lock l{cli_dispatcher.lock};
1230 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
7c673cae
FG
1231 cli_dispatcher.got_connect = false;
1232 }
1233 CHECK_AND_WAIT_TRUE(conn->is_connected());
1234 ASSERT_TRUE(conn->is_connected());
1235
1236 {
1237 m = new MPing();
1238 ASSERT_EQ(conn->send_message(m), 0);
1239 ASSERT_TRUE(conn->is_connected());
9f95a23c
TL
1240 std::unique_lock l{cli_dispatcher.lock};
1241 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1242 cli_dispatcher.got_new = false;
1243 }
1244 // resetcheck happen
11fdf7f2
TL
1245 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1246 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1247 srv_dispatcher.last_accept);
1248 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1249 cli_dispatcher.got_remote_reset = false;
1250
1251 server_msgr->shutdown();
1252 client_msgr->shutdown();
1253 server_msgr->wait();
1254 client_msgr->wait();
1255}
1256
1257TEST_P(MessengerTest, StatelessTest) {
1258 Message *m;
1259 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1260 entity_addr_t bind_addr;
9f95a23c 1261 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1262 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1263 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1264 p = Messenger::Policy::lossy_client(0);
1265 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1266
1267 server_msgr->bind(bind_addr);
1268 server_msgr->add_dispatcher_head(&srv_dispatcher);
1269 server_msgr->start();
1270 client_msgr->add_dispatcher_head(&cli_dispatcher);
1271 client_msgr->start();
1272
1273 // 1. test for server lose state
11fdf7f2
TL
1274 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1275 server_msgr->get_myaddrs());
7c673cae
FG
1276 {
1277 m = new MPing();
1278 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1279 std::unique_lock l{cli_dispatcher.lock};
1280 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1281 cli_dispatcher.got_new = false;
1282 }
11fdf7f2 1283 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1284 conn->mark_down();
1285 ASSERT_FALSE(conn->is_connected());
1286
1287 srv_dispatcher.got_new = false;
9f95a23c
TL
1288 ConnectionRef server_conn;
1289 srv_dispatcher.last_accept_con_ptr = &server_conn;
11fdf7f2
TL
1290 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1291 server_msgr->get_myaddrs());
7c673cae
FG
1292 {
1293 m = new MPing();
1294 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1295 std::unique_lock l{cli_dispatcher.lock};
1296 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1297 cli_dispatcher.got_new = false;
1298 }
11fdf7f2 1299 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
9f95a23c
TL
1300 ASSERT_TRUE(server_conn);
1301
7c673cae
FG
1302 // server lose state
1303 {
9f95a23c
TL
1304 std::unique_lock l{srv_dispatcher.lock};
1305 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
7c673cae 1306 }
11fdf7f2 1307 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1308
1309 // 2. test for client lossy
1310 server_conn->mark_down();
1311 ASSERT_FALSE(server_conn->is_connected());
1312 conn->send_keepalive();
1313 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1314 ASSERT_FALSE(conn->is_connected());
11fdf7f2
TL
1315 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1316 server_msgr->get_myaddrs());
7c673cae
FG
1317 {
1318 m = new MPing();
1319 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1320 std::unique_lock l{cli_dispatcher.lock};
1321 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1322 cli_dispatcher.got_new = false;
1323 }
11fdf7f2 1324 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1325
1326 server_msgr->shutdown();
1327 client_msgr->shutdown();
1328 server_msgr->wait();
1329 client_msgr->wait();
1330}
1331
9f95a23c
TL
1332TEST_P(MessengerTest, AnonTest) {
1333 Message *m;
1334 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1335 entity_addr_t bind_addr;
1336 bind_addr.parse("v2:127.0.0.1");
1337 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1338 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1339 p = Messenger::Policy::lossy_client(0);
1340 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1341
1342 server_msgr->bind(bind_addr);
1343 server_msgr->add_dispatcher_head(&srv_dispatcher);
1344 server_msgr->start();
1345 client_msgr->add_dispatcher_head(&cli_dispatcher);
1346 client_msgr->start();
1347
1348 ConnectionRef server_con_a, server_con_b;
1349
1350 // a
1351 srv_dispatcher.last_accept_con_ptr = &server_con_a;
1352 ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(),
1353 server_msgr->get_myaddrs(),
1354 true);
1355 {
1356 m = new MPing();
1357 ASSERT_EQ(con_a->send_message(m), 0);
1358 std::unique_lock l{cli_dispatcher.lock};
1359 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1360 cli_dispatcher.got_new = false;
1361 }
1362 ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count());
1363
1364 // b
1365 srv_dispatcher.last_accept_con_ptr = &server_con_b;
1366 ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(),
1367 server_msgr->get_myaddrs(),
1368 true);
1369 {
1370 m = new MPing();
1371 ASSERT_EQ(con_b->send_message(m), 0);
1372 std::unique_lock l{cli_dispatcher.lock};
1373 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1374 cli_dispatcher.got_new = false;
1375 }
1376 ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count());
1377
1378 // these should be distinct
1379 ASSERT_NE(con_a, con_b);
1380 ASSERT_NE(server_con_a, server_con_b);
1381
1382 // and both connected
1383 {
1384 m = new MPing();
1385 ASSERT_EQ(con_a->send_message(m), 0);
1386 std::unique_lock l{cli_dispatcher.lock};
1387 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1388 cli_dispatcher.got_new = false;
1389 }
1390 {
1391 m = new MPing();
1392 ASSERT_EQ(con_b->send_message(m), 0);
1393 std::unique_lock l{cli_dispatcher.lock};
1394 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1395 cli_dispatcher.got_new = false;
1396 }
1397
1398 // clean up
1399 con_a->mark_down();
1400 ASSERT_FALSE(con_a->is_connected());
1401 con_b->mark_down();
1402 ASSERT_FALSE(con_b->is_connected());
1403
1404 server_msgr->shutdown();
1405 client_msgr->shutdown();
1406 server_msgr->wait();
1407 client_msgr->wait();
1408}
1409
7c673cae
FG
1410TEST_P(MessengerTest, ClientStandbyTest) {
1411 Message *m;
1412 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1413 entity_addr_t bind_addr;
9f95a23c 1414 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1415 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1416 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1417 p = Messenger::Policy::lossless_peer(0);
1418 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1419
1420 server_msgr->bind(bind_addr);
1421 server_msgr->add_dispatcher_head(&srv_dispatcher);
1422 server_msgr->start();
1423 client_msgr->add_dispatcher_head(&cli_dispatcher);
1424 client_msgr->start();
1425
1426 // 1. test for client standby, resetcheck
11fdf7f2
TL
1427 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1428 server_msgr->get_myaddrs());
7c673cae
FG
1429 {
1430 m = new MPing();
1431 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1432 std::unique_lock l{cli_dispatcher.lock};
1433 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1434 cli_dispatcher.got_new = false;
1435 }
11fdf7f2
TL
1436 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1437 ConnectionRef server_conn = server_msgr->connect_to(
1438 client_msgr->get_mytype(),
1439 srv_dispatcher.last_accept);
7c673cae
FG
1440 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1441 cli_dispatcher.got_connect = false;
1442 server_conn->mark_down();
1443 ASSERT_FALSE(server_conn->is_connected());
1444 // client should be standby
1445 usleep(300*1000);
1446 // client should be standby, so we use original connection
1447 {
1448 // Try send message to verify got remote reset callback
1449 m = new MPing();
1450 ASSERT_EQ(conn->send_message(m), 0);
1451 {
9f95a23c
TL
1452 std::unique_lock l{cli_dispatcher.lock};
1453 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
7c673cae 1454 cli_dispatcher.got_remote_reset = false;
9f95a23c 1455 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
7c673cae
FG
1456 cli_dispatcher.got_connect = false;
1457 }
1458 CHECK_AND_WAIT_TRUE(conn->is_connected());
1459 ASSERT_TRUE(conn->is_connected());
1460 m = new MPing();
1461 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1462 std::unique_lock l{cli_dispatcher.lock};
1463 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1464 cli_dispatcher.got_new = false;
1465 }
11fdf7f2
TL
1466 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1467 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1468 srv_dispatcher.last_accept);
1469 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1470
1471 server_msgr->shutdown();
1472 client_msgr->shutdown();
1473 server_msgr->wait();
1474 client_msgr->wait();
1475}
1476
1477TEST_P(MessengerTest, AuthTest) {
11fdf7f2
TL
1478 g_ceph_context->_conf.set_val("auth_cluster_required", "cephx");
1479 g_ceph_context->_conf.set_val("auth_service_required", "cephx");
1480 g_ceph_context->_conf.set_val("auth_client_required", "cephx");
7c673cae
FG
1481 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1482 entity_addr_t bind_addr;
9f95a23c 1483 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1484 server_msgr->bind(bind_addr);
1485 server_msgr->add_dispatcher_head(&srv_dispatcher);
1486 server_msgr->start();
1487
1488 client_msgr->add_dispatcher_head(&cli_dispatcher);
1489 client_msgr->start();
1490
1491 // 1. simple auth round trip
1492 MPing *m = new MPing();
11fdf7f2
TL
1493 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1494 server_msgr->get_myaddrs());
7c673cae
FG
1495 {
1496 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1497 std::unique_lock l{cli_dispatcher.lock};
1498 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1499 cli_dispatcher.got_new = false;
1500 }
1501 ASSERT_TRUE(conn->is_connected());
11fdf7f2 1502 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1503
1504 // 2. mix auth
11fdf7f2
TL
1505 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
1506 g_ceph_context->_conf.set_val("auth_service_required", "none");
1507 g_ceph_context->_conf.set_val("auth_client_required", "none");
7c673cae
FG
1508 conn->mark_down();
1509 ASSERT_FALSE(conn->is_connected());
11fdf7f2
TL
1510 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1511 server_msgr->get_myaddrs());
7c673cae
FG
1512 {
1513 MPing *m = new MPing();
1514 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1515 std::unique_lock l{cli_dispatcher.lock};
1516 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1517 cli_dispatcher.got_new = false;
1518 }
1519 ASSERT_TRUE(conn->is_connected());
11fdf7f2 1520 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1521 server_msgr->shutdown();
1522 client_msgr->shutdown();
1523 server_msgr->wait();
1524 client_msgr->wait();
1525}
1526
1527TEST_P(MessengerTest, MessageTest) {
1528 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1529 entity_addr_t bind_addr;
9f95a23c 1530 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1531 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1532 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1533 p = Messenger::Policy::lossless_peer(0);
1534 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1535
1536 server_msgr->bind(bind_addr);
1537 server_msgr->add_dispatcher_head(&srv_dispatcher);
1538 server_msgr->start();
1539 client_msgr->add_dispatcher_head(&cli_dispatcher);
1540 client_msgr->start();
1541
1542
1543 // 1. A very large "front"(as well as "payload")
1544 // Because a external message need to invade Messenger::decode_message,
1545 // here we only use existing message class(MCommand)
11fdf7f2
TL
1546 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1547 server_msgr->get_myaddrs());
7c673cae
FG
1548 {
1549 uuid_d uuid;
1550 uuid.generate_random();
1551 vector<string> cmds;
1552 string s("abcdefghijklmnopqrstuvwxyz");
1553 for (int i = 0; i < 1024*30; i++)
1554 cmds.push_back(s);
1555 MCommand *m = new MCommand(uuid);
1556 m->cmd = cmds;
1557 conn->send_message(m);
9f95a23c
TL
1558 std::unique_lock l{cli_dispatcher.lock};
1559 cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1560 ASSERT_TRUE(cli_dispatcher.got_new);
1561 cli_dispatcher.got_new = false;
1562 }
1563
1564 // 2. A very large "data"
1565 {
1566 bufferlist bl;
1567 string s("abcdefghijklmnopqrstuvwxyz");
1568 for (int i = 0; i < 1024*30; i++)
1569 bl.append(s);
1570 MPing *m = new MPing();
1571 m->set_data(bl);
1572 conn->send_message(m);
1573 utime_t t;
1574 t += 1000*1000*500;
9f95a23c
TL
1575 std::unique_lock l{cli_dispatcher.lock};
1576 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1577 ASSERT_TRUE(cli_dispatcher.got_new);
1578 cli_dispatcher.got_new = false;
1579 }
1580 server_msgr->shutdown();
1581 client_msgr->shutdown();
1582 server_msgr->wait();
1583 client_msgr->wait();
1584}
1585
1586
1587class SyntheticWorkload;
1588
1589struct Payload {
1590 enum Who : uint8_t {
1591 PING = 0,
1592 PONG = 1,
1593 };
11fdf7f2
TL
1594 uint8_t who = 0;
1595 uint64_t seq = 0;
7c673cae
FG
1596 bufferlist data;
1597
1598 Payload(Who who, uint64_t seq, const bufferlist& data)
1599 : who(who), seq(seq), data(data)
1600 {}
1601 Payload() = default;
1602 DENC(Payload, v, p) {
1603 DENC_START(1, 1, p);
1604 denc(v.who, p);
1605 denc(v.seq, p);
1606 denc(v.data, p);
1607 DENC_FINISH(p);
1608 }
1609};
1610WRITE_CLASS_DENC(Payload)
1611
1612ostream& operator<<(ostream& out, const Payload &pl)
1613{
1614 return out << "reply=" << pl.who << " i = " << pl.seq;
1615}
1616
1617class SyntheticDispatcher : public Dispatcher {
1618 public:
9f95a23c
TL
1619 ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock");
1620 ceph::condition_variable cond;
7c673cae
FG
1621 bool is_server;
1622 bool got_new;
1623 bool got_remote_reset;
1624 bool got_connect;
1625 map<ConnectionRef, list<uint64_t> > conn_sent;
1626 map<uint64_t, bufferlist> sent;
1627 atomic<uint64_t> index;
1628 SyntheticWorkload *workload;
1629
1630 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
9f95a23c 1631 Dispatcher(g_ceph_context), is_server(s), got_new(false),
11fdf7f2 1632 got_remote_reset(false), got_connect(false), index(0), workload(wl) {
11fdf7f2 1633 }
7c673cae
FG
1634 bool ms_can_fast_dispatch_any() const override { return true; }
1635 bool ms_can_fast_dispatch(const Message *m) const override {
1636 switch (m->get_type()) {
1637 case CEPH_MSG_PING:
1638 case MSG_COMMAND:
1639 return true;
1640 default:
1641 return false;
1642 }
1643 }
1644
1645 void ms_handle_fast_connect(Connection *con) override {
9f95a23c 1646 std::lock_guard l{lock};
7c673cae
FG
1647 list<uint64_t> c = conn_sent[con];
1648 for (list<uint64_t>::iterator it = c.begin();
1649 it != c.end(); ++it)
1650 sent.erase(*it);
1651 conn_sent.erase(con);
1652 got_connect = true;
9f95a23c 1653 cond.notify_all();
7c673cae
FG
1654 }
1655 void ms_handle_fast_accept(Connection *con) override {
9f95a23c 1656 std::lock_guard l{lock};
7c673cae
FG
1657 list<uint64_t> c = conn_sent[con];
1658 for (list<uint64_t>::iterator it = c.begin();
1659 it != c.end(); ++it)
1660 sent.erase(*it);
1661 conn_sent.erase(con);
9f95a23c 1662 cond.notify_all();
7c673cae
FG
1663 }
1664 bool ms_dispatch(Message *m) override {
1665 ceph_abort();
1666 }
1667 bool ms_handle_reset(Connection *con) override;
1668 void ms_handle_remote_reset(Connection *con) override {
9f95a23c 1669 std::lock_guard l{lock};
7c673cae
FG
1670 list<uint64_t> c = conn_sent[con];
1671 for (list<uint64_t>::iterator it = c.begin();
1672 it != c.end(); ++it)
1673 sent.erase(*it);
1674 conn_sent.erase(con);
1675 got_remote_reset = true;
1676 }
1677 bool ms_handle_refused(Connection *con) override {
1678 return false;
1679 }
1680 void ms_fast_dispatch(Message *m) override {
1681 // MSG_COMMAND is used to disorganize regular message flow
1682 if (m->get_type() == MSG_COMMAND) {
1683 m->put();
1684 return ;
1685 }
1686
1687 Payload pl;
11fdf7f2
TL
1688 auto p = m->get_data().cbegin();
1689 decode(pl, p);
7c673cae
FG
1690 if (pl.who == Payload::PING) {
1691 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1692 reply_message(m, pl);
1693 m->put();
9f95a23c 1694 std::lock_guard l{lock};
7c673cae 1695 got_new = true;
9f95a23c 1696 cond.notify_all();
7c673cae 1697 } else {
9f95a23c 1698 std::lock_guard l{lock};
7c673cae
FG
1699 if (sent.count(pl.seq)) {
1700 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1701 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
1702 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
1703 conn_sent[m->get_connection()].pop_front();
1704 sent.erase(pl.seq);
1705 }
1706 m->put();
1707 got_new = true;
9f95a23c 1708 cond.notify_all();
7c673cae
FG
1709 }
1710 }
1711
11fdf7f2
TL
1712 int ms_handle_authentication(Connection *con) override {
1713 return 1;
7c673cae
FG
1714 }
1715
1716 void reply_message(const Message *m, Payload& pl) {
1717 pl.who = Payload::PONG;
1718 bufferlist bl;
11fdf7f2 1719 encode(pl, bl);
7c673cae
FG
1720 MPing *rm = new MPing();
1721 rm->set_data(bl);
1722 m->get_connection()->send_message(rm);
1723 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
1724 }
1725
1726 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
1727 Message *m = new MPing();
1728 Payload pl{Payload::PING, index++, data};
1729 bufferlist bl;
11fdf7f2 1730 encode(pl, bl);
7c673cae
FG
1731 m->set_data(bl);
1732 if (!con->get_messenger()->get_default_policy().lossy) {
9f95a23c 1733 std::lock_guard l{lock};
7c673cae
FG
1734 sent[pl.seq] = pl.data;
1735 conn_sent[con].push_back(pl.seq);
1736 }
1737 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
1738 ASSERT_EQ(0, con->send_message(m));
1739 }
1740
1741 uint64_t get_pending() {
9f95a23c 1742 std::lock_guard l{lock};
7c673cae
FG
1743 return sent.size();
1744 }
1745
1746 void clear_pending(ConnectionRef con) {
9f95a23c 1747 std::lock_guard l{lock};
7c673cae
FG
1748
1749 for (list<uint64_t>::iterator it = conn_sent[con].begin();
1750 it != conn_sent[con].end(); ++it)
1751 sent.erase(*it);
1752 conn_sent.erase(con);
1753 }
1754
1755 void print() {
1756 for (auto && p : conn_sent) {
1757 if (!p.second.empty()) {
1758 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
1759 }
1760 }
1761 }
1762};
1763
1764
1765class SyntheticWorkload {
9f95a23c
TL
1766 ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock");
1767 ceph::condition_variable cond;
7c673cae
FG
1768 set<Messenger*> available_servers;
1769 set<Messenger*> available_clients;
11fdf7f2 1770 Messenger::Policy client_policy;
7c673cae
FG
1771 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
1772 SyntheticDispatcher dispatcher;
1773 gen_type rng;
1774 vector<bufferlist> rand_data;
11fdf7f2 1775 DummyAuthClientServer dummy_auth;
7c673cae
FG
1776
1777 public:
1778 static const unsigned max_in_flight = 64;
1779 static const unsigned max_connections = 128;
1780 static const unsigned max_message_len = 1024 * 1024 * 4;
1781
1782 SyntheticWorkload(int servers, int clients, string type, int random_num,
11fdf7f2 1783 Messenger::Policy srv_policy, Messenger::Policy cli_policy)
9f95a23c 1784 : client_policy(cli_policy),
11fdf7f2
TL
1785 dispatcher(false, this),
1786 rng(time(NULL)),
1787 dummy_auth(g_ceph_context) {
1788 dummy_auth.auth_registry.refresh_config();
7c673cae
FG
1789 Messenger *msgr;
1790 int base_port = 16800;
1791 entity_addr_t bind_addr;
1792 char addr[64];
1793 for (int i = 0; i < servers; ++i) {
1794 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
f67539c2 1795 "server", getpid()+i);
9f95a23c 1796 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
11fdf7f2 1797 base_port+i);
7c673cae
FG
1798 bind_addr.parse(addr);
1799 msgr->bind(bind_addr);
1800 msgr->add_dispatcher_head(&dispatcher);
11fdf7f2
TL
1801 msgr->set_auth_client(&dummy_auth);
1802 msgr->set_auth_server(&dummy_auth);
7c673cae 1803
11fdf7f2 1804 ceph_assert(msgr);
7c673cae
FG
1805 msgr->set_default_policy(srv_policy);
1806 available_servers.insert(msgr);
1807 msgr->start();
1808 }
1809
1810 for (int i = 0; i < clients; ++i) {
1811 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
f67539c2 1812 "client", getpid()+i+servers);
7c673cae 1813 if (cli_policy.standby) {
9f95a23c 1814 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
11fdf7f2 1815 base_port+i+servers);
7c673cae
FG
1816 bind_addr.parse(addr);
1817 msgr->bind(bind_addr);
1818 }
1819 msgr->add_dispatcher_head(&dispatcher);
11fdf7f2
TL
1820 msgr->set_auth_client(&dummy_auth);
1821 msgr->set_auth_server(&dummy_auth);
7c673cae 1822
11fdf7f2 1823 ceph_assert(msgr);
7c673cae
FG
1824 msgr->set_default_policy(cli_policy);
1825 available_clients.insert(msgr);
1826 msgr->start();
1827 }
1828
1829 for (int i = 0; i < random_num; i++) {
1830 bufferlist bl;
1831 boost::uniform_int<> u(32, max_message_len);
1832 uint64_t value_len = u(rng);
1833 bufferptr bp(value_len);
1834 bp.zero();
1835 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1836 memcpy(bp.c_str()+j, &i, sizeof(i));
1837 j += 4096;
1838 }
1839
1840 bl.append(bp);
1841 rand_data.push_back(bl);
1842 }
1843 }
1844
1845 ConnectionRef _get_random_connection() {
1846 while (dispatcher.get_pending() > max_in_flight) {
9f95a23c 1847 lock.unlock();
7c673cae 1848 usleep(500);
9f95a23c 1849 lock.lock();
7c673cae 1850 }
9f95a23c 1851 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1852 boost::uniform_int<> choose(0, available_connections.size() - 1);
1853 int index = choose(rng);
1854 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1855 for (; index > 0; --index, ++i) ;
1856 return i->first;
1857 }
1858
1859 bool can_create_connection() {
1860 return available_connections.size() < max_connections;
1861 }
1862
1863 void generate_connection() {
9f95a23c 1864 std::lock_guard l{lock};
7c673cae
FG
1865 if (!can_create_connection())
1866 return ;
1867
1868 Messenger *server, *client;
1869 {
1870 boost::uniform_int<> choose(0, available_servers.size() - 1);
1871 int index = choose(rng);
1872 set<Messenger*>::iterator i = available_servers.begin();
1873 for (; index > 0; --index, ++i) ;
1874 server = *i;
1875 }
1876 {
1877 boost::uniform_int<> choose(0, available_clients.size() - 1);
1878 int index = choose(rng);
1879 set<Messenger*>::iterator i = available_clients.begin();
1880 for (; index > 0; --index, ++i) ;
1881 client = *i;
1882 }
1883
1884 pair<Messenger*, Messenger*> p;
1885 {
1886 boost::uniform_int<> choose(0, available_servers.size() - 1);
1887 if (server->get_default_policy().server) {
1888 p = make_pair(client, server);
11fdf7f2
TL
1889 ConnectionRef conn = client->connect_to(server->get_mytype(),
1890 server->get_myaddrs());
1891 available_connections[conn] = p;
7c673cae 1892 } else {
11fdf7f2
TL
1893 ConnectionRef conn = client->connect_to(server->get_mytype(),
1894 server->get_myaddrs());
1895 p = make_pair(client, server);
1896 available_connections[conn] = p;
7c673cae
FG
1897 }
1898 }
7c673cae
FG
1899 }
1900
1901 void send_message() {
9f95a23c 1902 std::lock_guard l{lock};
7c673cae
FG
1903 ConnectionRef conn = _get_random_connection();
1904 boost::uniform_int<> true_false(0, 99);
1905 int val = true_false(rng);
1906 if (val >= 95) {
1907 uuid_d uuid;
1908 uuid.generate_random();
1909 MCommand *m = new MCommand(uuid);
1910 vector<string> cmds;
1911 cmds.push_back("command");
1912 m->cmd = cmds;
1913 m->set_priority(200);
1914 conn->send_message(m);
1915 } else {
1916 boost::uniform_int<> u(0, rand_data.size()-1);
1917 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1918 }
1919 }
1920
1921 void drop_connection() {
9f95a23c 1922 std::lock_guard l{lock};
7c673cae
FG
1923 if (available_connections.size() < 10)
1924 return;
1925 ConnectionRef conn = _get_random_connection();
1926 dispatcher.clear_pending(conn);
1927 conn->mark_down();
11fdf7f2
TL
1928 if (!client_policy.server &&
1929 !client_policy.lossy &&
1930 client_policy.standby) {
1931 // it's a lossless policy, so we need to mark down each side
1932 pair<Messenger*, Messenger*> &p = available_connections[conn];
1933 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1934 ASSERT_EQ(conn->get_messenger(), p.first);
1935 ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
1936 p.first->get_myaddrs());
1937 peer->mark_down();
1938 dispatcher.clear_pending(peer);
1939 available_connections.erase(peer);
1940 }
7c673cae
FG
1941 }
1942 ASSERT_EQ(available_connections.erase(conn), 1U);
1943 }
1944
1945 void print_internal_state(bool detail=false) {
9f95a23c 1946 std::lock_guard l{lock};
7c673cae
FG
1947 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1948 << " inflight messages: " << dispatcher.get_pending() << dendl;
1949 if (detail && !available_connections.empty()) {
1950 dispatcher.print();
1951 }
1952 }
1953
1954 void wait_for_done() {
1955 int64_t tick_us = 1000 * 100; // 100ms
1956 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1957 int i = 0;
1958 while (dispatcher.get_pending()) {
1959 usleep(tick_us);
1960 timeout_us -= tick_us;
1961 if (i++ % 50 == 0)
1962 print_internal_state(true);
1963 if (timeout_us < 0)
11fdf7f2 1964 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
7c673cae
FG
1965 }
1966 for (set<Messenger*>::iterator it = available_servers.begin();
1967 it != available_servers.end(); ++it) {
1968 (*it)->shutdown();
1969 (*it)->wait();
1970 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1971 delete (*it);
1972 }
1973 available_servers.clear();
1974
1975 for (set<Messenger*>::iterator it = available_clients.begin();
1976 it != available_clients.end(); ++it) {
1977 (*it)->shutdown();
1978 (*it)->wait();
1979 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1980 delete (*it);
1981 }
1982 available_clients.clear();
1983 }
1984
1985 void handle_reset(Connection *con) {
9f95a23c 1986 std::lock_guard l{lock};
7c673cae
FG
1987 available_connections.erase(con);
1988 dispatcher.clear_pending(con);
1989 }
1990};
1991
1992bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1993 workload->handle_reset(con);
1994 return true;
1995}
1996
1997TEST_P(MessengerTest, SyntheticStressTest) {
1998 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1999 Messenger::Policy::stateful_server(0),
2000 Messenger::Policy::lossless_client(0));
2001 for (int i = 0; i < 100; ++i) {
2002 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2003 test_msg.generate_connection();
2004 }
2005 gen_type rng(time(NULL));
2006 for (int i = 0; i < 5000; ++i) {
2007 if (!(i % 10)) {
2008 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2009 test_msg.print_internal_state();
2010 }
2011 boost::uniform_int<> true_false(0, 99);
2012 int val = true_false(rng);
2013 if (val > 90) {
2014 test_msg.generate_connection();
2015 } else if (val > 80) {
2016 test_msg.drop_connection();
2017 } else if (val > 10) {
2018 test_msg.send_message();
2019 } else {
2020 usleep(rand() % 1000 + 500);
2021 }
2022 }
2023 test_msg.wait_for_done();
2024}
2025
2026TEST_P(MessengerTest, SyntheticStressTest1) {
2027 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2028 Messenger::Policy::lossless_peer_reuse(0),
2029 Messenger::Policy::lossless_peer_reuse(0));
2030 for (int i = 0; i < 10; ++i) {
2031 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2032 test_msg.generate_connection();
2033 }
2034 gen_type rng(time(NULL));
2035 for (int i = 0; i < 10000; ++i) {
2036 if (!(i % 10)) {
2037 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2038 test_msg.print_internal_state();
2039 }
2040 boost::uniform_int<> true_false(0, 99);
2041 int val = true_false(rng);
2042 if (val > 80) {
2043 test_msg.generate_connection();
2044 } else if (val > 60) {
2045 test_msg.drop_connection();
2046 } else if (val > 10) {
2047 test_msg.send_message();
2048 } else {
2049 usleep(rand() % 1000 + 500);
2050 }
2051 }
2052 test_msg.wait_for_done();
2053}
2054
2055
2056TEST_P(MessengerTest, SyntheticInjectTest) {
2057 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
11fdf7f2
TL
2058 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2059 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2060 g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216");
7c673cae
FG
2061 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
2062 Messenger::Policy::stateful_server(0),
2063 Messenger::Policy::lossless_client(0));
2064 for (int i = 0; i < 100; ++i) {
2065 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2066 test_msg.generate_connection();
2067 }
2068 gen_type rng(time(NULL));
2069 for (int i = 0; i < 1000; ++i) {
2070 if (!(i % 10)) {
2071 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2072 test_msg.print_internal_state();
2073 }
2074 boost::uniform_int<> true_false(0, 99);
2075 int val = true_false(rng);
2076 if (val > 90) {
2077 test_msg.generate_connection();
2078 } else if (val > 80) {
2079 test_msg.drop_connection();
2080 } else if (val > 10) {
2081 test_msg.send_message();
2082 } else {
2083 usleep(rand() % 500 + 100);
2084 }
2085 }
2086 test_msg.wait_for_done();
11fdf7f2
TL
2087 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2088 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2089 g_ceph_context->_conf.set_val(
7c673cae
FG
2090 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
2091}
2092
2093TEST_P(MessengerTest, SyntheticInjectTest2) {
11fdf7f2
TL
2094 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2095 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
7c673cae
FG
2096 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2097 Messenger::Policy::lossless_peer_reuse(0),
2098 Messenger::Policy::lossless_peer_reuse(0));
2099 for (int i = 0; i < 100; ++i) {
2100 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2101 test_msg.generate_connection();
2102 }
2103 gen_type rng(time(NULL));
2104 for (int i = 0; i < 1000; ++i) {
2105 if (!(i % 10)) {
2106 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2107 test_msg.print_internal_state();
2108 }
2109 boost::uniform_int<> true_false(0, 99);
2110 int val = true_false(rng);
2111 if (val > 90) {
2112 test_msg.generate_connection();
2113 } else if (val > 80) {
2114 test_msg.drop_connection();
2115 } else if (val > 10) {
2116 test_msg.send_message();
2117 } else {
2118 usleep(rand() % 500 + 100);
2119 }
2120 }
2121 test_msg.wait_for_done();
11fdf7f2
TL
2122 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2123 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
7c673cae
FG
2124}
2125
2126TEST_P(MessengerTest, SyntheticInjectTest3) {
11fdf7f2
TL
2127 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600");
2128 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
7c673cae
FG
2129 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2130 Messenger::Policy::stateless_server(0),
2131 Messenger::Policy::lossy_client(0));
2132 for (int i = 0; i < 100; ++i) {
2133 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2134 test_msg.generate_connection();
2135 }
2136 gen_type rng(time(NULL));
2137 for (int i = 0; i < 1000; ++i) {
2138 if (!(i % 10)) {
2139 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2140 test_msg.print_internal_state();
2141 }
2142 boost::uniform_int<> true_false(0, 99);
2143 int val = true_false(rng);
2144 if (val > 90) {
2145 test_msg.generate_connection();
2146 } else if (val > 80) {
2147 test_msg.drop_connection();
2148 } else if (val > 10) {
2149 test_msg.send_message();
2150 } else {
2151 usleep(rand() % 500 + 100);
2152 }
2153 }
2154 test_msg.wait_for_done();
11fdf7f2
TL
2155 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2156 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
7c673cae
FG
2157}
2158
2159
2160TEST_P(MessengerTest, SyntheticInjectTest4) {
11fdf7f2
TL
2161 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2162 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2163 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1");
2164 g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd");
2165 g_ceph_context->_conf.set_val("ms_inject_delay_max", "5");
7c673cae
FG
2166 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2167 Messenger::Policy::lossless_peer(0),
2168 Messenger::Policy::lossless_peer(0));
2169 for (int i = 0; i < 100; ++i) {
2170 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2171 test_msg.generate_connection();
2172 }
2173 gen_type rng(time(NULL));
2174 for (int i = 0; i < 1000; ++i) {
2175 if (!(i % 10)) {
2176 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2177 test_msg.print_internal_state();
2178 }
2179 boost::uniform_int<> true_false(0, 99);
2180 int val = true_false(rng);
2181 if (val > 95) {
2182 test_msg.generate_connection();
2183 } else if (val > 80) {
2184 // test_msg.drop_connection();
2185 } else if (val > 10) {
2186 test_msg.send_message();
2187 } else {
2188 usleep(rand() % 500 + 100);
2189 }
2190 }
2191 test_msg.wait_for_done();
11fdf7f2
TL
2192 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2193 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2194 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0");
2195 g_ceph_context->_conf.set_val("ms_inject_delay_type", "");
2196 g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
7c673cae
FG
2197}
2198
2199
2200class MarkdownDispatcher : public Dispatcher {
9f95a23c 2201 ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");
7c673cae
FG
2202 set<ConnectionRef> conns;
2203 bool last_mark;
2204 public:
31f18b77 2205 std::atomic<uint64_t> count = { 0 };
9f95a23c 2206 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context),
11fdf7f2 2207 last_mark(false) {
11fdf7f2 2208 }
7c673cae
FG
2209 bool ms_can_fast_dispatch_any() const override { return false; }
2210 bool ms_can_fast_dispatch(const Message *m) const override {
2211 switch (m->get_type()) {
2212 case CEPH_MSG_PING:
2213 return true;
2214 default:
2215 return false;
2216 }
2217 }
2218
2219 void ms_handle_fast_connect(Connection *con) override {
2220 lderr(g_ceph_context) << __func__ << " " << con << dendl;
9f95a23c 2221 std::lock_guard l{lock};
7c673cae
FG
2222 conns.insert(con);
2223 }
2224 void ms_handle_fast_accept(Connection *con) override {
9f95a23c 2225 std::lock_guard l{lock};
7c673cae
FG
2226 conns.insert(con);
2227 }
2228 bool ms_dispatch(Message *m) override {
2229 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
9f95a23c 2230 std::lock_guard l{lock};
31f18b77 2231 count++;
7c673cae
FG
2232 conns.insert(m->get_connection());
2233 if (conns.size() < 2 && !last_mark) {
2234 m->put();
2235 return true;
2236 }
2237
2238 last_mark = true;
2239 usleep(rand() % 500);
2240 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
2241 if ((*it) != m->get_connection().get()) {
2242 (*it)->mark_down();
2243 conns.erase(it);
2244 break;
2245 }
2246 }
2247 if (conns.empty())
2248 last_mark = false;
2249 m->put();
2250 return true;
2251 }
2252 bool ms_handle_reset(Connection *con) override {
2253 lderr(g_ceph_context) << __func__ << " " << con << dendl;
9f95a23c 2254 std::lock_guard l{lock};
7c673cae
FG
2255 conns.erase(con);
2256 usleep(rand() % 500);
2257 return true;
2258 }
2259 void ms_handle_remote_reset(Connection *con) override {
9f95a23c 2260 std::lock_guard l{lock};
7c673cae
FG
2261 conns.erase(con);
2262 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2263 }
2264 bool ms_handle_refused(Connection *con) override {
2265 return false;
2266 }
2267 void ms_fast_dispatch(Message *m) override {
2268 ceph_abort();
2269 }
11fdf7f2
TL
2270 int ms_handle_authentication(Connection *con) override {
2271 return 1;
7c673cae
FG
2272 }
2273};
2274
2275
2276// Markdown with external lock
2277TEST_P(MessengerTest, MarkdownTest) {
f67539c2 2278 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
7c673cae 2279 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
11fdf7f2
TL
2280 DummyAuthClientServer dummy_auth(g_ceph_context);
2281 dummy_auth.auth_registry.refresh_config();
7c673cae 2282 entity_addr_t bind_addr;
9f95a23c 2283 bind_addr.parse("v2:127.0.0.1:16800");
7c673cae
FG
2284 server_msgr->bind(bind_addr);
2285 server_msgr->add_dispatcher_head(&srv_dispatcher);
11fdf7f2
TL
2286 server_msgr->set_auth_client(&dummy_auth);
2287 server_msgr->set_auth_server(&dummy_auth);
7c673cae 2288 server_msgr->start();
9f95a23c 2289 bind_addr.parse("v2:127.0.0.1:16801");
7c673cae
FG
2290 server_msgr2->bind(bind_addr);
2291 server_msgr2->add_dispatcher_head(&srv_dispatcher);
11fdf7f2
TL
2292 server_msgr2->set_auth_client(&dummy_auth);
2293 server_msgr2->set_auth_server(&dummy_auth);
7c673cae
FG
2294 server_msgr2->start();
2295
2296 client_msgr->add_dispatcher_head(&cli_dispatcher);
11fdf7f2
TL
2297 client_msgr->set_auth_client(&dummy_auth);
2298 client_msgr->set_auth_server(&dummy_auth);
7c673cae
FG
2299 client_msgr->start();
2300
2301 int i = 1000;
2302 uint64_t last = 0;
2303 bool equal = false;
2304 uint64_t equal_count = 0;
2305 while (i--) {
11fdf7f2
TL
2306 ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
2307 server_msgr->get_myaddrs());
2308 ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
2309 server_msgr2->get_myaddrs());
7c673cae
FG
2310 MPing *m = new MPing();
2311 ASSERT_EQ(conn1->send_message(m), 0);
2312 m = new MPing();
2313 ASSERT_EQ(conn2->send_message(m), 0);
31f18b77
FG
2314 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
2315 if (srv_dispatcher.count == last) {
7c673cae
FG
2316 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
2317 equal = true;
2318 equal_count++;
2319 } else {
2320 equal = false;
2321 equal_count = 0;
2322 }
31f18b77 2323 last = srv_dispatcher.count;
7c673cae
FG
2324 if (equal_count)
2325 usleep(1000*500);
2326 ASSERT_FALSE(equal && equal_count > 3);
2327 }
2328 server_msgr->shutdown();
2329 client_msgr->shutdown();
2330 server_msgr2->shutdown();
2331 server_msgr->wait();
2332 client_msgr->wait();
2333 server_msgr2->wait();
2334 delete server_msgr2;
2335}
2336
9f95a23c 2337INSTANTIATE_TEST_SUITE_P(
7c673cae
FG
2338 Messenger,
2339 MessengerTest,
2340 ::testing::Values(
9f95a23c 2341 "async+posix"
7c673cae
FG
2342 )
2343);
2344
7c673cae 2345int main(int argc, char **argv) {
20effc67 2346 auto args = argv_to_vec(argc, argv);
11fdf7f2
TL
2347
2348 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
2349 CODE_ENVIRONMENT_UTILITY,
2350 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
2351 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
2352 g_ceph_context->_conf.set_val("auth_service_required", "none");
2353 g_ceph_context->_conf.set_val("auth_client_required", "none");
2354 g_ceph_context->_conf.set_val("keyring", "/dev/null");
2355 g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2356 g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true");
2357 g_ceph_context->_conf.set_val("ms_die_on_old_message", "true");
2358 g_ceph_context->_conf.set_val("ms_max_backoff", "1");
7c673cae
FG
2359 common_init_finish(g_ceph_context);
2360
2361 ::testing::InitGoogleTest(&argc, argv);
2362 return RUN_ALL_TESTS();
2363}
2364
2365/*
2366 * Local Variables:
2367 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
2368 * End:
2369 */