]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <atomic> | |
18 | #include <iostream> | |
19 | #include <unistd.h> | |
20 | #include <stdlib.h> | |
21 | #include <time.h> | |
22 | #include "common/Mutex.h" | |
23 | #include "common/Cond.h" | |
24 | #include "common/ceph_argparse.h" | |
25 | #include "global/global_init.h" | |
26 | #include "msg/Dispatcher.h" | |
27 | #include "msg/msg_types.h" | |
28 | #include "msg/Message.h" | |
29 | #include "msg/Messenger.h" | |
30 | #include "msg/Connection.h" | |
31 | #include "messages/MPing.h" | |
32 | #include "messages/MCommand.h" | |
33 | ||
34 | #include <boost/random/mersenne_twister.hpp> | |
35 | #include <boost/random/uniform_int.hpp> | |
36 | #include <boost/random/binomial_distribution.hpp> | |
37 | #include <gtest/gtest.h> | |
38 | ||
39 | typedef boost::mt11213b gen_type; | |
40 | ||
41 | #include "common/dout.h" | |
42 | #include "include/assert.h" | |
43 | ||
44 | #define dout_subsys ceph_subsys_ms | |
45 | #undef dout_prefix | |
46 | #define dout_prefix *_dout << " ceph_test_msgr " | |
47 | ||
48 | ||
49 | #if GTEST_HAS_PARAM_TEST | |
50 | ||
51 | #define CHECK_AND_WAIT_TRUE(expr) do { \ | |
52 | int n = 1000; \ | |
53 | while (--n) { \ | |
54 | if (expr) \ | |
55 | break; \ | |
56 | usleep(1000); \ | |
57 | } \ | |
58 | } while(0); | |
59 | ||
60 | class MessengerTest : public ::testing::TestWithParam<const char*> { | |
61 | public: | |
62 | Messenger *server_msgr; | |
63 | Messenger *client_msgr; | |
64 | ||
65 | MessengerTest(): server_msgr(NULL), client_msgr(NULL) {} | |
66 | void SetUp() override { | |
67 | lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl; | |
68 | server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0); | |
69 | client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0); | |
70 | server_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); | |
71 | client_msgr->set_default_policy(Messenger::Policy::lossy_client(0)); | |
72 | } | |
73 | void TearDown() override { | |
74 | ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0); | |
75 | ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0); | |
76 | delete server_msgr; | |
77 | delete client_msgr; | |
78 | } | |
79 | ||
80 | }; | |
81 | ||
82 | ||
83 | class FakeDispatcher : public Dispatcher { | |
84 | public: | |
85 | struct Session : public RefCountedObject { | |
86 | atomic<uint64_t> count; | |
87 | ConnectionRef con; | |
88 | ||
89 | explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) { | |
90 | } | |
91 | uint64_t get_count() { return count; } | |
92 | }; | |
93 | ||
94 | Mutex lock; | |
95 | Cond cond; | |
96 | bool is_server; | |
97 | bool got_new; | |
98 | bool got_remote_reset; | |
99 | bool got_connect; | |
100 | bool loopback; | |
101 | ||
102 | explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"), | |
103 | is_server(s), got_new(false), got_remote_reset(false), | |
104 | got_connect(false), loopback(false) {} | |
105 | bool ms_can_fast_dispatch_any() const override { return true; } | |
106 | bool ms_can_fast_dispatch(const Message *m) const override { | |
107 | switch (m->get_type()) { | |
108 | case CEPH_MSG_PING: | |
109 | return true; | |
110 | default: | |
111 | return false; | |
112 | } | |
113 | } | |
114 | ||
115 | void ms_handle_fast_connect(Connection *con) override { | |
116 | lock.Lock(); | |
117 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
118 | Session *s = static_cast<Session*>(con->get_priv()); | |
119 | if (!s) { | |
120 | s = new Session(con); | |
121 | con->set_priv(s->get()); | |
122 | lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl; | |
123 | } | |
124 | s->put(); | |
125 | got_connect = true; | |
126 | cond.Signal(); | |
127 | lock.Unlock(); | |
128 | } | |
129 | void ms_handle_fast_accept(Connection *con) override { | |
130 | Session *s = static_cast<Session*>(con->get_priv()); | |
131 | if (!s) { | |
132 | s = new Session(con); | |
133 | con->set_priv(s->get()); | |
134 | } | |
135 | s->put(); | |
136 | } | |
137 | bool ms_dispatch(Message *m) override { | |
138 | Session *s = static_cast<Session*>(m->get_connection()->get_priv()); | |
139 | if (!s) { | |
140 | s = new Session(m->get_connection()); | |
141 | m->get_connection()->set_priv(s->get()); | |
142 | } | |
143 | s->put(); | |
144 | s->count++; | |
145 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
146 | if (is_server) { | |
147 | reply_message(m); | |
148 | } | |
149 | Mutex::Locker l(lock); | |
150 | got_new = true; | |
151 | cond.Signal(); | |
152 | m->put(); | |
153 | return true; | |
154 | } | |
155 | bool ms_handle_reset(Connection *con) override { | |
156 | Mutex::Locker l(lock); | |
157 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
158 | Session *s = static_cast<Session*>(con->get_priv()); | |
159 | if (s) { | |
160 | s->con.reset(NULL); // break con <-> session ref cycle | |
161 | con->set_priv(NULL); // break ref <-> session cycle, if any | |
162 | s->put(); | |
163 | } | |
164 | return true; | |
165 | } | |
166 | void ms_handle_remote_reset(Connection *con) override { | |
167 | Mutex::Locker l(lock); | |
168 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
169 | Session *s = static_cast<Session*>(con->get_priv()); | |
170 | if (s) { | |
171 | s->con.reset(NULL); // break con <-> session ref cycle | |
172 | con->set_priv(NULL); // break ref <-> session cycle, if any | |
173 | s->put(); | |
174 | } | |
175 | got_remote_reset = true; | |
176 | cond.Signal(); | |
177 | } | |
178 | bool ms_handle_refused(Connection *con) override { | |
179 | return false; | |
180 | } | |
181 | void ms_fast_dispatch(Message *m) override { | |
182 | Session *s = static_cast<Session*>(m->get_connection()->get_priv()); | |
183 | if (!s) { | |
184 | s = new Session(m->get_connection()); | |
185 | m->get_connection()->set_priv(s->get()); | |
186 | } | |
187 | s->put(); | |
188 | s->count++; | |
189 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
190 | if (is_server) { | |
191 | if (loopback) | |
192 | assert(m->get_source().is_osd()); | |
193 | else | |
194 | reply_message(m); | |
195 | } else if (loopback) { | |
196 | assert(m->get_source().is_client()); | |
197 | } | |
198 | m->put(); | |
199 | Mutex::Locker l(lock); | |
200 | got_new = true; | |
201 | cond.Signal(); | |
202 | } | |
203 | ||
204 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
205 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
28e407b8 AA |
206 | bool& isvalid, CryptoKey& session_key, |
207 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) override { | |
7c673cae FG |
208 | isvalid = true; |
209 | return true; | |
210 | } | |
211 | ||
212 | void reply_message(Message *m) { | |
213 | MPing *rm = new MPing(); | |
214 | m->get_connection()->send_message(rm); | |
215 | } | |
216 | }; | |
217 | ||
218 | typedef FakeDispatcher::Session Session; | |
219 | ||
220 | TEST_P(MessengerTest, SimpleTest) { | |
221 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
222 | entity_addr_t bind_addr; | |
223 | bind_addr.parse("127.0.0.1"); | |
224 | server_msgr->bind(bind_addr); | |
225 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
226 | server_msgr->start(); | |
227 | ||
228 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
229 | client_msgr->start(); | |
230 | ||
231 | // 1. simple round trip | |
232 | MPing *m = new MPing(); | |
233 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
234 | { | |
235 | ASSERT_EQ(conn->send_message(m), 0); | |
236 | Mutex::Locker l(cli_dispatcher.lock); | |
237 | while (!cli_dispatcher.got_new) | |
238 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
239 | cli_dispatcher.got_new = false; | |
240 | } | |
241 | ASSERT_TRUE(conn->is_connected()); | |
242 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
243 | ASSERT_TRUE(conn->peer_is_osd()); | |
244 | ||
245 | // 2. test rebind port | |
246 | set<int> avoid_ports; | |
247 | for (int i = 0; i < 10 ; i++) | |
248 | avoid_ports.insert(server_msgr->get_myaddr().get_port() + i); | |
249 | server_msgr->rebind(avoid_ports); | |
250 | ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0); | |
251 | ||
252 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
253 | { | |
254 | m = new MPing(); | |
255 | ASSERT_EQ(conn->send_message(m), 0); | |
256 | Mutex::Locker l(cli_dispatcher.lock); | |
257 | while (!cli_dispatcher.got_new) | |
258 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
259 | cli_dispatcher.got_new = false; | |
260 | } | |
261 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
262 | ||
263 | // 3. test markdown connection | |
264 | conn->mark_down(); | |
265 | ASSERT_FALSE(conn->is_connected()); | |
266 | ||
267 | // 4. test failed connection | |
268 | server_msgr->shutdown(); | |
269 | server_msgr->wait(); | |
270 | ||
271 | m = new MPing(); | |
272 | conn->send_message(m); | |
273 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
274 | ASSERT_FALSE(conn->is_connected()); | |
275 | ||
276 | // 5. loopback connection | |
277 | srv_dispatcher.loopback = true; | |
278 | conn = client_msgr->get_loopback_connection(); | |
279 | { | |
280 | m = new MPing(); | |
281 | ASSERT_EQ(conn->send_message(m), 0); | |
282 | Mutex::Locker l(cli_dispatcher.lock); | |
283 | while (!cli_dispatcher.got_new) | |
284 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
285 | cli_dispatcher.got_new = false; | |
286 | } | |
287 | srv_dispatcher.loopback = false; | |
288 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
289 | client_msgr->shutdown(); | |
290 | client_msgr->wait(); | |
291 | server_msgr->shutdown(); | |
292 | server_msgr->wait(); | |
293 | } | |
294 | ||
295 | TEST_P(MessengerTest, NameAddrTest) { | |
296 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
297 | entity_addr_t bind_addr; | |
298 | bind_addr.parse("127.0.0.1"); | |
299 | server_msgr->bind(bind_addr); | |
300 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
301 | server_msgr->start(); | |
302 | ||
303 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
304 | client_msgr->start(); | |
305 | ||
306 | MPing *m = new MPing(); | |
307 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
308 | { | |
309 | ASSERT_EQ(conn->send_message(m), 0); | |
310 | Mutex::Locker l(cli_dispatcher.lock); | |
311 | while (!cli_dispatcher.got_new) | |
312 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
313 | cli_dispatcher.got_new = false; | |
314 | } | |
315 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
316 | ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr()); | |
317 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
318 | // Make should server_conn is the one we already accepted from client, | |
319 | // so it means client_msgr has the same addr when server connection has | |
320 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
321 | server_msgr->shutdown(); | |
322 | client_msgr->shutdown(); | |
323 | server_msgr->wait(); | |
324 | client_msgr->wait(); | |
325 | } | |
326 | ||
327 | TEST_P(MessengerTest, FeatureTest) { | |
328 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
329 | entity_addr_t bind_addr; | |
330 | bind_addr.parse("127.0.0.1"); | |
331 | uint64_t all_feature_supported, feature_required, feature_supported = 0; | |
332 | for (int i = 0; i < 10; i++) | |
333 | feature_supported |= 1ULL << i; | |
334 | feature_required = feature_supported | 1ULL << 13; | |
335 | all_feature_supported = feature_required | 1ULL << 14; | |
336 | ||
337 | Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT); | |
338 | p.features_required = feature_required; | |
339 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
340 | server_msgr->bind(bind_addr); | |
341 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
342 | server_msgr->start(); | |
343 | ||
344 | // 1. Suppose if only support less than required | |
345 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
346 | p.features_supported = feature_supported; | |
347 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
348 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
349 | client_msgr->start(); | |
350 | ||
351 | MPing *m = new MPing(); | |
352 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
353 | conn->send_message(m); | |
354 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
355 | // should failed build a connection | |
356 | ASSERT_FALSE(conn->is_connected()); | |
357 | ||
358 | client_msgr->shutdown(); | |
359 | client_msgr->wait(); | |
360 | ||
361 | // 2. supported met required | |
362 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
363 | p.features_supported = all_feature_supported; | |
364 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
365 | client_msgr->start(); | |
366 | ||
367 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
368 | { | |
369 | m = new MPing(); | |
370 | ASSERT_EQ(conn->send_message(m), 0); | |
371 | Mutex::Locker l(cli_dispatcher.lock); | |
372 | while (!cli_dispatcher.got_new) | |
373 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
374 | cli_dispatcher.got_new = false; | |
375 | } | |
376 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
377 | ||
378 | server_msgr->shutdown(); | |
379 | client_msgr->shutdown(); | |
380 | server_msgr->wait(); | |
381 | client_msgr->wait(); | |
382 | } | |
383 | ||
384 | TEST_P(MessengerTest, TimeoutTest) { | |
385 | g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1"); | |
386 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
387 | entity_addr_t bind_addr; | |
388 | bind_addr.parse("127.0.0.1"); | |
389 | server_msgr->bind(bind_addr); | |
390 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
391 | server_msgr->start(); | |
392 | ||
393 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
394 | client_msgr->start(); | |
395 | ||
396 | // 1. build the connection | |
397 | MPing *m = new MPing(); | |
398 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
399 | { | |
400 | ASSERT_EQ(conn->send_message(m), 0); | |
401 | Mutex::Locker l(cli_dispatcher.lock); | |
402 | while (!cli_dispatcher.got_new) | |
403 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
404 | cli_dispatcher.got_new = false; | |
405 | } | |
406 | ASSERT_TRUE(conn->is_connected()); | |
407 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
408 | ASSERT_TRUE(conn->peer_is_osd()); | |
409 | ||
410 | // 2. wait for idle | |
411 | usleep(2500*1000); | |
412 | ASSERT_FALSE(conn->is_connected()); | |
413 | ||
414 | server_msgr->shutdown(); | |
415 | server_msgr->wait(); | |
416 | ||
417 | client_msgr->shutdown(); | |
418 | client_msgr->wait(); | |
419 | g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900"); | |
420 | } | |
421 | ||
422 | TEST_P(MessengerTest, StatefulTest) { | |
423 | Message *m; | |
424 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
425 | entity_addr_t bind_addr; | |
426 | bind_addr.parse("127.0.0.1"); | |
427 | Messenger::Policy p = Messenger::Policy::stateful_server(0); | |
428 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
429 | p = Messenger::Policy::lossless_client(0); | |
430 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
431 | ||
432 | server_msgr->bind(bind_addr); | |
433 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
434 | server_msgr->start(); | |
435 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
436 | client_msgr->start(); | |
437 | ||
438 | // 1. test for server standby | |
439 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
440 | { | |
441 | m = new MPing(); | |
442 | ASSERT_EQ(conn->send_message(m), 0); | |
443 | Mutex::Locker l(cli_dispatcher.lock); | |
444 | while (!cli_dispatcher.got_new) | |
445 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
446 | cli_dispatcher.got_new = false; | |
447 | } | |
448 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
449 | conn->mark_down(); | |
450 | ASSERT_FALSE(conn->is_connected()); | |
451 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
452 | // don't lose state | |
453 | ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1); | |
454 | ||
455 | srv_dispatcher.got_new = false; | |
456 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
457 | { | |
458 | m = new MPing(); | |
459 | ASSERT_EQ(conn->send_message(m), 0); | |
460 | Mutex::Locker l(cli_dispatcher.lock); | |
461 | while (!cli_dispatcher.got_new) | |
462 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
463 | cli_dispatcher.got_new = false; | |
464 | } | |
465 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
466 | server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
467 | { | |
468 | Mutex::Locker l(srv_dispatcher.lock); | |
469 | while (!srv_dispatcher.got_remote_reset) | |
470 | srv_dispatcher.cond.Wait(srv_dispatcher.lock); | |
471 | } | |
472 | ||
473 | // 2. test for client reconnect | |
474 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); | |
475 | cli_dispatcher.got_connect = false; | |
476 | cli_dispatcher.got_new = false; | |
477 | cli_dispatcher.got_remote_reset = false; | |
478 | server_conn->mark_down(); | |
479 | ASSERT_FALSE(server_conn->is_connected()); | |
480 | // ensure client detect server socket closed | |
481 | { | |
482 | Mutex::Locker l(cli_dispatcher.lock); | |
483 | while (!cli_dispatcher.got_remote_reset) | |
484 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
485 | cli_dispatcher.got_remote_reset = false; | |
486 | } | |
487 | { | |
488 | Mutex::Locker l(cli_dispatcher.lock); | |
489 | while (!cli_dispatcher.got_connect) | |
490 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
491 | cli_dispatcher.got_connect = false; | |
492 | } | |
493 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
494 | ASSERT_TRUE(conn->is_connected()); | |
495 | ||
496 | { | |
497 | m = new MPing(); | |
498 | ASSERT_EQ(conn->send_message(m), 0); | |
499 | ASSERT_TRUE(conn->is_connected()); | |
500 | Mutex::Locker l(cli_dispatcher.lock); | |
501 | while (!cli_dispatcher.got_new) | |
502 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
503 | cli_dispatcher.got_new = false; | |
504 | } | |
505 | // resetcheck happen | |
506 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count()); | |
507 | server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
508 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count()); | |
509 | cli_dispatcher.got_remote_reset = false; | |
510 | ||
511 | server_msgr->shutdown(); | |
512 | client_msgr->shutdown(); | |
513 | server_msgr->wait(); | |
514 | client_msgr->wait(); | |
515 | } | |
516 | ||
517 | TEST_P(MessengerTest, StatelessTest) { | |
518 | Message *m; | |
519 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
520 | entity_addr_t bind_addr; | |
521 | bind_addr.parse("127.0.0.1"); | |
522 | Messenger::Policy p = Messenger::Policy::stateless_server(0); | |
523 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
524 | p = Messenger::Policy::lossy_client(0); | |
525 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
526 | ||
527 | server_msgr->bind(bind_addr); | |
528 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
529 | server_msgr->start(); | |
530 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
531 | client_msgr->start(); | |
532 | ||
533 | // 1. test for server lose state | |
534 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
535 | { | |
536 | m = new MPing(); | |
537 | ASSERT_EQ(conn->send_message(m), 0); | |
538 | Mutex::Locker l(cli_dispatcher.lock); | |
539 | while (!cli_dispatcher.got_new) | |
540 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
541 | cli_dispatcher.got_new = false; | |
542 | } | |
543 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
544 | conn->mark_down(); | |
545 | ASSERT_FALSE(conn->is_connected()); | |
546 | ||
547 | srv_dispatcher.got_new = false; | |
548 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
549 | { | |
550 | m = new MPing(); | |
551 | ASSERT_EQ(conn->send_message(m), 0); | |
552 | Mutex::Locker l(cli_dispatcher.lock); | |
553 | while (!cli_dispatcher.got_new) | |
554 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
555 | cli_dispatcher.got_new = false; | |
556 | } | |
557 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
558 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
559 | // server lose state | |
560 | { | |
561 | Mutex::Locker l(srv_dispatcher.lock); | |
562 | while (!srv_dispatcher.got_new) | |
563 | srv_dispatcher.cond.Wait(srv_dispatcher.lock); | |
564 | } | |
565 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count()); | |
566 | ||
567 | // 2. test for client lossy | |
568 | server_conn->mark_down(); | |
569 | ASSERT_FALSE(server_conn->is_connected()); | |
570 | conn->send_keepalive(); | |
571 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
572 | ASSERT_FALSE(conn->is_connected()); | |
573 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
574 | { | |
575 | m = new MPing(); | |
576 | ASSERT_EQ(conn->send_message(m), 0); | |
577 | Mutex::Locker l(cli_dispatcher.lock); | |
578 | while (!cli_dispatcher.got_new) | |
579 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
580 | cli_dispatcher.got_new = false; | |
581 | } | |
582 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
583 | ||
584 | server_msgr->shutdown(); | |
585 | client_msgr->shutdown(); | |
586 | server_msgr->wait(); | |
587 | client_msgr->wait(); | |
588 | } | |
589 | ||
590 | TEST_P(MessengerTest, ClientStandbyTest) { | |
591 | Message *m; | |
592 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
593 | entity_addr_t bind_addr; | |
594 | bind_addr.parse("127.0.0.1"); | |
595 | Messenger::Policy p = Messenger::Policy::stateful_server(0); | |
596 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
597 | p = Messenger::Policy::lossless_peer(0); | |
598 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
599 | ||
600 | server_msgr->bind(bind_addr); | |
601 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
602 | server_msgr->start(); | |
603 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
604 | client_msgr->start(); | |
605 | ||
606 | // 1. test for client standby, resetcheck | |
607 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
608 | { | |
609 | m = new MPing(); | |
610 | ASSERT_EQ(conn->send_message(m), 0); | |
611 | Mutex::Locker l(cli_dispatcher.lock); | |
612 | while (!cli_dispatcher.got_new) | |
613 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
614 | cli_dispatcher.got_new = false; | |
615 | } | |
616 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
617 | ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
618 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); | |
619 | cli_dispatcher.got_connect = false; | |
620 | server_conn->mark_down(); | |
621 | ASSERT_FALSE(server_conn->is_connected()); | |
622 | // client should be standby | |
623 | usleep(300*1000); | |
624 | // client should be standby, so we use original connection | |
625 | { | |
626 | // Try send message to verify got remote reset callback | |
627 | m = new MPing(); | |
628 | ASSERT_EQ(conn->send_message(m), 0); | |
629 | { | |
630 | Mutex::Locker l(cli_dispatcher.lock); | |
631 | while (!cli_dispatcher.got_remote_reset) | |
632 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
633 | cli_dispatcher.got_remote_reset = false; | |
634 | while (!cli_dispatcher.got_connect) | |
635 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
636 | cli_dispatcher.got_connect = false; | |
637 | } | |
638 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
639 | ASSERT_TRUE(conn->is_connected()); | |
640 | m = new MPing(); | |
641 | ASSERT_EQ(conn->send_message(m), 0); | |
642 | Mutex::Locker l(cli_dispatcher.lock); | |
643 | while (!cli_dispatcher.got_new) | |
644 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
645 | cli_dispatcher.got_new = false; | |
646 | } | |
647 | ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1); | |
648 | server_conn = server_msgr->get_connection(client_msgr->get_myinst()); | |
649 | ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1); | |
650 | ||
651 | server_msgr->shutdown(); | |
652 | client_msgr->shutdown(); | |
653 | server_msgr->wait(); | |
654 | client_msgr->wait(); | |
655 | } | |
656 | ||
657 | TEST_P(MessengerTest, AuthTest) { | |
658 | g_ceph_context->_conf->set_val("auth_cluster_required", "cephx"); | |
659 | g_ceph_context->_conf->set_val("auth_service_required", "cephx"); | |
660 | g_ceph_context->_conf->set_val("auth_client_required", "cephx"); | |
661 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
662 | entity_addr_t bind_addr; | |
663 | bind_addr.parse("127.0.0.1"); | |
664 | server_msgr->bind(bind_addr); | |
665 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
666 | server_msgr->start(); | |
667 | ||
668 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
669 | client_msgr->start(); | |
670 | ||
671 | // 1. simple auth round trip | |
672 | MPing *m = new MPing(); | |
673 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
674 | { | |
675 | ASSERT_EQ(conn->send_message(m), 0); | |
676 | Mutex::Locker l(cli_dispatcher.lock); | |
677 | while (!cli_dispatcher.got_new) | |
678 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
679 | cli_dispatcher.got_new = false; | |
680 | } | |
681 | ASSERT_TRUE(conn->is_connected()); | |
682 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
683 | ||
684 | // 2. mix auth | |
685 | g_ceph_context->_conf->set_val("auth_cluster_required", "none"); | |
686 | g_ceph_context->_conf->set_val("auth_service_required", "none"); | |
687 | g_ceph_context->_conf->set_val("auth_client_required", "none"); | |
688 | conn->mark_down(); | |
689 | ASSERT_FALSE(conn->is_connected()); | |
690 | conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
691 | { | |
692 | MPing *m = new MPing(); | |
693 | ASSERT_EQ(conn->send_message(m), 0); | |
694 | Mutex::Locker l(cli_dispatcher.lock); | |
695 | while (!cli_dispatcher.got_new) | |
696 | cli_dispatcher.cond.Wait(cli_dispatcher.lock); | |
697 | cli_dispatcher.got_new = false; | |
698 | } | |
699 | ASSERT_TRUE(conn->is_connected()); | |
700 | ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1); | |
701 | ||
702 | server_msgr->shutdown(); | |
703 | client_msgr->shutdown(); | |
704 | server_msgr->wait(); | |
705 | client_msgr->wait(); | |
706 | } | |
707 | ||
708 | TEST_P(MessengerTest, MessageTest) { | |
709 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
710 | entity_addr_t bind_addr; | |
711 | bind_addr.parse("127.0.0.1"); | |
712 | Messenger::Policy p = Messenger::Policy::stateful_server(0); | |
713 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
714 | p = Messenger::Policy::lossless_peer(0); | |
715 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
716 | ||
717 | server_msgr->bind(bind_addr); | |
718 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
719 | server_msgr->start(); | |
720 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
721 | client_msgr->start(); | |
722 | ||
723 | ||
724 | // 1. A very large "front"(as well as "payload") | |
725 | // Because a external message need to invade Messenger::decode_message, | |
726 | // here we only use existing message class(MCommand) | |
727 | ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); | |
728 | { | |
729 | uuid_d uuid; | |
730 | uuid.generate_random(); | |
731 | vector<string> cmds; | |
732 | string s("abcdefghijklmnopqrstuvwxyz"); | |
733 | for (int i = 0; i < 1024*30; i++) | |
734 | cmds.push_back(s); | |
735 | MCommand *m = new MCommand(uuid); | |
736 | m->cmd = cmds; | |
737 | conn->send_message(m); | |
738 | utime_t t; | |
739 | t += 1000*1000*500; | |
740 | Mutex::Locker l(cli_dispatcher.lock); | |
741 | while (!cli_dispatcher.got_new) | |
742 | cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t); | |
743 | ASSERT_TRUE(cli_dispatcher.got_new); | |
744 | cli_dispatcher.got_new = false; | |
745 | } | |
746 | ||
747 | // 2. A very large "data" | |
748 | { | |
749 | bufferlist bl; | |
750 | string s("abcdefghijklmnopqrstuvwxyz"); | |
751 | for (int i = 0; i < 1024*30; i++) | |
752 | bl.append(s); | |
753 | MPing *m = new MPing(); | |
754 | m->set_data(bl); | |
755 | conn->send_message(m); | |
756 | utime_t t; | |
757 | t += 1000*1000*500; | |
758 | Mutex::Locker l(cli_dispatcher.lock); | |
759 | while (!cli_dispatcher.got_new) | |
760 | cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t); | |
761 | ASSERT_TRUE(cli_dispatcher.got_new); | |
762 | cli_dispatcher.got_new = false; | |
763 | } | |
764 | server_msgr->shutdown(); | |
765 | client_msgr->shutdown(); | |
766 | server_msgr->wait(); | |
767 | client_msgr->wait(); | |
768 | } | |
769 | ||
770 | ||
771 | class SyntheticWorkload; | |
772 | ||
773 | struct Payload { | |
774 | enum Who : uint8_t { | |
775 | PING = 0, | |
776 | PONG = 1, | |
777 | }; | |
778 | uint8_t who; | |
779 | uint64_t seq; | |
780 | bufferlist data; | |
781 | ||
782 | Payload(Who who, uint64_t seq, const bufferlist& data) | |
783 | : who(who), seq(seq), data(data) | |
784 | {} | |
785 | Payload() = default; | |
786 | DENC(Payload, v, p) { | |
787 | DENC_START(1, 1, p); | |
788 | denc(v.who, p); | |
789 | denc(v.seq, p); | |
790 | denc(v.data, p); | |
791 | DENC_FINISH(p); | |
792 | } | |
793 | }; | |
794 | WRITE_CLASS_DENC(Payload) | |
795 | ||
796 | ostream& operator<<(ostream& out, const Payload &pl) | |
797 | { | |
798 | return out << "reply=" << pl.who << " i = " << pl.seq; | |
799 | } | |
800 | ||
801 | class SyntheticDispatcher : public Dispatcher { | |
802 | public: | |
803 | Mutex lock; | |
804 | Cond cond; | |
805 | bool is_server; | |
806 | bool got_new; | |
807 | bool got_remote_reset; | |
808 | bool got_connect; | |
809 | map<ConnectionRef, list<uint64_t> > conn_sent; | |
810 | map<uint64_t, bufferlist> sent; | |
811 | atomic<uint64_t> index; | |
812 | SyntheticWorkload *workload; | |
813 | ||
814 | SyntheticDispatcher(bool s, SyntheticWorkload *wl): | |
815 | Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false), | |
816 | got_remote_reset(false), got_connect(false), index(0), workload(wl) {} | |
817 | bool ms_can_fast_dispatch_any() const override { return true; } | |
818 | bool ms_can_fast_dispatch(const Message *m) const override { | |
819 | switch (m->get_type()) { | |
820 | case CEPH_MSG_PING: | |
821 | case MSG_COMMAND: | |
822 | return true; | |
823 | default: | |
824 | return false; | |
825 | } | |
826 | } | |
827 | ||
828 | void ms_handle_fast_connect(Connection *con) override { | |
829 | Mutex::Locker l(lock); | |
830 | list<uint64_t> c = conn_sent[con]; | |
831 | for (list<uint64_t>::iterator it = c.begin(); | |
832 | it != c.end(); ++it) | |
833 | sent.erase(*it); | |
834 | conn_sent.erase(con); | |
835 | got_connect = true; | |
836 | cond.Signal(); | |
837 | } | |
838 | void ms_handle_fast_accept(Connection *con) override { | |
839 | Mutex::Locker l(lock); | |
840 | list<uint64_t> c = conn_sent[con]; | |
841 | for (list<uint64_t>::iterator it = c.begin(); | |
842 | it != c.end(); ++it) | |
843 | sent.erase(*it); | |
844 | conn_sent.erase(con); | |
845 | cond.Signal(); | |
846 | } | |
847 | bool ms_dispatch(Message *m) override { | |
848 | ceph_abort(); | |
849 | } | |
850 | bool ms_handle_reset(Connection *con) override; | |
851 | void ms_handle_remote_reset(Connection *con) override { | |
852 | Mutex::Locker l(lock); | |
853 | list<uint64_t> c = conn_sent[con]; | |
854 | for (list<uint64_t>::iterator it = c.begin(); | |
855 | it != c.end(); ++it) | |
856 | sent.erase(*it); | |
857 | conn_sent.erase(con); | |
858 | got_remote_reset = true; | |
859 | } | |
860 | bool ms_handle_refused(Connection *con) override { | |
861 | return false; | |
862 | } | |
863 | void ms_fast_dispatch(Message *m) override { | |
864 | // MSG_COMMAND is used to disorganize regular message flow | |
865 | if (m->get_type() == MSG_COMMAND) { | |
866 | m->put(); | |
867 | return ; | |
868 | } | |
869 | ||
870 | Payload pl; | |
871 | auto p = m->get_data().begin(); | |
872 | ::decode(pl, p); | |
873 | if (pl.who == Payload::PING) { | |
874 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
875 | reply_message(m, pl); | |
876 | m->put(); | |
877 | Mutex::Locker l(lock); | |
878 | got_new = true; | |
879 | cond.Signal(); | |
880 | } else { | |
881 | Mutex::Locker l(lock); | |
882 | if (sent.count(pl.seq)) { | |
883 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
884 | ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); | |
885 | ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq])); | |
886 | conn_sent[m->get_connection()].pop_front(); | |
887 | sent.erase(pl.seq); | |
888 | } | |
889 | m->put(); | |
890 | got_new = true; | |
891 | cond.Signal(); | |
892 | } | |
893 | } | |
894 | ||
895 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
896 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
28e407b8 AA |
897 | bool& isvalid, CryptoKey& session_key, |
898 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) override { | |
7c673cae FG |
899 | isvalid = true; |
900 | return true; | |
901 | } | |
902 | ||
903 | void reply_message(const Message *m, Payload& pl) { | |
904 | pl.who = Payload::PONG; | |
905 | bufferlist bl; | |
906 | ::encode(pl, bl); | |
907 | MPing *rm = new MPing(); | |
908 | rm->set_data(bl); | |
909 | m->get_connection()->send_message(rm); | |
910 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl; | |
911 | } | |
912 | ||
913 | void send_message_wrap(ConnectionRef con, const bufferlist& data) { | |
914 | Message *m = new MPing(); | |
915 | Payload pl{Payload::PING, index++, data}; | |
916 | bufferlist bl; | |
917 | ::encode(pl, bl); | |
918 | m->set_data(bl); | |
919 | if (!con->get_messenger()->get_default_policy().lossy) { | |
920 | Mutex::Locker l(lock); | |
921 | sent[pl.seq] = pl.data; | |
922 | conn_sent[con].push_back(pl.seq); | |
923 | } | |
924 | lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl; | |
925 | ASSERT_EQ(0, con->send_message(m)); | |
926 | } | |
927 | ||
928 | uint64_t get_pending() { | |
929 | Mutex::Locker l(lock); | |
930 | return sent.size(); | |
931 | } | |
932 | ||
933 | void clear_pending(ConnectionRef con) { | |
934 | Mutex::Locker l(lock); | |
935 | ||
936 | for (list<uint64_t>::iterator it = conn_sent[con].begin(); | |
937 | it != conn_sent[con].end(); ++it) | |
938 | sent.erase(*it); | |
939 | conn_sent.erase(con); | |
940 | } | |
941 | ||
942 | void print() { | |
943 | for (auto && p : conn_sent) { | |
944 | if (!p.second.empty()) { | |
945 | lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl; | |
946 | } | |
947 | } | |
948 | } | |
949 | }; | |
950 | ||
951 | ||
952 | class SyntheticWorkload { | |
953 | Mutex lock; | |
954 | Cond cond; | |
955 | set<Messenger*> available_servers; | |
956 | set<Messenger*> available_clients; | |
957 | map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections; | |
958 | SyntheticDispatcher dispatcher; | |
959 | gen_type rng; | |
960 | vector<bufferlist> rand_data; | |
961 | ||
962 | public: | |
963 | static const unsigned max_in_flight = 64; | |
964 | static const unsigned max_connections = 128; | |
965 | static const unsigned max_message_len = 1024 * 1024 * 4; | |
966 | ||
967 | SyntheticWorkload(int servers, int clients, string type, int random_num, | |
968 | Messenger::Policy srv_policy, Messenger::Policy cli_policy): | |
969 | lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) { | |
970 | Messenger *msgr; | |
971 | int base_port = 16800; | |
972 | entity_addr_t bind_addr; | |
973 | char addr[64]; | |
974 | for (int i = 0; i < servers; ++i) { | |
975 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), | |
976 | "server", getpid()+i, 0); | |
977 | snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i); | |
978 | bind_addr.parse(addr); | |
979 | msgr->bind(bind_addr); | |
980 | msgr->add_dispatcher_head(&dispatcher); | |
981 | ||
982 | assert(msgr); | |
983 | msgr->set_default_policy(srv_policy); | |
984 | available_servers.insert(msgr); | |
985 | msgr->start(); | |
986 | } | |
987 | ||
988 | for (int i = 0; i < clients; ++i) { | |
989 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), | |
990 | "client", getpid()+i+servers, 0); | |
991 | if (cli_policy.standby) { | |
992 | snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers); | |
993 | bind_addr.parse(addr); | |
994 | msgr->bind(bind_addr); | |
995 | } | |
996 | msgr->add_dispatcher_head(&dispatcher); | |
997 | ||
998 | assert(msgr); | |
999 | msgr->set_default_policy(cli_policy); | |
1000 | available_clients.insert(msgr); | |
1001 | msgr->start(); | |
1002 | } | |
1003 | ||
1004 | for (int i = 0; i < random_num; i++) { | |
1005 | bufferlist bl; | |
1006 | boost::uniform_int<> u(32, max_message_len); | |
1007 | uint64_t value_len = u(rng); | |
1008 | bufferptr bp(value_len); | |
1009 | bp.zero(); | |
1010 | for (uint64_t j = 0; j < value_len-sizeof(i); ) { | |
1011 | memcpy(bp.c_str()+j, &i, sizeof(i)); | |
1012 | j += 4096; | |
1013 | } | |
1014 | ||
1015 | bl.append(bp); | |
1016 | rand_data.push_back(bl); | |
1017 | } | |
1018 | } | |
1019 | ||
1020 | ConnectionRef _get_random_connection() { | |
1021 | while (dispatcher.get_pending() > max_in_flight) { | |
1022 | lock.Unlock(); | |
1023 | usleep(500); | |
1024 | lock.Lock(); | |
1025 | } | |
1026 | assert(lock.is_locked()); | |
1027 | boost::uniform_int<> choose(0, available_connections.size() - 1); | |
1028 | int index = choose(rng); | |
1029 | map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin(); | |
1030 | for (; index > 0; --index, ++i) ; | |
1031 | return i->first; | |
1032 | } | |
1033 | ||
1034 | bool can_create_connection() { | |
1035 | return available_connections.size() < max_connections; | |
1036 | } | |
1037 | ||
1038 | void generate_connection() { | |
1039 | Mutex::Locker l(lock); | |
1040 | if (!can_create_connection()) | |
1041 | return ; | |
1042 | ||
1043 | Messenger *server, *client; | |
1044 | { | |
1045 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1046 | int index = choose(rng); | |
1047 | set<Messenger*>::iterator i = available_servers.begin(); | |
1048 | for (; index > 0; --index, ++i) ; | |
1049 | server = *i; | |
1050 | } | |
1051 | { | |
1052 | boost::uniform_int<> choose(0, available_clients.size() - 1); | |
1053 | int index = choose(rng); | |
1054 | set<Messenger*>::iterator i = available_clients.begin(); | |
1055 | for (; index > 0; --index, ++i) ; | |
1056 | client = *i; | |
1057 | } | |
1058 | ||
1059 | pair<Messenger*, Messenger*> p; | |
1060 | { | |
1061 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1062 | if (server->get_default_policy().server) { | |
1063 | p = make_pair(client, server); | |
1064 | } else { | |
1065 | ConnectionRef conn = client->get_connection(server->get_myinst()); | |
1066 | if (available_connections.count(conn) || choose(rng) % 2) | |
1067 | p = make_pair(client, server); | |
1068 | else | |
1069 | p = make_pair(server, client); | |
1070 | } | |
1071 | } | |
1072 | ConnectionRef conn = p.first->get_connection(p.second->get_myinst()); | |
1073 | available_connections[conn] = p; | |
1074 | } | |
1075 | ||
1076 | void send_message() { | |
1077 | Mutex::Locker l(lock); | |
1078 | ConnectionRef conn = _get_random_connection(); | |
1079 | boost::uniform_int<> true_false(0, 99); | |
1080 | int val = true_false(rng); | |
1081 | if (val >= 95) { | |
1082 | uuid_d uuid; | |
1083 | uuid.generate_random(); | |
1084 | MCommand *m = new MCommand(uuid); | |
1085 | vector<string> cmds; | |
1086 | cmds.push_back("command"); | |
1087 | m->cmd = cmds; | |
1088 | m->set_priority(200); | |
1089 | conn->send_message(m); | |
1090 | } else { | |
1091 | boost::uniform_int<> u(0, rand_data.size()-1); | |
1092 | dispatcher.send_message_wrap(conn, rand_data[u(rng)]); | |
1093 | } | |
1094 | } | |
1095 | ||
1096 | void drop_connection() { | |
1097 | Mutex::Locker l(lock); | |
1098 | if (available_connections.size() < 10) | |
1099 | return; | |
1100 | ConnectionRef conn = _get_random_connection(); | |
1101 | dispatcher.clear_pending(conn); | |
1102 | conn->mark_down(); | |
1103 | pair<Messenger*, Messenger*> &p = available_connections[conn]; | |
1104 | // it's a lossless policy, so we need to mark down each side | |
1105 | if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) { | |
1106 | ASSERT_EQ(conn->get_messenger(), p.first); | |
1107 | ConnectionRef peer = p.second->get_connection(p.first->get_myinst()); | |
1108 | peer->mark_down(); | |
1109 | dispatcher.clear_pending(peer); | |
1110 | available_connections.erase(peer); | |
1111 | } | |
1112 | ASSERT_EQ(available_connections.erase(conn), 1U); | |
1113 | } | |
1114 | ||
1115 | void print_internal_state(bool detail=false) { | |
1116 | Mutex::Locker l(lock); | |
1117 | lderr(g_ceph_context) << "available_connections: " << available_connections.size() | |
1118 | << " inflight messages: " << dispatcher.get_pending() << dendl; | |
1119 | if (detail && !available_connections.empty()) { | |
1120 | dispatcher.print(); | |
1121 | } | |
1122 | } | |
1123 | ||
1124 | void wait_for_done() { | |
1125 | int64_t tick_us = 1000 * 100; // 100ms | |
1126 | int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins | |
1127 | int i = 0; | |
1128 | while (dispatcher.get_pending()) { | |
1129 | usleep(tick_us); | |
1130 | timeout_us -= tick_us; | |
1131 | if (i++ % 50 == 0) | |
1132 | print_internal_state(true); | |
1133 | if (timeout_us < 0) | |
1134 | assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!"); | |
1135 | } | |
1136 | for (set<Messenger*>::iterator it = available_servers.begin(); | |
1137 | it != available_servers.end(); ++it) { | |
1138 | (*it)->shutdown(); | |
1139 | (*it)->wait(); | |
1140 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1141 | delete (*it); | |
1142 | } | |
1143 | available_servers.clear(); | |
1144 | ||
1145 | for (set<Messenger*>::iterator it = available_clients.begin(); | |
1146 | it != available_clients.end(); ++it) { | |
1147 | (*it)->shutdown(); | |
1148 | (*it)->wait(); | |
1149 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1150 | delete (*it); | |
1151 | } | |
1152 | available_clients.clear(); | |
1153 | } | |
1154 | ||
1155 | void handle_reset(Connection *con) { | |
1156 | Mutex::Locker l(lock); | |
1157 | available_connections.erase(con); | |
1158 | dispatcher.clear_pending(con); | |
1159 | } | |
1160 | }; | |
1161 | ||
1162 | bool SyntheticDispatcher::ms_handle_reset(Connection *con) { | |
1163 | workload->handle_reset(con); | |
1164 | return true; | |
1165 | } | |
1166 | ||
1167 | TEST_P(MessengerTest, SyntheticStressTest) { | |
1168 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, | |
1169 | Messenger::Policy::stateful_server(0), | |
1170 | Messenger::Policy::lossless_client(0)); | |
1171 | for (int i = 0; i < 100; ++i) { | |
1172 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1173 | test_msg.generate_connection(); | |
1174 | } | |
1175 | gen_type rng(time(NULL)); | |
1176 | for (int i = 0; i < 5000; ++i) { | |
1177 | if (!(i % 10)) { | |
1178 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1179 | test_msg.print_internal_state(); | |
1180 | } | |
1181 | boost::uniform_int<> true_false(0, 99); | |
1182 | int val = true_false(rng); | |
1183 | if (val > 90) { | |
1184 | test_msg.generate_connection(); | |
1185 | } else if (val > 80) { | |
1186 | test_msg.drop_connection(); | |
1187 | } else if (val > 10) { | |
1188 | test_msg.send_message(); | |
1189 | } else { | |
1190 | usleep(rand() % 1000 + 500); | |
1191 | } | |
1192 | } | |
1193 | test_msg.wait_for_done(); | |
1194 | } | |
1195 | ||
1196 | TEST_P(MessengerTest, SyntheticStressTest1) { | |
1197 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, | |
1198 | Messenger::Policy::lossless_peer_reuse(0), | |
1199 | Messenger::Policy::lossless_peer_reuse(0)); | |
1200 | for (int i = 0; i < 10; ++i) { | |
1201 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1202 | test_msg.generate_connection(); | |
1203 | } | |
1204 | gen_type rng(time(NULL)); | |
1205 | for (int i = 0; i < 10000; ++i) { | |
1206 | if (!(i % 10)) { | |
1207 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1208 | test_msg.print_internal_state(); | |
1209 | } | |
1210 | boost::uniform_int<> true_false(0, 99); | |
1211 | int val = true_false(rng); | |
1212 | if (val > 80) { | |
1213 | test_msg.generate_connection(); | |
1214 | } else if (val > 60) { | |
1215 | test_msg.drop_connection(); | |
1216 | } else if (val > 10) { | |
1217 | test_msg.send_message(); | |
1218 | } else { | |
1219 | usleep(rand() % 1000 + 500); | |
1220 | } | |
1221 | } | |
1222 | test_msg.wait_for_done(); | |
1223 | } | |
1224 | ||
1225 | ||
1226 | TEST_P(MessengerTest, SyntheticInjectTest) { | |
1227 | uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes; | |
1228 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); | |
1229 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1230 | g_ceph_context->_conf->set_val("ms_dispatch_throttle_bytes", "16777216"); | |
1231 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, | |
1232 | Messenger::Policy::stateful_server(0), | |
1233 | Messenger::Policy::lossless_client(0)); | |
1234 | for (int i = 0; i < 100; ++i) { | |
1235 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1236 | test_msg.generate_connection(); | |
1237 | } | |
1238 | gen_type rng(time(NULL)); | |
1239 | for (int i = 0; i < 1000; ++i) { | |
1240 | if (!(i % 10)) { | |
1241 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1242 | test_msg.print_internal_state(); | |
1243 | } | |
1244 | boost::uniform_int<> true_false(0, 99); | |
1245 | int val = true_false(rng); | |
1246 | if (val > 90) { | |
1247 | test_msg.generate_connection(); | |
1248 | } else if (val > 80) { | |
1249 | test_msg.drop_connection(); | |
1250 | } else if (val > 10) { | |
1251 | test_msg.send_message(); | |
1252 | } else { | |
1253 | usleep(rand() % 500 + 100); | |
1254 | } | |
1255 | } | |
1256 | test_msg.wait_for_done(); | |
1257 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1258 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1259 | g_ceph_context->_conf->set_val( | |
1260 | "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes)); | |
1261 | } | |
1262 | ||
1263 | TEST_P(MessengerTest, SyntheticInjectTest2) { | |
1264 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); | |
1265 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1266 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, | |
1267 | Messenger::Policy::lossless_peer_reuse(0), | |
1268 | Messenger::Policy::lossless_peer_reuse(0)); | |
1269 | for (int i = 0; i < 100; ++i) { | |
1270 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1271 | test_msg.generate_connection(); | |
1272 | } | |
1273 | gen_type rng(time(NULL)); | |
1274 | for (int i = 0; i < 1000; ++i) { | |
1275 | if (!(i % 10)) { | |
1276 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1277 | test_msg.print_internal_state(); | |
1278 | } | |
1279 | boost::uniform_int<> true_false(0, 99); | |
1280 | int val = true_false(rng); | |
1281 | if (val > 90) { | |
1282 | test_msg.generate_connection(); | |
1283 | } else if (val > 80) { | |
1284 | test_msg.drop_connection(); | |
1285 | } else if (val > 10) { | |
1286 | test_msg.send_message(); | |
1287 | } else { | |
1288 | usleep(rand() % 500 + 100); | |
1289 | } | |
1290 | } | |
1291 | test_msg.wait_for_done(); | |
1292 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1293 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1294 | } | |
1295 | ||
1296 | TEST_P(MessengerTest, SyntheticInjectTest3) { | |
1297 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "600"); | |
1298 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1299 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, | |
1300 | Messenger::Policy::stateless_server(0), | |
1301 | Messenger::Policy::lossy_client(0)); | |
1302 | for (int i = 0; i < 100; ++i) { | |
1303 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1304 | test_msg.generate_connection(); | |
1305 | } | |
1306 | gen_type rng(time(NULL)); | |
1307 | for (int i = 0; i < 1000; ++i) { | |
1308 | if (!(i % 10)) { | |
1309 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1310 | test_msg.print_internal_state(); | |
1311 | } | |
1312 | boost::uniform_int<> true_false(0, 99); | |
1313 | int val = true_false(rng); | |
1314 | if (val > 90) { | |
1315 | test_msg.generate_connection(); | |
1316 | } else if (val > 80) { | |
1317 | test_msg.drop_connection(); | |
1318 | } else if (val > 10) { | |
1319 | test_msg.send_message(); | |
1320 | } else { | |
1321 | usleep(rand() % 500 + 100); | |
1322 | } | |
1323 | } | |
1324 | test_msg.wait_for_done(); | |
1325 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1326 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1327 | } | |
1328 | ||
1329 | ||
1330 | TEST_P(MessengerTest, SyntheticInjectTest4) { | |
1331 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30"); | |
1332 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1"); | |
1333 | g_ceph_context->_conf->set_val("ms_inject_delay_probability", "1"); | |
1334 | g_ceph_context->_conf->set_val("ms_inject_delay_type", "client osd", false); | |
1335 | g_ceph_context->_conf->set_val("ms_inject_delay_max", "5"); | |
1336 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, | |
1337 | Messenger::Policy::lossless_peer(0), | |
1338 | Messenger::Policy::lossless_peer(0)); | |
1339 | for (int i = 0; i < 100; ++i) { | |
1340 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
1341 | test_msg.generate_connection(); | |
1342 | } | |
1343 | gen_type rng(time(NULL)); | |
1344 | for (int i = 0; i < 1000; ++i) { | |
1345 | if (!(i % 10)) { | |
1346 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
1347 | test_msg.print_internal_state(); | |
1348 | } | |
1349 | boost::uniform_int<> true_false(0, 99); | |
1350 | int val = true_false(rng); | |
1351 | if (val > 95) { | |
1352 | test_msg.generate_connection(); | |
1353 | } else if (val > 80) { | |
1354 | // test_msg.drop_connection(); | |
1355 | } else if (val > 10) { | |
1356 | test_msg.send_message(); | |
1357 | } else { | |
1358 | usleep(rand() % 500 + 100); | |
1359 | } | |
1360 | } | |
1361 | test_msg.wait_for_done(); | |
1362 | g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0"); | |
1363 | g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0"); | |
1364 | g_ceph_context->_conf->set_val("ms_inject_delay_probability", "0"); | |
1365 | g_ceph_context->_conf->set_val("ms_inject_delay_type", "", false); | |
1366 | g_ceph_context->_conf->set_val("ms_inject_delay_max", "0"); | |
1367 | } | |
1368 | ||
1369 | ||
1370 | class MarkdownDispatcher : public Dispatcher { | |
1371 | Mutex lock; | |
1372 | set<ConnectionRef> conns; | |
1373 | bool last_mark; | |
1374 | public: | |
31f18b77 | 1375 | std::atomic<uint64_t> count = { 0 }; |
7c673cae | 1376 | explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"), |
31f18b77 | 1377 | last_mark(false) {} |
7c673cae FG |
1378 | bool ms_can_fast_dispatch_any() const override { return false; } |
1379 | bool ms_can_fast_dispatch(const Message *m) const override { | |
1380 | switch (m->get_type()) { | |
1381 | case CEPH_MSG_PING: | |
1382 | return true; | |
1383 | default: | |
1384 | return false; | |
1385 | } | |
1386 | } | |
1387 | ||
1388 | void ms_handle_fast_connect(Connection *con) override { | |
1389 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
1390 | Mutex::Locker l(lock); | |
1391 | conns.insert(con); | |
1392 | } | |
1393 | void ms_handle_fast_accept(Connection *con) override { | |
1394 | Mutex::Locker l(lock); | |
1395 | conns.insert(con); | |
1396 | } | |
1397 | bool ms_dispatch(Message *m) override { | |
1398 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl; | |
1399 | Mutex::Locker l(lock); | |
31f18b77 | 1400 | count++; |
7c673cae FG |
1401 | conns.insert(m->get_connection()); |
1402 | if (conns.size() < 2 && !last_mark) { | |
1403 | m->put(); | |
1404 | return true; | |
1405 | } | |
1406 | ||
1407 | last_mark = true; | |
1408 | usleep(rand() % 500); | |
1409 | for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) { | |
1410 | if ((*it) != m->get_connection().get()) { | |
1411 | (*it)->mark_down(); | |
1412 | conns.erase(it); | |
1413 | break; | |
1414 | } | |
1415 | } | |
1416 | if (conns.empty()) | |
1417 | last_mark = false; | |
1418 | m->put(); | |
1419 | return true; | |
1420 | } | |
1421 | bool ms_handle_reset(Connection *con) override { | |
1422 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
1423 | Mutex::Locker l(lock); | |
1424 | conns.erase(con); | |
1425 | usleep(rand() % 500); | |
1426 | return true; | |
1427 | } | |
1428 | void ms_handle_remote_reset(Connection *con) override { | |
1429 | Mutex::Locker l(lock); | |
1430 | conns.erase(con); | |
1431 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
1432 | } | |
1433 | bool ms_handle_refused(Connection *con) override { | |
1434 | return false; | |
1435 | } | |
1436 | void ms_fast_dispatch(Message *m) override { | |
1437 | ceph_abort(); | |
1438 | } | |
1439 | bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, | |
1440 | bufferlist& authorizer, bufferlist& authorizer_reply, | |
28e407b8 AA |
1441 | bool& isvalid, CryptoKey& session_key, |
1442 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) override { | |
7c673cae FG |
1443 | isvalid = true; |
1444 | return true; | |
1445 | } | |
1446 | }; | |
1447 | ||
1448 | ||
1449 | // Markdown with external lock | |
1450 | TEST_P(MessengerTest, MarkdownTest) { | |
1451 | Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0); | |
1452 | MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1453 | entity_addr_t bind_addr; | |
1454 | bind_addr.parse("127.0.0.1:16800"); | |
1455 | server_msgr->bind(bind_addr); | |
1456 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1457 | server_msgr->start(); | |
1458 | bind_addr.parse("127.0.0.1:16801"); | |
1459 | server_msgr2->bind(bind_addr); | |
1460 | server_msgr2->add_dispatcher_head(&srv_dispatcher); | |
1461 | server_msgr2->start(); | |
1462 | ||
1463 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1464 | client_msgr->start(); | |
1465 | ||
1466 | int i = 1000; | |
1467 | uint64_t last = 0; | |
1468 | bool equal = false; | |
1469 | uint64_t equal_count = 0; | |
1470 | while (i--) { | |
1471 | ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst()); | |
1472 | ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst()); | |
1473 | MPing *m = new MPing(); | |
1474 | ASSERT_EQ(conn1->send_message(m), 0); | |
1475 | m = new MPing(); | |
1476 | ASSERT_EQ(conn2->send_message(m), 0); | |
31f18b77 FG |
1477 | CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1); |
1478 | if (srv_dispatcher.count == last) { | |
7c673cae FG |
1479 | lderr(g_ceph_context) << __func__ << " last is " << last << dendl; |
1480 | equal = true; | |
1481 | equal_count++; | |
1482 | } else { | |
1483 | equal = false; | |
1484 | equal_count = 0; | |
1485 | } | |
31f18b77 | 1486 | last = srv_dispatcher.count; |
7c673cae FG |
1487 | if (equal_count) |
1488 | usleep(1000*500); | |
1489 | ASSERT_FALSE(equal && equal_count > 3); | |
1490 | } | |
1491 | server_msgr->shutdown(); | |
1492 | client_msgr->shutdown(); | |
1493 | server_msgr2->shutdown(); | |
1494 | server_msgr->wait(); | |
1495 | client_msgr->wait(); | |
1496 | server_msgr2->wait(); | |
1497 | delete server_msgr2; | |
1498 | } | |
1499 | ||
1500 | INSTANTIATE_TEST_CASE_P( | |
1501 | Messenger, | |
1502 | MessengerTest, | |
1503 | ::testing::Values( | |
1504 | "async+posix", | |
1505 | "simple" | |
1506 | ) | |
1507 | ); | |
1508 | ||
1509 | #else | |
1510 | ||
1511 | // Google Test may not support value-parameterized tests with some | |
1512 | // compilers. If we use conditional compilation to compile out all | |
1513 | // code referring to the gtest_main library, MSVC linker will not link | |
1514 | // that library at all and consequently complain about missing entry | |
1515 | // point defined in that library (fatal error LNK1561: entry point | |
1516 | // must be defined). This dummy test keeps gtest_main linked in. | |
1517 | TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {} | |
1518 | ||
1519 | #endif | |
1520 | ||
1521 | ||
1522 | int main(int argc, char **argv) { | |
1523 | vector<const char*> args; | |
1524 | argv_to_vec(argc, (const char **)argv, args); | |
1525 | env_to_vec(args); | |
1526 | ||
1527 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); | |
1528 | g_ceph_context->_conf->set_val("auth_cluster_required", "none"); | |
1529 | g_ceph_context->_conf->set_val("auth_service_required", "none"); | |
1530 | g_ceph_context->_conf->set_val("auth_client_required", "none"); | |
1531 | g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async"); | |
1532 | g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true"); | |
1533 | g_ceph_context->_conf->set_val("ms_die_on_old_message", "true"); | |
1534 | g_ceph_context->_conf->set_val("ms_max_backoff", "1"); | |
1535 | common_init_finish(g_ceph_context); | |
1536 | ||
1537 | ::testing::InitGoogleTest(&argc, argv); | |
1538 | return RUN_ALL_TESTS(); | |
1539 | } | |
1540 | ||
1541 | /* | |
1542 | * Local Variables: | |
1543 | * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr" | |
1544 | * End: | |
1545 | */ |