]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <atomic> | |
18 | #include <iostream> | |
19 | #include <unistd.h> | |
20 | #include <stdlib.h> | |
21 | #include <time.h> | |
22 | #include "common/Mutex.h" | |
23 | #include "common/Cond.h" | |
24 | #include "common/ceph_argparse.h" | |
25 | #include "global/global_init.h" | |
26 | #include "msg/Dispatcher.h" | |
27 | #include "msg/msg_types.h" | |
28 | #include "msg/Message.h" | |
29 | #include "msg/Messenger.h" | |
30 | #include "msg/Connection.h" | |
31 | #include "messages/MPing.h" | |
32 | #include "messages/MCommand.h" | |
33 | ||
34 | #include <boost/random/mersenne_twister.hpp> | |
35 | #include <boost/random/uniform_int.hpp> | |
36 | #include <boost/random/binomial_distribution.hpp> | |
37 | #include <gtest/gtest.h> | |
38 | ||
39 | typedef boost::mt11213b gen_type; | |
40 | ||
41 | #include "common/dout.h" | |
42 | #include "include/assert.h" | |
43 | ||
44 | #define dout_subsys ceph_subsys_ms | |
45 | #undef dout_prefix | |
46 | #define dout_prefix *_dout << " ceph_test_msgr " | |
47 | ||
48 | ||
49 | #if GTEST_HAS_PARAM_TEST | |
50 | ||
51 | #define CHECK_AND_WAIT_TRUE(expr) do { \ | |
52 | int n = 1000; \ | |
53 | while (--n) { \ | |
54 | if (expr) \ | |
55 | break; \ | |
56 | usleep(1000); \ | |
57 | } \ | |
58 | } while(0); | |
59 | ||
60 | class MessengerTest : public ::testing::TestWithParam<const char*> { | |
61 | public: | |
62 | Messenger *server_msgr; | |
63 | Messenger *client_msgr; | |
64 | ||
65 | MessengerTest(): server_msgr(NULL), client_msgr(NULL) {} | |
66 | void SetUp() override { | |
67 | lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl; | |
68 | server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0); | |
69 | client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0); | |
70 | server_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
71 | client_msgr->set_default_policy(Messenger::Policy::lossy_client(0)); | |
72 | } | |
73 | void TearDown() override { | |
74 | ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0); | |
75 | ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0); | |
76 | delete server_msgr; | |
77 | delete client_msgr; | |
78 | } | |
79 | ||
80 | }; | |
81 | ||
82 | ||
83 | class FakeDispatcher : public Dispatcher { | |
84 | public: | |
85 | struct Session : public RefCountedObject { | |
86 | atomic<uint64_t> count; | |
87 | ConnectionRef con; | |
88 | ||
89 | explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) { | |
90 | } | |
91 | uint64_t get_count() { return count; } | |
92 | }; | |
93 | ||
94 | Mutex lock; | |
95 | Cond cond; | |
96 | bool is_server; | |
97 | bool got_new; | |
98 | bool got_remote_reset; | |
99 | bool got_connect; | |
100 | bool loopback; | |
101 | ||
102 | explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"), | |
103 | is_server(s), got_new(false), got_remote_reset(false), | |
104 | got_connect(false), loopback(false) {} | |
105 | bool ms_can_fast_dispatch_any() const override { return true; } | |
106 | bool ms_can_fast_dispatch(const Message *m) const override { | |
107 | switch (m->get_type()) { | |
108 | case CEPH_MSG_PING: | |
109 | return true; | |
110 | default: | |
111 | return false; | |
112 | } | |
113 | } | |
114 | ||
115 | void ms_handle_fast_connect(Connection *con) override { | |
116 | lock.Lock(); | |
117 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
118 | Session *s = static_cast<Session*>(con->get_priv()); | |
119 | if (!s) { | |
120 | s = new Session(con); | |
121 | con->set_priv(s->get()); | |
122 | lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl; | |
123 | } | |
124 | s->put(); | |
125 | got_connect = true; | |
126 | cond.Signal(); | |
127 | lock.Unlock(); | |
128 | } | |
129 | void ms_handle_fast_accept(Connection *con) override { | |
130 | Session *s = static_cast<Session*>(con->get_priv()); | |
131 | if (!s) { | |
132 | s = new Session(con); | |
133 | con->set_priv(s->get()); | |
134 | } | |
135 | s->put(); | |
136 | } | |
137 | bool ms_dispatch(Message *m) override { | |
138 | Session *s = static_cast<Session*>(m->get_connection()->get_priv()); | |
139 | if (!s) { | |
140 | s = new Session(m->get_connection()); | |
141 | m->get_connection()->set_priv(s->get()); | |
142 | } | |
143 | s->put(); | |
144 | s->count++; | |
145 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
146 | if (is_server) { | |
147 | reply_message(m); | |
148 | } | |
149 | Mutex::Locker l(lock); | |
150 | got_new = true; | |
151 | cond.Signal(); | |
152 | m->put(); | |
153 | return true; | |
154 | } | |
155 | bool ms_handle_reset(Connection *con) override { | |
156 | Mutex::Locker l(lock); | |
157 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
158 | Session *s = static_cast<Session*>(con->get_priv()); | |
159 | if (s) { | |
160 | s->con.reset(NULL); // break con <-> session ref cycle | |
161 | con->set_priv(NULL); // break ref <-> session cycle, if any | |
162 | s->put(); | |
163 | } | |
164 | return true; | |
165 | } | |
166 | void ms_handle_remote_reset(Connection *con) override { | |
167 | Mutex::Locker l(lock); | |
168 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
169 | Session *s = static_cast<Session*>(con->get_priv()); | |
170 | if (s) { | |
171 | s->con.reset(NULL); // break con <-> session ref cycle | |
172 | con->set_priv(NULL); // break ref <-> session cycle, if any | |
173 | s->put(); | |
174 | } | |
175 | got_remote_reset = true; | |
176 | cond.Signal(); | |
177 | } | |
178 | bool ms_handle_refused(Connection *con) override { | |
179 | return false; | |
180 | } | |
181 | void ms_fast_dispatch(Message *m) override { | |
182 | Session *s = static_cast<Session*>(m->get_connection()->get_priv()); | |
183 | if (!s) { | |
184 | s = new Session(m->get_connection()); | |
185 | m->get_connection()->set_priv(s->get()); | |
186 | } | |
187 | s->put(); | |
188 | s->count++; | |
189 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
190 | if (is_server) { | |
191 | if (loopback) | |
192 | assert(m->get_source().is_osd()); | |
193 | else | |
194 | reply_message(m); | |
195 | } else if (loopback) { | |
196 | assert(m->get_source().is_client()); | |
197 | } | |
198 | m->put(); | |
199 | Mutex::Locker l(lock); | |
200 | got_new = true; | |
201 | cond.Signal(); | |
202 | } | |
203 | ||
204 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
205 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
206 | bool& isvalid, CryptoKey& session_key) override { | |
207 | isvalid = true; | |
208 | return true; | |
209 | } | |
210 | ||
211 | void reply_message(Message *m) { | |
212 | MPing *rm = new MPing(); | |
213 | m->get_connection()->send_message(rm); | |
214 | } | |
215 | }; | |
216 | ||
217 | typedef FakeDispatcher::Session Session; | |
218 | ||
219 | TEST_P(MessengerTest, SimpleTest) { | |
220 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
221 | entity_addr_t bind_addr; | |
222 | bind_addr.parse("127.0.0.1"); | |
223 | server_msgr->bind(bind_addr); | |
224 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
225 | server_msgr->start(); | |
226 | ||
227 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
228 | client_msgr->start(); | |
229 | ||
230 | // 1. simple round trip | |
231 | MPing *m = new MPing(); | |
232 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
233 | { | |
234 | ASSERT_EQ(conn->send_message(m), 0); | |
235 | Mutex::Locker l(cli_dispatcher.lock); | |
236 | while (!cli_dispatcher.got_new) | |
237 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
238 | cli_dispatcher.got_new = false; | |
239 | } | |
240 | ASSERT_TRUE(conn->is_connected()); | |
241 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
242 | ASSERT_TRUE(conn->peer_is_osd()); | |
243 | ||
244 | // 2. test rebind port | |
245 | set<int> avoid_ports; | |
246 | for (int i = 0; i < 10 ; i++) | |
247 | avoid_ports.insert(server_msgr->get_myaddr().get_port() + i); | |
248 | server_msgr->rebind(avoid_ports); | |
249 | ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0); | |
250 | ||
251 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
252 | { | |
253 | m = new MPing(); | |
254 | ASSERT_EQ(conn->send_message(m), 0); | |
255 | Mutex::Locker l(cli_dispatcher.lock); | |
256 | while (!cli_dispatcher.got_new) | |
257 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
258 | cli_dispatcher.got_new = false; | |
259 | } | |
260 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
261 | ||
262 | // 3. test markdown connection | |
263 | conn->mark_down(); | |
264 | ASSERT_FALSE(conn->is_connected()); | |
265 | ||
266 | // 4. test failed connection | |
267 | server_msgr->shutdown(); | |
268 | server_msgr->wait(); | |
269 | ||
270 | m = new MPing(); | |
271 | conn->send_message(m); | |
272 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
273 | ASSERT_FALSE(conn->is_connected()); | |
274 | ||
275 | // 5. loopback connection | |
276 | srv_dispatcher.loopback = true; | |
277 | conn = client_msgr->get_loopback_connection(); | |
278 | { | |
279 | m = new MPing(); | |
280 | ASSERT_EQ(conn->send_message(m), 0); | |
281 | Mutex::Locker l(cli_dispatcher.lock); | |
282 | while (!cli_dispatcher.got_new) | |
283 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
284 | cli_dispatcher.got_new = false; | |
285 | } | |
286 | srv_dispatcher.loopback = false; | |
287 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
288 | client_msgr->shutdown(); | |
289 | client_msgr->wait(); | |
290 | server_msgr->shutdown(); | |
291 | server_msgr->wait(); | |
292 | } | |
293 | ||
294 | TEST_P(MessengerTest, NameAddrTest) { | |
295 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
296 | entity_addr_t bind_addr; | |
297 | bind_addr.parse("127.0.0.1"); | |
298 | server_msgr->bind(bind_addr); | |
299 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
300 | server_msgr->start(); | |
301 | ||
302 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
303 | client_msgr->start(); | |
304 | ||
305 | MPing *m = new MPing(); | |
306 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
307 | { | |
308 | ASSERT_EQ(conn->send_message(m), 0); | |
309 | Mutex::Locker l(cli_dispatcher.lock); | |
310 | while (!cli_dispatcher.got_new) | |
311 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
312 | cli_dispatcher.got_new = false; | |
313 | } | |
314 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
315 | ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr()); | |
316 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
317 | // Make should server_conn is the one we already accepted from client, | |
318 | // so it means client_msgr has the same addr when server connection has | |
319 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
320 | server_msgr->shutdown(); | |
321 | client_msgr->shutdown(); | |
322 | server_msgr->wait(); | |
323 | client_msgr->wait(); | |
324 | } | |
325 | ||
326 | TEST_P(MessengerTest, FeatureTest) { | |
327 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
328 | entity_addr_t bind_addr; | |
329 | bind_addr.parse("127.0.0.1"); | |
330 | uint64_t all_feature_supported, feature_required, feature_supported = 0; | |
331 | for (int i = 0; i < 10; i++) | |
332 | feature_supported |= 1ULL << i; | |
333 | feature_required = feature_supported | 1ULL << 13; | |
334 | all_feature_supported = feature_required | 1ULL << 14; | |
335 | ||
336 | Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT); | |
337 | p.features_required = feature_required; | |
338 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
339 | server_msgr->bind(bind_addr); | |
340 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
341 | server_msgr->start(); | |
342 | ||
343 | // 1. Suppose if only support less than required | |
344 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
345 | p.features_supported = feature_supported; | |
346 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
347 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
348 | client_msgr->start(); | |
349 | ||
350 | MPing *m = new MPing(); | |
351 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
352 | conn->send_message(m); | |
353 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
354 | // should failed build a connection | |
355 | ASSERT_FALSE(conn->is_connected()); | |
356 | ||
357 | client_msgr->shutdown(); | |
358 | client_msgr->wait(); | |
359 | ||
360 | // 2. supported met required | |
361 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
362 | p.features_supported = all_feature_supported; | |
363 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
364 | client_msgr->start(); | |
365 | ||
366 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
367 | { | |
368 | m = new MPing(); | |
369 | ASSERT_EQ(conn->send_message(m), 0); | |
370 | Mutex::Locker l(cli_dispatcher.lock); | |
371 | while (!cli_dispatcher.got_new) | |
372 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
373 | cli_dispatcher.got_new = false; | |
374 | } | |
375 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
376 | ||
377 | server_msgr->shutdown(); | |
378 | client_msgr->shutdown(); | |
379 | server_msgr->wait(); | |
380 | client_msgr->wait(); | |
381 | } | |
382 | ||
383 | TEST_P(MessengerTest, TimeoutTest) { | |
384 | g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1"); | |
385 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
386 | entity_addr_t bind_addr; | |
387 | bind_addr.parse("127.0.0.1"); | |
388 | server_msgr->bind(bind_addr); | |
389 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
390 | server_msgr->start(); | |
391 | ||
392 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
393 | client_msgr->start(); | |
394 | ||
395 | // 1. build the connection | |
396 | MPing *m = new MPing(); | |
397 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
398 | { | |
399 | ASSERT_EQ(conn->send_message(m), 0); | |
400 | Mutex::Locker l(cli_dispatcher.lock); | |
401 | while (!cli_dispatcher.got_new) | |
402 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
403 | cli_dispatcher.got_new = false; | |
404 | } | |
405 | ASSERT_TRUE(conn->is_connected()); | |
406 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
407 | ASSERT_TRUE(conn->peer_is_osd()); | |
408 | ||
409 | // 2. wait for idle | |
410 | usleep(2500*1000); | |
411 | ASSERT_FALSE(conn->is_connected()); | |
412 | ||
413 | server_msgr->shutdown(); | |
414 | server_msgr->wait(); | |
415 | ||
416 | client_msgr->shutdown(); | |
417 | client_msgr->wait(); | |
418 | g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900"); | |
419 | } | |
420 | ||
421 | TEST_P(MessengerTest, StatefulTest) { | |
422 | Message *m; | |
423 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
424 | entity_addr_t bind_addr; | |
425 | bind_addr.parse("127.0.0.1"); | |
426 | Messenger::Policy p = Messenger::Policy::stateful_server(0); | |
427 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
428 | p = Messenger::Policy::lossless_client(0); | |
429 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
430 | ||
431 | server_msgr->bind(bind_addr); | |
432 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
433 | server_msgr->start(); | |
434 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
435 | client_msgr->start(); | |
436 | ||
437 | // 1. test for server standby | |
438 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
439 | { | |
440 | m = new MPing(); | |
441 | ASSERT_EQ(conn->send_message(m), 0); | |
442 | Mutex::Locker l(cli_dispatcher.lock); | |
443 | while (!cli_dispatcher.got_new) | |
444 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
445 | cli_dispatcher.got_new = false; | |
446 | } | |
447 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
448 | conn->mark_down(); | |
449 | ASSERT_FALSE(conn->is_connected()); | |
450 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
451 | // don't lose state | |
452 | ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1); | |
453 | ||
454 | srv_dispatcher.got_new = false; | |
455 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
456 | { | |
457 | m = new MPing(); | |
458 | ASSERT_EQ(conn->send_message(m), 0); | |
459 | Mutex::Locker l(cli_dispatcher.lock); | |
460 | while (!cli_dispatcher.got_new) | |
461 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
462 | cli_dispatcher.got_new = false; | |
463 | } | |
464 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
465 | server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
466 | { | |
467 | Mutex::Locker l(srv_dispatcher.lock); | |
468 | while (!srv_dispatcher.got_remote_reset) | |
469 | srv_dispatcher.cond.Wait(srv_dispatcher.lock); | |
470 | } | |
471 | ||
472 | // 2. test for client reconnect | |
473 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); | |
474 | cli_dispatcher.got_connect = false; | |
475 | cli_dispatcher.got_new = false; | |
476 | cli_dispatcher.got_remote_reset = false; | |
477 | server_conn->mark_down(); | |
478 | ASSERT_FALSE(server_conn->is_connected()); | |
479 | // ensure client detect server socket closed | |
480 | { | |
481 | Mutex::Locker l(cli_dispatcher.lock); | |
482 | while (!cli_dispatcher.got_remote_reset) | |
483 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
484 | cli_dispatcher.got_remote_reset = false; | |
485 | } | |
486 | { | |
487 | Mutex::Locker l(cli_dispatcher.lock); | |
488 | while (!cli_dispatcher.got_connect) | |
489 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
490 | cli_dispatcher.got_connect = false; | |
491 | } | |
492 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
493 | ASSERT_TRUE(conn->is_connected()); | |
494 | ||
495 | { | |
496 | m = new MPing(); | |
497 | ASSERT_EQ(conn->send_message(m), 0); | |
498 | ASSERT_TRUE(conn->is_connected()); | |
499 | Mutex::Locker l(cli_dispatcher.lock); | |
500 | while (!cli_dispatcher.got_new) | |
501 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
502 | cli_dispatcher.got_new = false; | |
503 | } | |
504 | // resetcheck happen | |
505 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count()); | |
506 | server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
507 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count()); | |
508 | cli_dispatcher.got_remote_reset = false; | |
509 | ||
510 | server_msgr->shutdown(); | |
511 | client_msgr->shutdown(); | |
512 | server_msgr->wait(); | |
513 | client_msgr->wait(); | |
514 | } | |
515 | ||
516 | TEST_P(MessengerTest, StatelessTest) { | |
517 | Message *m; | |
518 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
519 | entity_addr_t bind_addr; | |
520 | bind_addr.parse("127.0.0.1"); | |
521 | Messenger::Policy p = Messenger::Policy::stateless_server(0); | |
522 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
523 | p = Messenger::Policy::lossy_client(0); | |
524 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
525 | ||
526 | server_msgr->bind(bind_addr); | |
527 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
528 | server_msgr->start(); | |
529 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
530 | client_msgr->start(); | |
531 | ||
532 | // 1. test for server lose state | |
533 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
534 | { | |
535 | m = new MPing(); | |
536 | ASSERT_EQ(conn->send_message(m), 0); | |
537 | Mutex::Locker l(cli_dispatcher.lock); | |
538 | while (!cli_dispatcher.got_new) | |
539 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
540 | cli_dispatcher.got_new = false; | |
541 | } | |
542 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
543 | conn->mark_down(); | |
544 | ASSERT_FALSE(conn->is_connected()); | |
545 | ||
546 | srv_dispatcher.got_new = false; | |
547 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
548 | { | |
549 | m = new MPing(); | |
550 | ASSERT_EQ(conn->send_message(m), 0); | |
551 | Mutex::Locker l(cli_dispatcher.lock); | |
552 | while (!cli_dispatcher.got_new) | |
553 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
554 | cli_dispatcher.got_new = false; | |
555 | } | |
556 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
557 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
558 | // server lose state | |
559 | { | |
560 | Mutex::Locker l(srv_dispatcher.lock); | |
561 | while (!srv_dispatcher.got_new) | |
562 | srv_dispatcher.cond.Wait(srv_dispatcher.lock); | |
563 | } | |
564 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count()); | |
565 | ||
566 | // 2. test for client lossy | |
567 | server_conn->mark_down(); | |
568 | ASSERT_FALSE(server_conn->is_connected()); | |
569 | conn->send_keepalive(); | |
570 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
571 | ASSERT_FALSE(conn->is_connected()); | |
572 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
573 | { | |
574 | m = new MPing(); | |
575 | ASSERT_EQ(conn->send_message(m), 0); | |
576 | Mutex::Locker l(cli_dispatcher.lock); | |
577 | while (!cli_dispatcher.got_new) | |
578 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
579 | cli_dispatcher.got_new = false; | |
580 | } | |
581 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
582 | ||
583 | server_msgr->shutdown(); | |
584 | client_msgr->shutdown(); | |
585 | server_msgr->wait(); | |
586 | client_msgr->wait(); | |
587 | } | |
588 | ||
589 | TEST_P(MessengerTest, ClientStandbyTest) { | |
590 | Message *m; | |
591 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
592 | entity_addr_t bind_addr; | |
593 | bind_addr.parse("127.0.0.1"); | |
594 | Messenger::Policy p = Messenger::Policy::stateful_server(0); | |
595 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
596 | p = Messenger::Policy::lossless_peer(0); | |
597 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
598 | ||
599 | server_msgr->bind(bind_addr); | |
600 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
601 | server_msgr->start(); | |
602 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
603 | client_msgr->start(); | |
604 | ||
605 | // 1. test for client standby, resetcheck | |
606 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
607 | { | |
608 | m = new MPing(); | |
609 | ASSERT_EQ(conn->send_message(m), 0); | |
610 | Mutex::Locker l(cli_dispatcher.lock); | |
611 | while (!cli_dispatcher.got_new) | |
612 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
613 | cli_dispatcher.got_new = false; | |
614 | } | |
615 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
616 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
617 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); | |
618 | cli_dispatcher.got_connect = false; | |
619 | server_conn->mark_down(); | |
620 | ASSERT_FALSE(server_conn->is_connected()); | |
621 | // client should be standby | |
622 | usleep(300*1000); | |
623 | // client should be standby, so we use original connection | |
624 | { | |
625 | // Try send message to verify got remote reset callback | |
626 | m = new MPing(); | |
627 | ASSERT_EQ(conn->send_message(m), 0); | |
628 | { | |
629 | Mutex::Locker l(cli_dispatcher.lock); | |
630 | while (!cli_dispatcher.got_remote_reset) | |
631 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
632 | cli_dispatcher.got_remote_reset = false; | |
633 | while (!cli_dispatcher.got_connect) | |
634 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
635 | cli_dispatcher.got_connect = false; | |
636 | } | |
637 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
638 | ASSERT_TRUE(conn->is_connected()); | |
639 | m = new MPing(); | |
640 | ASSERT_EQ(conn->send_message(m), 0); | |
641 | Mutex::Locker l(cli_dispatcher.lock); | |
642 | while (!cli_dispatcher.got_new) | |
643 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
644 | cli_dispatcher.got_new = false; | |
645 | } | |
646 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
647 | server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
648 | ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1); | |
649 | ||
650 | server_msgr->shutdown(); | |
651 | client_msgr->shutdown(); | |
652 | server_msgr->wait(); | |
653 | client_msgr->wait(); | |
654 | } | |
655 | ||
656 | TEST_P(MessengerTest, AuthTest) { | |
657 | g_ceph_context->_conf->set_val("auth_cluster_required", "cephx"); | |
658 | g_ceph_context->_conf->set_val("auth_service_required", "cephx"); | |
659 | g_ceph_context->_conf->set_val("auth_client_required", "cephx"); | |
660 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
661 | entity_addr_t bind_addr; | |
662 | bind_addr.parse("127.0.0.1"); | |
663 | server_msgr->bind(bind_addr); | |
664 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
665 | server_msgr->start(); | |
666 | ||
667 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
668 | client_msgr->start(); | |
669 | ||
670 | // 1. simple auth round trip | |
671 | MPing *m = new MPing(); | |
672 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
673 | { | |
674 | ASSERT_EQ(conn->send_message(m), 0); | |
675 | Mutex::Locker l(cli_dispatcher.lock); | |
676 | while (!cli_dispatcher.got_new) | |
677 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
678 | cli_dispatcher.got_new = false; | |
679 | } | |
680 | ASSERT_TRUE(conn->is_connected()); | |
681 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
682 | ||
683 | // 2. mix auth | |
684 | g_ceph_context->_conf->set_val("auth_cluster_required", "none"); | |
685 | g_ceph_context->_conf->set_val("auth_service_required", "none"); | |
686 | g_ceph_context->_conf->set_val("auth_client_required", "none"); | |
687 | conn->mark_down(); | |
688 | ASSERT_FALSE(conn->is_connected()); | |
689 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
690 | { | |
691 | MPing *m = new MPing(); | |
692 | ASSERT_EQ(conn->send_message(m), 0); | |
693 | Mutex::Locker l(cli_dispatcher.lock); | |
694 | while (!cli_dispatcher.got_new) | |
695 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
696 | cli_dispatcher.got_new = false; | |
697 | } | |
698 | ASSERT_TRUE(conn->is_connected()); | |
699 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
700 | ||
701 | server_msgr->shutdown(); | |
702 | client_msgr->shutdown(); | |
703 | server_msgr->wait(); | |
704 | client_msgr->wait(); | |
705 | } | |
706 | ||
707 | TEST_P(MessengerTest, MessageTest) { | |
708 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
709 | entity_addr_t bind_addr; | |
710 | bind_addr.parse("127.0.0.1"); | |
711 | Messenger::Policy p = Messenger::Policy::stateful_server(0); | |
712 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
713 | p = Messenger::Policy::lossless_peer(0); | |
714 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
715 | ||
716 | server_msgr->bind(bind_addr); | |
717 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
718 | server_msgr->start(); | |
719 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
720 | client_msgr->start(); | |
721 | ||
722 | ||
723 | // 1. A very large "front"(as well as "payload") | |
724 | // Because a external message need to invade Messenger::decode_message, | |
725 | // here we only use existing message class(MCommand) | |
726 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
727 | { | |
728 | uuid_d uuid; | |
729 | uuid.generate_random(); | |
730 | vector<string> cmds; | |
731 | string s("abcdefghijklmnopqrstuvwxyz"); | |
732 | for (int i = 0; i < 1024*30; i++) | |
733 | cmds.push_back(s); | |
734 | MCommand *m = new MCommand(uuid); | |
735 | m->cmd = cmds; | |
736 | conn->send_message(m); | |
737 | utime_t t; | |
738 | t += 1000*1000*500; | |
739 | Mutex::Locker l(cli_dispatcher.lock); | |
740 | while (!cli_dispatcher.got_new) | |
741 | cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t); | |
742 | ASSERT_TRUE(cli_dispatcher.got_new); | |
743 | cli_dispatcher.got_new = false; | |
744 | } | |
745 | ||
746 | // 2. A very large "data" | |
747 | { | |
748 | bufferlist bl; | |
749 | string s("abcdefghijklmnopqrstuvwxyz"); | |
750 | for (int i = 0; i < 1024*30; i++) | |
751 | bl.append(s); | |
752 | MPing *m = new MPing(); | |
753 | m->set_data(bl); | |
754 | conn->send_message(m); | |
755 | utime_t t; | |
756 | t += 1000*1000*500; | |
757 | Mutex::Locker l(cli_dispatcher.lock); | |
758 | while (!cli_dispatcher.got_new) | |
759 | cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t); | |
760 | ASSERT_TRUE(cli_dispatcher.got_new); | |
761 | cli_dispatcher.got_new = false; | |
762 | } | |
763 | server_msgr->shutdown(); | |
764 | client_msgr->shutdown(); | |
765 | server_msgr->wait(); | |
766 | client_msgr->wait(); | |
767 | } | |
768 | ||
769 | ||
770 | class SyntheticWorkload; | |
771 | ||
772 | struct Payload { | |
773 | enum Who : uint8_t { | |
774 | PING = 0, | |
775 | PONG = 1, | |
776 | }; | |
777 | uint8_t who; | |
778 | uint64_t seq; | |
779 | bufferlist data; | |
780 | ||
781 | Payload(Who who, uint64_t seq, const bufferlist& data) | |
782 | : who(who), seq(seq), data(data) | |
783 | {} | |
784 | Payload() = default; | |
785 | DENC(Payload, v, p) { | |
786 | DENC_START(1, 1, p); | |
787 | denc(v.who, p); | |
788 | denc(v.seq, p); | |
789 | denc(v.data, p); | |
790 | DENC_FINISH(p); | |
791 | } | |
792 | }; | |
793 | WRITE_CLASS_DENC(Payload) | |
794 | ||
795 | ostream& operator<<(ostream& out, const Payload &pl) | |
796 | { | |
797 | return out << "reply=" << pl.who << " i = " << pl.seq; | |
798 | } | |
799 | ||
800 | class SyntheticDispatcher : public Dispatcher { | |
801 | public: | |
802 | Mutex lock; | |
803 | Cond cond; | |
804 | bool is_server; | |
805 | bool got_new; | |
806 | bool got_remote_reset; | |
807 | bool got_connect; | |
808 | map<ConnectionRef, list<uint64_t> > conn_sent; | |
809 | map<uint64_t, bufferlist> sent; | |
810 | atomic<uint64_t> index; | |
811 | SyntheticWorkload *workload; | |
812 | ||
813 | SyntheticDispatcher(bool s, SyntheticWorkload *wl): | |
814 | Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false), | |
815 | got_remote_reset(false), got_connect(false), index(0), workload(wl) {} | |
816 | bool ms_can_fast_dispatch_any() const override { return true; } | |
817 | bool ms_can_fast_dispatch(const Message *m) const override { | |
818 | switch (m->get_type()) { | |
819 | case CEPH_MSG_PING: | |
820 | case MSG_COMMAND: | |
821 | return true; | |
822 | default: | |
823 | return false; | |
824 | } | |
825 | } | |
826 | ||
827 | void ms_handle_fast_connect(Connection *con) override { | |
828 | Mutex::Locker l(lock); | |
829 | list<uint64_t> c = conn_sent[con]; | |
830 | for (list<uint64_t>::iterator it = c.begin(); | |
831 | it != c.end(); ++it) | |
832 | sent.erase(*it); | |
833 | conn_sent.erase(con); | |
834 | got_connect = true; | |
835 | cond.Signal(); | |
836 | } | |
837 | void ms_handle_fast_accept(Connection *con) override { | |
838 | Mutex::Locker l(lock); | |
839 | list<uint64_t> c = conn_sent[con]; | |
840 | for (list<uint64_t>::iterator it = c.begin(); | |
841 | it != c.end(); ++it) | |
842 | sent.erase(*it); | |
843 | conn_sent.erase(con); | |
844 | cond.Signal(); | |
845 | } | |
846 | bool ms_dispatch(Message *m) override { | |
847 | ceph_abort(); | |
848 | } | |
849 | bool ms_handle_reset(Connection *con) override; | |
850 | void ms_handle_remote_reset(Connection *con) override { | |
851 | Mutex::Locker l(lock); | |
852 | list<uint64_t> c = conn_sent[con]; | |
853 | for (list<uint64_t>::iterator it = c.begin(); | |
854 | it != c.end(); ++it) | |
855 | sent.erase(*it); | |
856 | conn_sent.erase(con); | |
857 | got_remote_reset = true; | |
858 | } | |
859 | bool ms_handle_refused(Connection *con) override { | |
860 | return false; | |
861 | } | |
862 | void ms_fast_dispatch(Message *m) override { | |
863 | // MSG_COMMAND is used to disorganize regular message flow | |
864 | if (m->get_type() == MSG_COMMAND) { | |
865 | m->put(); | |
866 | return ; | |
867 | } | |
868 | ||
869 | Payload pl; | |
870 | auto p = m->get_data().begin(); | |
871 | ::decode(pl, p); | |
872 | if (pl.who == Payload::PING) { | |
873 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
874 | reply_message(m, pl); | |
875 | m->put(); | |
876 | Mutex::Locker l(lock); | |
877 | got_new = true; | |
878 | cond.Signal(); | |
879 | } else { | |
880 | Mutex::Locker l(lock); | |
881 | if (sent.count(pl.seq)) { | |
882 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
883 | ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); | |
884 | ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq])); | |
885 | conn_sent[m->get_connection()].pop_front(); | |
886 | sent.erase(pl.seq); | |
887 | } | |
888 | m->put(); | |
889 | got_new = true; | |
890 | cond.Signal(); | |
891 | } | |
892 | } | |
893 | ||
894 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
895 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
896 | bool& isvalid, CryptoKey& session_key) override { | |
897 | isvalid = true; | |
898 | return true; | |
899 | } | |
900 | ||
901 | void reply_message(const Message *m, Payload& pl) { | |
902 | pl.who = Payload::PONG; | |
903 | bufferlist bl; | |
904 | ::encode(pl, bl); | |
905 | MPing *rm = new MPing(); | |
906 | rm->set_data(bl); | |
907 | m->get_connection()->send_message(rm); | |
908 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl; | |
909 | } | |
910 | ||
911 | void send_message_wrap(ConnectionRef con, const bufferlist& data) { | |
912 | Message *m = new MPing(); | |
913 | Payload pl{Payload::PING, index++, data}; | |
914 | bufferlist bl; | |
915 | ::encode(pl, bl); | |
916 | m->set_data(bl); | |
917 | if (!con->get_messenger()->get_default_policy().lossy) { | |
918 | Mutex::Locker l(lock); | |
919 | sent[pl.seq] = pl.data; | |
920 | conn_sent[con].push_back(pl.seq); | |
921 | } | |
922 | lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl; | |
923 | ASSERT_EQ(0, con->send_message(m)); | |
924 | } | |
925 | ||
926 | uint64_t get_pending() { | |
927 | Mutex::Locker l(lock); | |
928 | return sent.size(); | |
929 | } | |
930 | ||
931 | void clear_pending(ConnectionRef con) { | |
932 | Mutex::Locker l(lock); | |
933 | ||
934 | for (list<uint64_t>::iterator it = conn_sent[con].begin(); | |
935 | it != conn_sent[con].end(); ++it) | |
936 | sent.erase(*it); | |
937 | conn_sent.erase(con); | |
938 | } | |
939 | ||
940 | void print() { | |
941 | for (auto && p : conn_sent) { | |
942 | if (!p.second.empty()) { | |
943 | lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl; | |
944 | } | |
945 | } | |
946 | } | |
947 | }; | |
948 | ||
949 | ||
950 | class SyntheticWorkload { | |
951 | Mutex lock; | |
952 | Cond cond; | |
953 | set<Messenger*> available_servers; | |
954 | set<Messenger*> available_clients; | |
955 | map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections; | |
956 | SyntheticDispatcher dispatcher; | |
957 | gen_type rng; | |
958 | vector<bufferlist> rand_data; | |
959 | ||
960 | public: | |
961 | static const unsigned max_in_flight = 64; | |
962 | static const unsigned max_connections = 128; | |
963 | static const unsigned max_message_len = 1024 * 1024 * 4; | |
964 | ||
965 | SyntheticWorkload(int servers, int clients, string type, int random_num, | |
966 | Messenger::Policy srv_policy, Messenger::Policy cli_policy): | |
967 | lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) { | |
968 | Messenger *msgr; | |
969 | int base_port = 16800; | |
970 | entity_addr_t bind_addr; | |
971 | char addr[64]; | |
972 | for (int i = 0; i < servers; ++i) { | |
973 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), | |
974 | "server", getpid()+i, 0); | |
975 | snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i); | |
976 | bind_addr.parse(addr); | |
977 | msgr->bind(bind_addr); | |
978 | msgr->add_dispatcher_head(&dispatcher); | |
979 | ||
980 | assert(msgr); | |
981 | msgr->set_default_policy(srv_policy); | |
982 | available_servers.insert(msgr); | |
983 | msgr->start(); | |
984 | } | |
985 | ||
986 | for (int i = 0; i < clients; ++i) { | |
987 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), | |
988 | "client", getpid()+i+servers, 0); | |
989 | if (cli_policy.standby) { | |
990 | snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers); | |
991 | bind_addr.parse(addr); | |
992 | msgr->bind(bind_addr); | |
993 | } | |
994 | msgr->add_dispatcher_head(&dispatcher); | |
995 | ||
996 | assert(msgr); | |
997 | msgr->set_default_policy(cli_policy); | |
998 | available_clients.insert(msgr); | |
999 | msgr->start(); | |
1000 | } | |
1001 | ||
1002 | for (int i = 0; i < random_num; i++) { | |
1003 | bufferlist bl; | |
1004 | boost::uniform_int<> u(32, max_message_len); | |
1005 | uint64_t value_len = u(rng); | |
1006 | bufferptr bp(value_len); | |
1007 | bp.zero(); | |
1008 | for (uint64_t j = 0; j < value_len-sizeof(i); ) { | |
1009 | memcpy(bp.c_str()+j, &i, sizeof(i)); | |
1010 | j += 4096; | |
1011 | } | |
1012 | ||
1013 | bl.append(bp); | |
1014 | rand_data.push_back(bl); | |
1015 | } | |
1016 | } | |
1017 | ||
1018 | ConnectionRef _get_random_connection() { | |
1019 | while (dispatcher.get_pending() > max_in_flight) { | |
1020 | lock.Unlock(); | |
1021 | usleep(500); | |
1022 | lock.Lock(); | |
1023 | } | |
1024 | assert(lock.is_locked()); | |
1025 | boost::uniform_int<> choose(0, available_connections.size() - 1); | |
1026 | int index = choose(rng); | |
1027 | map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin(); | |
1028 | for (; index > 0; --index, ++i) ; | |
1029 | return i->first; | |
1030 | } | |
1031 | ||
1032 | bool can_create_connection() { | |
1033 | return available_connections.size() < max_connections; | |
1034 | } | |
1035 | ||
1036 | void generate_connection() { | |
1037 | Mutex::Locker l(lock); | |
1038 | if (!can_create_connection()) | |
1039 | return ; | |
1040 | ||
1041 | Messenger *server, *client; | |
1042 | { | |
1043 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1044 | int index = choose(rng); | |
1045 | set<Messenger*>::iterator i = available_servers.begin(); | |
1046 | for (; index > 0; --index, ++i) ; | |
1047 | server = *i; | |
1048 | } | |
1049 | { | |
1050 | boost::uniform_int<> choose(0, available_clients.size() - 1); | |
1051 | int index = choose(rng); | |
1052 | set<Messenger*>::iterator i = available_clients.begin(); | |
1053 | for (; index > 0; --index, ++i) ; | |
1054 | client = *i; | |
1055 | } | |
1056 | ||
1057 | pair<Messenger*, Messenger*> p; | |
1058 | { | |
1059 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1060 | if (server->get_default_policy().server) { | |
1061 | p = make_pair(client, server); | |
1062 | } else { | |
1063 | ConnectionRef conn = client->get_connection(server->get_myinst()); | |
1064 | if (available_connections.count(conn) || choose(rng) % 2) | |
1065 | p = make_pair(client, server); | |
1066 | else | |
1067 | p = make_pair(server, client); | |
1068 | } | |
1069 | } | |
1070 | ConnectionRef conn = p.first->get_connection(p.second->get_myinst()); | |
1071 | available_connections[conn] = p; | |
1072 | } | |
1073 | ||
1074 | void send_message() { | |
1075 | Mutex::Locker l(lock); | |
1076 | ConnectionRef conn = _get_random_connection(); | |
1077 | boost::uniform_int<> true_false(0, 99); | |
1078 | int val = true_false(rng); | |
1079 | if (val >= 95) { | |
1080 | uuid_d uuid; | |
1081 | uuid.generate_random(); | |
1082 | MCommand *m = new MCommand(uuid); | |
1083 | vector<string> cmds; | |
1084 | cmds.push_back("command"); | |
1085 | m->cmd = cmds; | |
1086 | m->set_priority(200); | |
1087 | conn->send_message(m); | |
1088 | } else { | |
1089 | boost::uniform_int<> u(0, rand_data.size()-1); | |
1090 | dispatcher.send_message_wrap(conn, rand_data[u(rng)]); | |
1091 | } | |
1092 | } | |
1093 | ||
1094 | void drop_connection() { | |
1095 | Mutex::Locker l(lock); | |
1096 | if (available_connections.size() < 10) | |
1097 | return; | |
1098 | ConnectionRef conn = _get_random_connection(); | |
1099 | dispatcher.clear_pending(conn); | |
1100 | conn->mark_down(); | |
1101 | pair<Messenger*, Messenger*> &p = available_connections[conn]; | |
1102 | // it's a lossless policy, so we need to mark down each side | |
1103 | if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) { | |
1104 | ASSERT_EQ(conn->get_messenger(), p.first); | |
1105 | ConnectionRef peer = p.second->get_connection(p.first->get_myinst()); | |
1106 | peer->mark_down(); | |
1107 | dispatcher.clear_pending(peer); | |
1108 | available_connections.erase(peer); | |
1109 | } | |
1110 | ASSERT_EQ(available_connections.erase(conn), 1U); | |
1111 | } | |
1112 | ||
1113 | void print_internal_state(bool detail=false) { | |
1114 | Mutex::Locker l(lock); | |
1115 | lderr(g_ceph_context) << "available_connections: " << available_connections.size() | |
1116 | << " inflight messages: " << dispatcher.get_pending() << dendl; | |
1117 | if (detail && !available_connections.empty()) { | |
1118 | dispatcher.print(); | |
1119 | } | |
1120 | } | |
1121 | ||
1122 | void wait_for_done() { | |
1123 | int64_t tick_us = 1000 * 100; // 100ms | |
1124 | int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins | |
1125 | int i = 0; | |
1126 | while (dispatcher.get_pending()) { | |
1127 | usleep(tick_us); | |
1128 | timeout_us -= tick_us; | |
1129 | if (i++ % 50 == 0) | |
1130 | print_internal_state(true); | |
1131 | if (timeout_us < 0) | |
1132 | assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!"); | |
1133 | } | |
1134 | for (set<Messenger*>::iterator it = available_servers.begin(); | |
1135 | it != available_servers.end(); ++it) { | |
1136 | (*it)->shutdown(); | |
1137 | (*it)->wait(); | |
1138 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1139 | delete (*it); | |
1140 | } | |
1141 | available_servers.clear(); | |
1142 | ||
1143 | for (set<Messenger*>::iterator it = available_clients.begin(); | |
1144 | it != available_clients.end(); ++it) { | |
1145 | (*it)->shutdown(); | |
1146 | (*it)->wait(); | |
1147 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1148 | delete (*it); | |
1149 | } | |
1150 | available_clients.clear(); | |
1151 | } | |
1152 | ||
1153 | void handle_reset(Connection *con) { | |
1154 | Mutex::Locker l(lock); | |
1155 | available_connections.erase(con); | |
1156 | dispatcher.clear_pending(con); | |
1157 | } | |
1158 | }; | |
1159 | ||
1160 | bool SyntheticDispatcher::ms_handle_reset(Connection *con) { | |
1161 | workload->handle_reset(con); | |
1162 | return true; | |
1163 | } | |
1164 | ||
1165 | TEST_P(MessengerTest, SyntheticStressTest) { | |
1166 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, | |
1167 | Messenger::Policy::stateful_server(0), | |
1168 | Messenger::Policy::lossless_client(0)); | |
1169 | for (int i = 0; i < 100; ++i) { | |
1170 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1171 | test_msg.generate_connection(); | |
1172 | } | |
1173 | gen_type rng(time(NULL)); | |
1174 | for (int i = 0; i < 5000; ++i) { | |
1175 | if (!(i % 10)) { | |
1176 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1177 | test_msg.print_internal_state(); | |
1178 | } | |
1179 | boost::uniform_int<> true_false(0, 99); | |
1180 | int val = true_false(rng); | |
1181 | if (val > 90) { | |
1182 | test_msg.generate_connection(); | |
1183 | } else if (val > 80) { | |
1184 | test_msg.drop_connection(); | |
1185 | } else if (val > 10) { | |
1186 | test_msg.send_message(); | |
1187 | } else { | |
1188 | usleep(rand() % 1000 + 500); | |
1189 | } | |
1190 | } | |
1191 | test_msg.wait_for_done(); | |
1192 | } | |
1193 | ||
1194 | TEST_P(MessengerTest, SyntheticStressTest1) { | |
1195 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, | |
1196 | Messenger::Policy::lossless_peer_reuse(0), | |
1197 | Messenger::Policy::lossless_peer_reuse(0)); | |
1198 | for (int i = 0; i < 10; ++i) { | |
1199 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1200 | test_msg.generate_connection(); | |
1201 | } | |
1202 | gen_type rng(time(NULL)); | |
1203 | for (int i = 0; i < 10000; ++i) { | |
1204 | if (!(i % 10)) { | |
1205 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1206 | test_msg.print_internal_state(); | |
1207 | } | |
1208 | boost::uniform_int<> true_false(0, 99); | |
1209 | int val = true_false(rng); | |
1210 | if (val > 80) { | |
1211 | test_msg.generate_connection(); | |
1212 | } else if (val > 60) { | |
1213 | test_msg.drop_connection(); | |
1214 | } else if (val > 10) { | |
1215 | test_msg.send_message(); | |
1216 | } else { | |
1217 | usleep(rand() % 1000 + 500); | |
1218 | } | |
1219 | } | |
1220 | test_msg.wait_for_done(); | |
1221 | } | |
1222 | ||
1223 | ||
1224 | TEST_P(MessengerTest, SyntheticInjectTest) { | |
1225 | uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes; | |
1226 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); | |
1227 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1228 | g_ceph_context->_conf->set_val("ms_dispatch_throttle_bytes", "16777216"); | |
1229 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, | |
1230 | Messenger::Policy::stateful_server(0), | |
1231 | Messenger::Policy::lossless_client(0)); | |
1232 | for (int i = 0; i < 100; ++i) { | |
1233 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1234 | test_msg.generate_connection(); | |
1235 | } | |
1236 | gen_type rng(time(NULL)); | |
1237 | for (int i = 0; i < 1000; ++i) { | |
1238 | if (!(i % 10)) { | |
1239 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1240 | test_msg.print_internal_state(); | |
1241 | } | |
1242 | boost::uniform_int<> true_false(0, 99); | |
1243 | int val = true_false(rng); | |
1244 | if (val > 90) { | |
1245 | test_msg.generate_connection(); | |
1246 | } else if (val > 80) { | |
1247 | test_msg.drop_connection(); | |
1248 | } else if (val > 10) { | |
1249 | test_msg.send_message(); | |
1250 | } else { | |
1251 | usleep(rand() % 500 + 100); | |
1252 | } | |
1253 | } | |
1254 | test_msg.wait_for_done(); | |
1255 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1256 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1257 | g_ceph_context->_conf->set_val( | |
1258 | "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes)); | |
1259 | } | |
1260 | ||
1261 | TEST_P(MessengerTest, SyntheticInjectTest2) { | |
1262 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); | |
1263 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1264 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, | |
1265 | Messenger::Policy::lossless_peer_reuse(0), | |
1266 | Messenger::Policy::lossless_peer_reuse(0)); | |
1267 | for (int i = 0; i < 100; ++i) { | |
1268 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1269 | test_msg.generate_connection(); | |
1270 | } | |
1271 | gen_type rng(time(NULL)); | |
1272 | for (int i = 0; i < 1000; ++i) { | |
1273 | if (!(i % 10)) { | |
1274 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1275 | test_msg.print_internal_state(); | |
1276 | } | |
1277 | boost::uniform_int<> true_false(0, 99); | |
1278 | int val = true_false(rng); | |
1279 | if (val > 90) { | |
1280 | test_msg.generate_connection(); | |
1281 | } else if (val > 80) { | |
1282 | test_msg.drop_connection(); | |
1283 | } else if (val > 10) { | |
1284 | test_msg.send_message(); | |
1285 | } else { | |
1286 | usleep(rand() % 500 + 100); | |
1287 | } | |
1288 | } | |
1289 | test_msg.wait_for_done(); | |
1290 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1291 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1292 | } | |
1293 | ||
1294 | TEST_P(MessengerTest, SyntheticInjectTest3) { | |
1295 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "600"); | |
1296 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1297 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, | |
1298 | Messenger::Policy::stateless_server(0), | |
1299 | Messenger::Policy::lossy_client(0)); | |
1300 | for (int i = 0; i < 100; ++i) { | |
1301 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1302 | test_msg.generate_connection(); | |
1303 | } | |
1304 | gen_type rng(time(NULL)); | |
1305 | for (int i = 0; i < 1000; ++i) { | |
1306 | if (!(i % 10)) { | |
1307 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1308 | test_msg.print_internal_state(); | |
1309 | } | |
1310 | boost::uniform_int<> true_false(0, 99); | |
1311 | int val = true_false(rng); | |
1312 | if (val > 90) { | |
1313 | test_msg.generate_connection(); | |
1314 | } else if (val > 80) { | |
1315 | test_msg.drop_connection(); | |
1316 | } else if (val > 10) { | |
1317 | test_msg.send_message(); | |
1318 | } else { | |
1319 | usleep(rand() % 500 + 100); | |
1320 | } | |
1321 | } | |
1322 | test_msg.wait_for_done(); | |
1323 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1324 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1325 | } | |
1326 | ||
1327 | ||
1328 | TEST_P(MessengerTest, SyntheticInjectTest4) { | |
1329 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); | |
1330 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1331 | g_ceph_context->_conf->set_val("ms_inject_delay_probability", "1"); | |
1332 | g_ceph_context->_conf->set_val("ms_inject_delay_type", "client osd", false); | |
1333 | g_ceph_context->_conf->set_val("ms_inject_delay_max", "5"); | |
1334 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, | |
1335 | Messenger::Policy::lossless_peer(0), | |
1336 | Messenger::Policy::lossless_peer(0)); | |
1337 | for (int i = 0; i < 100; ++i) { | |
1338 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1339 | test_msg.generate_connection(); | |
1340 | } | |
1341 | gen_type rng(time(NULL)); | |
1342 | for (int i = 0; i < 1000; ++i) { | |
1343 | if (!(i % 10)) { | |
1344 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1345 | test_msg.print_internal_state(); | |
1346 | } | |
1347 | boost::uniform_int<> true_false(0, 99); | |
1348 | int val = true_false(rng); | |
1349 | if (val > 95) { | |
1350 | test_msg.generate_connection(); | |
1351 | } else if (val > 80) { | |
1352 | // test_msg.drop_connection(); | |
1353 | } else if (val > 10) { | |
1354 | test_msg.send_message(); | |
1355 | } else { | |
1356 | usleep(rand() % 500 + 100); | |
1357 | } | |
1358 | } | |
1359 | test_msg.wait_for_done(); | |
1360 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1361 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1362 | g_ceph_context->_conf->set_val("ms_inject_delay_probability", "0"); | |
1363 | g_ceph_context->_conf->set_val("ms_inject_delay_type", "", false); | |
1364 | g_ceph_context->_conf->set_val("ms_inject_delay_max", "0"); | |
1365 | } | |
1366 | ||
1367 | ||
1368 | class MarkdownDispatcher : public Dispatcher { | |
1369 | Mutex lock; | |
1370 | set<ConnectionRef> conns; | |
1371 | bool last_mark; | |
1372 | public: | |
31f18b77 | 1373 | std::atomic<uint64_t> count = { 0 }; |
7c673cae | 1374 | explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"), |
31f18b77 | 1375 | last_mark(false) {} |
7c673cae FG |
1376 | bool ms_can_fast_dispatch_any() const override { return false; } |
1377 | bool ms_can_fast_dispatch(const Message *m) const override { | |
1378 | switch (m->get_type()) { | |
1379 | case CEPH_MSG_PING: | |
1380 | return true; | |
1381 | default: | |
1382 | return false; | |
1383 | } | |
1384 | } | |
1385 | ||
1386 | void ms_handle_fast_connect(Connection *con) override { | |
1387 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
1388 | Mutex::Locker l(lock); | |
1389 | conns.insert(con); | |
1390 | } | |
1391 | void ms_handle_fast_accept(Connection *con) override { | |
1392 | Mutex::Locker l(lock); | |
1393 | conns.insert(con); | |
1394 | } | |
1395 | bool ms_dispatch(Message *m) override { | |
1396 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl; | |
1397 | Mutex::Locker l(lock); | |
31f18b77 | 1398 | count++; |
7c673cae FG |
1399 | conns.insert(m->get_connection()); |
1400 | if (conns.size() < 2 && !last_mark) { | |
1401 | m->put(); | |
1402 | return true; | |
1403 | } | |
1404 | ||
1405 | last_mark = true; | |
1406 | usleep(rand() % 500); | |
1407 | for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) { | |
1408 | if ((*it) != m->get_connection().get()) { | |
1409 | (*it)->mark_down(); | |
1410 | conns.erase(it); | |
1411 | break; | |
1412 | } | |
1413 | } | |
1414 | if (conns.empty()) | |
1415 | last_mark = false; | |
1416 | m->put(); | |
1417 | return true; | |
1418 | } | |
1419 | bool ms_handle_reset(Connection *con) override { | |
1420 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
1421 | Mutex::Locker l(lock); | |
1422 | conns.erase(con); | |
1423 | usleep(rand() % 500); | |
1424 | return true; | |
1425 | } | |
1426 | void ms_handle_remote_reset(Connection *con) override { | |
1427 | Mutex::Locker l(lock); | |
1428 | conns.erase(con); | |
1429 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
1430 | } | |
1431 | bool ms_handle_refused(Connection *con) override { | |
1432 | return false; | |
1433 | } | |
1434 | void ms_fast_dispatch(Message *m) override { | |
1435 | ceph_abort(); | |
1436 | } | |
1437 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
1438 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
1439 | bool& isvalid, CryptoKey& session_key) override { | |
1440 | isvalid = true; | |
1441 | return true; | |
1442 | } | |
1443 | }; | |
1444 | ||
1445 | ||
1446 | // Markdown with external lock | |
1447 | TEST_P(MessengerTest, MarkdownTest) { | |
1448 | Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0); | |
1449 | MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1450 | entity_addr_t bind_addr; | |
1451 | bind_addr.parse("127.0.0.1:16800"); | |
1452 | server_msgr->bind(bind_addr); | |
1453 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1454 | server_msgr->start(); | |
1455 | bind_addr.parse("127.0.0.1:16801"); | |
1456 | server_msgr2->bind(bind_addr); | |
1457 | server_msgr2->add_dispatcher_head(&srv_dispatcher); | |
1458 | server_msgr2->start(); | |
1459 | ||
1460 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1461 | client_msgr->start(); | |
1462 | ||
1463 | int i = 1000; | |
1464 | uint64_t last = 0; | |
1465 | bool equal = false; | |
1466 | uint64_t equal_count = 0; | |
1467 | while (i--) { | |
1468 | ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst()); | |
1469 | ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst()); | |
1470 | MPing *m = new MPing(); | |
1471 | ASSERT_EQ(conn1->send_message(m), 0); | |
1472 | m = new MPing(); | |
1473 | ASSERT_EQ(conn2->send_message(m), 0); | |
31f18b77 FG |
1474 | CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1); |
1475 | if (srv_dispatcher.count == last) { | |
7c673cae FG |
1476 | lderr(g_ceph_context) << __func__ << " last is " << last << dendl; |
1477 | equal = true; | |
1478 | equal_count++; | |
1479 | } else { | |
1480 | equal = false; | |
1481 | equal_count = 0; | |
1482 | } | |
31f18b77 | 1483 | last = srv_dispatcher.count; |
7c673cae FG |
1484 | if (equal_count) |
1485 | usleep(1000*500); | |
1486 | ASSERT_FALSE(equal && equal_count > 3); | |
1487 | } | |
1488 | server_msgr->shutdown(); | |
1489 | client_msgr->shutdown(); | |
1490 | server_msgr2->shutdown(); | |
1491 | server_msgr->wait(); | |
1492 | client_msgr->wait(); | |
1493 | server_msgr2->wait(); | |
1494 | delete server_msgr2; | |
1495 | } | |
1496 | ||
1497 | INSTANTIATE_TEST_CASE_P( | |
1498 | Messenger, | |
1499 | MessengerTest, | |
1500 | ::testing::Values( | |
1501 | "async+posix", | |
1502 | "simple" | |
1503 | ) | |
1504 | ); | |
1505 | ||
1506 | #else | |
1507 | ||
1508 | // Google Test may not support value-parameterized tests with some | |
1509 | // compilers. If we use conditional compilation to compile out all | |
1510 | // code referring to the gtest_main library, MSVC linker will not link | |
1511 | // that library at all and consequently complain about missing entry | |
1512 | // point defined in that library (fatal error LNK1561: entry point | |
1513 | // must be defined). This dummy test keeps gtest_main linked in. | |
1514 | TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {} | |
1515 | ||
1516 | #endif | |
1517 | ||
1518 | ||
1519 | int main(int argc, char **argv) { | |
1520 | vector<const char*> args; | |
1521 | argv_to_vec(argc, (const char **)argv, args); | |
1522 | env_to_vec(args); | |
1523 | ||
1524 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); | |
1525 | g_ceph_context->_conf->set_val("auth_cluster_required", "none"); | |
1526 | g_ceph_context->_conf->set_val("auth_service_required", "none"); | |
1527 | g_ceph_context->_conf->set_val("auth_client_required", "none"); | |
1528 | g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async"); | |
1529 | g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true"); | |
1530 | g_ceph_context->_conf->set_val("ms_die_on_old_message", "true"); | |
1531 | g_ceph_context->_conf->set_val("ms_max_backoff", "1"); | |
1532 | common_init_finish(g_ceph_context); | |
1533 | ||
1534 | ::testing::InitGoogleTest(&argc, argv); | |
1535 | return RUN_ALL_TESTS(); | |
1536 | } | |
1537 | ||
1538 | /* | |
1539 | * Local Variables: | |
1540 | * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr" | |
1541 | * End: | |
1542 | */ |