]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_msgr.cc
update sources to v12.1.0
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <atomic>
18 #include <iostream>
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <time.h>
22 #include "common/Mutex.h"
23 #include "common/Cond.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "msg/Dispatcher.h"
27 #include "msg/msg_types.h"
28 #include "msg/Message.h"
29 #include "msg/Messenger.h"
30 #include "msg/Connection.h"
31 #include "messages/MPing.h"
32 #include "messages/MCommand.h"
33
34 #include <boost/random/mersenne_twister.hpp>
35 #include <boost/random/uniform_int.hpp>
36 #include <boost/random/binomial_distribution.hpp>
37 #include <gtest/gtest.h>
38
39 typedef boost::mt11213b gen_type;
40
41 #include "common/dout.h"
42 #include "include/assert.h"
43
44 #define dout_subsys ceph_subsys_ms
45 #undef dout_prefix
46 #define dout_prefix *_dout << " ceph_test_msgr "
47
48
49 #if GTEST_HAS_PARAM_TEST
50
51 #define CHECK_AND_WAIT_TRUE(expr) do { \
52 int n = 1000; \
53 while (--n) { \
54 if (expr) \
55 break; \
56 usleep(1000); \
57 } \
58 } while(0);
59
60 class MessengerTest : public ::testing::TestWithParam<const char*> {
61 public:
62 Messenger *server_msgr;
63 Messenger *client_msgr;
64
65 MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
66 void SetUp() override {
67 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
68 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
69 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
70 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
71 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
72 }
73 void TearDown() override {
74 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
75 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
76 delete server_msgr;
77 delete client_msgr;
78 }
79
80 };
81
82
83 class FakeDispatcher : public Dispatcher {
84 public:
85 struct Session : public RefCountedObject {
86 atomic<uint64_t> count;
87 ConnectionRef con;
88
89 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
90 }
91 uint64_t get_count() { return count; }
92 };
93
94 Mutex lock;
95 Cond cond;
96 bool is_server;
97 bool got_new;
98 bool got_remote_reset;
99 bool got_connect;
100 bool loopback;
101
102 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
103 is_server(s), got_new(false), got_remote_reset(false),
104 got_connect(false), loopback(false) {}
105 bool ms_can_fast_dispatch_any() const override { return true; }
106 bool ms_can_fast_dispatch(const Message *m) const override {
107 switch (m->get_type()) {
108 case CEPH_MSG_PING:
109 return true;
110 default:
111 return false;
112 }
113 }
114
115 void ms_handle_fast_connect(Connection *con) override {
116 lock.Lock();
117 lderr(g_ceph_context) << __func__ << " " << con << dendl;
118 Session *s = static_cast<Session*>(con->get_priv());
119 if (!s) {
120 s = new Session(con);
121 con->set_priv(s->get());
122 lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
123 }
124 s->put();
125 got_connect = true;
126 cond.Signal();
127 lock.Unlock();
128 }
129 void ms_handle_fast_accept(Connection *con) override {
130 Session *s = static_cast<Session*>(con->get_priv());
131 if (!s) {
132 s = new Session(con);
133 con->set_priv(s->get());
134 }
135 s->put();
136 }
137 bool ms_dispatch(Message *m) override {
138 Session *s = static_cast<Session*>(m->get_connection()->get_priv());
139 if (!s) {
140 s = new Session(m->get_connection());
141 m->get_connection()->set_priv(s->get());
142 }
143 s->put();
144 s->count++;
145 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
146 if (is_server) {
147 reply_message(m);
148 }
149 Mutex::Locker l(lock);
150 got_new = true;
151 cond.Signal();
152 m->put();
153 return true;
154 }
155 bool ms_handle_reset(Connection *con) override {
156 Mutex::Locker l(lock);
157 lderr(g_ceph_context) << __func__ << " " << con << dendl;
158 Session *s = static_cast<Session*>(con->get_priv());
159 if (s) {
160 s->con.reset(NULL); // break con <-> session ref cycle
161 con->set_priv(NULL); // break ref <-> session cycle, if any
162 s->put();
163 }
164 return true;
165 }
166 void ms_handle_remote_reset(Connection *con) override {
167 Mutex::Locker l(lock);
168 lderr(g_ceph_context) << __func__ << " " << con << dendl;
169 Session *s = static_cast<Session*>(con->get_priv());
170 if (s) {
171 s->con.reset(NULL); // break con <-> session ref cycle
172 con->set_priv(NULL); // break ref <-> session cycle, if any
173 s->put();
174 }
175 got_remote_reset = true;
176 cond.Signal();
177 }
178 bool ms_handle_refused(Connection *con) override {
179 return false;
180 }
181 void ms_fast_dispatch(Message *m) override {
182 Session *s = static_cast<Session*>(m->get_connection()->get_priv());
183 if (!s) {
184 s = new Session(m->get_connection());
185 m->get_connection()->set_priv(s->get());
186 }
187 s->put();
188 s->count++;
189 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
190 if (is_server) {
191 if (loopback)
192 assert(m->get_source().is_osd());
193 else
194 reply_message(m);
195 } else if (loopback) {
196 assert(m->get_source().is_client());
197 }
198 m->put();
199 Mutex::Locker l(lock);
200 got_new = true;
201 cond.Signal();
202 }
203
204 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
205 bufferlist& authorizer, bufferlist& authorizer_reply,
206 bool& isvalid, CryptoKey& session_key) override {
207 isvalid = true;
208 return true;
209 }
210
211 void reply_message(Message *m) {
212 MPing *rm = new MPing();
213 m->get_connection()->send_message(rm);
214 }
215 };
216
217 typedef FakeDispatcher::Session Session;
218
219 TEST_P(MessengerTest, SimpleTest) {
220 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
221 entity_addr_t bind_addr;
222 bind_addr.parse("127.0.0.1");
223 server_msgr->bind(bind_addr);
224 server_msgr->add_dispatcher_head(&srv_dispatcher);
225 server_msgr->start();
226
227 client_msgr->add_dispatcher_head(&cli_dispatcher);
228 client_msgr->start();
229
230 // 1. simple round trip
231 MPing *m = new MPing();
232 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
233 {
234 ASSERT_EQ(conn->send_message(m), 0);
235 Mutex::Locker l(cli_dispatcher.lock);
236 while (!cli_dispatcher.got_new)
237 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
238 cli_dispatcher.got_new = false;
239 }
240 ASSERT_TRUE(conn->is_connected());
241 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
242 ASSERT_TRUE(conn->peer_is_osd());
243
244 // 2. test rebind port
245 set<int> avoid_ports;
246 for (int i = 0; i < 10 ; i++)
247 avoid_ports.insert(server_msgr->get_myaddr().get_port() + i);
248 server_msgr->rebind(avoid_ports);
249 ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0);
250
251 conn = client_msgr->get_connection(server_msgr->get_myinst());
252 {
253 m = new MPing();
254 ASSERT_EQ(conn->send_message(m), 0);
255 Mutex::Locker l(cli_dispatcher.lock);
256 while (!cli_dispatcher.got_new)
257 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
258 cli_dispatcher.got_new = false;
259 }
260 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
261
262 // 3. test markdown connection
263 conn->mark_down();
264 ASSERT_FALSE(conn->is_connected());
265
266 // 4. test failed connection
267 server_msgr->shutdown();
268 server_msgr->wait();
269
270 m = new MPing();
271 conn->send_message(m);
272 CHECK_AND_WAIT_TRUE(!conn->is_connected());
273 ASSERT_FALSE(conn->is_connected());
274
275 // 5. loopback connection
276 srv_dispatcher.loopback = true;
277 conn = client_msgr->get_loopback_connection();
278 {
279 m = new MPing();
280 ASSERT_EQ(conn->send_message(m), 0);
281 Mutex::Locker l(cli_dispatcher.lock);
282 while (!cli_dispatcher.got_new)
283 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
284 cli_dispatcher.got_new = false;
285 }
286 srv_dispatcher.loopback = false;
287 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
288 client_msgr->shutdown();
289 client_msgr->wait();
290 server_msgr->shutdown();
291 server_msgr->wait();
292 }
293
294 TEST_P(MessengerTest, NameAddrTest) {
295 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
296 entity_addr_t bind_addr;
297 bind_addr.parse("127.0.0.1");
298 server_msgr->bind(bind_addr);
299 server_msgr->add_dispatcher_head(&srv_dispatcher);
300 server_msgr->start();
301
302 client_msgr->add_dispatcher_head(&cli_dispatcher);
303 client_msgr->start();
304
305 MPing *m = new MPing();
306 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
307 {
308 ASSERT_EQ(conn->send_message(m), 0);
309 Mutex::Locker l(cli_dispatcher.lock);
310 while (!cli_dispatcher.got_new)
311 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
312 cli_dispatcher.got_new = false;
313 }
314 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
315 ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
316 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
317 // Make should server_conn is the one we already accepted from client,
318 // so it means client_msgr has the same addr when server connection has
319 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
320 server_msgr->shutdown();
321 client_msgr->shutdown();
322 server_msgr->wait();
323 client_msgr->wait();
324 }
325
326 TEST_P(MessengerTest, FeatureTest) {
327 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
328 entity_addr_t bind_addr;
329 bind_addr.parse("127.0.0.1");
330 uint64_t all_feature_supported, feature_required, feature_supported = 0;
331 for (int i = 0; i < 10; i++)
332 feature_supported |= 1ULL << i;
333 feature_required = feature_supported | 1ULL << 13;
334 all_feature_supported = feature_required | 1ULL << 14;
335
336 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
337 p.features_required = feature_required;
338 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
339 server_msgr->bind(bind_addr);
340 server_msgr->add_dispatcher_head(&srv_dispatcher);
341 server_msgr->start();
342
343 // 1. Suppose if only support less than required
344 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
345 p.features_supported = feature_supported;
346 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
347 client_msgr->add_dispatcher_head(&cli_dispatcher);
348 client_msgr->start();
349
350 MPing *m = new MPing();
351 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
352 conn->send_message(m);
353 CHECK_AND_WAIT_TRUE(!conn->is_connected());
354 // should failed build a connection
355 ASSERT_FALSE(conn->is_connected());
356
357 client_msgr->shutdown();
358 client_msgr->wait();
359
360 // 2. supported met required
361 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
362 p.features_supported = all_feature_supported;
363 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
364 client_msgr->start();
365
366 conn = client_msgr->get_connection(server_msgr->get_myinst());
367 {
368 m = new MPing();
369 ASSERT_EQ(conn->send_message(m), 0);
370 Mutex::Locker l(cli_dispatcher.lock);
371 while (!cli_dispatcher.got_new)
372 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
373 cli_dispatcher.got_new = false;
374 }
375 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
376
377 server_msgr->shutdown();
378 client_msgr->shutdown();
379 server_msgr->wait();
380 client_msgr->wait();
381 }
382
383 TEST_P(MessengerTest, TimeoutTest) {
384 g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1");
385 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
386 entity_addr_t bind_addr;
387 bind_addr.parse("127.0.0.1");
388 server_msgr->bind(bind_addr);
389 server_msgr->add_dispatcher_head(&srv_dispatcher);
390 server_msgr->start();
391
392 client_msgr->add_dispatcher_head(&cli_dispatcher);
393 client_msgr->start();
394
395 // 1. build the connection
396 MPing *m = new MPing();
397 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
398 {
399 ASSERT_EQ(conn->send_message(m), 0);
400 Mutex::Locker l(cli_dispatcher.lock);
401 while (!cli_dispatcher.got_new)
402 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
403 cli_dispatcher.got_new = false;
404 }
405 ASSERT_TRUE(conn->is_connected());
406 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
407 ASSERT_TRUE(conn->peer_is_osd());
408
409 // 2. wait for idle
410 usleep(2500*1000);
411 ASSERT_FALSE(conn->is_connected());
412
413 server_msgr->shutdown();
414 server_msgr->wait();
415
416 client_msgr->shutdown();
417 client_msgr->wait();
418 g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900");
419 }
420
421 TEST_P(MessengerTest, StatefulTest) {
422 Message *m;
423 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
424 entity_addr_t bind_addr;
425 bind_addr.parse("127.0.0.1");
426 Messenger::Policy p = Messenger::Policy::stateful_server(0);
427 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
428 p = Messenger::Policy::lossless_client(0);
429 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
430
431 server_msgr->bind(bind_addr);
432 server_msgr->add_dispatcher_head(&srv_dispatcher);
433 server_msgr->start();
434 client_msgr->add_dispatcher_head(&cli_dispatcher);
435 client_msgr->start();
436
437 // 1. test for server standby
438 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
439 {
440 m = new MPing();
441 ASSERT_EQ(conn->send_message(m), 0);
442 Mutex::Locker l(cli_dispatcher.lock);
443 while (!cli_dispatcher.got_new)
444 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
445 cli_dispatcher.got_new = false;
446 }
447 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
448 conn->mark_down();
449 ASSERT_FALSE(conn->is_connected());
450 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
451 // don't lose state
452 ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
453
454 srv_dispatcher.got_new = false;
455 conn = client_msgr->get_connection(server_msgr->get_myinst());
456 {
457 m = new MPing();
458 ASSERT_EQ(conn->send_message(m), 0);
459 Mutex::Locker l(cli_dispatcher.lock);
460 while (!cli_dispatcher.got_new)
461 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
462 cli_dispatcher.got_new = false;
463 }
464 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
465 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
466 {
467 Mutex::Locker l(srv_dispatcher.lock);
468 while (!srv_dispatcher.got_remote_reset)
469 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
470 }
471
472 // 2. test for client reconnect
473 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
474 cli_dispatcher.got_connect = false;
475 cli_dispatcher.got_new = false;
476 cli_dispatcher.got_remote_reset = false;
477 server_conn->mark_down();
478 ASSERT_FALSE(server_conn->is_connected());
479 // ensure client detect server socket closed
480 {
481 Mutex::Locker l(cli_dispatcher.lock);
482 while (!cli_dispatcher.got_remote_reset)
483 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
484 cli_dispatcher.got_remote_reset = false;
485 }
486 {
487 Mutex::Locker l(cli_dispatcher.lock);
488 while (!cli_dispatcher.got_connect)
489 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
490 cli_dispatcher.got_connect = false;
491 }
492 CHECK_AND_WAIT_TRUE(conn->is_connected());
493 ASSERT_TRUE(conn->is_connected());
494
495 {
496 m = new MPing();
497 ASSERT_EQ(conn->send_message(m), 0);
498 ASSERT_TRUE(conn->is_connected());
499 Mutex::Locker l(cli_dispatcher.lock);
500 while (!cli_dispatcher.got_new)
501 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
502 cli_dispatcher.got_new = false;
503 }
504 // resetcheck happen
505 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
506 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
507 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
508 cli_dispatcher.got_remote_reset = false;
509
510 server_msgr->shutdown();
511 client_msgr->shutdown();
512 server_msgr->wait();
513 client_msgr->wait();
514 }
515
516 TEST_P(MessengerTest, StatelessTest) {
517 Message *m;
518 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
519 entity_addr_t bind_addr;
520 bind_addr.parse("127.0.0.1");
521 Messenger::Policy p = Messenger::Policy::stateless_server(0);
522 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
523 p = Messenger::Policy::lossy_client(0);
524 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
525
526 server_msgr->bind(bind_addr);
527 server_msgr->add_dispatcher_head(&srv_dispatcher);
528 server_msgr->start();
529 client_msgr->add_dispatcher_head(&cli_dispatcher);
530 client_msgr->start();
531
532 // 1. test for server lose state
533 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
534 {
535 m = new MPing();
536 ASSERT_EQ(conn->send_message(m), 0);
537 Mutex::Locker l(cli_dispatcher.lock);
538 while (!cli_dispatcher.got_new)
539 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
540 cli_dispatcher.got_new = false;
541 }
542 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
543 conn->mark_down();
544 ASSERT_FALSE(conn->is_connected());
545
546 srv_dispatcher.got_new = false;
547 conn = client_msgr->get_connection(server_msgr->get_myinst());
548 {
549 m = new MPing();
550 ASSERT_EQ(conn->send_message(m), 0);
551 Mutex::Locker l(cli_dispatcher.lock);
552 while (!cli_dispatcher.got_new)
553 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
554 cli_dispatcher.got_new = false;
555 }
556 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
557 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
558 // server lose state
559 {
560 Mutex::Locker l(srv_dispatcher.lock);
561 while (!srv_dispatcher.got_new)
562 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
563 }
564 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
565
566 // 2. test for client lossy
567 server_conn->mark_down();
568 ASSERT_FALSE(server_conn->is_connected());
569 conn->send_keepalive();
570 CHECK_AND_WAIT_TRUE(!conn->is_connected());
571 ASSERT_FALSE(conn->is_connected());
572 conn = client_msgr->get_connection(server_msgr->get_myinst());
573 {
574 m = new MPing();
575 ASSERT_EQ(conn->send_message(m), 0);
576 Mutex::Locker l(cli_dispatcher.lock);
577 while (!cli_dispatcher.got_new)
578 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
579 cli_dispatcher.got_new = false;
580 }
581 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
582
583 server_msgr->shutdown();
584 client_msgr->shutdown();
585 server_msgr->wait();
586 client_msgr->wait();
587 }
588
589 TEST_P(MessengerTest, ClientStandbyTest) {
590 Message *m;
591 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
592 entity_addr_t bind_addr;
593 bind_addr.parse("127.0.0.1");
594 Messenger::Policy p = Messenger::Policy::stateful_server(0);
595 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
596 p = Messenger::Policy::lossless_peer(0);
597 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
598
599 server_msgr->bind(bind_addr);
600 server_msgr->add_dispatcher_head(&srv_dispatcher);
601 server_msgr->start();
602 client_msgr->add_dispatcher_head(&cli_dispatcher);
603 client_msgr->start();
604
605 // 1. test for client standby, resetcheck
606 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
607 {
608 m = new MPing();
609 ASSERT_EQ(conn->send_message(m), 0);
610 Mutex::Locker l(cli_dispatcher.lock);
611 while (!cli_dispatcher.got_new)
612 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
613 cli_dispatcher.got_new = false;
614 }
615 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
616 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
617 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
618 cli_dispatcher.got_connect = false;
619 server_conn->mark_down();
620 ASSERT_FALSE(server_conn->is_connected());
621 // client should be standby
622 usleep(300*1000);
623 // client should be standby, so we use original connection
624 {
625 // Try send message to verify got remote reset callback
626 m = new MPing();
627 ASSERT_EQ(conn->send_message(m), 0);
628 {
629 Mutex::Locker l(cli_dispatcher.lock);
630 while (!cli_dispatcher.got_remote_reset)
631 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
632 cli_dispatcher.got_remote_reset = false;
633 while (!cli_dispatcher.got_connect)
634 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
635 cli_dispatcher.got_connect = false;
636 }
637 CHECK_AND_WAIT_TRUE(conn->is_connected());
638 ASSERT_TRUE(conn->is_connected());
639 m = new MPing();
640 ASSERT_EQ(conn->send_message(m), 0);
641 Mutex::Locker l(cli_dispatcher.lock);
642 while (!cli_dispatcher.got_new)
643 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
644 cli_dispatcher.got_new = false;
645 }
646 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
647 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
648 ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
649
650 server_msgr->shutdown();
651 client_msgr->shutdown();
652 server_msgr->wait();
653 client_msgr->wait();
654 }
655
656 TEST_P(MessengerTest, AuthTest) {
657 g_ceph_context->_conf->set_val("auth_cluster_required", "cephx");
658 g_ceph_context->_conf->set_val("auth_service_required", "cephx");
659 g_ceph_context->_conf->set_val("auth_client_required", "cephx");
660 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
661 entity_addr_t bind_addr;
662 bind_addr.parse("127.0.0.1");
663 server_msgr->bind(bind_addr);
664 server_msgr->add_dispatcher_head(&srv_dispatcher);
665 server_msgr->start();
666
667 client_msgr->add_dispatcher_head(&cli_dispatcher);
668 client_msgr->start();
669
670 // 1. simple auth round trip
671 MPing *m = new MPing();
672 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
673 {
674 ASSERT_EQ(conn->send_message(m), 0);
675 Mutex::Locker l(cli_dispatcher.lock);
676 while (!cli_dispatcher.got_new)
677 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
678 cli_dispatcher.got_new = false;
679 }
680 ASSERT_TRUE(conn->is_connected());
681 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
682
683 // 2. mix auth
684 g_ceph_context->_conf->set_val("auth_cluster_required", "none");
685 g_ceph_context->_conf->set_val("auth_service_required", "none");
686 g_ceph_context->_conf->set_val("auth_client_required", "none");
687 conn->mark_down();
688 ASSERT_FALSE(conn->is_connected());
689 conn = client_msgr->get_connection(server_msgr->get_myinst());
690 {
691 MPing *m = new MPing();
692 ASSERT_EQ(conn->send_message(m), 0);
693 Mutex::Locker l(cli_dispatcher.lock);
694 while (!cli_dispatcher.got_new)
695 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
696 cli_dispatcher.got_new = false;
697 }
698 ASSERT_TRUE(conn->is_connected());
699 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
700
701 server_msgr->shutdown();
702 client_msgr->shutdown();
703 server_msgr->wait();
704 client_msgr->wait();
705 }
706
707 TEST_P(MessengerTest, MessageTest) {
708 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
709 entity_addr_t bind_addr;
710 bind_addr.parse("127.0.0.1");
711 Messenger::Policy p = Messenger::Policy::stateful_server(0);
712 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
713 p = Messenger::Policy::lossless_peer(0);
714 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
715
716 server_msgr->bind(bind_addr);
717 server_msgr->add_dispatcher_head(&srv_dispatcher);
718 server_msgr->start();
719 client_msgr->add_dispatcher_head(&cli_dispatcher);
720 client_msgr->start();
721
722
723 // 1. A very large "front"(as well as "payload")
724 // Because a external message need to invade Messenger::decode_message,
725 // here we only use existing message class(MCommand)
726 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
727 {
728 uuid_d uuid;
729 uuid.generate_random();
730 vector<string> cmds;
731 string s("abcdefghijklmnopqrstuvwxyz");
732 for (int i = 0; i < 1024*30; i++)
733 cmds.push_back(s);
734 MCommand *m = new MCommand(uuid);
735 m->cmd = cmds;
736 conn->send_message(m);
737 utime_t t;
738 t += 1000*1000*500;
739 Mutex::Locker l(cli_dispatcher.lock);
740 while (!cli_dispatcher.got_new)
741 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
742 ASSERT_TRUE(cli_dispatcher.got_new);
743 cli_dispatcher.got_new = false;
744 }
745
746 // 2. A very large "data"
747 {
748 bufferlist bl;
749 string s("abcdefghijklmnopqrstuvwxyz");
750 for (int i = 0; i < 1024*30; i++)
751 bl.append(s);
752 MPing *m = new MPing();
753 m->set_data(bl);
754 conn->send_message(m);
755 utime_t t;
756 t += 1000*1000*500;
757 Mutex::Locker l(cli_dispatcher.lock);
758 while (!cli_dispatcher.got_new)
759 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
760 ASSERT_TRUE(cli_dispatcher.got_new);
761 cli_dispatcher.got_new = false;
762 }
763 server_msgr->shutdown();
764 client_msgr->shutdown();
765 server_msgr->wait();
766 client_msgr->wait();
767 }
768
769
770 class SyntheticWorkload;
771
772 struct Payload {
773 enum Who : uint8_t {
774 PING = 0,
775 PONG = 1,
776 };
777 uint8_t who;
778 uint64_t seq;
779 bufferlist data;
780
781 Payload(Who who, uint64_t seq, const bufferlist& data)
782 : who(who), seq(seq), data(data)
783 {}
784 Payload() = default;
785 DENC(Payload, v, p) {
786 DENC_START(1, 1, p);
787 denc(v.who, p);
788 denc(v.seq, p);
789 denc(v.data, p);
790 DENC_FINISH(p);
791 }
792 };
793 WRITE_CLASS_DENC(Payload)
794
795 ostream& operator<<(ostream& out, const Payload &pl)
796 {
797 return out << "reply=" << pl.who << " i = " << pl.seq;
798 }
799
800 class SyntheticDispatcher : public Dispatcher {
801 public:
802 Mutex lock;
803 Cond cond;
804 bool is_server;
805 bool got_new;
806 bool got_remote_reset;
807 bool got_connect;
808 map<ConnectionRef, list<uint64_t> > conn_sent;
809 map<uint64_t, bufferlist> sent;
810 atomic<uint64_t> index;
811 SyntheticWorkload *workload;
812
813 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
814 Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
815 got_remote_reset(false), got_connect(false), index(0), workload(wl) {}
816 bool ms_can_fast_dispatch_any() const override { return true; }
817 bool ms_can_fast_dispatch(const Message *m) const override {
818 switch (m->get_type()) {
819 case CEPH_MSG_PING:
820 case MSG_COMMAND:
821 return true;
822 default:
823 return false;
824 }
825 }
826
827 void ms_handle_fast_connect(Connection *con) override {
828 Mutex::Locker l(lock);
829 list<uint64_t> c = conn_sent[con];
830 for (list<uint64_t>::iterator it = c.begin();
831 it != c.end(); ++it)
832 sent.erase(*it);
833 conn_sent.erase(con);
834 got_connect = true;
835 cond.Signal();
836 }
837 void ms_handle_fast_accept(Connection *con) override {
838 Mutex::Locker l(lock);
839 list<uint64_t> c = conn_sent[con];
840 for (list<uint64_t>::iterator it = c.begin();
841 it != c.end(); ++it)
842 sent.erase(*it);
843 conn_sent.erase(con);
844 cond.Signal();
845 }
846 bool ms_dispatch(Message *m) override {
847 ceph_abort();
848 }
849 bool ms_handle_reset(Connection *con) override;
850 void ms_handle_remote_reset(Connection *con) override {
851 Mutex::Locker l(lock);
852 list<uint64_t> c = conn_sent[con];
853 for (list<uint64_t>::iterator it = c.begin();
854 it != c.end(); ++it)
855 sent.erase(*it);
856 conn_sent.erase(con);
857 got_remote_reset = true;
858 }
859 bool ms_handle_refused(Connection *con) override {
860 return false;
861 }
862 void ms_fast_dispatch(Message *m) override {
863 // MSG_COMMAND is used to disorganize regular message flow
864 if (m->get_type() == MSG_COMMAND) {
865 m->put();
866 return ;
867 }
868
869 Payload pl;
870 auto p = m->get_data().begin();
871 ::decode(pl, p);
872 if (pl.who == Payload::PING) {
873 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
874 reply_message(m, pl);
875 m->put();
876 Mutex::Locker l(lock);
877 got_new = true;
878 cond.Signal();
879 } else {
880 Mutex::Locker l(lock);
881 if (sent.count(pl.seq)) {
882 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
883 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
884 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
885 conn_sent[m->get_connection()].pop_front();
886 sent.erase(pl.seq);
887 }
888 m->put();
889 got_new = true;
890 cond.Signal();
891 }
892 }
893
894 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
895 bufferlist& authorizer, bufferlist& authorizer_reply,
896 bool& isvalid, CryptoKey& session_key) override {
897 isvalid = true;
898 return true;
899 }
900
901 void reply_message(const Message *m, Payload& pl) {
902 pl.who = Payload::PONG;
903 bufferlist bl;
904 ::encode(pl, bl);
905 MPing *rm = new MPing();
906 rm->set_data(bl);
907 m->get_connection()->send_message(rm);
908 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
909 }
910
911 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
912 Message *m = new MPing();
913 Payload pl{Payload::PING, index++, data};
914 bufferlist bl;
915 ::encode(pl, bl);
916 m->set_data(bl);
917 if (!con->get_messenger()->get_default_policy().lossy) {
918 Mutex::Locker l(lock);
919 sent[pl.seq] = pl.data;
920 conn_sent[con].push_back(pl.seq);
921 }
922 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
923 ASSERT_EQ(0, con->send_message(m));
924 }
925
926 uint64_t get_pending() {
927 Mutex::Locker l(lock);
928 return sent.size();
929 }
930
931 void clear_pending(ConnectionRef con) {
932 Mutex::Locker l(lock);
933
934 for (list<uint64_t>::iterator it = conn_sent[con].begin();
935 it != conn_sent[con].end(); ++it)
936 sent.erase(*it);
937 conn_sent.erase(con);
938 }
939
940 void print() {
941 for (auto && p : conn_sent) {
942 if (!p.second.empty()) {
943 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
944 }
945 }
946 }
947 };
948
949
950 class SyntheticWorkload {
951 Mutex lock;
952 Cond cond;
953 set<Messenger*> available_servers;
954 set<Messenger*> available_clients;
955 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
956 SyntheticDispatcher dispatcher;
957 gen_type rng;
958 vector<bufferlist> rand_data;
959
960 public:
961 static const unsigned max_in_flight = 64;
962 static const unsigned max_connections = 128;
963 static const unsigned max_message_len = 1024 * 1024 * 4;
964
965 SyntheticWorkload(int servers, int clients, string type, int random_num,
966 Messenger::Policy srv_policy, Messenger::Policy cli_policy):
967 lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
968 Messenger *msgr;
969 int base_port = 16800;
970 entity_addr_t bind_addr;
971 char addr[64];
972 for (int i = 0; i < servers; ++i) {
973 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
974 "server", getpid()+i, 0);
975 snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
976 bind_addr.parse(addr);
977 msgr->bind(bind_addr);
978 msgr->add_dispatcher_head(&dispatcher);
979
980 assert(msgr);
981 msgr->set_default_policy(srv_policy);
982 available_servers.insert(msgr);
983 msgr->start();
984 }
985
986 for (int i = 0; i < clients; ++i) {
987 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
988 "client", getpid()+i+servers, 0);
989 if (cli_policy.standby) {
990 snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
991 bind_addr.parse(addr);
992 msgr->bind(bind_addr);
993 }
994 msgr->add_dispatcher_head(&dispatcher);
995
996 assert(msgr);
997 msgr->set_default_policy(cli_policy);
998 available_clients.insert(msgr);
999 msgr->start();
1000 }
1001
1002 for (int i = 0; i < random_num; i++) {
1003 bufferlist bl;
1004 boost::uniform_int<> u(32, max_message_len);
1005 uint64_t value_len = u(rng);
1006 bufferptr bp(value_len);
1007 bp.zero();
1008 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1009 memcpy(bp.c_str()+j, &i, sizeof(i));
1010 j += 4096;
1011 }
1012
1013 bl.append(bp);
1014 rand_data.push_back(bl);
1015 }
1016 }
1017
1018 ConnectionRef _get_random_connection() {
1019 while (dispatcher.get_pending() > max_in_flight) {
1020 lock.Unlock();
1021 usleep(500);
1022 lock.Lock();
1023 }
1024 assert(lock.is_locked());
1025 boost::uniform_int<> choose(0, available_connections.size() - 1);
1026 int index = choose(rng);
1027 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1028 for (; index > 0; --index, ++i) ;
1029 return i->first;
1030 }
1031
1032 bool can_create_connection() {
1033 return available_connections.size() < max_connections;
1034 }
1035
1036 void generate_connection() {
1037 Mutex::Locker l(lock);
1038 if (!can_create_connection())
1039 return ;
1040
1041 Messenger *server, *client;
1042 {
1043 boost::uniform_int<> choose(0, available_servers.size() - 1);
1044 int index = choose(rng);
1045 set<Messenger*>::iterator i = available_servers.begin();
1046 for (; index > 0; --index, ++i) ;
1047 server = *i;
1048 }
1049 {
1050 boost::uniform_int<> choose(0, available_clients.size() - 1);
1051 int index = choose(rng);
1052 set<Messenger*>::iterator i = available_clients.begin();
1053 for (; index > 0; --index, ++i) ;
1054 client = *i;
1055 }
1056
1057 pair<Messenger*, Messenger*> p;
1058 {
1059 boost::uniform_int<> choose(0, available_servers.size() - 1);
1060 if (server->get_default_policy().server) {
1061 p = make_pair(client, server);
1062 } else {
1063 ConnectionRef conn = client->get_connection(server->get_myinst());
1064 if (available_connections.count(conn) || choose(rng) % 2)
1065 p = make_pair(client, server);
1066 else
1067 p = make_pair(server, client);
1068 }
1069 }
1070 ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
1071 available_connections[conn] = p;
1072 }
1073
1074 void send_message() {
1075 Mutex::Locker l(lock);
1076 ConnectionRef conn = _get_random_connection();
1077 boost::uniform_int<> true_false(0, 99);
1078 int val = true_false(rng);
1079 if (val >= 95) {
1080 uuid_d uuid;
1081 uuid.generate_random();
1082 MCommand *m = new MCommand(uuid);
1083 vector<string> cmds;
1084 cmds.push_back("command");
1085 m->cmd = cmds;
1086 m->set_priority(200);
1087 conn->send_message(m);
1088 } else {
1089 boost::uniform_int<> u(0, rand_data.size()-1);
1090 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1091 }
1092 }
1093
1094 void drop_connection() {
1095 Mutex::Locker l(lock);
1096 if (available_connections.size() < 10)
1097 return;
1098 ConnectionRef conn = _get_random_connection();
1099 dispatcher.clear_pending(conn);
1100 conn->mark_down();
1101 pair<Messenger*, Messenger*> &p = available_connections[conn];
1102 // it's a lossless policy, so we need to mark down each side
1103 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1104 ASSERT_EQ(conn->get_messenger(), p.first);
1105 ConnectionRef peer = p.second->get_connection(p.first->get_myinst());
1106 peer->mark_down();
1107 dispatcher.clear_pending(peer);
1108 available_connections.erase(peer);
1109 }
1110 ASSERT_EQ(available_connections.erase(conn), 1U);
1111 }
1112
1113 void print_internal_state(bool detail=false) {
1114 Mutex::Locker l(lock);
1115 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1116 << " inflight messages: " << dispatcher.get_pending() << dendl;
1117 if (detail && !available_connections.empty()) {
1118 dispatcher.print();
1119 }
1120 }
1121
1122 void wait_for_done() {
1123 int64_t tick_us = 1000 * 100; // 100ms
1124 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1125 int i = 0;
1126 while (dispatcher.get_pending()) {
1127 usleep(tick_us);
1128 timeout_us -= tick_us;
1129 if (i++ % 50 == 0)
1130 print_internal_state(true);
1131 if (timeout_us < 0)
1132 assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!");
1133 }
1134 for (set<Messenger*>::iterator it = available_servers.begin();
1135 it != available_servers.end(); ++it) {
1136 (*it)->shutdown();
1137 (*it)->wait();
1138 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1139 delete (*it);
1140 }
1141 available_servers.clear();
1142
1143 for (set<Messenger*>::iterator it = available_clients.begin();
1144 it != available_clients.end(); ++it) {
1145 (*it)->shutdown();
1146 (*it)->wait();
1147 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1148 delete (*it);
1149 }
1150 available_clients.clear();
1151 }
1152
1153 void handle_reset(Connection *con) {
1154 Mutex::Locker l(lock);
1155 available_connections.erase(con);
1156 dispatcher.clear_pending(con);
1157 }
1158 };
1159
1160 bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1161 workload->handle_reset(con);
1162 return true;
1163 }
1164
1165 TEST_P(MessengerTest, SyntheticStressTest) {
1166 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1167 Messenger::Policy::stateful_server(0),
1168 Messenger::Policy::lossless_client(0));
1169 for (int i = 0; i < 100; ++i) {
1170 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1171 test_msg.generate_connection();
1172 }
1173 gen_type rng(time(NULL));
1174 for (int i = 0; i < 5000; ++i) {
1175 if (!(i % 10)) {
1176 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1177 test_msg.print_internal_state();
1178 }
1179 boost::uniform_int<> true_false(0, 99);
1180 int val = true_false(rng);
1181 if (val > 90) {
1182 test_msg.generate_connection();
1183 } else if (val > 80) {
1184 test_msg.drop_connection();
1185 } else if (val > 10) {
1186 test_msg.send_message();
1187 } else {
1188 usleep(rand() % 1000 + 500);
1189 }
1190 }
1191 test_msg.wait_for_done();
1192 }
1193
1194 TEST_P(MessengerTest, SyntheticStressTest1) {
1195 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1196 Messenger::Policy::lossless_peer_reuse(0),
1197 Messenger::Policy::lossless_peer_reuse(0));
1198 for (int i = 0; i < 10; ++i) {
1199 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1200 test_msg.generate_connection();
1201 }
1202 gen_type rng(time(NULL));
1203 for (int i = 0; i < 10000; ++i) {
1204 if (!(i % 10)) {
1205 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1206 test_msg.print_internal_state();
1207 }
1208 boost::uniform_int<> true_false(0, 99);
1209 int val = true_false(rng);
1210 if (val > 80) {
1211 test_msg.generate_connection();
1212 } else if (val > 60) {
1213 test_msg.drop_connection();
1214 } else if (val > 10) {
1215 test_msg.send_message();
1216 } else {
1217 usleep(rand() % 1000 + 500);
1218 }
1219 }
1220 test_msg.wait_for_done();
1221 }
1222
1223
1224 TEST_P(MessengerTest, SyntheticInjectTest) {
1225 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
1226 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1227 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1228 g_ceph_context->_conf->set_val("ms_dispatch_throttle_bytes", "16777216");
1229 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1230 Messenger::Policy::stateful_server(0),
1231 Messenger::Policy::lossless_client(0));
1232 for (int i = 0; i < 100; ++i) {
1233 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1234 test_msg.generate_connection();
1235 }
1236 gen_type rng(time(NULL));
1237 for (int i = 0; i < 1000; ++i) {
1238 if (!(i % 10)) {
1239 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1240 test_msg.print_internal_state();
1241 }
1242 boost::uniform_int<> true_false(0, 99);
1243 int val = true_false(rng);
1244 if (val > 90) {
1245 test_msg.generate_connection();
1246 } else if (val > 80) {
1247 test_msg.drop_connection();
1248 } else if (val > 10) {
1249 test_msg.send_message();
1250 } else {
1251 usleep(rand() % 500 + 100);
1252 }
1253 }
1254 test_msg.wait_for_done();
1255 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1256 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1257 g_ceph_context->_conf->set_val(
1258 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
1259 }
1260
1261 TEST_P(MessengerTest, SyntheticInjectTest2) {
1262 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1263 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1264 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1265 Messenger::Policy::lossless_peer_reuse(0),
1266 Messenger::Policy::lossless_peer_reuse(0));
1267 for (int i = 0; i < 100; ++i) {
1268 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1269 test_msg.generate_connection();
1270 }
1271 gen_type rng(time(NULL));
1272 for (int i = 0; i < 1000; ++i) {
1273 if (!(i % 10)) {
1274 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1275 test_msg.print_internal_state();
1276 }
1277 boost::uniform_int<> true_false(0, 99);
1278 int val = true_false(rng);
1279 if (val > 90) {
1280 test_msg.generate_connection();
1281 } else if (val > 80) {
1282 test_msg.drop_connection();
1283 } else if (val > 10) {
1284 test_msg.send_message();
1285 } else {
1286 usleep(rand() % 500 + 100);
1287 }
1288 }
1289 test_msg.wait_for_done();
1290 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1291 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1292 }
1293
1294 TEST_P(MessengerTest, SyntheticInjectTest3) {
1295 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "600");
1296 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1297 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1298 Messenger::Policy::stateless_server(0),
1299 Messenger::Policy::lossy_client(0));
1300 for (int i = 0; i < 100; ++i) {
1301 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1302 test_msg.generate_connection();
1303 }
1304 gen_type rng(time(NULL));
1305 for (int i = 0; i < 1000; ++i) {
1306 if (!(i % 10)) {
1307 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1308 test_msg.print_internal_state();
1309 }
1310 boost::uniform_int<> true_false(0, 99);
1311 int val = true_false(rng);
1312 if (val > 90) {
1313 test_msg.generate_connection();
1314 } else if (val > 80) {
1315 test_msg.drop_connection();
1316 } else if (val > 10) {
1317 test_msg.send_message();
1318 } else {
1319 usleep(rand() % 500 + 100);
1320 }
1321 }
1322 test_msg.wait_for_done();
1323 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1324 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1325 }
1326
1327
1328 TEST_P(MessengerTest, SyntheticInjectTest4) {
1329 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1330 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1331 g_ceph_context->_conf->set_val("ms_inject_delay_probability", "1");
1332 g_ceph_context->_conf->set_val("ms_inject_delay_type", "client osd", false);
1333 g_ceph_context->_conf->set_val("ms_inject_delay_max", "5");
1334 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1335 Messenger::Policy::lossless_peer(0),
1336 Messenger::Policy::lossless_peer(0));
1337 for (int i = 0; i < 100; ++i) {
1338 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1339 test_msg.generate_connection();
1340 }
1341 gen_type rng(time(NULL));
1342 for (int i = 0; i < 1000; ++i) {
1343 if (!(i % 10)) {
1344 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1345 test_msg.print_internal_state();
1346 }
1347 boost::uniform_int<> true_false(0, 99);
1348 int val = true_false(rng);
1349 if (val > 95) {
1350 test_msg.generate_connection();
1351 } else if (val > 80) {
1352 // test_msg.drop_connection();
1353 } else if (val > 10) {
1354 test_msg.send_message();
1355 } else {
1356 usleep(rand() % 500 + 100);
1357 }
1358 }
1359 test_msg.wait_for_done();
1360 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1361 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1362 g_ceph_context->_conf->set_val("ms_inject_delay_probability", "0");
1363 g_ceph_context->_conf->set_val("ms_inject_delay_type", "", false);
1364 g_ceph_context->_conf->set_val("ms_inject_delay_max", "0");
1365 }
1366
1367
1368 class MarkdownDispatcher : public Dispatcher {
1369 Mutex lock;
1370 set<ConnectionRef> conns;
1371 bool last_mark;
1372 public:
1373 std::atomic<uint64_t> count = { 0 };
1374 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
1375 last_mark(false) {}
1376 bool ms_can_fast_dispatch_any() const override { return false; }
1377 bool ms_can_fast_dispatch(const Message *m) const override {
1378 switch (m->get_type()) {
1379 case CEPH_MSG_PING:
1380 return true;
1381 default:
1382 return false;
1383 }
1384 }
1385
1386 void ms_handle_fast_connect(Connection *con) override {
1387 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1388 Mutex::Locker l(lock);
1389 conns.insert(con);
1390 }
1391 void ms_handle_fast_accept(Connection *con) override {
1392 Mutex::Locker l(lock);
1393 conns.insert(con);
1394 }
1395 bool ms_dispatch(Message *m) override {
1396 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
1397 Mutex::Locker l(lock);
1398 count++;
1399 conns.insert(m->get_connection());
1400 if (conns.size() < 2 && !last_mark) {
1401 m->put();
1402 return true;
1403 }
1404
1405 last_mark = true;
1406 usleep(rand() % 500);
1407 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
1408 if ((*it) != m->get_connection().get()) {
1409 (*it)->mark_down();
1410 conns.erase(it);
1411 break;
1412 }
1413 }
1414 if (conns.empty())
1415 last_mark = false;
1416 m->put();
1417 return true;
1418 }
1419 bool ms_handle_reset(Connection *con) override {
1420 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1421 Mutex::Locker l(lock);
1422 conns.erase(con);
1423 usleep(rand() % 500);
1424 return true;
1425 }
1426 void ms_handle_remote_reset(Connection *con) override {
1427 Mutex::Locker l(lock);
1428 conns.erase(con);
1429 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1430 }
1431 bool ms_handle_refused(Connection *con) override {
1432 return false;
1433 }
1434 void ms_fast_dispatch(Message *m) override {
1435 ceph_abort();
1436 }
1437 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
1438 bufferlist& authorizer, bufferlist& authorizer_reply,
1439 bool& isvalid, CryptoKey& session_key) override {
1440 isvalid = true;
1441 return true;
1442 }
1443 };
1444
1445
1446 // Markdown with external lock
1447 TEST_P(MessengerTest, MarkdownTest) {
1448 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
1449 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
1450 entity_addr_t bind_addr;
1451 bind_addr.parse("127.0.0.1:16800");
1452 server_msgr->bind(bind_addr);
1453 server_msgr->add_dispatcher_head(&srv_dispatcher);
1454 server_msgr->start();
1455 bind_addr.parse("127.0.0.1:16801");
1456 server_msgr2->bind(bind_addr);
1457 server_msgr2->add_dispatcher_head(&srv_dispatcher);
1458 server_msgr2->start();
1459
1460 client_msgr->add_dispatcher_head(&cli_dispatcher);
1461 client_msgr->start();
1462
1463 int i = 1000;
1464 uint64_t last = 0;
1465 bool equal = false;
1466 uint64_t equal_count = 0;
1467 while (i--) {
1468 ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
1469 ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
1470 MPing *m = new MPing();
1471 ASSERT_EQ(conn1->send_message(m), 0);
1472 m = new MPing();
1473 ASSERT_EQ(conn2->send_message(m), 0);
1474 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
1475 if (srv_dispatcher.count == last) {
1476 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
1477 equal = true;
1478 equal_count++;
1479 } else {
1480 equal = false;
1481 equal_count = 0;
1482 }
1483 last = srv_dispatcher.count;
1484 if (equal_count)
1485 usleep(1000*500);
1486 ASSERT_FALSE(equal && equal_count > 3);
1487 }
1488 server_msgr->shutdown();
1489 client_msgr->shutdown();
1490 server_msgr2->shutdown();
1491 server_msgr->wait();
1492 client_msgr->wait();
1493 server_msgr2->wait();
1494 delete server_msgr2;
1495 }
1496
1497 INSTANTIATE_TEST_CASE_P(
1498 Messenger,
1499 MessengerTest,
1500 ::testing::Values(
1501 "async+posix",
1502 "simple"
1503 )
1504 );
1505
1506 #else
1507
1508 // Google Test may not support value-parameterized tests with some
1509 // compilers. If we use conditional compilation to compile out all
1510 // code referring to the gtest_main library, MSVC linker will not link
1511 // that library at all and consequently complain about missing entry
1512 // point defined in that library (fatal error LNK1561: entry point
1513 // must be defined). This dummy test keeps gtest_main linked in.
1514 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
1515
1516 #endif
1517
1518
1519 int main(int argc, char **argv) {
1520 vector<const char*> args;
1521 argv_to_vec(argc, (const char **)argv, args);
1522 env_to_vec(args);
1523
1524 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
1525 g_ceph_context->_conf->set_val("auth_cluster_required", "none");
1526 g_ceph_context->_conf->set_val("auth_service_required", "none");
1527 g_ceph_context->_conf->set_val("auth_client_required", "none");
1528 g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
1529 g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true");
1530 g_ceph_context->_conf->set_val("ms_die_on_old_message", "true");
1531 g_ceph_context->_conf->set_val("ms_max_backoff", "1");
1532 common_init_finish(g_ceph_context);
1533
1534 ::testing::InitGoogleTest(&argc, argv);
1535 return RUN_ALL_TESTS();
1536 }
1537
1538 /*
1539 * Local Variables:
1540 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
1541 * End:
1542 */