]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/msgr/test_msgr.cc
update sources to 12.2.7
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <atomic>
18 #include <iostream>
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <time.h>
22 #include "common/Mutex.h"
23 #include "common/Cond.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "msg/Dispatcher.h"
27 #include "msg/msg_types.h"
28 #include "msg/Message.h"
29 #include "msg/Messenger.h"
30 #include "msg/Connection.h"
31 #include "messages/MPing.h"
32 #include "messages/MCommand.h"
33
34 #include <boost/random/mersenne_twister.hpp>
35 #include <boost/random/uniform_int.hpp>
36 #include <boost/random/binomial_distribution.hpp>
37 #include <gtest/gtest.h>
38
39 typedef boost::mt11213b gen_type;
40
41 #include "common/dout.h"
42 #include "include/assert.h"
43
44 #define dout_subsys ceph_subsys_ms
45 #undef dout_prefix
46 #define dout_prefix *_dout << " ceph_test_msgr "
47
48
49 #if GTEST_HAS_PARAM_TEST
50
51 #define CHECK_AND_WAIT_TRUE(expr) do { \
52 int n = 1000; \
53 while (--n) { \
54 if (expr) \
55 break; \
56 usleep(1000); \
57 } \
58 } while(0);
59
60 class MessengerTest : public ::testing::TestWithParam<const char*> {
61 public:
62 Messenger *server_msgr;
63 Messenger *client_msgr;
64
65 MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
66 void SetUp() override {
67 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
68 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
69 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
70 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
71 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
72 }
73 void TearDown() override {
74 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
75 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
76 delete server_msgr;
77 delete client_msgr;
78 }
79
80 };
81
82
83 class FakeDispatcher : public Dispatcher {
84 public:
85 struct Session : public RefCountedObject {
86 atomic<uint64_t> count;
87 ConnectionRef con;
88
89 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
90 }
91 uint64_t get_count() { return count; }
92 };
93
94 Mutex lock;
95 Cond cond;
96 bool is_server;
97 bool got_new;
98 bool got_remote_reset;
99 bool got_connect;
100 bool loopback;
101
102 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
103 is_server(s), got_new(false), got_remote_reset(false),
104 got_connect(false), loopback(false) {}
105 bool ms_can_fast_dispatch_any() const override { return true; }
106 bool ms_can_fast_dispatch(const Message *m) const override {
107 switch (m->get_type()) {
108 case CEPH_MSG_PING:
109 return true;
110 default:
111 return false;
112 }
113 }
114
115 void ms_handle_fast_connect(Connection *con) override {
116 lock.Lock();
117 lderr(g_ceph_context) << __func__ << " " << con << dendl;
118 Session *s = static_cast<Session*>(con->get_priv());
119 if (!s) {
120 s = new Session(con);
121 con->set_priv(s->get());
122 lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
123 }
124 s->put();
125 got_connect = true;
126 cond.Signal();
127 lock.Unlock();
128 }
129 void ms_handle_fast_accept(Connection *con) override {
130 Session *s = static_cast<Session*>(con->get_priv());
131 if (!s) {
132 s = new Session(con);
133 con->set_priv(s->get());
134 }
135 s->put();
136 }
137 bool ms_dispatch(Message *m) override {
138 Session *s = static_cast<Session*>(m->get_connection()->get_priv());
139 if (!s) {
140 s = new Session(m->get_connection());
141 m->get_connection()->set_priv(s->get());
142 }
143 s->put();
144 s->count++;
145 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
146 if (is_server) {
147 reply_message(m);
148 }
149 Mutex::Locker l(lock);
150 got_new = true;
151 cond.Signal();
152 m->put();
153 return true;
154 }
155 bool ms_handle_reset(Connection *con) override {
156 Mutex::Locker l(lock);
157 lderr(g_ceph_context) << __func__ << " " << con << dendl;
158 Session *s = static_cast<Session*>(con->get_priv());
159 if (s) {
160 s->con.reset(NULL); // break con <-> session ref cycle
161 con->set_priv(NULL); // break ref <-> session cycle, if any
162 s->put();
163 }
164 return true;
165 }
166 void ms_handle_remote_reset(Connection *con) override {
167 Mutex::Locker l(lock);
168 lderr(g_ceph_context) << __func__ << " " << con << dendl;
169 Session *s = static_cast<Session*>(con->get_priv());
170 if (s) {
171 s->con.reset(NULL); // break con <-> session ref cycle
172 con->set_priv(NULL); // break ref <-> session cycle, if any
173 s->put();
174 }
175 got_remote_reset = true;
176 cond.Signal();
177 }
178 bool ms_handle_refused(Connection *con) override {
179 return false;
180 }
181 void ms_fast_dispatch(Message *m) override {
182 Session *s = static_cast<Session*>(m->get_connection()->get_priv());
183 if (!s) {
184 s = new Session(m->get_connection());
185 m->get_connection()->set_priv(s->get());
186 }
187 s->put();
188 s->count++;
189 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
190 if (is_server) {
191 if (loopback)
192 assert(m->get_source().is_osd());
193 else
194 reply_message(m);
195 } else if (loopback) {
196 assert(m->get_source().is_client());
197 }
198 m->put();
199 Mutex::Locker l(lock);
200 got_new = true;
201 cond.Signal();
202 }
203
204 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
205 bufferlist& authorizer, bufferlist& authorizer_reply,
206 bool& isvalid, CryptoKey& session_key,
207 std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
208 isvalid = true;
209 return true;
210 }
211
212 void reply_message(Message *m) {
213 MPing *rm = new MPing();
214 m->get_connection()->send_message(rm);
215 }
216 };
217
218 typedef FakeDispatcher::Session Session;
219
220 TEST_P(MessengerTest, SimpleTest) {
221 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
222 entity_addr_t bind_addr;
223 bind_addr.parse("127.0.0.1");
224 server_msgr->bind(bind_addr);
225 server_msgr->add_dispatcher_head(&srv_dispatcher);
226 server_msgr->start();
227
228 client_msgr->add_dispatcher_head(&cli_dispatcher);
229 client_msgr->start();
230
231 // 1. simple round trip
232 MPing *m = new MPing();
233 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
234 {
235 ASSERT_EQ(conn->send_message(m), 0);
236 Mutex::Locker l(cli_dispatcher.lock);
237 while (!cli_dispatcher.got_new)
238 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
239 cli_dispatcher.got_new = false;
240 }
241 ASSERT_TRUE(conn->is_connected());
242 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
243 ASSERT_TRUE(conn->peer_is_osd());
244
245 // 2. test rebind port
246 set<int> avoid_ports;
247 for (int i = 0; i < 10 ; i++)
248 avoid_ports.insert(server_msgr->get_myaddr().get_port() + i);
249 server_msgr->rebind(avoid_ports);
250 ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0);
251
252 conn = client_msgr->get_connection(server_msgr->get_myinst());
253 {
254 m = new MPing();
255 ASSERT_EQ(conn->send_message(m), 0);
256 Mutex::Locker l(cli_dispatcher.lock);
257 while (!cli_dispatcher.got_new)
258 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
259 cli_dispatcher.got_new = false;
260 }
261 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
262
263 // 3. test markdown connection
264 conn->mark_down();
265 ASSERT_FALSE(conn->is_connected());
266
267 // 4. test failed connection
268 server_msgr->shutdown();
269 server_msgr->wait();
270
271 m = new MPing();
272 conn->send_message(m);
273 CHECK_AND_WAIT_TRUE(!conn->is_connected());
274 ASSERT_FALSE(conn->is_connected());
275
276 // 5. loopback connection
277 srv_dispatcher.loopback = true;
278 conn = client_msgr->get_loopback_connection();
279 {
280 m = new MPing();
281 ASSERT_EQ(conn->send_message(m), 0);
282 Mutex::Locker l(cli_dispatcher.lock);
283 while (!cli_dispatcher.got_new)
284 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
285 cli_dispatcher.got_new = false;
286 }
287 srv_dispatcher.loopback = false;
288 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
289 client_msgr->shutdown();
290 client_msgr->wait();
291 server_msgr->shutdown();
292 server_msgr->wait();
293 }
294
295 TEST_P(MessengerTest, NameAddrTest) {
296 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
297 entity_addr_t bind_addr;
298 bind_addr.parse("127.0.0.1");
299 server_msgr->bind(bind_addr);
300 server_msgr->add_dispatcher_head(&srv_dispatcher);
301 server_msgr->start();
302
303 client_msgr->add_dispatcher_head(&cli_dispatcher);
304 client_msgr->start();
305
306 MPing *m = new MPing();
307 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
308 {
309 ASSERT_EQ(conn->send_message(m), 0);
310 Mutex::Locker l(cli_dispatcher.lock);
311 while (!cli_dispatcher.got_new)
312 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
313 cli_dispatcher.got_new = false;
314 }
315 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
316 ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
317 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
318 // Make should server_conn is the one we already accepted from client,
319 // so it means client_msgr has the same addr when server connection has
320 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
321 server_msgr->shutdown();
322 client_msgr->shutdown();
323 server_msgr->wait();
324 client_msgr->wait();
325 }
326
327 TEST_P(MessengerTest, FeatureTest) {
328 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
329 entity_addr_t bind_addr;
330 bind_addr.parse("127.0.0.1");
331 uint64_t all_feature_supported, feature_required, feature_supported = 0;
332 for (int i = 0; i < 10; i++)
333 feature_supported |= 1ULL << i;
334 feature_required = feature_supported | 1ULL << 13;
335 all_feature_supported = feature_required | 1ULL << 14;
336
337 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
338 p.features_required = feature_required;
339 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
340 server_msgr->bind(bind_addr);
341 server_msgr->add_dispatcher_head(&srv_dispatcher);
342 server_msgr->start();
343
344 // 1. Suppose if only support less than required
345 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
346 p.features_supported = feature_supported;
347 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
348 client_msgr->add_dispatcher_head(&cli_dispatcher);
349 client_msgr->start();
350
351 MPing *m = new MPing();
352 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
353 conn->send_message(m);
354 CHECK_AND_WAIT_TRUE(!conn->is_connected());
355 // should failed build a connection
356 ASSERT_FALSE(conn->is_connected());
357
358 client_msgr->shutdown();
359 client_msgr->wait();
360
361 // 2. supported met required
362 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
363 p.features_supported = all_feature_supported;
364 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
365 client_msgr->start();
366
367 conn = client_msgr->get_connection(server_msgr->get_myinst());
368 {
369 m = new MPing();
370 ASSERT_EQ(conn->send_message(m), 0);
371 Mutex::Locker l(cli_dispatcher.lock);
372 while (!cli_dispatcher.got_new)
373 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
374 cli_dispatcher.got_new = false;
375 }
376 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
377
378 server_msgr->shutdown();
379 client_msgr->shutdown();
380 server_msgr->wait();
381 client_msgr->wait();
382 }
383
384 TEST_P(MessengerTest, TimeoutTest) {
385 g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1");
386 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
387 entity_addr_t bind_addr;
388 bind_addr.parse("127.0.0.1");
389 server_msgr->bind(bind_addr);
390 server_msgr->add_dispatcher_head(&srv_dispatcher);
391 server_msgr->start();
392
393 client_msgr->add_dispatcher_head(&cli_dispatcher);
394 client_msgr->start();
395
396 // 1. build the connection
397 MPing *m = new MPing();
398 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
399 {
400 ASSERT_EQ(conn->send_message(m), 0);
401 Mutex::Locker l(cli_dispatcher.lock);
402 while (!cli_dispatcher.got_new)
403 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
404 cli_dispatcher.got_new = false;
405 }
406 ASSERT_TRUE(conn->is_connected());
407 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
408 ASSERT_TRUE(conn->peer_is_osd());
409
410 // 2. wait for idle
411 usleep(2500*1000);
412 ASSERT_FALSE(conn->is_connected());
413
414 server_msgr->shutdown();
415 server_msgr->wait();
416
417 client_msgr->shutdown();
418 client_msgr->wait();
419 g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900");
420 }
421
422 TEST_P(MessengerTest, StatefulTest) {
423 Message *m;
424 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
425 entity_addr_t bind_addr;
426 bind_addr.parse("127.0.0.1");
427 Messenger::Policy p = Messenger::Policy::stateful_server(0);
428 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
429 p = Messenger::Policy::lossless_client(0);
430 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
431
432 server_msgr->bind(bind_addr);
433 server_msgr->add_dispatcher_head(&srv_dispatcher);
434 server_msgr->start();
435 client_msgr->add_dispatcher_head(&cli_dispatcher);
436 client_msgr->start();
437
438 // 1. test for server standby
439 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
440 {
441 m = new MPing();
442 ASSERT_EQ(conn->send_message(m), 0);
443 Mutex::Locker l(cli_dispatcher.lock);
444 while (!cli_dispatcher.got_new)
445 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
446 cli_dispatcher.got_new = false;
447 }
448 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
449 conn->mark_down();
450 ASSERT_FALSE(conn->is_connected());
451 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
452 // don't lose state
453 ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
454
455 srv_dispatcher.got_new = false;
456 conn = client_msgr->get_connection(server_msgr->get_myinst());
457 {
458 m = new MPing();
459 ASSERT_EQ(conn->send_message(m), 0);
460 Mutex::Locker l(cli_dispatcher.lock);
461 while (!cli_dispatcher.got_new)
462 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
463 cli_dispatcher.got_new = false;
464 }
465 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
466 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
467 {
468 Mutex::Locker l(srv_dispatcher.lock);
469 while (!srv_dispatcher.got_remote_reset)
470 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
471 }
472
473 // 2. test for client reconnect
474 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
475 cli_dispatcher.got_connect = false;
476 cli_dispatcher.got_new = false;
477 cli_dispatcher.got_remote_reset = false;
478 server_conn->mark_down();
479 ASSERT_FALSE(server_conn->is_connected());
480 // ensure client detect server socket closed
481 {
482 Mutex::Locker l(cli_dispatcher.lock);
483 while (!cli_dispatcher.got_remote_reset)
484 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
485 cli_dispatcher.got_remote_reset = false;
486 }
487 {
488 Mutex::Locker l(cli_dispatcher.lock);
489 while (!cli_dispatcher.got_connect)
490 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
491 cli_dispatcher.got_connect = false;
492 }
493 CHECK_AND_WAIT_TRUE(conn->is_connected());
494 ASSERT_TRUE(conn->is_connected());
495
496 {
497 m = new MPing();
498 ASSERT_EQ(conn->send_message(m), 0);
499 ASSERT_TRUE(conn->is_connected());
500 Mutex::Locker l(cli_dispatcher.lock);
501 while (!cli_dispatcher.got_new)
502 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
503 cli_dispatcher.got_new = false;
504 }
505 // resetcheck happen
506 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
507 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
508 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
509 cli_dispatcher.got_remote_reset = false;
510
511 server_msgr->shutdown();
512 client_msgr->shutdown();
513 server_msgr->wait();
514 client_msgr->wait();
515 }
516
517 TEST_P(MessengerTest, StatelessTest) {
518 Message *m;
519 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
520 entity_addr_t bind_addr;
521 bind_addr.parse("127.0.0.1");
522 Messenger::Policy p = Messenger::Policy::stateless_server(0);
523 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
524 p = Messenger::Policy::lossy_client(0);
525 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
526
527 server_msgr->bind(bind_addr);
528 server_msgr->add_dispatcher_head(&srv_dispatcher);
529 server_msgr->start();
530 client_msgr->add_dispatcher_head(&cli_dispatcher);
531 client_msgr->start();
532
533 // 1. test for server lose state
534 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
535 {
536 m = new MPing();
537 ASSERT_EQ(conn->send_message(m), 0);
538 Mutex::Locker l(cli_dispatcher.lock);
539 while (!cli_dispatcher.got_new)
540 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
541 cli_dispatcher.got_new = false;
542 }
543 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
544 conn->mark_down();
545 ASSERT_FALSE(conn->is_connected());
546
547 srv_dispatcher.got_new = false;
548 conn = client_msgr->get_connection(server_msgr->get_myinst());
549 {
550 m = new MPing();
551 ASSERT_EQ(conn->send_message(m), 0);
552 Mutex::Locker l(cli_dispatcher.lock);
553 while (!cli_dispatcher.got_new)
554 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
555 cli_dispatcher.got_new = false;
556 }
557 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
558 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
559 // server lose state
560 {
561 Mutex::Locker l(srv_dispatcher.lock);
562 while (!srv_dispatcher.got_new)
563 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
564 }
565 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
566
567 // 2. test for client lossy
568 server_conn->mark_down();
569 ASSERT_FALSE(server_conn->is_connected());
570 conn->send_keepalive();
571 CHECK_AND_WAIT_TRUE(!conn->is_connected());
572 ASSERT_FALSE(conn->is_connected());
573 conn = client_msgr->get_connection(server_msgr->get_myinst());
574 {
575 m = new MPing();
576 ASSERT_EQ(conn->send_message(m), 0);
577 Mutex::Locker l(cli_dispatcher.lock);
578 while (!cli_dispatcher.got_new)
579 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
580 cli_dispatcher.got_new = false;
581 }
582 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
583
584 server_msgr->shutdown();
585 client_msgr->shutdown();
586 server_msgr->wait();
587 client_msgr->wait();
588 }
589
590 TEST_P(MessengerTest, ClientStandbyTest) {
591 Message *m;
592 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
593 entity_addr_t bind_addr;
594 bind_addr.parse("127.0.0.1");
595 Messenger::Policy p = Messenger::Policy::stateful_server(0);
596 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
597 p = Messenger::Policy::lossless_peer(0);
598 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
599
600 server_msgr->bind(bind_addr);
601 server_msgr->add_dispatcher_head(&srv_dispatcher);
602 server_msgr->start();
603 client_msgr->add_dispatcher_head(&cli_dispatcher);
604 client_msgr->start();
605
606 // 1. test for client standby, resetcheck
607 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
608 {
609 m = new MPing();
610 ASSERT_EQ(conn->send_message(m), 0);
611 Mutex::Locker l(cli_dispatcher.lock);
612 while (!cli_dispatcher.got_new)
613 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
614 cli_dispatcher.got_new = false;
615 }
616 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
617 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
618 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
619 cli_dispatcher.got_connect = false;
620 server_conn->mark_down();
621 ASSERT_FALSE(server_conn->is_connected());
622 // client should be standby
623 usleep(300*1000);
624 // client should be standby, so we use original connection
625 {
626 // Try send message to verify got remote reset callback
627 m = new MPing();
628 ASSERT_EQ(conn->send_message(m), 0);
629 {
630 Mutex::Locker l(cli_dispatcher.lock);
631 while (!cli_dispatcher.got_remote_reset)
632 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
633 cli_dispatcher.got_remote_reset = false;
634 while (!cli_dispatcher.got_connect)
635 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
636 cli_dispatcher.got_connect = false;
637 }
638 CHECK_AND_WAIT_TRUE(conn->is_connected());
639 ASSERT_TRUE(conn->is_connected());
640 m = new MPing();
641 ASSERT_EQ(conn->send_message(m), 0);
642 Mutex::Locker l(cli_dispatcher.lock);
643 while (!cli_dispatcher.got_new)
644 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
645 cli_dispatcher.got_new = false;
646 }
647 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
648 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
649 ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
650
651 server_msgr->shutdown();
652 client_msgr->shutdown();
653 server_msgr->wait();
654 client_msgr->wait();
655 }
656
657 TEST_P(MessengerTest, AuthTest) {
658 g_ceph_context->_conf->set_val("auth_cluster_required", "cephx");
659 g_ceph_context->_conf->set_val("auth_service_required", "cephx");
660 g_ceph_context->_conf->set_val("auth_client_required", "cephx");
661 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
662 entity_addr_t bind_addr;
663 bind_addr.parse("127.0.0.1");
664 server_msgr->bind(bind_addr);
665 server_msgr->add_dispatcher_head(&srv_dispatcher);
666 server_msgr->start();
667
668 client_msgr->add_dispatcher_head(&cli_dispatcher);
669 client_msgr->start();
670
671 // 1. simple auth round trip
672 MPing *m = new MPing();
673 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
674 {
675 ASSERT_EQ(conn->send_message(m), 0);
676 Mutex::Locker l(cli_dispatcher.lock);
677 while (!cli_dispatcher.got_new)
678 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
679 cli_dispatcher.got_new = false;
680 }
681 ASSERT_TRUE(conn->is_connected());
682 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
683
684 // 2. mix auth
685 g_ceph_context->_conf->set_val("auth_cluster_required", "none");
686 g_ceph_context->_conf->set_val("auth_service_required", "none");
687 g_ceph_context->_conf->set_val("auth_client_required", "none");
688 conn->mark_down();
689 ASSERT_FALSE(conn->is_connected());
690 conn = client_msgr->get_connection(server_msgr->get_myinst());
691 {
692 MPing *m = new MPing();
693 ASSERT_EQ(conn->send_message(m), 0);
694 Mutex::Locker l(cli_dispatcher.lock);
695 while (!cli_dispatcher.got_new)
696 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
697 cli_dispatcher.got_new = false;
698 }
699 ASSERT_TRUE(conn->is_connected());
700 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
701
702 server_msgr->shutdown();
703 client_msgr->shutdown();
704 server_msgr->wait();
705 client_msgr->wait();
706 }
707
708 TEST_P(MessengerTest, MessageTest) {
709 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
710 entity_addr_t bind_addr;
711 bind_addr.parse("127.0.0.1");
712 Messenger::Policy p = Messenger::Policy::stateful_server(0);
713 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
714 p = Messenger::Policy::lossless_peer(0);
715 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
716
717 server_msgr->bind(bind_addr);
718 server_msgr->add_dispatcher_head(&srv_dispatcher);
719 server_msgr->start();
720 client_msgr->add_dispatcher_head(&cli_dispatcher);
721 client_msgr->start();
722
723
724 // 1. A very large "front"(as well as "payload")
725 // Because a external message need to invade Messenger::decode_message,
726 // here we only use existing message class(MCommand)
727 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
728 {
729 uuid_d uuid;
730 uuid.generate_random();
731 vector<string> cmds;
732 string s("abcdefghijklmnopqrstuvwxyz");
733 for (int i = 0; i < 1024*30; i++)
734 cmds.push_back(s);
735 MCommand *m = new MCommand(uuid);
736 m->cmd = cmds;
737 conn->send_message(m);
738 utime_t t;
739 t += 1000*1000*500;
740 Mutex::Locker l(cli_dispatcher.lock);
741 while (!cli_dispatcher.got_new)
742 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
743 ASSERT_TRUE(cli_dispatcher.got_new);
744 cli_dispatcher.got_new = false;
745 }
746
747 // 2. A very large "data"
748 {
749 bufferlist bl;
750 string s("abcdefghijklmnopqrstuvwxyz");
751 for (int i = 0; i < 1024*30; i++)
752 bl.append(s);
753 MPing *m = new MPing();
754 m->set_data(bl);
755 conn->send_message(m);
756 utime_t t;
757 t += 1000*1000*500;
758 Mutex::Locker l(cli_dispatcher.lock);
759 while (!cli_dispatcher.got_new)
760 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
761 ASSERT_TRUE(cli_dispatcher.got_new);
762 cli_dispatcher.got_new = false;
763 }
764 server_msgr->shutdown();
765 client_msgr->shutdown();
766 server_msgr->wait();
767 client_msgr->wait();
768 }
769
770
771 class SyntheticWorkload;
772
773 struct Payload {
774 enum Who : uint8_t {
775 PING = 0,
776 PONG = 1,
777 };
778 uint8_t who;
779 uint64_t seq;
780 bufferlist data;
781
782 Payload(Who who, uint64_t seq, const bufferlist& data)
783 : who(who), seq(seq), data(data)
784 {}
785 Payload() = default;
786 DENC(Payload, v, p) {
787 DENC_START(1, 1, p);
788 denc(v.who, p);
789 denc(v.seq, p);
790 denc(v.data, p);
791 DENC_FINISH(p);
792 }
793 };
794 WRITE_CLASS_DENC(Payload)
795
796 ostream& operator<<(ostream& out, const Payload &pl)
797 {
798 return out << "reply=" << pl.who << " i = " << pl.seq;
799 }
800
801 class SyntheticDispatcher : public Dispatcher {
802 public:
803 Mutex lock;
804 Cond cond;
805 bool is_server;
806 bool got_new;
807 bool got_remote_reset;
808 bool got_connect;
809 map<ConnectionRef, list<uint64_t> > conn_sent;
810 map<uint64_t, bufferlist> sent;
811 atomic<uint64_t> index;
812 SyntheticWorkload *workload;
813
814 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
815 Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
816 got_remote_reset(false), got_connect(false), index(0), workload(wl) {}
817 bool ms_can_fast_dispatch_any() const override { return true; }
818 bool ms_can_fast_dispatch(const Message *m) const override {
819 switch (m->get_type()) {
820 case CEPH_MSG_PING:
821 case MSG_COMMAND:
822 return true;
823 default:
824 return false;
825 }
826 }
827
828 void ms_handle_fast_connect(Connection *con) override {
829 Mutex::Locker l(lock);
830 list<uint64_t> c = conn_sent[con];
831 for (list<uint64_t>::iterator it = c.begin();
832 it != c.end(); ++it)
833 sent.erase(*it);
834 conn_sent.erase(con);
835 got_connect = true;
836 cond.Signal();
837 }
838 void ms_handle_fast_accept(Connection *con) override {
839 Mutex::Locker l(lock);
840 list<uint64_t> c = conn_sent[con];
841 for (list<uint64_t>::iterator it = c.begin();
842 it != c.end(); ++it)
843 sent.erase(*it);
844 conn_sent.erase(con);
845 cond.Signal();
846 }
847 bool ms_dispatch(Message *m) override {
848 ceph_abort();
849 }
850 bool ms_handle_reset(Connection *con) override;
851 void ms_handle_remote_reset(Connection *con) override {
852 Mutex::Locker l(lock);
853 list<uint64_t> c = conn_sent[con];
854 for (list<uint64_t>::iterator it = c.begin();
855 it != c.end(); ++it)
856 sent.erase(*it);
857 conn_sent.erase(con);
858 got_remote_reset = true;
859 }
860 bool ms_handle_refused(Connection *con) override {
861 return false;
862 }
863 void ms_fast_dispatch(Message *m) override {
864 // MSG_COMMAND is used to disorganize regular message flow
865 if (m->get_type() == MSG_COMMAND) {
866 m->put();
867 return ;
868 }
869
870 Payload pl;
871 auto p = m->get_data().begin();
872 ::decode(pl, p);
873 if (pl.who == Payload::PING) {
874 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
875 reply_message(m, pl);
876 m->put();
877 Mutex::Locker l(lock);
878 got_new = true;
879 cond.Signal();
880 } else {
881 Mutex::Locker l(lock);
882 if (sent.count(pl.seq)) {
883 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
884 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
885 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
886 conn_sent[m->get_connection()].pop_front();
887 sent.erase(pl.seq);
888 }
889 m->put();
890 got_new = true;
891 cond.Signal();
892 }
893 }
894
895 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
896 bufferlist& authorizer, bufferlist& authorizer_reply,
897 bool& isvalid, CryptoKey& session_key,
898 std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
899 isvalid = true;
900 return true;
901 }
902
903 void reply_message(const Message *m, Payload& pl) {
904 pl.who = Payload::PONG;
905 bufferlist bl;
906 ::encode(pl, bl);
907 MPing *rm = new MPing();
908 rm->set_data(bl);
909 m->get_connection()->send_message(rm);
910 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
911 }
912
913 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
914 Message *m = new MPing();
915 Payload pl{Payload::PING, index++, data};
916 bufferlist bl;
917 ::encode(pl, bl);
918 m->set_data(bl);
919 if (!con->get_messenger()->get_default_policy().lossy) {
920 Mutex::Locker l(lock);
921 sent[pl.seq] = pl.data;
922 conn_sent[con].push_back(pl.seq);
923 }
924 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
925 ASSERT_EQ(0, con->send_message(m));
926 }
927
928 uint64_t get_pending() {
929 Mutex::Locker l(lock);
930 return sent.size();
931 }
932
933 void clear_pending(ConnectionRef con) {
934 Mutex::Locker l(lock);
935
936 for (list<uint64_t>::iterator it = conn_sent[con].begin();
937 it != conn_sent[con].end(); ++it)
938 sent.erase(*it);
939 conn_sent.erase(con);
940 }
941
942 void print() {
943 for (auto && p : conn_sent) {
944 if (!p.second.empty()) {
945 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
946 }
947 }
948 }
949 };
950
951
952 class SyntheticWorkload {
953 Mutex lock;
954 Cond cond;
955 set<Messenger*> available_servers;
956 set<Messenger*> available_clients;
957 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
958 SyntheticDispatcher dispatcher;
959 gen_type rng;
960 vector<bufferlist> rand_data;
961
962 public:
963 static const unsigned max_in_flight = 64;
964 static const unsigned max_connections = 128;
965 static const unsigned max_message_len = 1024 * 1024 * 4;
966
967 SyntheticWorkload(int servers, int clients, string type, int random_num,
968 Messenger::Policy srv_policy, Messenger::Policy cli_policy):
969 lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
970 Messenger *msgr;
971 int base_port = 16800;
972 entity_addr_t bind_addr;
973 char addr[64];
974 for (int i = 0; i < servers; ++i) {
975 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
976 "server", getpid()+i, 0);
977 snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
978 bind_addr.parse(addr);
979 msgr->bind(bind_addr);
980 msgr->add_dispatcher_head(&dispatcher);
981
982 assert(msgr);
983 msgr->set_default_policy(srv_policy);
984 available_servers.insert(msgr);
985 msgr->start();
986 }
987
988 for (int i = 0; i < clients; ++i) {
989 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
990 "client", getpid()+i+servers, 0);
991 if (cli_policy.standby) {
992 snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
993 bind_addr.parse(addr);
994 msgr->bind(bind_addr);
995 }
996 msgr->add_dispatcher_head(&dispatcher);
997
998 assert(msgr);
999 msgr->set_default_policy(cli_policy);
1000 available_clients.insert(msgr);
1001 msgr->start();
1002 }
1003
1004 for (int i = 0; i < random_num; i++) {
1005 bufferlist bl;
1006 boost::uniform_int<> u(32, max_message_len);
1007 uint64_t value_len = u(rng);
1008 bufferptr bp(value_len);
1009 bp.zero();
1010 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1011 memcpy(bp.c_str()+j, &i, sizeof(i));
1012 j += 4096;
1013 }
1014
1015 bl.append(bp);
1016 rand_data.push_back(bl);
1017 }
1018 }
1019
1020 ConnectionRef _get_random_connection() {
1021 while (dispatcher.get_pending() > max_in_flight) {
1022 lock.Unlock();
1023 usleep(500);
1024 lock.Lock();
1025 }
1026 assert(lock.is_locked());
1027 boost::uniform_int<> choose(0, available_connections.size() - 1);
1028 int index = choose(rng);
1029 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1030 for (; index > 0; --index, ++i) ;
1031 return i->first;
1032 }
1033
1034 bool can_create_connection() {
1035 return available_connections.size() < max_connections;
1036 }
1037
1038 void generate_connection() {
1039 Mutex::Locker l(lock);
1040 if (!can_create_connection())
1041 return ;
1042
1043 Messenger *server, *client;
1044 {
1045 boost::uniform_int<> choose(0, available_servers.size() - 1);
1046 int index = choose(rng);
1047 set<Messenger*>::iterator i = available_servers.begin();
1048 for (; index > 0; --index, ++i) ;
1049 server = *i;
1050 }
1051 {
1052 boost::uniform_int<> choose(0, available_clients.size() - 1);
1053 int index = choose(rng);
1054 set<Messenger*>::iterator i = available_clients.begin();
1055 for (; index > 0; --index, ++i) ;
1056 client = *i;
1057 }
1058
1059 pair<Messenger*, Messenger*> p;
1060 {
1061 boost::uniform_int<> choose(0, available_servers.size() - 1);
1062 if (server->get_default_policy().server) {
1063 p = make_pair(client, server);
1064 } else {
1065 ConnectionRef conn = client->get_connection(server->get_myinst());
1066 if (available_connections.count(conn) || choose(rng) % 2)
1067 p = make_pair(client, server);
1068 else
1069 p = make_pair(server, client);
1070 }
1071 }
1072 ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
1073 available_connections[conn] = p;
1074 }
1075
1076 void send_message() {
1077 Mutex::Locker l(lock);
1078 ConnectionRef conn = _get_random_connection();
1079 boost::uniform_int<> true_false(0, 99);
1080 int val = true_false(rng);
1081 if (val >= 95) {
1082 uuid_d uuid;
1083 uuid.generate_random();
1084 MCommand *m = new MCommand(uuid);
1085 vector<string> cmds;
1086 cmds.push_back("command");
1087 m->cmd = cmds;
1088 m->set_priority(200);
1089 conn->send_message(m);
1090 } else {
1091 boost::uniform_int<> u(0, rand_data.size()-1);
1092 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1093 }
1094 }
1095
1096 void drop_connection() {
1097 Mutex::Locker l(lock);
1098 if (available_connections.size() < 10)
1099 return;
1100 ConnectionRef conn = _get_random_connection();
1101 dispatcher.clear_pending(conn);
1102 conn->mark_down();
1103 pair<Messenger*, Messenger*> &p = available_connections[conn];
1104 // it's a lossless policy, so we need to mark down each side
1105 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1106 ASSERT_EQ(conn->get_messenger(), p.first);
1107 ConnectionRef peer = p.second->get_connection(p.first->get_myinst());
1108 peer->mark_down();
1109 dispatcher.clear_pending(peer);
1110 available_connections.erase(peer);
1111 }
1112 ASSERT_EQ(available_connections.erase(conn), 1U);
1113 }
1114
1115 void print_internal_state(bool detail=false) {
1116 Mutex::Locker l(lock);
1117 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1118 << " inflight messages: " << dispatcher.get_pending() << dendl;
1119 if (detail && !available_connections.empty()) {
1120 dispatcher.print();
1121 }
1122 }
1123
1124 void wait_for_done() {
1125 int64_t tick_us = 1000 * 100; // 100ms
1126 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1127 int i = 0;
1128 while (dispatcher.get_pending()) {
1129 usleep(tick_us);
1130 timeout_us -= tick_us;
1131 if (i++ % 50 == 0)
1132 print_internal_state(true);
1133 if (timeout_us < 0)
1134 assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!");
1135 }
1136 for (set<Messenger*>::iterator it = available_servers.begin();
1137 it != available_servers.end(); ++it) {
1138 (*it)->shutdown();
1139 (*it)->wait();
1140 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1141 delete (*it);
1142 }
1143 available_servers.clear();
1144
1145 for (set<Messenger*>::iterator it = available_clients.begin();
1146 it != available_clients.end(); ++it) {
1147 (*it)->shutdown();
1148 (*it)->wait();
1149 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1150 delete (*it);
1151 }
1152 available_clients.clear();
1153 }
1154
1155 void handle_reset(Connection *con) {
1156 Mutex::Locker l(lock);
1157 available_connections.erase(con);
1158 dispatcher.clear_pending(con);
1159 }
1160 };
1161
1162 bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1163 workload->handle_reset(con);
1164 return true;
1165 }
1166
1167 TEST_P(MessengerTest, SyntheticStressTest) {
1168 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1169 Messenger::Policy::stateful_server(0),
1170 Messenger::Policy::lossless_client(0));
1171 for (int i = 0; i < 100; ++i) {
1172 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1173 test_msg.generate_connection();
1174 }
1175 gen_type rng(time(NULL));
1176 for (int i = 0; i < 5000; ++i) {
1177 if (!(i % 10)) {
1178 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1179 test_msg.print_internal_state();
1180 }
1181 boost::uniform_int<> true_false(0, 99);
1182 int val = true_false(rng);
1183 if (val > 90) {
1184 test_msg.generate_connection();
1185 } else if (val > 80) {
1186 test_msg.drop_connection();
1187 } else if (val > 10) {
1188 test_msg.send_message();
1189 } else {
1190 usleep(rand() % 1000 + 500);
1191 }
1192 }
1193 test_msg.wait_for_done();
1194 }
1195
1196 TEST_P(MessengerTest, SyntheticStressTest1) {
1197 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1198 Messenger::Policy::lossless_peer_reuse(0),
1199 Messenger::Policy::lossless_peer_reuse(0));
1200 for (int i = 0; i < 10; ++i) {
1201 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1202 test_msg.generate_connection();
1203 }
1204 gen_type rng(time(NULL));
1205 for (int i = 0; i < 10000; ++i) {
1206 if (!(i % 10)) {
1207 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1208 test_msg.print_internal_state();
1209 }
1210 boost::uniform_int<> true_false(0, 99);
1211 int val = true_false(rng);
1212 if (val > 80) {
1213 test_msg.generate_connection();
1214 } else if (val > 60) {
1215 test_msg.drop_connection();
1216 } else if (val > 10) {
1217 test_msg.send_message();
1218 } else {
1219 usleep(rand() % 1000 + 500);
1220 }
1221 }
1222 test_msg.wait_for_done();
1223 }
1224
1225
1226 TEST_P(MessengerTest, SyntheticInjectTest) {
1227 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
1228 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1229 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1230 g_ceph_context->_conf->set_val("ms_dispatch_throttle_bytes", "16777216");
1231 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1232 Messenger::Policy::stateful_server(0),
1233 Messenger::Policy::lossless_client(0));
1234 for (int i = 0; i < 100; ++i) {
1235 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1236 test_msg.generate_connection();
1237 }
1238 gen_type rng(time(NULL));
1239 for (int i = 0; i < 1000; ++i) {
1240 if (!(i % 10)) {
1241 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1242 test_msg.print_internal_state();
1243 }
1244 boost::uniform_int<> true_false(0, 99);
1245 int val = true_false(rng);
1246 if (val > 90) {
1247 test_msg.generate_connection();
1248 } else if (val > 80) {
1249 test_msg.drop_connection();
1250 } else if (val > 10) {
1251 test_msg.send_message();
1252 } else {
1253 usleep(rand() % 500 + 100);
1254 }
1255 }
1256 test_msg.wait_for_done();
1257 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1258 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1259 g_ceph_context->_conf->set_val(
1260 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
1261 }
1262
1263 TEST_P(MessengerTest, SyntheticInjectTest2) {
1264 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1265 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1266 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1267 Messenger::Policy::lossless_peer_reuse(0),
1268 Messenger::Policy::lossless_peer_reuse(0));
1269 for (int i = 0; i < 100; ++i) {
1270 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1271 test_msg.generate_connection();
1272 }
1273 gen_type rng(time(NULL));
1274 for (int i = 0; i < 1000; ++i) {
1275 if (!(i % 10)) {
1276 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1277 test_msg.print_internal_state();
1278 }
1279 boost::uniform_int<> true_false(0, 99);
1280 int val = true_false(rng);
1281 if (val > 90) {
1282 test_msg.generate_connection();
1283 } else if (val > 80) {
1284 test_msg.drop_connection();
1285 } else if (val > 10) {
1286 test_msg.send_message();
1287 } else {
1288 usleep(rand() % 500 + 100);
1289 }
1290 }
1291 test_msg.wait_for_done();
1292 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1293 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1294 }
1295
1296 TEST_P(MessengerTest, SyntheticInjectTest3) {
1297 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "600");
1298 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1299 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1300 Messenger::Policy::stateless_server(0),
1301 Messenger::Policy::lossy_client(0));
1302 for (int i = 0; i < 100; ++i) {
1303 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1304 test_msg.generate_connection();
1305 }
1306 gen_type rng(time(NULL));
1307 for (int i = 0; i < 1000; ++i) {
1308 if (!(i % 10)) {
1309 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1310 test_msg.print_internal_state();
1311 }
1312 boost::uniform_int<> true_false(0, 99);
1313 int val = true_false(rng);
1314 if (val > 90) {
1315 test_msg.generate_connection();
1316 } else if (val > 80) {
1317 test_msg.drop_connection();
1318 } else if (val > 10) {
1319 test_msg.send_message();
1320 } else {
1321 usleep(rand() % 500 + 100);
1322 }
1323 }
1324 test_msg.wait_for_done();
1325 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1326 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1327 }
1328
1329
1330 TEST_P(MessengerTest, SyntheticInjectTest4) {
1331 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1332 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1333 g_ceph_context->_conf->set_val("ms_inject_delay_probability", "1");
1334 g_ceph_context->_conf->set_val("ms_inject_delay_type", "client osd", false);
1335 g_ceph_context->_conf->set_val("ms_inject_delay_max", "5");
1336 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1337 Messenger::Policy::lossless_peer(0),
1338 Messenger::Policy::lossless_peer(0));
1339 for (int i = 0; i < 100; ++i) {
1340 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1341 test_msg.generate_connection();
1342 }
1343 gen_type rng(time(NULL));
1344 for (int i = 0; i < 1000; ++i) {
1345 if (!(i % 10)) {
1346 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1347 test_msg.print_internal_state();
1348 }
1349 boost::uniform_int<> true_false(0, 99);
1350 int val = true_false(rng);
1351 if (val > 95) {
1352 test_msg.generate_connection();
1353 } else if (val > 80) {
1354 // test_msg.drop_connection();
1355 } else if (val > 10) {
1356 test_msg.send_message();
1357 } else {
1358 usleep(rand() % 500 + 100);
1359 }
1360 }
1361 test_msg.wait_for_done();
1362 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1363 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1364 g_ceph_context->_conf->set_val("ms_inject_delay_probability", "0");
1365 g_ceph_context->_conf->set_val("ms_inject_delay_type", "", false);
1366 g_ceph_context->_conf->set_val("ms_inject_delay_max", "0");
1367 }
1368
1369
1370 class MarkdownDispatcher : public Dispatcher {
1371 Mutex lock;
1372 set<ConnectionRef> conns;
1373 bool last_mark;
1374 public:
1375 std::atomic<uint64_t> count = { 0 };
1376 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
1377 last_mark(false) {}
1378 bool ms_can_fast_dispatch_any() const override { return false; }
1379 bool ms_can_fast_dispatch(const Message *m) const override {
1380 switch (m->get_type()) {
1381 case CEPH_MSG_PING:
1382 return true;
1383 default:
1384 return false;
1385 }
1386 }
1387
1388 void ms_handle_fast_connect(Connection *con) override {
1389 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1390 Mutex::Locker l(lock);
1391 conns.insert(con);
1392 }
1393 void ms_handle_fast_accept(Connection *con) override {
1394 Mutex::Locker l(lock);
1395 conns.insert(con);
1396 }
1397 bool ms_dispatch(Message *m) override {
1398 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
1399 Mutex::Locker l(lock);
1400 count++;
1401 conns.insert(m->get_connection());
1402 if (conns.size() < 2 && !last_mark) {
1403 m->put();
1404 return true;
1405 }
1406
1407 last_mark = true;
1408 usleep(rand() % 500);
1409 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
1410 if ((*it) != m->get_connection().get()) {
1411 (*it)->mark_down();
1412 conns.erase(it);
1413 break;
1414 }
1415 }
1416 if (conns.empty())
1417 last_mark = false;
1418 m->put();
1419 return true;
1420 }
1421 bool ms_handle_reset(Connection *con) override {
1422 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1423 Mutex::Locker l(lock);
1424 conns.erase(con);
1425 usleep(rand() % 500);
1426 return true;
1427 }
1428 void ms_handle_remote_reset(Connection *con) override {
1429 Mutex::Locker l(lock);
1430 conns.erase(con);
1431 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1432 }
1433 bool ms_handle_refused(Connection *con) override {
1434 return false;
1435 }
1436 void ms_fast_dispatch(Message *m) override {
1437 ceph_abort();
1438 }
1439 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
1440 bufferlist& authorizer, bufferlist& authorizer_reply,
1441 bool& isvalid, CryptoKey& session_key,
1442 std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
1443 isvalid = true;
1444 return true;
1445 }
1446 };
1447
1448
1449 // Markdown with external lock
1450 TEST_P(MessengerTest, MarkdownTest) {
1451 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
1452 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
1453 entity_addr_t bind_addr;
1454 bind_addr.parse("127.0.0.1:16800");
1455 server_msgr->bind(bind_addr);
1456 server_msgr->add_dispatcher_head(&srv_dispatcher);
1457 server_msgr->start();
1458 bind_addr.parse("127.0.0.1:16801");
1459 server_msgr2->bind(bind_addr);
1460 server_msgr2->add_dispatcher_head(&srv_dispatcher);
1461 server_msgr2->start();
1462
1463 client_msgr->add_dispatcher_head(&cli_dispatcher);
1464 client_msgr->start();
1465
1466 int i = 1000;
1467 uint64_t last = 0;
1468 bool equal = false;
1469 uint64_t equal_count = 0;
1470 while (i--) {
1471 ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
1472 ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
1473 MPing *m = new MPing();
1474 ASSERT_EQ(conn1->send_message(m), 0);
1475 m = new MPing();
1476 ASSERT_EQ(conn2->send_message(m), 0);
1477 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
1478 if (srv_dispatcher.count == last) {
1479 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
1480 equal = true;
1481 equal_count++;
1482 } else {
1483 equal = false;
1484 equal_count = 0;
1485 }
1486 last = srv_dispatcher.count;
1487 if (equal_count)
1488 usleep(1000*500);
1489 ASSERT_FALSE(equal && equal_count > 3);
1490 }
1491 server_msgr->shutdown();
1492 client_msgr->shutdown();
1493 server_msgr2->shutdown();
1494 server_msgr->wait();
1495 client_msgr->wait();
1496 server_msgr2->wait();
1497 delete server_msgr2;
1498 }
1499
1500 INSTANTIATE_TEST_CASE_P(
1501 Messenger,
1502 MessengerTest,
1503 ::testing::Values(
1504 "async+posix",
1505 "simple"
1506 )
1507 );
1508
1509 #else
1510
1511 // Google Test may not support value-parameterized tests with some
1512 // compilers. If we use conditional compilation to compile out all
1513 // code referring to the gtest_main library, MSVC linker will not link
1514 // that library at all and consequently complain about missing entry
1515 // point defined in that library (fatal error LNK1561: entry point
1516 // must be defined). This dummy test keeps gtest_main linked in.
1517 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
1518
1519 #endif
1520
1521
1522 int main(int argc, char **argv) {
1523 vector<const char*> args;
1524 argv_to_vec(argc, (const char **)argv, args);
1525 env_to_vec(args);
1526
1527 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
1528 g_ceph_context->_conf->set_val("auth_cluster_required", "none");
1529 g_ceph_context->_conf->set_val("auth_service_required", "none");
1530 g_ceph_context->_conf->set_val("auth_client_required", "none");
1531 g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
1532 g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true");
1533 g_ceph_context->_conf->set_val("ms_die_on_old_message", "true");
1534 g_ceph_context->_conf->set_val("ms_max_backoff", "1");
1535 common_init_finish(g_ceph_context);
1536
1537 ::testing::InitGoogleTest(&argc, argv);
1538 return RUN_ALL_TESTS();
1539 }
1540
1541 /*
1542 * Local Variables:
1543 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
1544 * End:
1545 */