]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/msgr/test_msgr.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <atomic>
18#include <iostream>
f6b5b4d7 19#include <memory>
7c673cae
FG
20#include <unistd.h>
21#include <stdlib.h>
22#include <time.h>
11fdf7f2
TL
23#include <set>
24#include <list>
9f95a23c 25#include "common/ceph_mutex.h"
7c673cae
FG
26#include "common/ceph_argparse.h"
27#include "global/global_init.h"
28#include "msg/Dispatcher.h"
29#include "msg/msg_types.h"
30#include "msg/Message.h"
31#include "msg/Messenger.h"
32#include "msg/Connection.h"
33#include "messages/MPing.h"
34#include "messages/MCommand.h"
35
36#include <boost/random/mersenne_twister.hpp>
37#include <boost/random/uniform_int.hpp>
38#include <boost/random/binomial_distribution.hpp>
39#include <gtest/gtest.h>
40
41typedef boost::mt11213b gen_type;
42
43#include "common/dout.h"
11fdf7f2
TL
44#include "include/ceph_assert.h"
45
46#include "auth/DummyAuth.h"
7c673cae
FG
47
48#define dout_subsys ceph_subsys_ms
49#undef dout_prefix
50#define dout_prefix *_dout << " ceph_test_msgr "
51
52
7c673cae
FG
53#define CHECK_AND_WAIT_TRUE(expr) do { \
54 int n = 1000; \
55 while (--n) { \
56 if (expr) \
57 break; \
58 usleep(1000); \
59 } \
60} while(0);
61
62class MessengerTest : public ::testing::TestWithParam<const char*> {
63 public:
11fdf7f2 64 DummyAuthClientServer dummy_auth;
7c673cae
FG
65 Messenger *server_msgr;
66 Messenger *client_msgr;
67
11fdf7f2
TL
68 MessengerTest() : dummy_auth(g_ceph_context),
69 server_msgr(NULL), client_msgr(NULL) {
70 dummy_auth.auth_registry.refresh_config();
71 }
7c673cae
FG
72 void SetUp() override {
73 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
f67539c2
TL
74 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
75 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
7c673cae
FG
76 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
77 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
11fdf7f2
TL
78 server_msgr->set_auth_client(&dummy_auth);
79 server_msgr->set_auth_server(&dummy_auth);
80 client_msgr->set_auth_client(&dummy_auth);
81 client_msgr->set_auth_server(&dummy_auth);
9f95a23c 82 server_msgr->set_require_authorizer(false);
7c673cae
FG
83 }
84 void TearDown() override {
85 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
86 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
87 delete server_msgr;
88 delete client_msgr;
89 }
90
91};
92
93
94class FakeDispatcher : public Dispatcher {
95 public:
96 struct Session : public RefCountedObject {
97 atomic<uint64_t> count;
98 ConnectionRef con;
99
100 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
101 }
102 uint64_t get_count() { return count; }
103 };
104
9f95a23c
TL
105 ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock");
106 ceph::condition_variable cond;
7c673cae
FG
107 bool is_server;
108 bool got_new;
109 bool got_remote_reset;
110 bool got_connect;
111 bool loopback;
11fdf7f2 112 entity_addrvec_t last_accept;
9f95a23c 113 ConnectionRef *last_accept_con_ptr = nullptr;
7c673cae 114
9f95a23c 115 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
7c673cae 116 is_server(s), got_new(false), got_remote_reset(false),
11fdf7f2 117 got_connect(false), loopback(false) {
11fdf7f2 118 }
7c673cae
FG
119 bool ms_can_fast_dispatch_any() const override { return true; }
120 bool ms_can_fast_dispatch(const Message *m) const override {
121 switch (m->get_type()) {
122 case CEPH_MSG_PING:
123 return true;
124 default:
125 return false;
126 }
127 }
128
129 void ms_handle_fast_connect(Connection *con) override {
9f95a23c 130 std::scoped_lock l{lock};
7c673cae 131 lderr(g_ceph_context) << __func__ << " " << con << dendl;
11fdf7f2 132 auto s = con->get_priv();
7c673cae 133 if (!s) {
11fdf7f2
TL
134 auto session = new Session(con);
135 con->set_priv(RefCountedPtr{session, false});
136 lderr(g_ceph_context) << __func__ << " con: " << con
137 << " count: " << session->count << dendl;
7c673cae 138 }
7c673cae 139 got_connect = true;
9f95a23c 140 cond.notify_all();
7c673cae
FG
141 }
142 void ms_handle_fast_accept(Connection *con) override {
11fdf7f2 143 last_accept = con->get_peer_addrs();
9f95a23c
TL
144 if (last_accept_con_ptr) {
145 *last_accept_con_ptr = con;
146 }
11fdf7f2
TL
147 if (!con->get_priv()) {
148 con->set_priv(RefCountedPtr{new Session(con), false});
7c673cae 149 }
7c673cae
FG
150 }
151 bool ms_dispatch(Message *m) override {
11fdf7f2
TL
152 auto priv = m->get_connection()->get_priv();
153 auto s = static_cast<Session*>(priv.get());
7c673cae
FG
154 if (!s) {
155 s = new Session(m->get_connection());
11fdf7f2
TL
156 priv.reset(s, false);
157 m->get_connection()->set_priv(priv);
7c673cae 158 }
7c673cae
FG
159 s->count++;
160 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
161 if (is_server) {
162 reply_message(m);
163 }
9f95a23c 164 std::lock_guard l{lock};
7c673cae 165 got_new = true;
9f95a23c 166 cond.notify_all();
7c673cae
FG
167 m->put();
168 return true;
169 }
170 bool ms_handle_reset(Connection *con) override {
9f95a23c 171 std::lock_guard l{lock};
7c673cae 172 lderr(g_ceph_context) << __func__ << " " << con << dendl;
11fdf7f2
TL
173 auto priv = con->get_priv();
174 if (auto s = static_cast<Session*>(priv.get()); s) {
175 s->con.reset(); // break con <-> session ref cycle
176 con->set_priv(nullptr); // break ref <-> session cycle, if any
7c673cae
FG
177 }
178 return true;
179 }
180 void ms_handle_remote_reset(Connection *con) override {
9f95a23c 181 std::lock_guard l{lock};
7c673cae 182 lderr(g_ceph_context) << __func__ << " " << con << dendl;
11fdf7f2
TL
183 auto priv = con->get_priv();
184 if (auto s = static_cast<Session*>(priv.get()); s) {
185 s->con.reset(); // break con <-> session ref cycle
186 con->set_priv(nullptr); // break ref <-> session cycle, if any
7c673cae
FG
187 }
188 got_remote_reset = true;
9f95a23c 189 cond.notify_all();
7c673cae
FG
190 }
191 bool ms_handle_refused(Connection *con) override {
192 return false;
193 }
194 void ms_fast_dispatch(Message *m) override {
11fdf7f2
TL
195 auto priv = m->get_connection()->get_priv();
196 auto s = static_cast<Session*>(priv.get());
7c673cae
FG
197 if (!s) {
198 s = new Session(m->get_connection());
11fdf7f2
TL
199 priv.reset(s, false);
200 m->get_connection()->set_priv(priv);
7c673cae 201 }
7c673cae
FG
202 s->count++;
203 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
204 if (is_server) {
205 if (loopback)
11fdf7f2 206 ceph_assert(m->get_source().is_osd());
7c673cae
FG
207 else
208 reply_message(m);
209 } else if (loopback) {
11fdf7f2 210 ceph_assert(m->get_source().is_client());
7c673cae
FG
211 }
212 m->put();
9f95a23c 213 std::lock_guard l{lock};
7c673cae 214 got_new = true;
9f95a23c 215 cond.notify_all();
7c673cae
FG
216 }
217
11fdf7f2
TL
218 int ms_handle_authentication(Connection *con) override {
219 return 1;
7c673cae
FG
220 }
221
222 void reply_message(Message *m) {
223 MPing *rm = new MPing();
224 m->get_connection()->send_message(rm);
225 }
226};
227
228typedef FakeDispatcher::Session Session;
229
11fdf7f2
TL
230struct TestInterceptor : public Interceptor {
231
232 bool step_waiting = false;
233 bool waiting = true;
234 std::map<Connection *, uint32_t> current_step;
235 std::map<Connection *, std::list<uint32_t>> step_history;
236 std::map<uint32_t, std::optional<ACTION>> decisions;
237 std::set<uint32_t> breakpoints;
238
239 uint32_t count_step(Connection *conn, uint32_t step) {
240 uint32_t count = 0;
241 for (auto s : step_history[conn]) {
242 if (s == step) {
243 count++;
244 }
245 }
246 return count;
247 }
248
249 void breakpoint(uint32_t step) {
250 breakpoints.insert(step);
251 }
252
253 void remove_bp(uint32_t step) {
254 breakpoints.erase(step);
255 }
256
257 Connection *wait(uint32_t step, Connection *conn=nullptr) {
258 std::unique_lock<std::mutex> l(lock);
259 while(true) {
260 if (conn) {
261 auto it = current_step.find(conn);
262 if (it != current_step.end()) {
263 if (it->second == step) {
264 break;
265 }
266 }
267 } else {
268 for (auto it : current_step) {
269 if (it.second == step) {
270 conn = it.first;
271 break;
272 }
273 }
274 if (conn) {
275 break;
276 }
277 }
278 step_waiting = true;
279 cond_var.wait(l);
280 }
281 step_waiting = false;
282 return conn;
283 }
284
285 ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
286 if (decisions[step]) {
287 return *(decisions[step]);
288 }
289 waiting = true;
9f95a23c 290 cond_var.wait(l, [this] { return !waiting; });
11fdf7f2
TL
291 return *(decisions[step]);
292 }
293
294 void proceed(uint32_t step, ACTION decision) {
295 std::unique_lock<std::mutex> l(lock);
296 decisions[step] = decision;
297 if (waiting) {
298 waiting = false;
299 cond_var.notify_one();
300 }
301 }
302
303 ACTION intercept(Connection *conn, uint32_t step) override {
304 lderr(g_ceph_context) << __func__ << " conn(" << conn
305 << ") intercept called on step=" << step << dendl;
306
307 {
308 std::unique_lock<std::mutex> l(lock);
309 step_history[conn].push_back(step);
310 current_step[conn] = step;
311 if (step_waiting) {
312 cond_var.notify_one();
313 }
314 }
315
316 std::unique_lock<std::mutex> l(lock);
317 ACTION decision = ACTION::CONTINUE;
318 if (breakpoints.find(step) != breakpoints.end()) {
319 lderr(g_ceph_context) << __func__ << " conn(" << conn
320 << ") pausing on step=" << step << dendl;
321 decision = wait_for_decision(step, l);
322 } else {
323 if (decisions[step]) {
324 decision = *(decisions[step]);
325 }
326 }
327 lderr(g_ceph_context) << __func__ << " conn(" << conn
328 << ") resuming step=" << step << " with decision="
329 << decision << dendl;
330 decisions[step].reset();
331 return decision;
332 }
333
334};
335
336/**
337 * Scenario: A connects to B, and B connects to A at the same time.
338 */
339TEST_P(MessengerTest, ConnectionRaceTest) {
11fdf7f2
TL
340 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
341
342 TestInterceptor *cli_interceptor = new TestInterceptor();
343 TestInterceptor *srv_interceptor = new TestInterceptor();
344
345 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
346 server_msgr->interceptor = srv_interceptor;
347
348 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
349 client_msgr->interceptor = cli_interceptor;
350
351 entity_addr_t bind_addr;
352 bind_addr.parse("v2:127.0.0.1:3300");
353 server_msgr->bind(bind_addr);
354 server_msgr->add_dispatcher_head(&srv_dispatcher);
355 server_msgr->start();
356
357 bind_addr.parse("v2:127.0.0.1:3301");
358 client_msgr->bind(bind_addr);
359 client_msgr->add_dispatcher_head(&cli_dispatcher);
360 client_msgr->start();
361
362 // pause before sending client_ident message
f67539c2 363 cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
11fdf7f2 364 // pause before sending client_ident message
f67539c2 365 srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
11fdf7f2
TL
366
367 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
368 server_msgr->get_myaddrs());
369 MPing *m1 = new MPing();
370 ASSERT_EQ(c2s->send_message(m1), 0);
371
372 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
373 client_msgr->get_myaddrs());
374 MPing *m2 = new MPing();
375 ASSERT_EQ(s2c->send_message(m2), 0);
376
f67539c2
TL
377 cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get());
378 srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get());
11fdf7f2
TL
379
380 // at this point both connections (A->B, B->A) are paused just before sending
381 // the client_ident message.
382
f67539c2
TL
383 cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
384 srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
11fdf7f2 385
f67539c2
TL
386 cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
387 srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
11fdf7f2
TL
388
389 {
9f95a23c
TL
390 std::unique_lock l{cli_dispatcher.lock};
391 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
392 cli_dispatcher.got_new = false;
393 }
394
395 {
9f95a23c
TL
396 std::unique_lock l{srv_dispatcher.lock};
397 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
11fdf7f2
TL
398 srv_dispatcher.got_new = false;
399 }
400
401 ASSERT_TRUE(s2c->is_connected());
402 ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
403 ASSERT_TRUE(s2c->peer_is_client());
404
405 ASSERT_TRUE(c2s->is_connected());
406 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
407 ASSERT_TRUE(c2s->peer_is_osd());
408
409 client_msgr->shutdown();
410 client_msgr->wait();
411 server_msgr->shutdown();
412 server_msgr->wait();
413
414 delete cli_interceptor;
415 delete srv_interceptor;
416}
417
f6b5b4d7
TL
418/**
419 * Scenario: A connects to B, and B connects to A at the same time.
420 * The first (A -> B) connection gets to message flow handshake, the
421 * second (B -> A) connection is stuck waiting for a banner from A.
422 * After A sends client_ident to B, the first connection wins and B
423 * calls reuse_connection() to replace the second connection's socket
424 * while the second connection is still in BANNER_CONNECTING.
425 */
426TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) {
427 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
428
429 auto cli_interceptor = std::make_unique<TestInterceptor>();
430 auto srv_interceptor = std::make_unique<TestInterceptor>();
431
432 server_msgr->set_policy(entity_name_t::TYPE_CLIENT,
433 Messenger::Policy::lossless_peer_reuse(0));
434 server_msgr->interceptor = srv_interceptor.get();
435
436 client_msgr->set_policy(entity_name_t::TYPE_OSD,
437 Messenger::Policy::lossless_peer_reuse(0));
438 client_msgr->interceptor = cli_interceptor.get();
439
440 entity_addr_t bind_addr;
441 bind_addr.parse("v2:127.0.0.1:3300");
442 server_msgr->bind(bind_addr);
443 server_msgr->add_dispatcher_head(&srv_dispatcher);
444 server_msgr->start();
445
446 bind_addr.parse("v2:127.0.0.1:3301");
447 client_msgr->bind(bind_addr);
448 client_msgr->add_dispatcher_head(&cli_dispatcher);
449 client_msgr->start();
450
451 // pause before sending client_ident message
f67539c2 452 srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY);
f6b5b4d7
TL
453
454 ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
455 client_msgr->get_myaddrs());
456 MPing *m1 = new MPing();
457 ASSERT_EQ(s2c->send_message(m1), 0);
458
f67539c2
TL
459 srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY);
460 srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY);
f6b5b4d7
TL
461
462 // pause before sending banner
f67539c2 463 cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
f6b5b4d7
TL
464
465 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
466 server_msgr->get_myaddrs());
467 MPing *m2 = new MPing();
468 ASSERT_EQ(c2s->send_message(m2), 0);
469
f67539c2
TL
470 cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
471 cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING);
f6b5b4d7
TL
472
473 // second connection is in BANNER_CONNECTING, ensure it stays so
474 // and send client_ident
f67539c2
TL
475 srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE);
476 srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE);
f6b5b4d7
TL
477
478 // handle client_ident -- triggers reuse_connection() with exproto
479 // in BANNER_CONNECTING
f67539c2
TL
480 cli_interceptor->breakpoint(Interceptor::STEP::READY);
481 cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE);
f6b5b4d7 482
f67539c2
TL
483 cli_interceptor->wait(Interceptor::STEP::READY);
484 cli_interceptor->remove_bp(Interceptor::STEP::READY);
f6b5b4d7
TL
485
486 // first connection is in READY
f67539c2
TL
487 Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE);
488 srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE);
f6b5b4d7 489
f67539c2
TL
490 srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE);
491 cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
f6b5b4d7
TL
492
493 {
494 std::unique_lock l{cli_dispatcher.lock};
495 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
496 cli_dispatcher.got_new = false;
497 }
498
499 {
500 std::unique_lock l{srv_dispatcher.lock};
501 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
502 srv_dispatcher.got_new = false;
503 }
504
505 EXPECT_TRUE(s2c->is_connected());
506 EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
507 EXPECT_TRUE(s2c->peer_is_client());
508
509 EXPECT_TRUE(c2s->is_connected());
510 EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
511 EXPECT_TRUE(c2s->peer_is_osd());
512
513 // closed in reuse_connection() -- EPIPE when writing banner/hello
514 EXPECT_FALSE(s2c_accepter->is_connected());
515
516 // established exactly once, never faulted and reconnected
f67539c2
TL
517 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u);
518 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u);
519 EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u);
f6b5b4d7
TL
520
521 client_msgr->shutdown();
522 client_msgr->wait();
523 server_msgr->shutdown();
524 server_msgr->wait();
525}
526
11fdf7f2
TL
527/**
528 * Scenario:
529 * - A connects to B
530 * - A sends client_ident to B
531 * - B fails before sending server_ident to A
532 * - A reconnects
533 */
534TEST_P(MessengerTest, MissingServerIdenTest) {
11fdf7f2
TL
535 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
536
537 TestInterceptor *cli_interceptor = new TestInterceptor();
538 TestInterceptor *srv_interceptor = new TestInterceptor();
539
540 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
541 server_msgr->interceptor = srv_interceptor;
542
543 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
544 client_msgr->interceptor = cli_interceptor;
545
546 entity_addr_t bind_addr;
547 bind_addr.parse("v2:127.0.0.1:3300");
548 server_msgr->bind(bind_addr);
549 server_msgr->add_dispatcher_head(&srv_dispatcher);
550 server_msgr->start();
551
552 bind_addr.parse("v2:127.0.0.1:3301");
553 client_msgr->bind(bind_addr);
554 client_msgr->add_dispatcher_head(&cli_dispatcher);
555 client_msgr->start();
556
f67539c2
TL
557 // pause before sending server_ident message
558 srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
559
560 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
561 server_msgr->get_myaddrs());
562 MPing *m1 = new MPing();
563 ASSERT_EQ(c2s->send_message(m1), 0);
564
f67539c2
TL
565 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
566 srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
567
568 // We inject a message from this side of the connection to force it to be
569 // in standby when we inject the failure below
570 MPing *m2 = new MPing();
571 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
572
f67539c2 573 srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
11fdf7f2
TL
574
575 {
9f95a23c
TL
576 std::unique_lock l{srv_dispatcher.lock};
577 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
11fdf7f2
TL
578 srv_dispatcher.got_new = false;
579 }
580
581 {
9f95a23c
TL
582 std::unique_lock l{cli_dispatcher.lock};
583 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
584 cli_dispatcher.got_new = false;
585 }
586
587 ASSERT_TRUE(c2s->is_connected());
588 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
589 ASSERT_TRUE(c2s->peer_is_osd());
590
591 ASSERT_TRUE(c2s_accepter->is_connected());
592 ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
593 ASSERT_TRUE(c2s_accepter->peer_is_client());
594
595 client_msgr->shutdown();
596 client_msgr->wait();
597 server_msgr->shutdown();
598 server_msgr->wait();
599
600 delete cli_interceptor;
601 delete srv_interceptor;
602}
603
604/**
605 * Scenario:
606 * - A connects to B
607 * - A sends client_ident to B
608 * - B fails before sending server_ident to A
609 * - A goes to standby
610 * - B reconnects to A
611 */
612TEST_P(MessengerTest, MissingServerIdenTest2) {
11fdf7f2
TL
613 FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
614
615 TestInterceptor *cli_interceptor = new TestInterceptor();
616 TestInterceptor *srv_interceptor = new TestInterceptor();
617
618 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
619 server_msgr->interceptor = srv_interceptor;
620
621 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
622 client_msgr->interceptor = cli_interceptor;
623
624 entity_addr_t bind_addr;
625 bind_addr.parse("v2:127.0.0.1:3300");
626 server_msgr->bind(bind_addr);
627 server_msgr->add_dispatcher_head(&srv_dispatcher);
628 server_msgr->start();
629
630 bind_addr.parse("v2:127.0.0.1:3301");
631 client_msgr->bind(bind_addr);
632 client_msgr->add_dispatcher_head(&cli_dispatcher);
633 client_msgr->start();
634
f67539c2
TL
635 // pause before sending server_ident message
636 srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
637
638 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
639 server_msgr->get_myaddrs());
640
f67539c2
TL
641 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY);
642 srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY);
11fdf7f2
TL
643
644 // We inject a message from this side of the connection to force it to be
645 // in standby when we inject the failure below
646 MPing *m2 = new MPing();
647 ASSERT_EQ(c2s_accepter->send_message(m2), 0);
648
f67539c2 649 srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL);
11fdf7f2
TL
650
651 {
9f95a23c
TL
652 std::unique_lock l{cli_dispatcher.lock};
653 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
654 cli_dispatcher.got_new = false;
655 }
656
657 ASSERT_TRUE(c2s->is_connected());
658 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
659 ASSERT_TRUE(c2s->peer_is_osd());
660
661 ASSERT_TRUE(c2s_accepter->is_connected());
662 ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
663 ASSERT_TRUE(c2s_accepter->peer_is_client());
664
665 client_msgr->shutdown();
666 client_msgr->wait();
667 server_msgr->shutdown();
668 server_msgr->wait();
669
670 delete cli_interceptor;
671 delete srv_interceptor;
672}
673
674/**
675 * Scenario:
676 * - A connects to B
677 * - A and B exchange messages
678 * - A fails
679 * - B goes into standby
680 * - A reconnects
681 */
682TEST_P(MessengerTest, ReconnectTest) {
11fdf7f2
TL
683 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
684
685 TestInterceptor *cli_interceptor = new TestInterceptor();
686 TestInterceptor *srv_interceptor = new TestInterceptor();
687
688 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
689 server_msgr->interceptor = srv_interceptor;
690
691 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
692 client_msgr->interceptor = cli_interceptor;
693
694 entity_addr_t bind_addr;
695 bind_addr.parse("v2:127.0.0.1:3300");
696 server_msgr->bind(bind_addr);
697 server_msgr->add_dispatcher_head(&srv_dispatcher);
698 server_msgr->start();
699
700 bind_addr.parse("v2:127.0.0.1:3301");
701 client_msgr->bind(bind_addr);
702 client_msgr->add_dispatcher_head(&cli_dispatcher);
703 client_msgr->start();
704
705 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
706 server_msgr->get_myaddrs());
707
708 MPing *m1 = new MPing();
709 ASSERT_EQ(c2s->send_message(m1), 0);
710
711 {
9f95a23c
TL
712 std::unique_lock l{cli_dispatcher.lock};
713 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
714 cli_dispatcher.got_new = false;
715 }
716
717 ASSERT_TRUE(c2s->is_connected());
718 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
719 ASSERT_TRUE(c2s->peer_is_osd());
720
f67539c2 721 cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
722
723 MPing *m2 = new MPing();
724 ASSERT_EQ(c2s->send_message(m2), 0);
725
f67539c2
TL
726 cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
727 cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
728
729 // at this point client and server are connected together
730
f67539c2 731 srv_interceptor->breakpoint(Interceptor::STEP::READY);
11fdf7f2
TL
732
733 // failing client
f67539c2 734 cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
11fdf7f2
TL
735
736 MPing *m3 = new MPing();
737 ASSERT_EQ(c2s->send_message(m3), 0);
738
f67539c2 739 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
11fdf7f2
TL
740 // the srv end of theconnection is now paused at ready
741 // this means that the reconnect was successful
f67539c2 742 srv_interceptor->remove_bp(Interceptor::STEP::READY);
11fdf7f2
TL
743
744 ASSERT_TRUE(c2s_accepter->peer_is_client());
745 // c2s_accepter sent 0 reconnect messages
f67539c2 746 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
11fdf7f2 747 // c2s_accepter sent 1 reconnect_ok messages
f67539c2 748 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u);
11fdf7f2 749 // c2s sent 1 reconnect messages
f67539c2 750 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
11fdf7f2 751 // c2s sent 0 reconnect_ok messages
f67539c2 752 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u);
11fdf7f2
TL
753
754 srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
755
756 {
9f95a23c
TL
757 std::unique_lock l{cli_dispatcher.lock};
758 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
759 cli_dispatcher.got_new = false;
760 }
761
762 client_msgr->shutdown();
763 client_msgr->wait();
764 server_msgr->shutdown();
765 server_msgr->wait();
766
767 delete cli_interceptor;
768 delete srv_interceptor;
769}
770
771/**
772 * Scenario:
773 * - A connects to B
774 * - A and B exchange messages
775 * - A fails
776 * - A reconnects // B reconnects
777 */
778TEST_P(MessengerTest, ReconnectRaceTest) {
11fdf7f2
TL
779 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
780
781 TestInterceptor *cli_interceptor = new TestInterceptor();
782 TestInterceptor *srv_interceptor = new TestInterceptor();
783
784 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
785 server_msgr->interceptor = srv_interceptor;
786
787 client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
788 client_msgr->interceptor = cli_interceptor;
789
790 entity_addr_t bind_addr;
791 bind_addr.parse("v2:127.0.0.1:3300");
792 server_msgr->bind(bind_addr);
793 server_msgr->add_dispatcher_head(&srv_dispatcher);
794 server_msgr->start();
795
796 bind_addr.parse("v2:127.0.0.1:3301");
797 client_msgr->bind(bind_addr);
798 client_msgr->add_dispatcher_head(&cli_dispatcher);
799 client_msgr->start();
800
801 ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
802 server_msgr->get_myaddrs());
803
804 MPing *m1 = new MPing();
805 ASSERT_EQ(c2s->send_message(m1), 0);
806
807 {
9f95a23c
TL
808 std::unique_lock l{cli_dispatcher.lock};
809 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
810 cli_dispatcher.got_new = false;
811 }
812
813 ASSERT_TRUE(c2s->is_connected());
814 ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
815 ASSERT_TRUE(c2s->peer_is_osd());
816
f67539c2 817 cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
818
819 MPing *m2 = new MPing();
820 ASSERT_EQ(c2s->send_message(m2), 0);
821
f67539c2
TL
822 cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get());
823 cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE);
11fdf7f2
TL
824
825 // at this point client and server are connected together
826
827 // force both client and server to race on reconnect
f67539c2
TL
828 cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
829 srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT);
11fdf7f2
TL
830
831 // failing client
832 // this will cause both client and server to reconnect at the same time
f67539c2 833 cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL);
11fdf7f2
TL
834
835 MPing *m3 = new MPing();
836 ASSERT_EQ(c2s->send_message(m3), 0);
837
f67539c2
TL
838 cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get());
839 srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT);
11fdf7f2 840
f67539c2
TL
841 cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
842 srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT);
11fdf7f2
TL
843
844 // pause on "ready"
f67539c2 845 srv_interceptor->breakpoint(Interceptor::STEP::READY);
11fdf7f2 846
f67539c2
TL
847 cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
848 srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE);
11fdf7f2 849
f67539c2 850 Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY);
11fdf7f2
TL
851
852 // the server has reconnected and is "ready"
f67539c2 853 srv_interceptor->remove_bp(Interceptor::STEP::READY);
11fdf7f2
TL
854
855 ASSERT_TRUE(c2s_accepter->peer_is_client());
856 ASSERT_TRUE(c2s->peer_is_osd());
857
858 // the server should win the reconnect race
859
860 // c2s_accepter sent 1 or 2 reconnect messages
f67539c2
TL
861 ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u);
862 ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u);
11fdf7f2 863 // c2s_accepter sent 0 reconnect_ok messages
f67539c2 864 ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u);
11fdf7f2 865 // c2s sent 1 reconnect messages
f67539c2 866 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u);
11fdf7f2 867 // c2s sent 1 reconnect_ok messages
f67539c2 868 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u);
11fdf7f2 869
f67539c2 870 if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) {
11fdf7f2
TL
871 // if the server send the reconnect message two times then
872 // the client must have sent a session retry message to the server
f67539c2 873 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u);
11fdf7f2 874 } else {
f67539c2 875 ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u);
11fdf7f2
TL
876 }
877
f67539c2 878 srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE);
11fdf7f2
TL
879
880 {
9f95a23c
TL
881 std::unique_lock l{cli_dispatcher.lock};
882 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
883 cli_dispatcher.got_new = false;
884 }
885
886 client_msgr->shutdown();
887 client_msgr->wait();
888 server_msgr->shutdown();
889 server_msgr->wait();
890
891 delete cli_interceptor;
892 delete srv_interceptor;
893}
894
7c673cae
FG
895TEST_P(MessengerTest, SimpleTest) {
896 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
897 entity_addr_t bind_addr;
9f95a23c 898 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
899 server_msgr->bind(bind_addr);
900 server_msgr->add_dispatcher_head(&srv_dispatcher);
901 server_msgr->start();
902
903 client_msgr->add_dispatcher_head(&cli_dispatcher);
904 client_msgr->start();
905
906 // 1. simple round trip
907 MPing *m = new MPing();
11fdf7f2
TL
908 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
909 server_msgr->get_myaddrs());
7c673cae
FG
910 {
911 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
912 std::unique_lock l{cli_dispatcher.lock};
913 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
914 cli_dispatcher.got_new = false;
915 }
916 ASSERT_TRUE(conn->is_connected());
11fdf7f2 917 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
918 ASSERT_TRUE(conn->peer_is_osd());
919
920 // 2. test rebind port
921 set<int> avoid_ports;
11fdf7f2
TL
922 for (int i = 0; i < 10 ; i++) {
923 for (auto a : server_msgr->get_myaddrs().v) {
924 avoid_ports.insert(a.get_port() + i);
925 }
926 }
7c673cae 927 server_msgr->rebind(avoid_ports);
11fdf7f2
TL
928 for (auto a : server_msgr->get_myaddrs().v) {
929 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
930 }
7c673cae 931
11fdf7f2
TL
932 conn = client_msgr->connect_to(server_msgr->get_mytype(),
933 server_msgr->get_myaddrs());
7c673cae
FG
934 {
935 m = new MPing();
936 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
937 std::unique_lock l{cli_dispatcher.lock};
938 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
939 cli_dispatcher.got_new = false;
940 }
11fdf7f2 941 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
942
943 // 3. test markdown connection
944 conn->mark_down();
945 ASSERT_FALSE(conn->is_connected());
946
947 // 4. test failed connection
948 server_msgr->shutdown();
949 server_msgr->wait();
950
951 m = new MPing();
952 conn->send_message(m);
953 CHECK_AND_WAIT_TRUE(!conn->is_connected());
954 ASSERT_FALSE(conn->is_connected());
955
956 // 5. loopback connection
957 srv_dispatcher.loopback = true;
958 conn = client_msgr->get_loopback_connection();
959 {
960 m = new MPing();
961 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
962 std::unique_lock l{cli_dispatcher.lock};
963 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
964 cli_dispatcher.got_new = false;
965 }
966 srv_dispatcher.loopback = false;
11fdf7f2
TL
967 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
968 client_msgr->shutdown();
969 client_msgr->wait();
970 server_msgr->shutdown();
971 server_msgr->wait();
972}
973
974TEST_P(MessengerTest, SimpleMsgr2Test) {
975 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
976 entity_addr_t legacy_addr;
977 legacy_addr.parse("v1:127.0.0.1");
978 entity_addr_t msgr2_addr;
979 msgr2_addr.parse("v2:127.0.0.1");
980 entity_addrvec_t bind_addrs;
981 bind_addrs.v.push_back(legacy_addr);
982 bind_addrs.v.push_back(msgr2_addr);
983 server_msgr->bindv(bind_addrs);
984 server_msgr->add_dispatcher_head(&srv_dispatcher);
985 server_msgr->start();
986
987 client_msgr->add_dispatcher_head(&cli_dispatcher);
988 client_msgr->start();
989
990 // 1. simple round trip
991 MPing *m = new MPing();
992 ConnectionRef conn = client_msgr->connect_to(
993 server_msgr->get_mytype(),
994 server_msgr->get_myaddrs());
995 {
996 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
997 std::unique_lock l{cli_dispatcher.lock};
998 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
999 cli_dispatcher.got_new = false;
1000 }
1001 ASSERT_TRUE(conn->is_connected());
1002 ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
1003 ASSERT_TRUE(conn->peer_is_osd());
1004
1005 // 2. test rebind port
1006 set<int> avoid_ports;
1007 for (int i = 0; i < 10 ; i++) {
1008 for (auto a : server_msgr->get_myaddrs().v) {
1009 avoid_ports.insert(a.get_port() + i);
1010 }
1011 }
1012 server_msgr->rebind(avoid_ports);
1013 for (auto a : server_msgr->get_myaddrs().v) {
1014 ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
1015 }
1016
1017 conn = client_msgr->connect_to(
1018 server_msgr->get_mytype(),
1019 server_msgr->get_myaddrs());
1020 {
1021 m = new MPing();
1022 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1023 std::unique_lock l{cli_dispatcher.lock};
1024 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
1025 cli_dispatcher.got_new = false;
1026 }
1027 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1028
1029 // 3. test markdown connection
1030 conn->mark_down();
1031 ASSERT_FALSE(conn->is_connected());
1032
1033 // 4. test failed connection
1034 server_msgr->shutdown();
1035 server_msgr->wait();
1036
1037 m = new MPing();
1038 conn->send_message(m);
1039 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1040 ASSERT_FALSE(conn->is_connected());
1041
1042 // 5. loopback connection
1043 srv_dispatcher.loopback = true;
1044 conn = client_msgr->get_loopback_connection();
1045 {
1046 m = new MPing();
1047 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1048 std::unique_lock l{cli_dispatcher.lock};
1049 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
11fdf7f2
TL
1050 cli_dispatcher.got_new = false;
1051 }
1052 srv_dispatcher.loopback = false;
1053 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1054 client_msgr->shutdown();
1055 client_msgr->wait();
1056 server_msgr->shutdown();
1057 server_msgr->wait();
1058}
1059
7c673cae
FG
1060TEST_P(MessengerTest, FeatureTest) {
1061 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1062 entity_addr_t bind_addr;
9f95a23c 1063 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1064 uint64_t all_feature_supported, feature_required, feature_supported = 0;
1065 for (int i = 0; i < 10; i++)
1066 feature_supported |= 1ULL << i;
11fdf7f2
TL
1067 feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2;
1068 feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS;
7c673cae
FG
1069 feature_required = feature_supported | 1ULL << 13;
1070 all_feature_supported = feature_required | 1ULL << 14;
1071
1072 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
1073 p.features_required = feature_required;
1074 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1075 server_msgr->bind(bind_addr);
1076 server_msgr->add_dispatcher_head(&srv_dispatcher);
1077 server_msgr->start();
1078
1079 // 1. Suppose if only support less than required
1080 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1081 p.features_supported = feature_supported;
1082 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1083 client_msgr->add_dispatcher_head(&cli_dispatcher);
1084 client_msgr->start();
1085
1086 MPing *m = new MPing();
11fdf7f2
TL
1087 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1088 server_msgr->get_myaddrs());
7c673cae
FG
1089 conn->send_message(m);
1090 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1091 // should failed build a connection
1092 ASSERT_FALSE(conn->is_connected());
1093
1094 client_msgr->shutdown();
1095 client_msgr->wait();
1096
1097 // 2. supported met required
1098 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
1099 p.features_supported = all_feature_supported;
1100 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1101 client_msgr->start();
1102
11fdf7f2
TL
1103 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1104 server_msgr->get_myaddrs());
7c673cae
FG
1105 {
1106 m = new MPing();
1107 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1108 std::unique_lock l{cli_dispatcher.lock};
1109 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1110 cli_dispatcher.got_new = false;
1111 }
11fdf7f2 1112 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1113
1114 server_msgr->shutdown();
1115 client_msgr->shutdown();
1116 server_msgr->wait();
1117 client_msgr->wait();
1118}
1119
1120TEST_P(MessengerTest, TimeoutTest) {
81eedcae 1121 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1");
7c673cae
FG
1122 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1123 entity_addr_t bind_addr;
9f95a23c 1124 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1125 server_msgr->bind(bind_addr);
1126 server_msgr->add_dispatcher_head(&srv_dispatcher);
1127 server_msgr->start();
1128
1129 client_msgr->add_dispatcher_head(&cli_dispatcher);
1130 client_msgr->start();
1131
1132 // 1. build the connection
1133 MPing *m = new MPing();
11fdf7f2
TL
1134 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1135 server_msgr->get_myaddrs());
7c673cae
FG
1136 {
1137 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1138 std::unique_lock l{cli_dispatcher.lock};
1139 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1140 cli_dispatcher.got_new = false;
1141 }
1142 ASSERT_TRUE(conn->is_connected());
11fdf7f2 1143 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1144 ASSERT_TRUE(conn->peer_is_osd());
1145
1146 // 2. wait for idle
1147 usleep(2500*1000);
1148 ASSERT_FALSE(conn->is_connected());
1149
1150 server_msgr->shutdown();
1151 server_msgr->wait();
1152
1153 client_msgr->shutdown();
1154 client_msgr->wait();
81eedcae 1155 g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900");
7c673cae
FG
1156}
1157
1158TEST_P(MessengerTest, StatefulTest) {
1159 Message *m;
1160 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1161 entity_addr_t bind_addr;
9f95a23c 1162 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1163 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1164 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1165 p = Messenger::Policy::lossless_client(0);
1166 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1167
1168 server_msgr->bind(bind_addr);
1169 server_msgr->add_dispatcher_head(&srv_dispatcher);
1170 server_msgr->start();
1171 client_msgr->add_dispatcher_head(&cli_dispatcher);
1172 client_msgr->start();
1173
1174 // 1. test for server standby
11fdf7f2
TL
1175 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1176 server_msgr->get_myaddrs());
7c673cae
FG
1177 {
1178 m = new MPing();
1179 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1180 std::unique_lock l{cli_dispatcher.lock};
1181 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1182 cli_dispatcher.got_new = false;
1183 }
11fdf7f2 1184 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1185 conn->mark_down();
1186 ASSERT_FALSE(conn->is_connected());
11fdf7f2
TL
1187 ConnectionRef server_conn = server_msgr->connect_to(
1188 client_msgr->get_mytype(), srv_dispatcher.last_accept);
7c673cae 1189 // don't lose state
11fdf7f2 1190 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1191
1192 srv_dispatcher.got_new = false;
11fdf7f2
TL
1193 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1194 server_msgr->get_myaddrs());
7c673cae
FG
1195 {
1196 m = new MPing();
1197 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1198 std::unique_lock l{cli_dispatcher.lock};
1199 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1200 cli_dispatcher.got_new = false;
1201 }
11fdf7f2
TL
1202 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1203 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1204 srv_dispatcher.last_accept);
7c673cae 1205 {
9f95a23c
TL
1206 std::unique_lock l{srv_dispatcher.lock};
1207 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; });
7c673cae
FG
1208 }
1209
1210 // 2. test for client reconnect
1211 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1212 cli_dispatcher.got_connect = false;
1213 cli_dispatcher.got_new = false;
1214 cli_dispatcher.got_remote_reset = false;
1215 server_conn->mark_down();
1216 ASSERT_FALSE(server_conn->is_connected());
1217 // ensure client detect server socket closed
1218 {
9f95a23c
TL
1219 std::unique_lock l{cli_dispatcher.lock};
1220 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
7c673cae
FG
1221 cli_dispatcher.got_remote_reset = false;
1222 }
1223 {
9f95a23c
TL
1224 std::unique_lock l{cli_dispatcher.lock};
1225 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
7c673cae
FG
1226 cli_dispatcher.got_connect = false;
1227 }
1228 CHECK_AND_WAIT_TRUE(conn->is_connected());
1229 ASSERT_TRUE(conn->is_connected());
1230
1231 {
1232 m = new MPing();
1233 ASSERT_EQ(conn->send_message(m), 0);
1234 ASSERT_TRUE(conn->is_connected());
9f95a23c
TL
1235 std::unique_lock l{cli_dispatcher.lock};
1236 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1237 cli_dispatcher.got_new = false;
1238 }
1239 // resetcheck happen
11fdf7f2
TL
1240 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1241 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1242 srv_dispatcher.last_accept);
1243 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1244 cli_dispatcher.got_remote_reset = false;
1245
1246 server_msgr->shutdown();
1247 client_msgr->shutdown();
1248 server_msgr->wait();
1249 client_msgr->wait();
1250}
1251
1252TEST_P(MessengerTest, StatelessTest) {
1253 Message *m;
1254 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1255 entity_addr_t bind_addr;
9f95a23c 1256 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1257 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1258 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1259 p = Messenger::Policy::lossy_client(0);
1260 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1261
1262 server_msgr->bind(bind_addr);
1263 server_msgr->add_dispatcher_head(&srv_dispatcher);
1264 server_msgr->start();
1265 client_msgr->add_dispatcher_head(&cli_dispatcher);
1266 client_msgr->start();
1267
1268 // 1. test for server lose state
11fdf7f2
TL
1269 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1270 server_msgr->get_myaddrs());
7c673cae
FG
1271 {
1272 m = new MPing();
1273 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1274 std::unique_lock l{cli_dispatcher.lock};
1275 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1276 cli_dispatcher.got_new = false;
1277 }
11fdf7f2 1278 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1279 conn->mark_down();
1280 ASSERT_FALSE(conn->is_connected());
1281
1282 srv_dispatcher.got_new = false;
9f95a23c
TL
1283 ConnectionRef server_conn;
1284 srv_dispatcher.last_accept_con_ptr = &server_conn;
11fdf7f2
TL
1285 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1286 server_msgr->get_myaddrs());
7c673cae
FG
1287 {
1288 m = new MPing();
1289 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1290 std::unique_lock l{cli_dispatcher.lock};
1291 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1292 cli_dispatcher.got_new = false;
1293 }
11fdf7f2 1294 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
9f95a23c
TL
1295 ASSERT_TRUE(server_conn);
1296
7c673cae
FG
1297 // server lose state
1298 {
9f95a23c
TL
1299 std::unique_lock l{srv_dispatcher.lock};
1300 srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
7c673cae 1301 }
11fdf7f2 1302 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1303
1304 // 2. test for client lossy
1305 server_conn->mark_down();
1306 ASSERT_FALSE(server_conn->is_connected());
1307 conn->send_keepalive();
1308 CHECK_AND_WAIT_TRUE(!conn->is_connected());
1309 ASSERT_FALSE(conn->is_connected());
11fdf7f2
TL
1310 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1311 server_msgr->get_myaddrs());
7c673cae
FG
1312 {
1313 m = new MPing();
1314 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1315 std::unique_lock l{cli_dispatcher.lock};
1316 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1317 cli_dispatcher.got_new = false;
1318 }
11fdf7f2 1319 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1320
1321 server_msgr->shutdown();
1322 client_msgr->shutdown();
1323 server_msgr->wait();
1324 client_msgr->wait();
1325}
1326
9f95a23c
TL
1327TEST_P(MessengerTest, AnonTest) {
1328 Message *m;
1329 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1330 entity_addr_t bind_addr;
1331 bind_addr.parse("v2:127.0.0.1");
1332 Messenger::Policy p = Messenger::Policy::stateless_server(0);
1333 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1334 p = Messenger::Policy::lossy_client(0);
1335 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1336
1337 server_msgr->bind(bind_addr);
1338 server_msgr->add_dispatcher_head(&srv_dispatcher);
1339 server_msgr->start();
1340 client_msgr->add_dispatcher_head(&cli_dispatcher);
1341 client_msgr->start();
1342
1343 ConnectionRef server_con_a, server_con_b;
1344
1345 // a
1346 srv_dispatcher.last_accept_con_ptr = &server_con_a;
1347 ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(),
1348 server_msgr->get_myaddrs(),
1349 true);
1350 {
1351 m = new MPing();
1352 ASSERT_EQ(con_a->send_message(m), 0);
1353 std::unique_lock l{cli_dispatcher.lock};
1354 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1355 cli_dispatcher.got_new = false;
1356 }
1357 ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count());
1358
1359 // b
1360 srv_dispatcher.last_accept_con_ptr = &server_con_b;
1361 ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(),
1362 server_msgr->get_myaddrs(),
1363 true);
1364 {
1365 m = new MPing();
1366 ASSERT_EQ(con_b->send_message(m), 0);
1367 std::unique_lock l{cli_dispatcher.lock};
1368 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1369 cli_dispatcher.got_new = false;
1370 }
1371 ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count());
1372
1373 // these should be distinct
1374 ASSERT_NE(con_a, con_b);
1375 ASSERT_NE(server_con_a, server_con_b);
1376
1377 // and both connected
1378 {
1379 m = new MPing();
1380 ASSERT_EQ(con_a->send_message(m), 0);
1381 std::unique_lock l{cli_dispatcher.lock};
1382 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1383 cli_dispatcher.got_new = false;
1384 }
1385 {
1386 m = new MPing();
1387 ASSERT_EQ(con_b->send_message(m), 0);
1388 std::unique_lock l{cli_dispatcher.lock};
1389 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
1390 cli_dispatcher.got_new = false;
1391 }
1392
1393 // clean up
1394 con_a->mark_down();
1395 ASSERT_FALSE(con_a->is_connected());
1396 con_b->mark_down();
1397 ASSERT_FALSE(con_b->is_connected());
1398
1399 server_msgr->shutdown();
1400 client_msgr->shutdown();
1401 server_msgr->wait();
1402 client_msgr->wait();
1403}
1404
7c673cae
FG
1405TEST_P(MessengerTest, ClientStandbyTest) {
1406 Message *m;
1407 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1408 entity_addr_t bind_addr;
9f95a23c 1409 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1410 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1411 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1412 p = Messenger::Policy::lossless_peer(0);
1413 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1414
1415 server_msgr->bind(bind_addr);
1416 server_msgr->add_dispatcher_head(&srv_dispatcher);
1417 server_msgr->start();
1418 client_msgr->add_dispatcher_head(&cli_dispatcher);
1419 client_msgr->start();
1420
1421 // 1. test for client standby, resetcheck
11fdf7f2
TL
1422 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1423 server_msgr->get_myaddrs());
7c673cae
FG
1424 {
1425 m = new MPing();
1426 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1427 std::unique_lock l{cli_dispatcher.lock};
1428 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1429 cli_dispatcher.got_new = false;
1430 }
11fdf7f2
TL
1431 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1432 ConnectionRef server_conn = server_msgr->connect_to(
1433 client_msgr->get_mytype(),
1434 srv_dispatcher.last_accept);
7c673cae
FG
1435 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
1436 cli_dispatcher.got_connect = false;
1437 server_conn->mark_down();
1438 ASSERT_FALSE(server_conn->is_connected());
1439 // client should be standby
1440 usleep(300*1000);
1441 // client should be standby, so we use original connection
1442 {
1443 // Try send message to verify got remote reset callback
1444 m = new MPing();
1445 ASSERT_EQ(conn->send_message(m), 0);
1446 {
9f95a23c
TL
1447 std::unique_lock l{cli_dispatcher.lock};
1448 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; });
7c673cae 1449 cli_dispatcher.got_remote_reset = false;
9f95a23c 1450 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; });
7c673cae
FG
1451 cli_dispatcher.got_connect = false;
1452 }
1453 CHECK_AND_WAIT_TRUE(conn->is_connected());
1454 ASSERT_TRUE(conn->is_connected());
1455 m = new MPing();
1456 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1457 std::unique_lock l{cli_dispatcher.lock};
1458 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1459 cli_dispatcher.got_new = false;
1460 }
11fdf7f2
TL
1461 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
1462 server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
1463 srv_dispatcher.last_accept);
1464 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
7c673cae
FG
1465
1466 server_msgr->shutdown();
1467 client_msgr->shutdown();
1468 server_msgr->wait();
1469 client_msgr->wait();
1470}
1471
1472TEST_P(MessengerTest, AuthTest) {
11fdf7f2
TL
1473 g_ceph_context->_conf.set_val("auth_cluster_required", "cephx");
1474 g_ceph_context->_conf.set_val("auth_service_required", "cephx");
1475 g_ceph_context->_conf.set_val("auth_client_required", "cephx");
7c673cae
FG
1476 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1477 entity_addr_t bind_addr;
9f95a23c 1478 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1479 server_msgr->bind(bind_addr);
1480 server_msgr->add_dispatcher_head(&srv_dispatcher);
1481 server_msgr->start();
1482
1483 client_msgr->add_dispatcher_head(&cli_dispatcher);
1484 client_msgr->start();
1485
1486 // 1. simple auth round trip
1487 MPing *m = new MPing();
11fdf7f2
TL
1488 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1489 server_msgr->get_myaddrs());
7c673cae
FG
1490 {
1491 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1492 std::unique_lock l{cli_dispatcher.lock};
1493 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1494 cli_dispatcher.got_new = false;
1495 }
1496 ASSERT_TRUE(conn->is_connected());
11fdf7f2 1497 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1498
1499 // 2. mix auth
11fdf7f2
TL
1500 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
1501 g_ceph_context->_conf.set_val("auth_service_required", "none");
1502 g_ceph_context->_conf.set_val("auth_client_required", "none");
7c673cae
FG
1503 conn->mark_down();
1504 ASSERT_FALSE(conn->is_connected());
11fdf7f2
TL
1505 conn = client_msgr->connect_to(server_msgr->get_mytype(),
1506 server_msgr->get_myaddrs());
7c673cae
FG
1507 {
1508 MPing *m = new MPing();
1509 ASSERT_EQ(conn->send_message(m), 0);
9f95a23c
TL
1510 std::unique_lock l{cli_dispatcher.lock};
1511 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1512 cli_dispatcher.got_new = false;
1513 }
1514 ASSERT_TRUE(conn->is_connected());
11fdf7f2 1515 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
7c673cae
FG
1516 server_msgr->shutdown();
1517 client_msgr->shutdown();
1518 server_msgr->wait();
1519 client_msgr->wait();
1520}
1521
1522TEST_P(MessengerTest, MessageTest) {
1523 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
1524 entity_addr_t bind_addr;
9f95a23c 1525 bind_addr.parse("v2:127.0.0.1");
7c673cae
FG
1526 Messenger::Policy p = Messenger::Policy::stateful_server(0);
1527 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
1528 p = Messenger::Policy::lossless_peer(0);
1529 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
1530
1531 server_msgr->bind(bind_addr);
1532 server_msgr->add_dispatcher_head(&srv_dispatcher);
1533 server_msgr->start();
1534 client_msgr->add_dispatcher_head(&cli_dispatcher);
1535 client_msgr->start();
1536
1537
1538 // 1. A very large "front"(as well as "payload")
1539 // Because a external message need to invade Messenger::decode_message,
1540 // here we only use existing message class(MCommand)
11fdf7f2
TL
1541 ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
1542 server_msgr->get_myaddrs());
7c673cae
FG
1543 {
1544 uuid_d uuid;
1545 uuid.generate_random();
1546 vector<string> cmds;
1547 string s("abcdefghijklmnopqrstuvwxyz");
1548 for (int i = 0; i < 1024*30; i++)
1549 cmds.push_back(s);
1550 MCommand *m = new MCommand(uuid);
1551 m->cmd = cmds;
1552 conn->send_message(m);
9f95a23c
TL
1553 std::unique_lock l{cli_dispatcher.lock};
1554 cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1555 ASSERT_TRUE(cli_dispatcher.got_new);
1556 cli_dispatcher.got_new = false;
1557 }
1558
1559 // 2. A very large "data"
1560 {
1561 bufferlist bl;
1562 string s("abcdefghijklmnopqrstuvwxyz");
1563 for (int i = 0; i < 1024*30; i++)
1564 bl.append(s);
1565 MPing *m = new MPing();
1566 m->set_data(bl);
1567 conn->send_message(m);
1568 utime_t t;
1569 t += 1000*1000*500;
9f95a23c
TL
1570 std::unique_lock l{cli_dispatcher.lock};
1571 cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
7c673cae
FG
1572 ASSERT_TRUE(cli_dispatcher.got_new);
1573 cli_dispatcher.got_new = false;
1574 }
1575 server_msgr->shutdown();
1576 client_msgr->shutdown();
1577 server_msgr->wait();
1578 client_msgr->wait();
1579}
1580
1581
1582class SyntheticWorkload;
1583
1584struct Payload {
1585 enum Who : uint8_t {
1586 PING = 0,
1587 PONG = 1,
1588 };
11fdf7f2
TL
1589 uint8_t who = 0;
1590 uint64_t seq = 0;
7c673cae
FG
1591 bufferlist data;
1592
1593 Payload(Who who, uint64_t seq, const bufferlist& data)
1594 : who(who), seq(seq), data(data)
1595 {}
1596 Payload() = default;
1597 DENC(Payload, v, p) {
1598 DENC_START(1, 1, p);
1599 denc(v.who, p);
1600 denc(v.seq, p);
1601 denc(v.data, p);
1602 DENC_FINISH(p);
1603 }
1604};
1605WRITE_CLASS_DENC(Payload)
1606
1607ostream& operator<<(ostream& out, const Payload &pl)
1608{
1609 return out << "reply=" << pl.who << " i = " << pl.seq;
1610}
1611
1612class SyntheticDispatcher : public Dispatcher {
1613 public:
9f95a23c
TL
1614 ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock");
1615 ceph::condition_variable cond;
7c673cae
FG
1616 bool is_server;
1617 bool got_new;
1618 bool got_remote_reset;
1619 bool got_connect;
1620 map<ConnectionRef, list<uint64_t> > conn_sent;
1621 map<uint64_t, bufferlist> sent;
1622 atomic<uint64_t> index;
1623 SyntheticWorkload *workload;
1624
1625 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
9f95a23c 1626 Dispatcher(g_ceph_context), is_server(s), got_new(false),
11fdf7f2 1627 got_remote_reset(false), got_connect(false), index(0), workload(wl) {
11fdf7f2 1628 }
7c673cae
FG
1629 bool ms_can_fast_dispatch_any() const override { return true; }
1630 bool ms_can_fast_dispatch(const Message *m) const override {
1631 switch (m->get_type()) {
1632 case CEPH_MSG_PING:
1633 case MSG_COMMAND:
1634 return true;
1635 default:
1636 return false;
1637 }
1638 }
1639
1640 void ms_handle_fast_connect(Connection *con) override {
9f95a23c 1641 std::lock_guard l{lock};
7c673cae
FG
1642 list<uint64_t> c = conn_sent[con];
1643 for (list<uint64_t>::iterator it = c.begin();
1644 it != c.end(); ++it)
1645 sent.erase(*it);
1646 conn_sent.erase(con);
1647 got_connect = true;
9f95a23c 1648 cond.notify_all();
7c673cae
FG
1649 }
1650 void ms_handle_fast_accept(Connection *con) override {
9f95a23c 1651 std::lock_guard l{lock};
7c673cae
FG
1652 list<uint64_t> c = conn_sent[con];
1653 for (list<uint64_t>::iterator it = c.begin();
1654 it != c.end(); ++it)
1655 sent.erase(*it);
1656 conn_sent.erase(con);
9f95a23c 1657 cond.notify_all();
7c673cae
FG
1658 }
1659 bool ms_dispatch(Message *m) override {
1660 ceph_abort();
1661 }
1662 bool ms_handle_reset(Connection *con) override;
1663 void ms_handle_remote_reset(Connection *con) override {
9f95a23c 1664 std::lock_guard l{lock};
7c673cae
FG
1665 list<uint64_t> c = conn_sent[con];
1666 for (list<uint64_t>::iterator it = c.begin();
1667 it != c.end(); ++it)
1668 sent.erase(*it);
1669 conn_sent.erase(con);
1670 got_remote_reset = true;
1671 }
1672 bool ms_handle_refused(Connection *con) override {
1673 return false;
1674 }
1675 void ms_fast_dispatch(Message *m) override {
1676 // MSG_COMMAND is used to disorganize regular message flow
1677 if (m->get_type() == MSG_COMMAND) {
1678 m->put();
1679 return ;
1680 }
1681
1682 Payload pl;
11fdf7f2
TL
1683 auto p = m->get_data().cbegin();
1684 decode(pl, p);
7c673cae
FG
1685 if (pl.who == Payload::PING) {
1686 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1687 reply_message(m, pl);
1688 m->put();
9f95a23c 1689 std::lock_guard l{lock};
7c673cae 1690 got_new = true;
9f95a23c 1691 cond.notify_all();
7c673cae 1692 } else {
9f95a23c 1693 std::lock_guard l{lock};
7c673cae
FG
1694 if (sent.count(pl.seq)) {
1695 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
1696 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
1697 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
1698 conn_sent[m->get_connection()].pop_front();
1699 sent.erase(pl.seq);
1700 }
1701 m->put();
1702 got_new = true;
9f95a23c 1703 cond.notify_all();
7c673cae
FG
1704 }
1705 }
1706
11fdf7f2
TL
1707 int ms_handle_authentication(Connection *con) override {
1708 return 1;
7c673cae
FG
1709 }
1710
1711 void reply_message(const Message *m, Payload& pl) {
1712 pl.who = Payload::PONG;
1713 bufferlist bl;
11fdf7f2 1714 encode(pl, bl);
7c673cae
FG
1715 MPing *rm = new MPing();
1716 rm->set_data(bl);
1717 m->get_connection()->send_message(rm);
1718 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
1719 }
1720
1721 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
1722 Message *m = new MPing();
1723 Payload pl{Payload::PING, index++, data};
1724 bufferlist bl;
11fdf7f2 1725 encode(pl, bl);
7c673cae
FG
1726 m->set_data(bl);
1727 if (!con->get_messenger()->get_default_policy().lossy) {
9f95a23c 1728 std::lock_guard l{lock};
7c673cae
FG
1729 sent[pl.seq] = pl.data;
1730 conn_sent[con].push_back(pl.seq);
1731 }
1732 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
1733 ASSERT_EQ(0, con->send_message(m));
1734 }
1735
1736 uint64_t get_pending() {
9f95a23c 1737 std::lock_guard l{lock};
7c673cae
FG
1738 return sent.size();
1739 }
1740
1741 void clear_pending(ConnectionRef con) {
9f95a23c 1742 std::lock_guard l{lock};
7c673cae
FG
1743
1744 for (list<uint64_t>::iterator it = conn_sent[con].begin();
1745 it != conn_sent[con].end(); ++it)
1746 sent.erase(*it);
1747 conn_sent.erase(con);
1748 }
1749
1750 void print() {
1751 for (auto && p : conn_sent) {
1752 if (!p.second.empty()) {
1753 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
1754 }
1755 }
1756 }
1757};
1758
1759
1760class SyntheticWorkload {
9f95a23c
TL
1761 ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock");
1762 ceph::condition_variable cond;
7c673cae
FG
1763 set<Messenger*> available_servers;
1764 set<Messenger*> available_clients;
11fdf7f2 1765 Messenger::Policy client_policy;
7c673cae
FG
1766 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
1767 SyntheticDispatcher dispatcher;
1768 gen_type rng;
1769 vector<bufferlist> rand_data;
11fdf7f2 1770 DummyAuthClientServer dummy_auth;
7c673cae
FG
1771
1772 public:
1773 static const unsigned max_in_flight = 64;
1774 static const unsigned max_connections = 128;
1775 static const unsigned max_message_len = 1024 * 1024 * 4;
1776
1777 SyntheticWorkload(int servers, int clients, string type, int random_num,
11fdf7f2 1778 Messenger::Policy srv_policy, Messenger::Policy cli_policy)
9f95a23c 1779 : client_policy(cli_policy),
11fdf7f2
TL
1780 dispatcher(false, this),
1781 rng(time(NULL)),
1782 dummy_auth(g_ceph_context) {
1783 dummy_auth.auth_registry.refresh_config();
7c673cae
FG
1784 Messenger *msgr;
1785 int base_port = 16800;
1786 entity_addr_t bind_addr;
1787 char addr[64];
1788 for (int i = 0; i < servers; ++i) {
1789 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
f67539c2 1790 "server", getpid()+i);
9f95a23c 1791 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
11fdf7f2 1792 base_port+i);
7c673cae
FG
1793 bind_addr.parse(addr);
1794 msgr->bind(bind_addr);
1795 msgr->add_dispatcher_head(&dispatcher);
11fdf7f2
TL
1796 msgr->set_auth_client(&dummy_auth);
1797 msgr->set_auth_server(&dummy_auth);
7c673cae 1798
11fdf7f2 1799 ceph_assert(msgr);
7c673cae
FG
1800 msgr->set_default_policy(srv_policy);
1801 available_servers.insert(msgr);
1802 msgr->start();
1803 }
1804
1805 for (int i = 0; i < clients; ++i) {
1806 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
f67539c2 1807 "client", getpid()+i+servers);
7c673cae 1808 if (cli_policy.standby) {
9f95a23c 1809 snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
11fdf7f2 1810 base_port+i+servers);
7c673cae
FG
1811 bind_addr.parse(addr);
1812 msgr->bind(bind_addr);
1813 }
1814 msgr->add_dispatcher_head(&dispatcher);
11fdf7f2
TL
1815 msgr->set_auth_client(&dummy_auth);
1816 msgr->set_auth_server(&dummy_auth);
7c673cae 1817
11fdf7f2 1818 ceph_assert(msgr);
7c673cae
FG
1819 msgr->set_default_policy(cli_policy);
1820 available_clients.insert(msgr);
1821 msgr->start();
1822 }
1823
1824 for (int i = 0; i < random_num; i++) {
1825 bufferlist bl;
1826 boost::uniform_int<> u(32, max_message_len);
1827 uint64_t value_len = u(rng);
1828 bufferptr bp(value_len);
1829 bp.zero();
1830 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1831 memcpy(bp.c_str()+j, &i, sizeof(i));
1832 j += 4096;
1833 }
1834
1835 bl.append(bp);
1836 rand_data.push_back(bl);
1837 }
1838 }
1839
1840 ConnectionRef _get_random_connection() {
1841 while (dispatcher.get_pending() > max_in_flight) {
9f95a23c 1842 lock.unlock();
7c673cae 1843 usleep(500);
9f95a23c 1844 lock.lock();
7c673cae 1845 }
9f95a23c 1846 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1847 boost::uniform_int<> choose(0, available_connections.size() - 1);
1848 int index = choose(rng);
1849 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1850 for (; index > 0; --index, ++i) ;
1851 return i->first;
1852 }
1853
1854 bool can_create_connection() {
1855 return available_connections.size() < max_connections;
1856 }
1857
1858 void generate_connection() {
9f95a23c 1859 std::lock_guard l{lock};
7c673cae
FG
1860 if (!can_create_connection())
1861 return ;
1862
1863 Messenger *server, *client;
1864 {
1865 boost::uniform_int<> choose(0, available_servers.size() - 1);
1866 int index = choose(rng);
1867 set<Messenger*>::iterator i = available_servers.begin();
1868 for (; index > 0; --index, ++i) ;
1869 server = *i;
1870 }
1871 {
1872 boost::uniform_int<> choose(0, available_clients.size() - 1);
1873 int index = choose(rng);
1874 set<Messenger*>::iterator i = available_clients.begin();
1875 for (; index > 0; --index, ++i) ;
1876 client = *i;
1877 }
1878
1879 pair<Messenger*, Messenger*> p;
1880 {
1881 boost::uniform_int<> choose(0, available_servers.size() - 1);
1882 if (server->get_default_policy().server) {
1883 p = make_pair(client, server);
11fdf7f2
TL
1884 ConnectionRef conn = client->connect_to(server->get_mytype(),
1885 server->get_myaddrs());
1886 available_connections[conn] = p;
7c673cae 1887 } else {
11fdf7f2
TL
1888 ConnectionRef conn = client->connect_to(server->get_mytype(),
1889 server->get_myaddrs());
1890 p = make_pair(client, server);
1891 available_connections[conn] = p;
7c673cae
FG
1892 }
1893 }
7c673cae
FG
1894 }
1895
1896 void send_message() {
9f95a23c 1897 std::lock_guard l{lock};
7c673cae
FG
1898 ConnectionRef conn = _get_random_connection();
1899 boost::uniform_int<> true_false(0, 99);
1900 int val = true_false(rng);
1901 if (val >= 95) {
1902 uuid_d uuid;
1903 uuid.generate_random();
1904 MCommand *m = new MCommand(uuid);
1905 vector<string> cmds;
1906 cmds.push_back("command");
1907 m->cmd = cmds;
1908 m->set_priority(200);
1909 conn->send_message(m);
1910 } else {
1911 boost::uniform_int<> u(0, rand_data.size()-1);
1912 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1913 }
1914 }
1915
1916 void drop_connection() {
9f95a23c 1917 std::lock_guard l{lock};
7c673cae
FG
1918 if (available_connections.size() < 10)
1919 return;
1920 ConnectionRef conn = _get_random_connection();
1921 dispatcher.clear_pending(conn);
1922 conn->mark_down();
11fdf7f2
TL
1923 if (!client_policy.server &&
1924 !client_policy.lossy &&
1925 client_policy.standby) {
1926 // it's a lossless policy, so we need to mark down each side
1927 pair<Messenger*, Messenger*> &p = available_connections[conn];
1928 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1929 ASSERT_EQ(conn->get_messenger(), p.first);
1930 ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
1931 p.first->get_myaddrs());
1932 peer->mark_down();
1933 dispatcher.clear_pending(peer);
1934 available_connections.erase(peer);
1935 }
7c673cae
FG
1936 }
1937 ASSERT_EQ(available_connections.erase(conn), 1U);
1938 }
1939
1940 void print_internal_state(bool detail=false) {
9f95a23c 1941 std::lock_guard l{lock};
7c673cae
FG
1942 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1943 << " inflight messages: " << dispatcher.get_pending() << dendl;
1944 if (detail && !available_connections.empty()) {
1945 dispatcher.print();
1946 }
1947 }
1948
1949 void wait_for_done() {
1950 int64_t tick_us = 1000 * 100; // 100ms
1951 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1952 int i = 0;
1953 while (dispatcher.get_pending()) {
1954 usleep(tick_us);
1955 timeout_us -= tick_us;
1956 if (i++ % 50 == 0)
1957 print_internal_state(true);
1958 if (timeout_us < 0)
11fdf7f2 1959 ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
7c673cae
FG
1960 }
1961 for (set<Messenger*>::iterator it = available_servers.begin();
1962 it != available_servers.end(); ++it) {
1963 (*it)->shutdown();
1964 (*it)->wait();
1965 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1966 delete (*it);
1967 }
1968 available_servers.clear();
1969
1970 for (set<Messenger*>::iterator it = available_clients.begin();
1971 it != available_clients.end(); ++it) {
1972 (*it)->shutdown();
1973 (*it)->wait();
1974 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1975 delete (*it);
1976 }
1977 available_clients.clear();
1978 }
1979
1980 void handle_reset(Connection *con) {
9f95a23c 1981 std::lock_guard l{lock};
7c673cae
FG
1982 available_connections.erase(con);
1983 dispatcher.clear_pending(con);
1984 }
1985};
1986
1987bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1988 workload->handle_reset(con);
1989 return true;
1990}
1991
1992TEST_P(MessengerTest, SyntheticStressTest) {
1993 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1994 Messenger::Policy::stateful_server(0),
1995 Messenger::Policy::lossless_client(0));
1996 for (int i = 0; i < 100; ++i) {
1997 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1998 test_msg.generate_connection();
1999 }
2000 gen_type rng(time(NULL));
2001 for (int i = 0; i < 5000; ++i) {
2002 if (!(i % 10)) {
2003 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2004 test_msg.print_internal_state();
2005 }
2006 boost::uniform_int<> true_false(0, 99);
2007 int val = true_false(rng);
2008 if (val > 90) {
2009 test_msg.generate_connection();
2010 } else if (val > 80) {
2011 test_msg.drop_connection();
2012 } else if (val > 10) {
2013 test_msg.send_message();
2014 } else {
2015 usleep(rand() % 1000 + 500);
2016 }
2017 }
2018 test_msg.wait_for_done();
2019}
2020
2021TEST_P(MessengerTest, SyntheticStressTest1) {
2022 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2023 Messenger::Policy::lossless_peer_reuse(0),
2024 Messenger::Policy::lossless_peer_reuse(0));
2025 for (int i = 0; i < 10; ++i) {
2026 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2027 test_msg.generate_connection();
2028 }
2029 gen_type rng(time(NULL));
2030 for (int i = 0; i < 10000; ++i) {
2031 if (!(i % 10)) {
2032 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2033 test_msg.print_internal_state();
2034 }
2035 boost::uniform_int<> true_false(0, 99);
2036 int val = true_false(rng);
2037 if (val > 80) {
2038 test_msg.generate_connection();
2039 } else if (val > 60) {
2040 test_msg.drop_connection();
2041 } else if (val > 10) {
2042 test_msg.send_message();
2043 } else {
2044 usleep(rand() % 1000 + 500);
2045 }
2046 }
2047 test_msg.wait_for_done();
2048}
2049
2050
2051TEST_P(MessengerTest, SyntheticInjectTest) {
2052 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
11fdf7f2
TL
2053 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2054 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2055 g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216");
7c673cae
FG
2056 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
2057 Messenger::Policy::stateful_server(0),
2058 Messenger::Policy::lossless_client(0));
2059 for (int i = 0; i < 100; ++i) {
2060 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2061 test_msg.generate_connection();
2062 }
2063 gen_type rng(time(NULL));
2064 for (int i = 0; i < 1000; ++i) {
2065 if (!(i % 10)) {
2066 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2067 test_msg.print_internal_state();
2068 }
2069 boost::uniform_int<> true_false(0, 99);
2070 int val = true_false(rng);
2071 if (val > 90) {
2072 test_msg.generate_connection();
2073 } else if (val > 80) {
2074 test_msg.drop_connection();
2075 } else if (val > 10) {
2076 test_msg.send_message();
2077 } else {
2078 usleep(rand() % 500 + 100);
2079 }
2080 }
2081 test_msg.wait_for_done();
11fdf7f2
TL
2082 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2083 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2084 g_ceph_context->_conf.set_val(
7c673cae
FG
2085 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
2086}
2087
2088TEST_P(MessengerTest, SyntheticInjectTest2) {
11fdf7f2
TL
2089 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2090 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
7c673cae
FG
2091 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2092 Messenger::Policy::lossless_peer_reuse(0),
2093 Messenger::Policy::lossless_peer_reuse(0));
2094 for (int i = 0; i < 100; ++i) {
2095 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2096 test_msg.generate_connection();
2097 }
2098 gen_type rng(time(NULL));
2099 for (int i = 0; i < 1000; ++i) {
2100 if (!(i % 10)) {
2101 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2102 test_msg.print_internal_state();
2103 }
2104 boost::uniform_int<> true_false(0, 99);
2105 int val = true_false(rng);
2106 if (val > 90) {
2107 test_msg.generate_connection();
2108 } else if (val > 80) {
2109 test_msg.drop_connection();
2110 } else if (val > 10) {
2111 test_msg.send_message();
2112 } else {
2113 usleep(rand() % 500 + 100);
2114 }
2115 }
2116 test_msg.wait_for_done();
11fdf7f2
TL
2117 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2118 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
7c673cae
FG
2119}
2120
2121TEST_P(MessengerTest, SyntheticInjectTest3) {
11fdf7f2
TL
2122 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600");
2123 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
7c673cae
FG
2124 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
2125 Messenger::Policy::stateless_server(0),
2126 Messenger::Policy::lossy_client(0));
2127 for (int i = 0; i < 100; ++i) {
2128 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2129 test_msg.generate_connection();
2130 }
2131 gen_type rng(time(NULL));
2132 for (int i = 0; i < 1000; ++i) {
2133 if (!(i % 10)) {
2134 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2135 test_msg.print_internal_state();
2136 }
2137 boost::uniform_int<> true_false(0, 99);
2138 int val = true_false(rng);
2139 if (val > 90) {
2140 test_msg.generate_connection();
2141 } else if (val > 80) {
2142 test_msg.drop_connection();
2143 } else if (val > 10) {
2144 test_msg.send_message();
2145 } else {
2146 usleep(rand() % 500 + 100);
2147 }
2148 }
2149 test_msg.wait_for_done();
11fdf7f2
TL
2150 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2151 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
7c673cae
FG
2152}
2153
2154
2155TEST_P(MessengerTest, SyntheticInjectTest4) {
11fdf7f2
TL
2156 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
2157 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
2158 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1");
2159 g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd");
2160 g_ceph_context->_conf.set_val("ms_inject_delay_max", "5");
7c673cae
FG
2161 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
2162 Messenger::Policy::lossless_peer(0),
2163 Messenger::Policy::lossless_peer(0));
2164 for (int i = 0; i < 100; ++i) {
2165 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
2166 test_msg.generate_connection();
2167 }
2168 gen_type rng(time(NULL));
2169 for (int i = 0; i < 1000; ++i) {
2170 if (!(i % 10)) {
2171 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
2172 test_msg.print_internal_state();
2173 }
2174 boost::uniform_int<> true_false(0, 99);
2175 int val = true_false(rng);
2176 if (val > 95) {
2177 test_msg.generate_connection();
2178 } else if (val > 80) {
2179 // test_msg.drop_connection();
2180 } else if (val > 10) {
2181 test_msg.send_message();
2182 } else {
2183 usleep(rand() % 500 + 100);
2184 }
2185 }
2186 test_msg.wait_for_done();
11fdf7f2
TL
2187 g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
2188 g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
2189 g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0");
2190 g_ceph_context->_conf.set_val("ms_inject_delay_type", "");
2191 g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
7c673cae
FG
2192}
2193
2194
2195class MarkdownDispatcher : public Dispatcher {
9f95a23c 2196 ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock");
7c673cae
FG
2197 set<ConnectionRef> conns;
2198 bool last_mark;
2199 public:
31f18b77 2200 std::atomic<uint64_t> count = { 0 };
9f95a23c 2201 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context),
11fdf7f2 2202 last_mark(false) {
11fdf7f2 2203 }
7c673cae
FG
2204 bool ms_can_fast_dispatch_any() const override { return false; }
2205 bool ms_can_fast_dispatch(const Message *m) const override {
2206 switch (m->get_type()) {
2207 case CEPH_MSG_PING:
2208 return true;
2209 default:
2210 return false;
2211 }
2212 }
2213
2214 void ms_handle_fast_connect(Connection *con) override {
2215 lderr(g_ceph_context) << __func__ << " " << con << dendl;
9f95a23c 2216 std::lock_guard l{lock};
7c673cae
FG
2217 conns.insert(con);
2218 }
2219 void ms_handle_fast_accept(Connection *con) override {
9f95a23c 2220 std::lock_guard l{lock};
7c673cae
FG
2221 conns.insert(con);
2222 }
2223 bool ms_dispatch(Message *m) override {
2224 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
9f95a23c 2225 std::lock_guard l{lock};
31f18b77 2226 count++;
7c673cae
FG
2227 conns.insert(m->get_connection());
2228 if (conns.size() < 2 && !last_mark) {
2229 m->put();
2230 return true;
2231 }
2232
2233 last_mark = true;
2234 usleep(rand() % 500);
2235 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
2236 if ((*it) != m->get_connection().get()) {
2237 (*it)->mark_down();
2238 conns.erase(it);
2239 break;
2240 }
2241 }
2242 if (conns.empty())
2243 last_mark = false;
2244 m->put();
2245 return true;
2246 }
2247 bool ms_handle_reset(Connection *con) override {
2248 lderr(g_ceph_context) << __func__ << " " << con << dendl;
9f95a23c 2249 std::lock_guard l{lock};
7c673cae
FG
2250 conns.erase(con);
2251 usleep(rand() % 500);
2252 return true;
2253 }
2254 void ms_handle_remote_reset(Connection *con) override {
9f95a23c 2255 std::lock_guard l{lock};
7c673cae
FG
2256 conns.erase(con);
2257 lderr(g_ceph_context) << __func__ << " " << con << dendl;
2258 }
2259 bool ms_handle_refused(Connection *con) override {
2260 return false;
2261 }
2262 void ms_fast_dispatch(Message *m) override {
2263 ceph_abort();
2264 }
11fdf7f2
TL
2265 int ms_handle_authentication(Connection *con) override {
2266 return 1;
7c673cae
FG
2267 }
2268};
2269
2270
2271// Markdown with external lock
2272TEST_P(MessengerTest, MarkdownTest) {
f67539c2 2273 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
7c673cae 2274 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
11fdf7f2
TL
2275 DummyAuthClientServer dummy_auth(g_ceph_context);
2276 dummy_auth.auth_registry.refresh_config();
7c673cae 2277 entity_addr_t bind_addr;
9f95a23c 2278 bind_addr.parse("v2:127.0.0.1:16800");
7c673cae
FG
2279 server_msgr->bind(bind_addr);
2280 server_msgr->add_dispatcher_head(&srv_dispatcher);
11fdf7f2
TL
2281 server_msgr->set_auth_client(&dummy_auth);
2282 server_msgr->set_auth_server(&dummy_auth);
7c673cae 2283 server_msgr->start();
9f95a23c 2284 bind_addr.parse("v2:127.0.0.1:16801");
7c673cae
FG
2285 server_msgr2->bind(bind_addr);
2286 server_msgr2->add_dispatcher_head(&srv_dispatcher);
11fdf7f2
TL
2287 server_msgr2->set_auth_client(&dummy_auth);
2288 server_msgr2->set_auth_server(&dummy_auth);
7c673cae
FG
2289 server_msgr2->start();
2290
2291 client_msgr->add_dispatcher_head(&cli_dispatcher);
11fdf7f2
TL
2292 client_msgr->set_auth_client(&dummy_auth);
2293 client_msgr->set_auth_server(&dummy_auth);
7c673cae
FG
2294 client_msgr->start();
2295
2296 int i = 1000;
2297 uint64_t last = 0;
2298 bool equal = false;
2299 uint64_t equal_count = 0;
2300 while (i--) {
11fdf7f2
TL
2301 ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
2302 server_msgr->get_myaddrs());
2303 ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
2304 server_msgr2->get_myaddrs());
7c673cae
FG
2305 MPing *m = new MPing();
2306 ASSERT_EQ(conn1->send_message(m), 0);
2307 m = new MPing();
2308 ASSERT_EQ(conn2->send_message(m), 0);
31f18b77
FG
2309 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
2310 if (srv_dispatcher.count == last) {
7c673cae
FG
2311 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
2312 equal = true;
2313 equal_count++;
2314 } else {
2315 equal = false;
2316 equal_count = 0;
2317 }
31f18b77 2318 last = srv_dispatcher.count;
7c673cae
FG
2319 if (equal_count)
2320 usleep(1000*500);
2321 ASSERT_FALSE(equal && equal_count > 3);
2322 }
2323 server_msgr->shutdown();
2324 client_msgr->shutdown();
2325 server_msgr2->shutdown();
2326 server_msgr->wait();
2327 client_msgr->wait();
2328 server_msgr2->wait();
2329 delete server_msgr2;
2330}
2331
9f95a23c 2332INSTANTIATE_TEST_SUITE_P(
7c673cae
FG
2333 Messenger,
2334 MessengerTest,
2335 ::testing::Values(
9f95a23c 2336 "async+posix"
7c673cae
FG
2337 )
2338);
2339
7c673cae
FG
2340int main(int argc, char **argv) {
2341 vector<const char*> args;
2342 argv_to_vec(argc, (const char **)argv, args);
11fdf7f2
TL
2343
2344 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
2345 CODE_ENVIRONMENT_UTILITY,
2346 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
2347 g_ceph_context->_conf.set_val("auth_cluster_required", "none");
2348 g_ceph_context->_conf.set_val("auth_service_required", "none");
2349 g_ceph_context->_conf.set_val("auth_client_required", "none");
2350 g_ceph_context->_conf.set_val("keyring", "/dev/null");
2351 g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
2352 g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true");
2353 g_ceph_context->_conf.set_val("ms_die_on_old_message", "true");
2354 g_ceph_context->_conf.set_val("ms_max_backoff", "1");
7c673cae
FG
2355 common_init_finish(g_ceph_context);
2356
2357 ::testing::InitGoogleTest(&argc, argv);
2358 return RUN_ALL_TESTS();
2359}
2360
2361/*
2362 * Local Variables:
2363 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
2364 * End:
2365 */