]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <atomic> | |
18 | #include <iostream> | |
20effc67 | 19 | #include <list> |
f6b5b4d7 | 20 | #include <memory> |
20effc67 | 21 | #include <set> |
7c673cae FG |
22 | #include <stdlib.h> |
23 | #include <time.h> | |
20effc67 TL |
24 | #include <unistd.h> |
25 | ||
26 | #include <boost/random/binomial_distribution.hpp> | |
27 | #include <boost/random/mersenne_twister.hpp> | |
28 | #include <boost/random/uniform_int.hpp> | |
29 | #include <gtest/gtest.h> | |
30 | ||
31 | #define MSG_POLICY_UNIT_TESTING | |
32 | ||
7c673cae | 33 | #include "common/ceph_argparse.h" |
20effc67 | 34 | #include "common/ceph_mutex.h" |
7c673cae | 35 | #include "global/global_init.h" |
20effc67 TL |
36 | #include "messages/MCommand.h" |
37 | #include "messages/MPing.h" | |
38 | #include "msg/Connection.h" | |
7c673cae | 39 | #include "msg/Dispatcher.h" |
7c673cae FG |
40 | #include "msg/Message.h" |
41 | #include "msg/Messenger.h" | |
20effc67 | 42 | #include "msg/msg_types.h" |
7c673cae FG |
43 | |
44 | typedef boost::mt11213b gen_type; | |
45 | ||
46 | #include "common/dout.h" | |
11fdf7f2 TL |
47 | #include "include/ceph_assert.h" |
48 | ||
49 | #include "auth/DummyAuth.h" | |
7c673cae FG |
50 | |
51 | #define dout_subsys ceph_subsys_ms | |
52 | #undef dout_prefix | |
53 | #define dout_prefix *_dout << " ceph_test_msgr " | |
54 | ||
55 | ||
7c673cae FG |
56 | #define CHECK_AND_WAIT_TRUE(expr) do { \ |
57 | int n = 1000; \ | |
58 | while (--n) { \ | |
59 | if (expr) \ | |
60 | break; \ | |
61 | usleep(1000); \ | |
62 | } \ | |
63 | } while(0); | |
64 | ||
20effc67 TL |
65 | using namespace std; |
66 | ||
7c673cae FG |
67 | class MessengerTest : public ::testing::TestWithParam<const char*> { |
68 | public: | |
11fdf7f2 | 69 | DummyAuthClientServer dummy_auth; |
7c673cae FG |
70 | Messenger *server_msgr; |
71 | Messenger *client_msgr; | |
72 | ||
11fdf7f2 TL |
73 | MessengerTest() : dummy_auth(g_ceph_context), |
74 | server_msgr(NULL), client_msgr(NULL) { | |
75 | dummy_auth.auth_registry.refresh_config(); | |
76 | } | |
7c673cae FG |
77 | void SetUp() override { |
78 | lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl; | |
f67539c2 TL |
79 | server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); |
80 | client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid()); | |
7c673cae FG |
81 | server_msgr->set_default_policy(Messenger::Policy::stateless_server(0)); |
82 | client_msgr->set_default_policy(Messenger::Policy::lossy_client(0)); | |
11fdf7f2 TL |
83 | server_msgr->set_auth_client(&dummy_auth); |
84 | server_msgr->set_auth_server(&dummy_auth); | |
85 | client_msgr->set_auth_client(&dummy_auth); | |
86 | client_msgr->set_auth_server(&dummy_auth); | |
9f95a23c | 87 | server_msgr->set_require_authorizer(false); |
7c673cae FG |
88 | } |
89 | void TearDown() override { | |
90 | ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0); | |
91 | ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0); | |
92 | delete server_msgr; | |
93 | delete client_msgr; | |
94 | } | |
95 | ||
96 | }; | |
97 | ||
98 | ||
99 | class FakeDispatcher : public Dispatcher { | |
100 | public: | |
101 | struct Session : public RefCountedObject { | |
102 | atomic<uint64_t> count; | |
103 | ConnectionRef con; | |
104 | ||
105 | explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) { | |
106 | } | |
107 | uint64_t get_count() { return count; } | |
108 | }; | |
109 | ||
9f95a23c TL |
110 | ceph::mutex lock = ceph::make_mutex("FakeDispatcher::lock"); |
111 | ceph::condition_variable cond; | |
7c673cae FG |
112 | bool is_server; |
113 | bool got_new; | |
114 | bool got_remote_reset; | |
115 | bool got_connect; | |
116 | bool loopback; | |
11fdf7f2 | 117 | entity_addrvec_t last_accept; |
9f95a23c | 118 | ConnectionRef *last_accept_con_ptr = nullptr; |
7c673cae | 119 | |
9f95a23c | 120 | explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), |
7c673cae | 121 | is_server(s), got_new(false), got_remote_reset(false), |
11fdf7f2 | 122 | got_connect(false), loopback(false) { |
11fdf7f2 | 123 | } |
7c673cae FG |
124 | bool ms_can_fast_dispatch_any() const override { return true; } |
125 | bool ms_can_fast_dispatch(const Message *m) const override { | |
126 | switch (m->get_type()) { | |
127 | case CEPH_MSG_PING: | |
128 | return true; | |
129 | default: | |
130 | return false; | |
131 | } | |
132 | } | |
133 | ||
134 | void ms_handle_fast_connect(Connection *con) override { | |
9f95a23c | 135 | std::scoped_lock l{lock}; |
7c673cae | 136 | lderr(g_ceph_context) << __func__ << " " << con << dendl; |
11fdf7f2 | 137 | auto s = con->get_priv(); |
7c673cae | 138 | if (!s) { |
11fdf7f2 TL |
139 | auto session = new Session(con); |
140 | con->set_priv(RefCountedPtr{session, false}); | |
141 | lderr(g_ceph_context) << __func__ << " con: " << con | |
142 | << " count: " << session->count << dendl; | |
7c673cae | 143 | } |
7c673cae | 144 | got_connect = true; |
9f95a23c | 145 | cond.notify_all(); |
7c673cae FG |
146 | } |
147 | void ms_handle_fast_accept(Connection *con) override { | |
11fdf7f2 | 148 | last_accept = con->get_peer_addrs(); |
9f95a23c TL |
149 | if (last_accept_con_ptr) { |
150 | *last_accept_con_ptr = con; | |
151 | } | |
11fdf7f2 TL |
152 | if (!con->get_priv()) { |
153 | con->set_priv(RefCountedPtr{new Session(con), false}); | |
7c673cae | 154 | } |
7c673cae FG |
155 | } |
156 | bool ms_dispatch(Message *m) override { | |
11fdf7f2 TL |
157 | auto priv = m->get_connection()->get_priv(); |
158 | auto s = static_cast<Session*>(priv.get()); | |
7c673cae FG |
159 | if (!s) { |
160 | s = new Session(m->get_connection()); | |
11fdf7f2 TL |
161 | priv.reset(s, false); |
162 | m->get_connection()->set_priv(priv); | |
7c673cae | 163 | } |
7c673cae FG |
164 | s->count++; |
165 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
166 | if (is_server) { | |
167 | reply_message(m); | |
168 | } | |
9f95a23c | 169 | std::lock_guard l{lock}; |
7c673cae | 170 | got_new = true; |
9f95a23c | 171 | cond.notify_all(); |
7c673cae FG |
172 | m->put(); |
173 | return true; | |
174 | } | |
175 | bool ms_handle_reset(Connection *con) override { | |
9f95a23c | 176 | std::lock_guard l{lock}; |
7c673cae | 177 | lderr(g_ceph_context) << __func__ << " " << con << dendl; |
11fdf7f2 TL |
178 | auto priv = con->get_priv(); |
179 | if (auto s = static_cast<Session*>(priv.get()); s) { | |
180 | s->con.reset(); // break con <-> session ref cycle | |
181 | con->set_priv(nullptr); // break ref <-> session cycle, if any | |
7c673cae FG |
182 | } |
183 | return true; | |
184 | } | |
185 | void ms_handle_remote_reset(Connection *con) override { | |
9f95a23c | 186 | std::lock_guard l{lock}; |
7c673cae | 187 | lderr(g_ceph_context) << __func__ << " " << con << dendl; |
11fdf7f2 TL |
188 | auto priv = con->get_priv(); |
189 | if (auto s = static_cast<Session*>(priv.get()); s) { | |
190 | s->con.reset(); // break con <-> session ref cycle | |
191 | con->set_priv(nullptr); // break ref <-> session cycle, if any | |
7c673cae FG |
192 | } |
193 | got_remote_reset = true; | |
9f95a23c | 194 | cond.notify_all(); |
7c673cae FG |
195 | } |
196 | bool ms_handle_refused(Connection *con) override { | |
197 | return false; | |
198 | } | |
199 | void ms_fast_dispatch(Message *m) override { | |
11fdf7f2 TL |
200 | auto priv = m->get_connection()->get_priv(); |
201 | auto s = static_cast<Session*>(priv.get()); | |
7c673cae FG |
202 | if (!s) { |
203 | s = new Session(m->get_connection()); | |
11fdf7f2 TL |
204 | priv.reset(s, false); |
205 | m->get_connection()->set_priv(priv); | |
7c673cae | 206 | } |
7c673cae FG |
207 | s->count++; |
208 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; | |
209 | if (is_server) { | |
210 | if (loopback) | |
11fdf7f2 | 211 | ceph_assert(m->get_source().is_osd()); |
7c673cae FG |
212 | else |
213 | reply_message(m); | |
214 | } else if (loopback) { | |
11fdf7f2 | 215 | ceph_assert(m->get_source().is_client()); |
7c673cae FG |
216 | } |
217 | m->put(); | |
9f95a23c | 218 | std::lock_guard l{lock}; |
7c673cae | 219 | got_new = true; |
9f95a23c | 220 | cond.notify_all(); |
7c673cae FG |
221 | } |
222 | ||
11fdf7f2 TL |
223 | int ms_handle_authentication(Connection *con) override { |
224 | return 1; | |
7c673cae FG |
225 | } |
226 | ||
227 | void reply_message(Message *m) { | |
228 | MPing *rm = new MPing(); | |
229 | m->get_connection()->send_message(rm); | |
230 | } | |
231 | }; | |
232 | ||
233 | typedef FakeDispatcher::Session Session; | |
234 | ||
11fdf7f2 TL |
235 | struct TestInterceptor : public Interceptor { |
236 | ||
237 | bool step_waiting = false; | |
238 | bool waiting = true; | |
239 | std::map<Connection *, uint32_t> current_step; | |
240 | std::map<Connection *, std::list<uint32_t>> step_history; | |
241 | std::map<uint32_t, std::optional<ACTION>> decisions; | |
242 | std::set<uint32_t> breakpoints; | |
243 | ||
244 | uint32_t count_step(Connection *conn, uint32_t step) { | |
245 | uint32_t count = 0; | |
246 | for (auto s : step_history[conn]) { | |
247 | if (s == step) { | |
248 | count++; | |
249 | } | |
250 | } | |
251 | return count; | |
252 | } | |
253 | ||
254 | void breakpoint(uint32_t step) { | |
255 | breakpoints.insert(step); | |
256 | } | |
257 | ||
258 | void remove_bp(uint32_t step) { | |
259 | breakpoints.erase(step); | |
260 | } | |
261 | ||
262 | Connection *wait(uint32_t step, Connection *conn=nullptr) { | |
263 | std::unique_lock<std::mutex> l(lock); | |
264 | while(true) { | |
265 | if (conn) { | |
266 | auto it = current_step.find(conn); | |
267 | if (it != current_step.end()) { | |
268 | if (it->second == step) { | |
269 | break; | |
270 | } | |
271 | } | |
272 | } else { | |
273 | for (auto it : current_step) { | |
274 | if (it.second == step) { | |
275 | conn = it.first; | |
276 | break; | |
277 | } | |
278 | } | |
279 | if (conn) { | |
280 | break; | |
281 | } | |
282 | } | |
283 | step_waiting = true; | |
284 | cond_var.wait(l); | |
285 | } | |
286 | step_waiting = false; | |
287 | return conn; | |
288 | } | |
289 | ||
290 | ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) { | |
291 | if (decisions[step]) { | |
292 | return *(decisions[step]); | |
293 | } | |
294 | waiting = true; | |
9f95a23c | 295 | cond_var.wait(l, [this] { return !waiting; }); |
11fdf7f2 TL |
296 | return *(decisions[step]); |
297 | } | |
298 | ||
299 | void proceed(uint32_t step, ACTION decision) { | |
300 | std::unique_lock<std::mutex> l(lock); | |
301 | decisions[step] = decision; | |
302 | if (waiting) { | |
303 | waiting = false; | |
304 | cond_var.notify_one(); | |
305 | } | |
306 | } | |
307 | ||
308 | ACTION intercept(Connection *conn, uint32_t step) override { | |
309 | lderr(g_ceph_context) << __func__ << " conn(" << conn | |
310 | << ") intercept called on step=" << step << dendl; | |
311 | ||
312 | { | |
313 | std::unique_lock<std::mutex> l(lock); | |
314 | step_history[conn].push_back(step); | |
315 | current_step[conn] = step; | |
316 | if (step_waiting) { | |
317 | cond_var.notify_one(); | |
318 | } | |
319 | } | |
320 | ||
321 | std::unique_lock<std::mutex> l(lock); | |
322 | ACTION decision = ACTION::CONTINUE; | |
323 | if (breakpoints.find(step) != breakpoints.end()) { | |
324 | lderr(g_ceph_context) << __func__ << " conn(" << conn | |
325 | << ") pausing on step=" << step << dendl; | |
326 | decision = wait_for_decision(step, l); | |
327 | } else { | |
328 | if (decisions[step]) { | |
329 | decision = *(decisions[step]); | |
330 | } | |
331 | } | |
332 | lderr(g_ceph_context) << __func__ << " conn(" << conn | |
333 | << ") resuming step=" << step << " with decision=" | |
334 | << decision << dendl; | |
335 | decisions[step].reset(); | |
336 | return decision; | |
337 | } | |
338 | ||
339 | }; | |
340 | ||
341 | /** | |
342 | * Scenario: A connects to B, and B connects to A at the same time. | |
343 | */ | |
344 | TEST_P(MessengerTest, ConnectionRaceTest) { | |
11fdf7f2 TL |
345 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); |
346 | ||
347 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
348 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
349 | ||
350 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0)); | |
351 | server_msgr->interceptor = srv_interceptor; | |
352 | ||
353 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0)); | |
354 | client_msgr->interceptor = cli_interceptor; | |
355 | ||
356 | entity_addr_t bind_addr; | |
357 | bind_addr.parse("v2:127.0.0.1:3300"); | |
358 | server_msgr->bind(bind_addr); | |
359 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
360 | server_msgr->start(); | |
361 | ||
362 | bind_addr.parse("v2:127.0.0.1:3301"); | |
363 | client_msgr->bind(bind_addr); | |
364 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
365 | client_msgr->start(); | |
366 | ||
367 | // pause before sending client_ident message | |
f67539c2 | 368 | cli_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
11fdf7f2 | 369 | // pause before sending client_ident message |
f67539c2 | 370 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
11fdf7f2 TL |
371 | |
372 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
373 | server_msgr->get_myaddrs()); | |
374 | MPing *m1 = new MPing(); | |
375 | ASSERT_EQ(c2s->send_message(m1), 0); | |
376 | ||
377 | ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), | |
378 | client_msgr->get_myaddrs()); | |
379 | MPing *m2 = new MPing(); | |
380 | ASSERT_EQ(s2c->send_message(m2), 0); | |
381 | ||
f67539c2 TL |
382 | cli_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, c2s.get()); |
383 | srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY, s2c.get()); | |
11fdf7f2 TL |
384 | |
385 | // at this point both connections (A->B, B->A) are paused just before sending | |
386 | // the client_ident message. | |
387 | ||
f67539c2 TL |
388 | cli_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
389 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); | |
11fdf7f2 | 390 | |
f67539c2 TL |
391 | cli_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); |
392 | srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); | |
11fdf7f2 TL |
393 | |
394 | { | |
9f95a23c TL |
395 | std::unique_lock l{cli_dispatcher.lock}; |
396 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
397 | cli_dispatcher.got_new = false; |
398 | } | |
399 | ||
400 | { | |
9f95a23c TL |
401 | std::unique_lock l{srv_dispatcher.lock}; |
402 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
11fdf7f2 TL |
403 | srv_dispatcher.got_new = false; |
404 | } | |
405 | ||
406 | ASSERT_TRUE(s2c->is_connected()); | |
407 | ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count()); | |
408 | ASSERT_TRUE(s2c->peer_is_client()); | |
409 | ||
410 | ASSERT_TRUE(c2s->is_connected()); | |
411 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
412 | ASSERT_TRUE(c2s->peer_is_osd()); | |
413 | ||
414 | client_msgr->shutdown(); | |
415 | client_msgr->wait(); | |
416 | server_msgr->shutdown(); | |
417 | server_msgr->wait(); | |
418 | ||
419 | delete cli_interceptor; | |
420 | delete srv_interceptor; | |
421 | } | |
422 | ||
f6b5b4d7 TL |
423 | /** |
424 | * Scenario: A connects to B, and B connects to A at the same time. | |
425 | * The first (A -> B) connection gets to message flow handshake, the | |
426 | * second (B -> A) connection is stuck waiting for a banner from A. | |
427 | * After A sends client_ident to B, the first connection wins and B | |
428 | * calls reuse_connection() to replace the second connection's socket | |
429 | * while the second connection is still in BANNER_CONNECTING. | |
430 | */ | |
431 | TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) { | |
432 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); | |
433 | ||
434 | auto cli_interceptor = std::make_unique<TestInterceptor>(); | |
435 | auto srv_interceptor = std::make_unique<TestInterceptor>(); | |
436 | ||
437 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, | |
438 | Messenger::Policy::lossless_peer_reuse(0)); | |
439 | server_msgr->interceptor = srv_interceptor.get(); | |
440 | ||
441 | client_msgr->set_policy(entity_name_t::TYPE_OSD, | |
442 | Messenger::Policy::lossless_peer_reuse(0)); | |
443 | client_msgr->interceptor = cli_interceptor.get(); | |
444 | ||
445 | entity_addr_t bind_addr; | |
446 | bind_addr.parse("v2:127.0.0.1:3300"); | |
447 | server_msgr->bind(bind_addr); | |
448 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
449 | server_msgr->start(); | |
450 | ||
451 | bind_addr.parse("v2:127.0.0.1:3301"); | |
452 | client_msgr->bind(bind_addr); | |
453 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
454 | client_msgr->start(); | |
455 | ||
456 | // pause before sending client_ident message | |
f67539c2 | 457 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
f6b5b4d7 TL |
458 | |
459 | ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), | |
460 | client_msgr->get_myaddrs()); | |
461 | MPing *m1 = new MPing(); | |
462 | ASSERT_EQ(s2c->send_message(m1), 0); | |
463 | ||
f67539c2 TL |
464 | srv_interceptor->wait(Interceptor::STEP::SEND_CLIENT_IDENTITY); |
465 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_CLIENT_IDENTITY); | |
f6b5b4d7 TL |
466 | |
467 | // pause before sending banner | |
f67539c2 | 468 | cli_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); |
f6b5b4d7 TL |
469 | |
470 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
471 | server_msgr->get_myaddrs()); | |
472 | MPing *m2 = new MPing(); | |
473 | ASSERT_EQ(c2s->send_message(m2), 0); | |
474 | ||
f67539c2 TL |
475 | cli_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); |
476 | cli_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING); | |
f6b5b4d7 TL |
477 | |
478 | // second connection is in BANNER_CONNECTING, ensure it stays so | |
479 | // and send client_ident | |
f67539c2 TL |
480 | srv_interceptor->breakpoint(Interceptor::STEP::BANNER_EXCHANGE); |
481 | srv_interceptor->proceed(Interceptor::STEP::SEND_CLIENT_IDENTITY, Interceptor::ACTION::CONTINUE); | |
f6b5b4d7 TL |
482 | |
483 | // handle client_ident -- triggers reuse_connection() with exproto | |
484 | // in BANNER_CONNECTING | |
f67539c2 TL |
485 | cli_interceptor->breakpoint(Interceptor::STEP::READY); |
486 | cli_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE_BANNER_CONNECTING, Interceptor::ACTION::CONTINUE); | |
f6b5b4d7 | 487 | |
f67539c2 TL |
488 | cli_interceptor->wait(Interceptor::STEP::READY); |
489 | cli_interceptor->remove_bp(Interceptor::STEP::READY); | |
f6b5b4d7 TL |
490 | |
491 | // first connection is in READY | |
f67539c2 TL |
492 | Connection *s2c_accepter = srv_interceptor->wait(Interceptor::STEP::BANNER_EXCHANGE); |
493 | srv_interceptor->remove_bp(Interceptor::STEP::BANNER_EXCHANGE); | |
f6b5b4d7 | 494 | |
f67539c2 TL |
495 | srv_interceptor->proceed(Interceptor::STEP::BANNER_EXCHANGE, Interceptor::ACTION::CONTINUE); |
496 | cli_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); | |
f6b5b4d7 TL |
497 | |
498 | { | |
499 | std::unique_lock l{cli_dispatcher.lock}; | |
500 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
501 | cli_dispatcher.got_new = false; | |
502 | } | |
503 | ||
504 | { | |
505 | std::unique_lock l{srv_dispatcher.lock}; | |
506 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
507 | srv_dispatcher.got_new = false; | |
508 | } | |
509 | ||
510 | EXPECT_TRUE(s2c->is_connected()); | |
511 | EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count()); | |
512 | EXPECT_TRUE(s2c->peer_is_client()); | |
513 | ||
514 | EXPECT_TRUE(c2s->is_connected()); | |
515 | EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
516 | EXPECT_TRUE(c2s->peer_is_osd()); | |
517 | ||
518 | // closed in reuse_connection() -- EPIPE when writing banner/hello | |
519 | EXPECT_FALSE(s2c_accepter->is_connected()); | |
520 | ||
521 | // established exactly once, never faulted and reconnected | |
f67539c2 TL |
522 | EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::START_CLIENT_BANNER_EXCHANGE), 1u); |
523 | EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 0u); | |
524 | EXPECT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::READY), 1u); | |
f6b5b4d7 TL |
525 | |
526 | client_msgr->shutdown(); | |
527 | client_msgr->wait(); | |
528 | server_msgr->shutdown(); | |
529 | server_msgr->wait(); | |
530 | } | |
531 | ||
11fdf7f2 TL |
532 | /** |
533 | * Scenario: | |
534 | * - A connects to B | |
535 | * - A sends client_ident to B | |
536 | * - B fails before sending server_ident to A | |
537 | * - A reconnects | |
538 | */ | |
539 | TEST_P(MessengerTest, MissingServerIdenTest) { | |
11fdf7f2 TL |
540 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); |
541 | ||
542 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
543 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
544 | ||
545 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); | |
546 | server_msgr->interceptor = srv_interceptor; | |
547 | ||
548 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0)); | |
549 | client_msgr->interceptor = cli_interceptor; | |
550 | ||
551 | entity_addr_t bind_addr; | |
552 | bind_addr.parse("v2:127.0.0.1:3300"); | |
553 | server_msgr->bind(bind_addr); | |
554 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
555 | server_msgr->start(); | |
556 | ||
557 | bind_addr.parse("v2:127.0.0.1:3301"); | |
558 | client_msgr->bind(bind_addr); | |
559 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
560 | client_msgr->start(); | |
561 | ||
f67539c2 TL |
562 | // pause before sending server_ident message |
563 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
564 | |
565 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
566 | server_msgr->get_myaddrs()); | |
567 | MPing *m1 = new MPing(); | |
568 | ASSERT_EQ(c2s->send_message(m1), 0); | |
569 | ||
f67539c2 TL |
570 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); |
571 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
572 | |
573 | // We inject a message from this side of the connection to force it to be | |
574 | // in standby when we inject the failure below | |
575 | MPing *m2 = new MPing(); | |
576 | ASSERT_EQ(c2s_accepter->send_message(m2), 0); | |
577 | ||
f67539c2 | 578 | srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
579 | |
580 | { | |
9f95a23c TL |
581 | std::unique_lock l{srv_dispatcher.lock}; |
582 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
11fdf7f2 TL |
583 | srv_dispatcher.got_new = false; |
584 | } | |
585 | ||
586 | { | |
9f95a23c TL |
587 | std::unique_lock l{cli_dispatcher.lock}; |
588 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
589 | cli_dispatcher.got_new = false; |
590 | } | |
591 | ||
592 | ASSERT_TRUE(c2s->is_connected()); | |
593 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
594 | ASSERT_TRUE(c2s->peer_is_osd()); | |
595 | ||
596 | ASSERT_TRUE(c2s_accepter->is_connected()); | |
597 | ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count()); | |
598 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
599 | ||
600 | client_msgr->shutdown(); | |
601 | client_msgr->wait(); | |
602 | server_msgr->shutdown(); | |
603 | server_msgr->wait(); | |
604 | ||
605 | delete cli_interceptor; | |
606 | delete srv_interceptor; | |
607 | } | |
608 | ||
609 | /** | |
610 | * Scenario: | |
611 | * - A connects to B | |
612 | * - A sends client_ident to B | |
613 | * - B fails before sending server_ident to A | |
614 | * - A goes to standby | |
615 | * - B reconnects to A | |
616 | */ | |
617 | TEST_P(MessengerTest, MissingServerIdenTest2) { | |
11fdf7f2 TL |
618 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); |
619 | ||
620 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
621 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
622 | ||
623 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); | |
624 | server_msgr->interceptor = srv_interceptor; | |
625 | ||
626 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); | |
627 | client_msgr->interceptor = cli_interceptor; | |
628 | ||
629 | entity_addr_t bind_addr; | |
630 | bind_addr.parse("v2:127.0.0.1:3300"); | |
631 | server_msgr->bind(bind_addr); | |
632 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
633 | server_msgr->start(); | |
634 | ||
635 | bind_addr.parse("v2:127.0.0.1:3301"); | |
636 | client_msgr->bind(bind_addr); | |
637 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
638 | client_msgr->start(); | |
639 | ||
f67539c2 TL |
640 | // pause before sending server_ident message |
641 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
642 | |
643 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
644 | server_msgr->get_myaddrs()); | |
645 | ||
f67539c2 TL |
646 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::SEND_SERVER_IDENTITY); |
647 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_SERVER_IDENTITY); | |
11fdf7f2 TL |
648 | |
649 | // We inject a message from this side of the connection to force it to be | |
650 | // in standby when we inject the failure below | |
651 | MPing *m2 = new MPing(); | |
652 | ASSERT_EQ(c2s_accepter->send_message(m2), 0); | |
653 | ||
f67539c2 | 654 | srv_interceptor->proceed(Interceptor::STEP::SEND_SERVER_IDENTITY, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
655 | |
656 | { | |
9f95a23c TL |
657 | std::unique_lock l{cli_dispatcher.lock}; |
658 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
659 | cli_dispatcher.got_new = false; |
660 | } | |
661 | ||
662 | ASSERT_TRUE(c2s->is_connected()); | |
663 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
664 | ASSERT_TRUE(c2s->peer_is_osd()); | |
665 | ||
666 | ASSERT_TRUE(c2s_accepter->is_connected()); | |
667 | ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count()); | |
668 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
669 | ||
670 | client_msgr->shutdown(); | |
671 | client_msgr->wait(); | |
672 | server_msgr->shutdown(); | |
673 | server_msgr->wait(); | |
674 | ||
675 | delete cli_interceptor; | |
676 | delete srv_interceptor; | |
677 | } | |
678 | ||
679 | /** | |
680 | * Scenario: | |
681 | * - A connects to B | |
682 | * - A and B exchange messages | |
683 | * - A fails | |
684 | * - B goes into standby | |
685 | * - A reconnects | |
686 | */ | |
687 | TEST_P(MessengerTest, ReconnectTest) { | |
11fdf7f2 TL |
688 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
689 | ||
690 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
691 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
692 | ||
693 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0)); | |
694 | server_msgr->interceptor = srv_interceptor; | |
695 | ||
696 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); | |
697 | client_msgr->interceptor = cli_interceptor; | |
698 | ||
699 | entity_addr_t bind_addr; | |
700 | bind_addr.parse("v2:127.0.0.1:3300"); | |
701 | server_msgr->bind(bind_addr); | |
702 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
703 | server_msgr->start(); | |
704 | ||
705 | bind_addr.parse("v2:127.0.0.1:3301"); | |
706 | client_msgr->bind(bind_addr); | |
707 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
708 | client_msgr->start(); | |
709 | ||
710 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
711 | server_msgr->get_myaddrs()); | |
712 | ||
713 | MPing *m1 = new MPing(); | |
714 | ASSERT_EQ(c2s->send_message(m1), 0); | |
715 | ||
716 | { | |
9f95a23c TL |
717 | std::unique_lock l{cli_dispatcher.lock}; |
718 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
719 | cli_dispatcher.got_new = false; |
720 | } | |
721 | ||
722 | ASSERT_TRUE(c2s->is_connected()); | |
723 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
724 | ASSERT_TRUE(c2s->peer_is_osd()); | |
725 | ||
f67539c2 | 726 | cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); |
11fdf7f2 TL |
727 | |
728 | MPing *m2 = new MPing(); | |
729 | ASSERT_EQ(c2s->send_message(m2), 0); | |
730 | ||
f67539c2 TL |
731 | cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); |
732 | cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); | |
11fdf7f2 TL |
733 | |
734 | // at this point client and server are connected together | |
735 | ||
f67539c2 | 736 | srv_interceptor->breakpoint(Interceptor::STEP::READY); |
11fdf7f2 TL |
737 | |
738 | // failing client | |
f67539c2 | 739 | cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
740 | |
741 | MPing *m3 = new MPing(); | |
742 | ASSERT_EQ(c2s->send_message(m3), 0); | |
743 | ||
f67539c2 | 744 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); |
11fdf7f2 TL |
745 | // the srv end of theconnection is now paused at ready |
746 | // this means that the reconnect was successful | |
f67539c2 | 747 | srv_interceptor->remove_bp(Interceptor::STEP::READY); |
11fdf7f2 TL |
748 | |
749 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
750 | // c2s_accepter sent 0 reconnect messages | |
f67539c2 | 751 | ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); |
11fdf7f2 | 752 | // c2s_accepter sent 1 reconnect_ok messages |
f67539c2 | 753 | ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 1u); |
11fdf7f2 | 754 | // c2s sent 1 reconnect messages |
f67539c2 | 755 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); |
11fdf7f2 | 756 | // c2s sent 0 reconnect_ok messages |
f67539c2 | 757 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 0u); |
11fdf7f2 TL |
758 | |
759 | srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); | |
760 | ||
761 | { | |
9f95a23c TL |
762 | std::unique_lock l{cli_dispatcher.lock}; |
763 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
764 | cli_dispatcher.got_new = false; |
765 | } | |
766 | ||
767 | client_msgr->shutdown(); | |
768 | client_msgr->wait(); | |
769 | server_msgr->shutdown(); | |
770 | server_msgr->wait(); | |
771 | ||
772 | delete cli_interceptor; | |
773 | delete srv_interceptor; | |
774 | } | |
775 | ||
776 | /** | |
777 | * Scenario: | |
778 | * - A connects to B | |
779 | * - A and B exchange messages | |
780 | * - A fails | |
781 | * - A reconnects // B reconnects | |
782 | */ | |
783 | TEST_P(MessengerTest, ReconnectRaceTest) { | |
11fdf7f2 TL |
784 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
785 | ||
786 | TestInterceptor *cli_interceptor = new TestInterceptor(); | |
787 | TestInterceptor *srv_interceptor = new TestInterceptor(); | |
788 | ||
789 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0)); | |
790 | server_msgr->interceptor = srv_interceptor; | |
791 | ||
792 | client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0)); | |
793 | client_msgr->interceptor = cli_interceptor; | |
794 | ||
795 | entity_addr_t bind_addr; | |
796 | bind_addr.parse("v2:127.0.0.1:3300"); | |
797 | server_msgr->bind(bind_addr); | |
798 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
799 | server_msgr->start(); | |
800 | ||
801 | bind_addr.parse("v2:127.0.0.1:3301"); | |
802 | client_msgr->bind(bind_addr); | |
803 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
804 | client_msgr->start(); | |
805 | ||
806 | ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), | |
807 | server_msgr->get_myaddrs()); | |
808 | ||
809 | MPing *m1 = new MPing(); | |
810 | ASSERT_EQ(c2s->send_message(m1), 0); | |
811 | ||
812 | { | |
9f95a23c TL |
813 | std::unique_lock l{cli_dispatcher.lock}; |
814 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
815 | cli_dispatcher.got_new = false; |
816 | } | |
817 | ||
818 | ASSERT_TRUE(c2s->is_connected()); | |
819 | ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count()); | |
820 | ASSERT_TRUE(c2s->peer_is_osd()); | |
821 | ||
f67539c2 | 822 | cli_interceptor->breakpoint(Interceptor::STEP::HANDLE_MESSAGE); |
11fdf7f2 TL |
823 | |
824 | MPing *m2 = new MPing(); | |
825 | ASSERT_EQ(c2s->send_message(m2), 0); | |
826 | ||
f67539c2 TL |
827 | cli_interceptor->wait(Interceptor::STEP::HANDLE_MESSAGE, c2s.get()); |
828 | cli_interceptor->remove_bp(Interceptor::STEP::HANDLE_MESSAGE); | |
11fdf7f2 TL |
829 | |
830 | // at this point client and server are connected together | |
831 | ||
832 | // force both client and server to race on reconnect | |
f67539c2 TL |
833 | cli_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); |
834 | srv_interceptor->breakpoint(Interceptor::STEP::SEND_RECONNECT); | |
11fdf7f2 TL |
835 | |
836 | // failing client | |
837 | // this will cause both client and server to reconnect at the same time | |
f67539c2 | 838 | cli_interceptor->proceed(Interceptor::STEP::HANDLE_MESSAGE, Interceptor::ACTION::FAIL); |
11fdf7f2 TL |
839 | |
840 | MPing *m3 = new MPing(); | |
841 | ASSERT_EQ(c2s->send_message(m3), 0); | |
842 | ||
f67539c2 TL |
843 | cli_interceptor->wait(Interceptor::STEP::SEND_RECONNECT, c2s.get()); |
844 | srv_interceptor->wait(Interceptor::STEP::SEND_RECONNECT); | |
11fdf7f2 | 845 | |
f67539c2 TL |
846 | cli_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); |
847 | srv_interceptor->remove_bp(Interceptor::STEP::SEND_RECONNECT); | |
11fdf7f2 TL |
848 | |
849 | // pause on "ready" | |
f67539c2 | 850 | srv_interceptor->breakpoint(Interceptor::STEP::READY); |
11fdf7f2 | 851 | |
f67539c2 TL |
852 | cli_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); |
853 | srv_interceptor->proceed(Interceptor::STEP::SEND_RECONNECT, Interceptor::ACTION::CONTINUE); | |
11fdf7f2 | 854 | |
f67539c2 | 855 | Connection *c2s_accepter = srv_interceptor->wait(Interceptor::STEP::READY); |
11fdf7f2 TL |
856 | |
857 | // the server has reconnected and is "ready" | |
f67539c2 | 858 | srv_interceptor->remove_bp(Interceptor::STEP::READY); |
11fdf7f2 TL |
859 | |
860 | ASSERT_TRUE(c2s_accepter->peer_is_client()); | |
861 | ASSERT_TRUE(c2s->peer_is_osd()); | |
862 | ||
863 | // the server should win the reconnect race | |
864 | ||
865 | // c2s_accepter sent 1 or 2 reconnect messages | |
f67539c2 TL |
866 | ASSERT_LT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 3u); |
867 | ASSERT_GT(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT), 0u); | |
11fdf7f2 | 868 | // c2s_accepter sent 0 reconnect_ok messages |
f67539c2 | 869 | ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT_OK), 0u); |
11fdf7f2 | 870 | // c2s sent 1 reconnect messages |
f67539c2 | 871 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT), 1u); |
11fdf7f2 | 872 | // c2s sent 1 reconnect_ok messages |
f67539c2 | 873 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SEND_RECONNECT_OK), 1u); |
11fdf7f2 | 874 | |
f67539c2 | 875 | if (srv_interceptor->count_step(c2s_accepter, Interceptor::STEP::SEND_RECONNECT) == 2) { |
11fdf7f2 TL |
876 | // if the server send the reconnect message two times then |
877 | // the client must have sent a session retry message to the server | |
f67539c2 | 878 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 1u); |
11fdf7f2 | 879 | } else { |
f67539c2 | 880 | ASSERT_EQ(cli_interceptor->count_step(c2s.get(), Interceptor::STEP::SESSION_RETRY), 0u); |
11fdf7f2 TL |
881 | } |
882 | ||
f67539c2 | 883 | srv_interceptor->proceed(Interceptor::STEP::READY, Interceptor::ACTION::CONTINUE); |
11fdf7f2 TL |
884 | |
885 | { | |
9f95a23c TL |
886 | std::unique_lock l{cli_dispatcher.lock}; |
887 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
888 | cli_dispatcher.got_new = false; |
889 | } | |
890 | ||
891 | client_msgr->shutdown(); | |
892 | client_msgr->wait(); | |
893 | server_msgr->shutdown(); | |
894 | server_msgr->wait(); | |
895 | ||
896 | delete cli_interceptor; | |
897 | delete srv_interceptor; | |
898 | } | |
899 | ||
7c673cae FG |
900 | TEST_P(MessengerTest, SimpleTest) { |
901 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
902 | entity_addr_t bind_addr; | |
9f95a23c | 903 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
904 | server_msgr->bind(bind_addr); |
905 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
906 | server_msgr->start(); | |
907 | ||
908 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
909 | client_msgr->start(); | |
910 | ||
911 | // 1. simple round trip | |
912 | MPing *m = new MPing(); | |
11fdf7f2 TL |
913 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
914 | server_msgr->get_myaddrs()); | |
7c673cae FG |
915 | { |
916 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
917 | std::unique_lock l{cli_dispatcher.lock}; |
918 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
919 | cli_dispatcher.got_new = false; |
920 | } | |
921 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 922 | ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
923 | ASSERT_TRUE(conn->peer_is_osd()); |
924 | ||
925 | // 2. test rebind port | |
926 | set<int> avoid_ports; | |
11fdf7f2 TL |
927 | for (int i = 0; i < 10 ; i++) { |
928 | for (auto a : server_msgr->get_myaddrs().v) { | |
929 | avoid_ports.insert(a.get_port() + i); | |
930 | } | |
931 | } | |
7c673cae | 932 | server_msgr->rebind(avoid_ports); |
11fdf7f2 TL |
933 | for (auto a : server_msgr->get_myaddrs().v) { |
934 | ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); | |
935 | } | |
7c673cae | 936 | |
11fdf7f2 TL |
937 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
938 | server_msgr->get_myaddrs()); | |
7c673cae FG |
939 | { |
940 | m = new MPing(); | |
941 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
942 | std::unique_lock l{cli_dispatcher.lock}; |
943 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
944 | cli_dispatcher.got_new = false; |
945 | } | |
11fdf7f2 | 946 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
947 | |
948 | // 3. test markdown connection | |
949 | conn->mark_down(); | |
950 | ASSERT_FALSE(conn->is_connected()); | |
951 | ||
952 | // 4. test failed connection | |
953 | server_msgr->shutdown(); | |
954 | server_msgr->wait(); | |
955 | ||
956 | m = new MPing(); | |
957 | conn->send_message(m); | |
958 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
959 | ASSERT_FALSE(conn->is_connected()); | |
960 | ||
961 | // 5. loopback connection | |
962 | srv_dispatcher.loopback = true; | |
963 | conn = client_msgr->get_loopback_connection(); | |
964 | { | |
965 | m = new MPing(); | |
966 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
967 | std::unique_lock l{cli_dispatcher.lock}; |
968 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
969 | cli_dispatcher.got_new = false; |
970 | } | |
971 | srv_dispatcher.loopback = false; | |
11fdf7f2 TL |
972 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
973 | client_msgr->shutdown(); | |
974 | client_msgr->wait(); | |
975 | server_msgr->shutdown(); | |
976 | server_msgr->wait(); | |
977 | } | |
978 | ||
979 | TEST_P(MessengerTest, SimpleMsgr2Test) { | |
980 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
981 | entity_addr_t legacy_addr; | |
982 | legacy_addr.parse("v1:127.0.0.1"); | |
983 | entity_addr_t msgr2_addr; | |
984 | msgr2_addr.parse("v2:127.0.0.1"); | |
985 | entity_addrvec_t bind_addrs; | |
986 | bind_addrs.v.push_back(legacy_addr); | |
987 | bind_addrs.v.push_back(msgr2_addr); | |
988 | server_msgr->bindv(bind_addrs); | |
989 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
990 | server_msgr->start(); | |
991 | ||
992 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
993 | client_msgr->start(); | |
994 | ||
995 | // 1. simple round trip | |
996 | MPing *m = new MPing(); | |
997 | ConnectionRef conn = client_msgr->connect_to( | |
998 | server_msgr->get_mytype(), | |
999 | server_msgr->get_myaddrs()); | |
1000 | { | |
1001 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1002 | std::unique_lock l{cli_dispatcher.lock}; |
1003 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
1004 | cli_dispatcher.got_new = false; |
1005 | } | |
1006 | ASSERT_TRUE(conn->is_connected()); | |
1007 | ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count()); | |
1008 | ASSERT_TRUE(conn->peer_is_osd()); | |
1009 | ||
1010 | // 2. test rebind port | |
1011 | set<int> avoid_ports; | |
1012 | for (int i = 0; i < 10 ; i++) { | |
1013 | for (auto a : server_msgr->get_myaddrs().v) { | |
1014 | avoid_ports.insert(a.get_port() + i); | |
1015 | } | |
1016 | } | |
1017 | server_msgr->rebind(avoid_ports); | |
1018 | for (auto a : server_msgr->get_myaddrs().v) { | |
1019 | ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); | |
1020 | } | |
1021 | ||
1022 | conn = client_msgr->connect_to( | |
1023 | server_msgr->get_mytype(), | |
1024 | server_msgr->get_myaddrs()); | |
1025 | { | |
1026 | m = new MPing(); | |
1027 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1028 | std::unique_lock l{cli_dispatcher.lock}; |
1029 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
1030 | cli_dispatcher.got_new = false; |
1031 | } | |
1032 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); | |
1033 | ||
1034 | // 3. test markdown connection | |
1035 | conn->mark_down(); | |
1036 | ASSERT_FALSE(conn->is_connected()); | |
1037 | ||
1038 | // 4. test failed connection | |
1039 | server_msgr->shutdown(); | |
1040 | server_msgr->wait(); | |
1041 | ||
1042 | m = new MPing(); | |
1043 | conn->send_message(m); | |
1044 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
1045 | ASSERT_FALSE(conn->is_connected()); | |
1046 | ||
1047 | // 5. loopback connection | |
1048 | srv_dispatcher.loopback = true; | |
1049 | conn = client_msgr->get_loopback_connection(); | |
1050 | { | |
1051 | m = new MPing(); | |
1052 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1053 | std::unique_lock l{cli_dispatcher.lock}; |
1054 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
11fdf7f2 TL |
1055 | cli_dispatcher.got_new = false; |
1056 | } | |
1057 | srv_dispatcher.loopback = false; | |
1058 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); | |
7c673cae FG |
1059 | client_msgr->shutdown(); |
1060 | client_msgr->wait(); | |
1061 | server_msgr->shutdown(); | |
1062 | server_msgr->wait(); | |
1063 | } | |
1064 | ||
7c673cae FG |
1065 | TEST_P(MessengerTest, FeatureTest) { |
1066 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1067 | entity_addr_t bind_addr; | |
9f95a23c | 1068 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1069 | uint64_t all_feature_supported, feature_required, feature_supported = 0; |
1070 | for (int i = 0; i < 10; i++) | |
1071 | feature_supported |= 1ULL << i; | |
11fdf7f2 TL |
1072 | feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2; |
1073 | feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS; | |
7c673cae FG |
1074 | feature_required = feature_supported | 1ULL << 13; |
1075 | all_feature_supported = feature_required | 1ULL << 14; | |
1076 | ||
1077 | Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT); | |
1078 | p.features_required = feature_required; | |
1079 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1080 | server_msgr->bind(bind_addr); | |
1081 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1082 | server_msgr->start(); | |
1083 | ||
1084 | // 1. Suppose if only support less than required | |
1085 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
1086 | p.features_supported = feature_supported; | |
1087 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1088 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1089 | client_msgr->start(); | |
1090 | ||
1091 | MPing *m = new MPing(); | |
11fdf7f2 TL |
1092 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1093 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1094 | conn->send_message(m); |
1095 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
1096 | // should failed build a connection | |
1097 | ASSERT_FALSE(conn->is_connected()); | |
1098 | ||
1099 | client_msgr->shutdown(); | |
1100 | client_msgr->wait(); | |
1101 | ||
1102 | // 2. supported met required | |
1103 | p = client_msgr->get_policy(entity_name_t::TYPE_OSD); | |
1104 | p.features_supported = all_feature_supported; | |
1105 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1106 | client_msgr->start(); | |
1107 | ||
11fdf7f2 TL |
1108 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1109 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1110 | { |
1111 | m = new MPing(); | |
1112 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1113 | std::unique_lock l{cli_dispatcher.lock}; |
1114 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1115 | cli_dispatcher.got_new = false; |
1116 | } | |
11fdf7f2 | 1117 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1118 | |
1119 | server_msgr->shutdown(); | |
1120 | client_msgr->shutdown(); | |
1121 | server_msgr->wait(); | |
1122 | client_msgr->wait(); | |
1123 | } | |
1124 | ||
1125 | TEST_P(MessengerTest, TimeoutTest) { | |
81eedcae | 1126 | g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "1"); |
7c673cae FG |
1127 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
1128 | entity_addr_t bind_addr; | |
9f95a23c | 1129 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1130 | server_msgr->bind(bind_addr); |
1131 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1132 | server_msgr->start(); | |
1133 | ||
1134 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1135 | client_msgr->start(); | |
1136 | ||
1137 | // 1. build the connection | |
1138 | MPing *m = new MPing(); | |
11fdf7f2 TL |
1139 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1140 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1141 | { |
1142 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1143 | std::unique_lock l{cli_dispatcher.lock}; |
1144 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1145 | cli_dispatcher.got_new = false; |
1146 | } | |
1147 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 1148 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1149 | ASSERT_TRUE(conn->peer_is_osd()); |
1150 | ||
1151 | // 2. wait for idle | |
1152 | usleep(2500*1000); | |
1153 | ASSERT_FALSE(conn->is_connected()); | |
1154 | ||
1155 | server_msgr->shutdown(); | |
1156 | server_msgr->wait(); | |
1157 | ||
1158 | client_msgr->shutdown(); | |
1159 | client_msgr->wait(); | |
81eedcae | 1160 | g_ceph_context->_conf.set_val("ms_connection_idle_timeout", "900"); |
7c673cae FG |
1161 | } |
1162 | ||
1163 | TEST_P(MessengerTest, StatefulTest) { | |
1164 | Message *m; | |
1165 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1166 | entity_addr_t bind_addr; | |
9f95a23c | 1167 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1168 | Messenger::Policy p = Messenger::Policy::stateful_server(0); |
1169 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1170 | p = Messenger::Policy::lossless_client(0); | |
1171 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1172 | ||
1173 | server_msgr->bind(bind_addr); | |
1174 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1175 | server_msgr->start(); | |
1176 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1177 | client_msgr->start(); | |
1178 | ||
1179 | // 1. test for server standby | |
11fdf7f2 TL |
1180 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1181 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1182 | { |
1183 | m = new MPing(); | |
1184 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1185 | std::unique_lock l{cli_dispatcher.lock}; |
1186 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1187 | cli_dispatcher.got_new = false; |
1188 | } | |
11fdf7f2 | 1189 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1190 | conn->mark_down(); |
1191 | ASSERT_FALSE(conn->is_connected()); | |
11fdf7f2 TL |
1192 | ConnectionRef server_conn = server_msgr->connect_to( |
1193 | client_msgr->get_mytype(), srv_dispatcher.last_accept); | |
7c673cae | 1194 | // don't lose state |
11fdf7f2 | 1195 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); |
7c673cae FG |
1196 | |
1197 | srv_dispatcher.got_new = false; | |
11fdf7f2 TL |
1198 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1199 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1200 | { |
1201 | m = new MPing(); | |
1202 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1203 | std::unique_lock l{cli_dispatcher.lock}; |
1204 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1205 | cli_dispatcher.got_new = false; |
1206 | } | |
11fdf7f2 TL |
1207 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1208 | server_conn = server_msgr->connect_to(client_msgr->get_mytype(), | |
1209 | srv_dispatcher.last_accept); | |
7c673cae | 1210 | { |
9f95a23c TL |
1211 | std::unique_lock l{srv_dispatcher.lock}; |
1212 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_remote_reset; }); | |
7c673cae FG |
1213 | } |
1214 | ||
1215 | // 2. test for client reconnect | |
1216 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); | |
1217 | cli_dispatcher.got_connect = false; | |
1218 | cli_dispatcher.got_new = false; | |
1219 | cli_dispatcher.got_remote_reset = false; | |
1220 | server_conn->mark_down(); | |
1221 | ASSERT_FALSE(server_conn->is_connected()); | |
1222 | // ensure client detect server socket closed | |
1223 | { | |
9f95a23c TL |
1224 | std::unique_lock l{cli_dispatcher.lock}; |
1225 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); | |
7c673cae FG |
1226 | cli_dispatcher.got_remote_reset = false; |
1227 | } | |
1228 | { | |
9f95a23c TL |
1229 | std::unique_lock l{cli_dispatcher.lock}; |
1230 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); | |
7c673cae FG |
1231 | cli_dispatcher.got_connect = false; |
1232 | } | |
1233 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
1234 | ASSERT_TRUE(conn->is_connected()); | |
1235 | ||
1236 | { | |
1237 | m = new MPing(); | |
1238 | ASSERT_EQ(conn->send_message(m), 0); | |
1239 | ASSERT_TRUE(conn->is_connected()); | |
9f95a23c TL |
1240 | std::unique_lock l{cli_dispatcher.lock}; |
1241 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1242 | cli_dispatcher.got_new = false; |
1243 | } | |
1244 | // resetcheck happen | |
11fdf7f2 TL |
1245 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1246 | server_conn = server_msgr->connect_to(client_msgr->get_mytype(), | |
1247 | srv_dispatcher.last_accept); | |
1248 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); | |
7c673cae FG |
1249 | cli_dispatcher.got_remote_reset = false; |
1250 | ||
1251 | server_msgr->shutdown(); | |
1252 | client_msgr->shutdown(); | |
1253 | server_msgr->wait(); | |
1254 | client_msgr->wait(); | |
1255 | } | |
1256 | ||
1257 | TEST_P(MessengerTest, StatelessTest) { | |
1258 | Message *m; | |
1259 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1260 | entity_addr_t bind_addr; | |
9f95a23c | 1261 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1262 | Messenger::Policy p = Messenger::Policy::stateless_server(0); |
1263 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1264 | p = Messenger::Policy::lossy_client(0); | |
1265 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1266 | ||
1267 | server_msgr->bind(bind_addr); | |
1268 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1269 | server_msgr->start(); | |
1270 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1271 | client_msgr->start(); | |
1272 | ||
1273 | // 1. test for server lose state | |
11fdf7f2 TL |
1274 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1275 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1276 | { |
1277 | m = new MPing(); | |
1278 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1279 | std::unique_lock l{cli_dispatcher.lock}; |
1280 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1281 | cli_dispatcher.got_new = false; |
1282 | } | |
11fdf7f2 | 1283 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1284 | conn->mark_down(); |
1285 | ASSERT_FALSE(conn->is_connected()); | |
1286 | ||
1287 | srv_dispatcher.got_new = false; | |
9f95a23c TL |
1288 | ConnectionRef server_conn; |
1289 | srv_dispatcher.last_accept_con_ptr = &server_conn; | |
11fdf7f2 TL |
1290 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1291 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1292 | { |
1293 | m = new MPing(); | |
1294 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1295 | std::unique_lock l{cli_dispatcher.lock}; |
1296 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1297 | cli_dispatcher.got_new = false; |
1298 | } | |
11fdf7f2 | 1299 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
9f95a23c TL |
1300 | ASSERT_TRUE(server_conn); |
1301 | ||
7c673cae FG |
1302 | // server lose state |
1303 | { | |
9f95a23c TL |
1304 | std::unique_lock l{srv_dispatcher.lock}; |
1305 | srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); | |
7c673cae | 1306 | } |
11fdf7f2 | 1307 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); |
7c673cae FG |
1308 | |
1309 | // 2. test for client lossy | |
1310 | server_conn->mark_down(); | |
1311 | ASSERT_FALSE(server_conn->is_connected()); | |
1312 | conn->send_keepalive(); | |
1313 | CHECK_AND_WAIT_TRUE(!conn->is_connected()); | |
1314 | ASSERT_FALSE(conn->is_connected()); | |
11fdf7f2 TL |
1315 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1316 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1317 | { |
1318 | m = new MPing(); | |
1319 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1320 | std::unique_lock l{cli_dispatcher.lock}; |
1321 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1322 | cli_dispatcher.got_new = false; |
1323 | } | |
11fdf7f2 | 1324 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1325 | |
1326 | server_msgr->shutdown(); | |
1327 | client_msgr->shutdown(); | |
1328 | server_msgr->wait(); | |
1329 | client_msgr->wait(); | |
1330 | } | |
1331 | ||
9f95a23c TL |
1332 | TEST_P(MessengerTest, AnonTest) { |
1333 | Message *m; | |
1334 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1335 | entity_addr_t bind_addr; | |
1336 | bind_addr.parse("v2:127.0.0.1"); | |
1337 | Messenger::Policy p = Messenger::Policy::stateless_server(0); | |
1338 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1339 | p = Messenger::Policy::lossy_client(0); | |
1340 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1341 | ||
1342 | server_msgr->bind(bind_addr); | |
1343 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1344 | server_msgr->start(); | |
1345 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1346 | client_msgr->start(); | |
1347 | ||
1348 | ConnectionRef server_con_a, server_con_b; | |
1349 | ||
1350 | // a | |
1351 | srv_dispatcher.last_accept_con_ptr = &server_con_a; | |
1352 | ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(), | |
1353 | server_msgr->get_myaddrs(), | |
1354 | true); | |
1355 | { | |
1356 | m = new MPing(); | |
1357 | ASSERT_EQ(con_a->send_message(m), 0); | |
1358 | std::unique_lock l{cli_dispatcher.lock}; | |
1359 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1360 | cli_dispatcher.got_new = false; | |
1361 | } | |
1362 | ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count()); | |
1363 | ||
1364 | // b | |
1365 | srv_dispatcher.last_accept_con_ptr = &server_con_b; | |
1366 | ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(), | |
1367 | server_msgr->get_myaddrs(), | |
1368 | true); | |
1369 | { | |
1370 | m = new MPing(); | |
1371 | ASSERT_EQ(con_b->send_message(m), 0); | |
1372 | std::unique_lock l{cli_dispatcher.lock}; | |
1373 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1374 | cli_dispatcher.got_new = false; | |
1375 | } | |
1376 | ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count()); | |
1377 | ||
1378 | // these should be distinct | |
1379 | ASSERT_NE(con_a, con_b); | |
1380 | ASSERT_NE(server_con_a, server_con_b); | |
1381 | ||
1382 | // and both connected | |
1383 | { | |
1384 | m = new MPing(); | |
1385 | ASSERT_EQ(con_a->send_message(m), 0); | |
1386 | std::unique_lock l{cli_dispatcher.lock}; | |
1387 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1388 | cli_dispatcher.got_new = false; | |
1389 | } | |
1390 | { | |
1391 | m = new MPing(); | |
1392 | ASSERT_EQ(con_b->send_message(m), 0); | |
1393 | std::unique_lock l{cli_dispatcher.lock}; | |
1394 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
1395 | cli_dispatcher.got_new = false; | |
1396 | } | |
1397 | ||
1398 | // clean up | |
1399 | con_a->mark_down(); | |
1400 | ASSERT_FALSE(con_a->is_connected()); | |
1401 | con_b->mark_down(); | |
1402 | ASSERT_FALSE(con_b->is_connected()); | |
1403 | ||
1404 | server_msgr->shutdown(); | |
1405 | client_msgr->shutdown(); | |
1406 | server_msgr->wait(); | |
1407 | client_msgr->wait(); | |
1408 | } | |
1409 | ||
7c673cae FG |
1410 | TEST_P(MessengerTest, ClientStandbyTest) { |
1411 | Message *m; | |
1412 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1413 | entity_addr_t bind_addr; | |
9f95a23c | 1414 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1415 | Messenger::Policy p = Messenger::Policy::stateful_server(0); |
1416 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1417 | p = Messenger::Policy::lossless_peer(0); | |
1418 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1419 | ||
1420 | server_msgr->bind(bind_addr); | |
1421 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1422 | server_msgr->start(); | |
1423 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1424 | client_msgr->start(); | |
1425 | ||
1426 | // 1. test for client standby, resetcheck | |
11fdf7f2 TL |
1427 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1428 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1429 | { |
1430 | m = new MPing(); | |
1431 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1432 | std::unique_lock l{cli_dispatcher.lock}; |
1433 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1434 | cli_dispatcher.got_new = false; |
1435 | } | |
11fdf7f2 TL |
1436 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1437 | ConnectionRef server_conn = server_msgr->connect_to( | |
1438 | client_msgr->get_mytype(), | |
1439 | srv_dispatcher.last_accept); | |
7c673cae FG |
1440 | ASSERT_FALSE(cli_dispatcher.got_remote_reset); |
1441 | cli_dispatcher.got_connect = false; | |
1442 | server_conn->mark_down(); | |
1443 | ASSERT_FALSE(server_conn->is_connected()); | |
1444 | // client should be standby | |
1445 | usleep(300*1000); | |
1446 | // client should be standby, so we use original connection | |
1447 | { | |
1448 | // Try send message to verify got remote reset callback | |
1449 | m = new MPing(); | |
1450 | ASSERT_EQ(conn->send_message(m), 0); | |
1451 | { | |
9f95a23c TL |
1452 | std::unique_lock l{cli_dispatcher.lock}; |
1453 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_remote_reset; }); | |
7c673cae | 1454 | cli_dispatcher.got_remote_reset = false; |
9f95a23c | 1455 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_connect; }); |
7c673cae FG |
1456 | cli_dispatcher.got_connect = false; |
1457 | } | |
1458 | CHECK_AND_WAIT_TRUE(conn->is_connected()); | |
1459 | ASSERT_TRUE(conn->is_connected()); | |
1460 | m = new MPing(); | |
1461 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1462 | std::unique_lock l{cli_dispatcher.lock}; |
1463 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1464 | cli_dispatcher.got_new = false; |
1465 | } | |
11fdf7f2 TL |
1466 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
1467 | server_conn = server_msgr->connect_to(client_msgr->get_mytype(), | |
1468 | srv_dispatcher.last_accept); | |
1469 | ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count()); | |
7c673cae FG |
1470 | |
1471 | server_msgr->shutdown(); | |
1472 | client_msgr->shutdown(); | |
1473 | server_msgr->wait(); | |
1474 | client_msgr->wait(); | |
1475 | } | |
1476 | ||
1477 | TEST_P(MessengerTest, AuthTest) { | |
11fdf7f2 TL |
1478 | g_ceph_context->_conf.set_val("auth_cluster_required", "cephx"); |
1479 | g_ceph_context->_conf.set_val("auth_service_required", "cephx"); | |
1480 | g_ceph_context->_conf.set_val("auth_client_required", "cephx"); | |
7c673cae FG |
1481 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); |
1482 | entity_addr_t bind_addr; | |
9f95a23c | 1483 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1484 | server_msgr->bind(bind_addr); |
1485 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1486 | server_msgr->start(); | |
1487 | ||
1488 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1489 | client_msgr->start(); | |
1490 | ||
1491 | // 1. simple auth round trip | |
1492 | MPing *m = new MPing(); | |
11fdf7f2 TL |
1493 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1494 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1495 | { |
1496 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1497 | std::unique_lock l{cli_dispatcher.lock}; |
1498 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1499 | cli_dispatcher.got_new = false; |
1500 | } | |
1501 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 1502 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1503 | |
1504 | // 2. mix auth | |
11fdf7f2 TL |
1505 | g_ceph_context->_conf.set_val("auth_cluster_required", "none"); |
1506 | g_ceph_context->_conf.set_val("auth_service_required", "none"); | |
1507 | g_ceph_context->_conf.set_val("auth_client_required", "none"); | |
7c673cae FG |
1508 | conn->mark_down(); |
1509 | ASSERT_FALSE(conn->is_connected()); | |
11fdf7f2 TL |
1510 | conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1511 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1512 | { |
1513 | MPing *m = new MPing(); | |
1514 | ASSERT_EQ(conn->send_message(m), 0); | |
9f95a23c TL |
1515 | std::unique_lock l{cli_dispatcher.lock}; |
1516 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1517 | cli_dispatcher.got_new = false; |
1518 | } | |
1519 | ASSERT_TRUE(conn->is_connected()); | |
11fdf7f2 | 1520 | ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count()); |
7c673cae FG |
1521 | server_msgr->shutdown(); |
1522 | client_msgr->shutdown(); | |
1523 | server_msgr->wait(); | |
1524 | client_msgr->wait(); | |
1525 | } | |
1526 | ||
1527 | TEST_P(MessengerTest, MessageTest) { | |
1528 | FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); | |
1529 | entity_addr_t bind_addr; | |
9f95a23c | 1530 | bind_addr.parse("v2:127.0.0.1"); |
7c673cae FG |
1531 | Messenger::Policy p = Messenger::Policy::stateful_server(0); |
1532 | server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); | |
1533 | p = Messenger::Policy::lossless_peer(0); | |
1534 | client_msgr->set_policy(entity_name_t::TYPE_OSD, p); | |
1535 | ||
1536 | server_msgr->bind(bind_addr); | |
1537 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
1538 | server_msgr->start(); | |
1539 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
1540 | client_msgr->start(); | |
1541 | ||
1542 | ||
1543 | // 1. A very large "front"(as well as "payload") | |
1544 | // Because a external message need to invade Messenger::decode_message, | |
1545 | // here we only use existing message class(MCommand) | |
11fdf7f2 TL |
1546 | ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), |
1547 | server_msgr->get_myaddrs()); | |
7c673cae FG |
1548 | { |
1549 | uuid_d uuid; | |
1550 | uuid.generate_random(); | |
1551 | vector<string> cmds; | |
1552 | string s("abcdefghijklmnopqrstuvwxyz"); | |
1553 | for (int i = 0; i < 1024*30; i++) | |
1554 | cmds.push_back(s); | |
1555 | MCommand *m = new MCommand(uuid); | |
1556 | m->cmd = cmds; | |
1557 | conn->send_message(m); | |
9f95a23c TL |
1558 | std::unique_lock l{cli_dispatcher.lock}; |
1559 | cli_dispatcher.cond.wait_for(l, 500s, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1560 | ASSERT_TRUE(cli_dispatcher.got_new); |
1561 | cli_dispatcher.got_new = false; | |
1562 | } | |
1563 | ||
1564 | // 2. A very large "data" | |
1565 | { | |
1566 | bufferlist bl; | |
1567 | string s("abcdefghijklmnopqrstuvwxyz"); | |
1568 | for (int i = 0; i < 1024*30; i++) | |
1569 | bl.append(s); | |
1570 | MPing *m = new MPing(); | |
1571 | m->set_data(bl); | |
1572 | conn->send_message(m); | |
1573 | utime_t t; | |
1574 | t += 1000*1000*500; | |
9f95a23c TL |
1575 | std::unique_lock l{cli_dispatcher.lock}; |
1576 | cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); | |
7c673cae FG |
1577 | ASSERT_TRUE(cli_dispatcher.got_new); |
1578 | cli_dispatcher.got_new = false; | |
1579 | } | |
1580 | server_msgr->shutdown(); | |
1581 | client_msgr->shutdown(); | |
1582 | server_msgr->wait(); | |
1583 | client_msgr->wait(); | |
1584 | } | |
1585 | ||
1586 | ||
1587 | class SyntheticWorkload; | |
1588 | ||
1589 | struct Payload { | |
1590 | enum Who : uint8_t { | |
1591 | PING = 0, | |
1592 | PONG = 1, | |
1593 | }; | |
11fdf7f2 TL |
1594 | uint8_t who = 0; |
1595 | uint64_t seq = 0; | |
7c673cae FG |
1596 | bufferlist data; |
1597 | ||
1598 | Payload(Who who, uint64_t seq, const bufferlist& data) | |
1599 | : who(who), seq(seq), data(data) | |
1600 | {} | |
1601 | Payload() = default; | |
1602 | DENC(Payload, v, p) { | |
1603 | DENC_START(1, 1, p); | |
1604 | denc(v.who, p); | |
1605 | denc(v.seq, p); | |
1606 | denc(v.data, p); | |
1607 | DENC_FINISH(p); | |
1608 | } | |
1609 | }; | |
1610 | WRITE_CLASS_DENC(Payload) | |
1611 | ||
1612 | ostream& operator<<(ostream& out, const Payload &pl) | |
1613 | { | |
1614 | return out << "reply=" << pl.who << " i = " << pl.seq; | |
1615 | } | |
1616 | ||
1617 | class SyntheticDispatcher : public Dispatcher { | |
1618 | public: | |
9f95a23c TL |
1619 | ceph::mutex lock = ceph::make_mutex("SyntheticDispatcher::lock"); |
1620 | ceph::condition_variable cond; | |
7c673cae FG |
1621 | bool is_server; |
1622 | bool got_new; | |
1623 | bool got_remote_reset; | |
1624 | bool got_connect; | |
1625 | map<ConnectionRef, list<uint64_t> > conn_sent; | |
1626 | map<uint64_t, bufferlist> sent; | |
1627 | atomic<uint64_t> index; | |
1628 | SyntheticWorkload *workload; | |
1629 | ||
1630 | SyntheticDispatcher(bool s, SyntheticWorkload *wl): | |
9f95a23c | 1631 | Dispatcher(g_ceph_context), is_server(s), got_new(false), |
11fdf7f2 | 1632 | got_remote_reset(false), got_connect(false), index(0), workload(wl) { |
11fdf7f2 | 1633 | } |
7c673cae FG |
1634 | bool ms_can_fast_dispatch_any() const override { return true; } |
1635 | bool ms_can_fast_dispatch(const Message *m) const override { | |
1636 | switch (m->get_type()) { | |
1637 | case CEPH_MSG_PING: | |
1638 | case MSG_COMMAND: | |
1639 | return true; | |
1640 | default: | |
1641 | return false; | |
1642 | } | |
1643 | } | |
1644 | ||
1645 | void ms_handle_fast_connect(Connection *con) override { | |
9f95a23c | 1646 | std::lock_guard l{lock}; |
7c673cae FG |
1647 | list<uint64_t> c = conn_sent[con]; |
1648 | for (list<uint64_t>::iterator it = c.begin(); | |
1649 | it != c.end(); ++it) | |
1650 | sent.erase(*it); | |
1651 | conn_sent.erase(con); | |
1652 | got_connect = true; | |
9f95a23c | 1653 | cond.notify_all(); |
7c673cae FG |
1654 | } |
1655 | void ms_handle_fast_accept(Connection *con) override { | |
9f95a23c | 1656 | std::lock_guard l{lock}; |
7c673cae FG |
1657 | list<uint64_t> c = conn_sent[con]; |
1658 | for (list<uint64_t>::iterator it = c.begin(); | |
1659 | it != c.end(); ++it) | |
1660 | sent.erase(*it); | |
1661 | conn_sent.erase(con); | |
9f95a23c | 1662 | cond.notify_all(); |
7c673cae FG |
1663 | } |
1664 | bool ms_dispatch(Message *m) override { | |
1665 | ceph_abort(); | |
1666 | } | |
1667 | bool ms_handle_reset(Connection *con) override; | |
1668 | void ms_handle_remote_reset(Connection *con) override { | |
9f95a23c | 1669 | std::lock_guard l{lock}; |
7c673cae FG |
1670 | list<uint64_t> c = conn_sent[con]; |
1671 | for (list<uint64_t>::iterator it = c.begin(); | |
1672 | it != c.end(); ++it) | |
1673 | sent.erase(*it); | |
1674 | conn_sent.erase(con); | |
1675 | got_remote_reset = true; | |
1676 | } | |
1677 | bool ms_handle_refused(Connection *con) override { | |
1678 | return false; | |
1679 | } | |
1680 | void ms_fast_dispatch(Message *m) override { | |
1681 | // MSG_COMMAND is used to disorganize regular message flow | |
1682 | if (m->get_type() == MSG_COMMAND) { | |
1683 | m->put(); | |
1684 | return ; | |
1685 | } | |
1686 | ||
1687 | Payload pl; | |
11fdf7f2 TL |
1688 | auto p = m->get_data().cbegin(); |
1689 | decode(pl, p); | |
7c673cae FG |
1690 | if (pl.who == Payload::PING) { |
1691 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
1692 | reply_message(m, pl); | |
1693 | m->put(); | |
9f95a23c | 1694 | std::lock_guard l{lock}; |
7c673cae | 1695 | got_new = true; |
9f95a23c | 1696 | cond.notify_all(); |
7c673cae | 1697 | } else { |
9f95a23c | 1698 | std::lock_guard l{lock}; |
7c673cae FG |
1699 | if (sent.count(pl.seq)) { |
1700 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl; | |
1701 | ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq); | |
1702 | ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq])); | |
1703 | conn_sent[m->get_connection()].pop_front(); | |
1704 | sent.erase(pl.seq); | |
1705 | } | |
1706 | m->put(); | |
1707 | got_new = true; | |
9f95a23c | 1708 | cond.notify_all(); |
7c673cae FG |
1709 | } |
1710 | } | |
1711 | ||
11fdf7f2 TL |
1712 | int ms_handle_authentication(Connection *con) override { |
1713 | return 1; | |
7c673cae FG |
1714 | } |
1715 | ||
1716 | void reply_message(const Message *m, Payload& pl) { | |
1717 | pl.who = Payload::PONG; | |
1718 | bufferlist bl; | |
11fdf7f2 | 1719 | encode(pl, bl); |
7c673cae FG |
1720 | MPing *rm = new MPing(); |
1721 | rm->set_data(bl); | |
1722 | m->get_connection()->send_message(rm); | |
1723 | lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl; | |
1724 | } | |
1725 | ||
1726 | void send_message_wrap(ConnectionRef con, const bufferlist& data) { | |
1727 | Message *m = new MPing(); | |
1728 | Payload pl{Payload::PING, index++, data}; | |
1729 | bufferlist bl; | |
11fdf7f2 | 1730 | encode(pl, bl); |
7c673cae FG |
1731 | m->set_data(bl); |
1732 | if (!con->get_messenger()->get_default_policy().lossy) { | |
9f95a23c | 1733 | std::lock_guard l{lock}; |
7c673cae FG |
1734 | sent[pl.seq] = pl.data; |
1735 | conn_sent[con].push_back(pl.seq); | |
1736 | } | |
1737 | lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl; | |
1738 | ASSERT_EQ(0, con->send_message(m)); | |
1739 | } | |
1740 | ||
1741 | uint64_t get_pending() { | |
9f95a23c | 1742 | std::lock_guard l{lock}; |
7c673cae FG |
1743 | return sent.size(); |
1744 | } | |
1745 | ||
1746 | void clear_pending(ConnectionRef con) { | |
9f95a23c | 1747 | std::lock_guard l{lock}; |
7c673cae FG |
1748 | |
1749 | for (list<uint64_t>::iterator it = conn_sent[con].begin(); | |
1750 | it != conn_sent[con].end(); ++it) | |
1751 | sent.erase(*it); | |
1752 | conn_sent.erase(con); | |
1753 | } | |
1754 | ||
1755 | void print() { | |
1756 | for (auto && p : conn_sent) { | |
1757 | if (!p.second.empty()) { | |
1758 | lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl; | |
1759 | } | |
1760 | } | |
1761 | } | |
1762 | }; | |
1763 | ||
1764 | ||
1765 | class SyntheticWorkload { | |
9f95a23c TL |
1766 | ceph::mutex lock = ceph::make_mutex("SyntheticWorkload::lock"); |
1767 | ceph::condition_variable cond; | |
7c673cae FG |
1768 | set<Messenger*> available_servers; |
1769 | set<Messenger*> available_clients; | |
11fdf7f2 | 1770 | Messenger::Policy client_policy; |
7c673cae FG |
1771 | map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections; |
1772 | SyntheticDispatcher dispatcher; | |
1773 | gen_type rng; | |
1774 | vector<bufferlist> rand_data; | |
11fdf7f2 | 1775 | DummyAuthClientServer dummy_auth; |
7c673cae FG |
1776 | |
1777 | public: | |
1778 | static const unsigned max_in_flight = 64; | |
1779 | static const unsigned max_connections = 128; | |
1780 | static const unsigned max_message_len = 1024 * 1024 * 4; | |
1781 | ||
1782 | SyntheticWorkload(int servers, int clients, string type, int random_num, | |
11fdf7f2 | 1783 | Messenger::Policy srv_policy, Messenger::Policy cli_policy) |
9f95a23c | 1784 | : client_policy(cli_policy), |
11fdf7f2 TL |
1785 | dispatcher(false, this), |
1786 | rng(time(NULL)), | |
1787 | dummy_auth(g_ceph_context) { | |
1788 | dummy_auth.auth_registry.refresh_config(); | |
7c673cae FG |
1789 | Messenger *msgr; |
1790 | int base_port = 16800; | |
1791 | entity_addr_t bind_addr; | |
1792 | char addr[64]; | |
1793 | for (int i = 0; i < servers; ++i) { | |
1794 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), | |
f67539c2 | 1795 | "server", getpid()+i); |
9f95a23c | 1796 | snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", |
11fdf7f2 | 1797 | base_port+i); |
7c673cae FG |
1798 | bind_addr.parse(addr); |
1799 | msgr->bind(bind_addr); | |
1800 | msgr->add_dispatcher_head(&dispatcher); | |
11fdf7f2 TL |
1801 | msgr->set_auth_client(&dummy_auth); |
1802 | msgr->set_auth_server(&dummy_auth); | |
7c673cae | 1803 | |
11fdf7f2 | 1804 | ceph_assert(msgr); |
7c673cae FG |
1805 | msgr->set_default_policy(srv_policy); |
1806 | available_servers.insert(msgr); | |
1807 | msgr->start(); | |
1808 | } | |
1809 | ||
1810 | for (int i = 0; i < clients; ++i) { | |
1811 | msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1), | |
f67539c2 | 1812 | "client", getpid()+i+servers); |
7c673cae | 1813 | if (cli_policy.standby) { |
9f95a23c | 1814 | snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d", |
11fdf7f2 | 1815 | base_port+i+servers); |
7c673cae FG |
1816 | bind_addr.parse(addr); |
1817 | msgr->bind(bind_addr); | |
1818 | } | |
1819 | msgr->add_dispatcher_head(&dispatcher); | |
11fdf7f2 TL |
1820 | msgr->set_auth_client(&dummy_auth); |
1821 | msgr->set_auth_server(&dummy_auth); | |
7c673cae | 1822 | |
11fdf7f2 | 1823 | ceph_assert(msgr); |
7c673cae FG |
1824 | msgr->set_default_policy(cli_policy); |
1825 | available_clients.insert(msgr); | |
1826 | msgr->start(); | |
1827 | } | |
1828 | ||
1829 | for (int i = 0; i < random_num; i++) { | |
1830 | bufferlist bl; | |
1831 | boost::uniform_int<> u(32, max_message_len); | |
1832 | uint64_t value_len = u(rng); | |
1833 | bufferptr bp(value_len); | |
1834 | bp.zero(); | |
1835 | for (uint64_t j = 0; j < value_len-sizeof(i); ) { | |
1836 | memcpy(bp.c_str()+j, &i, sizeof(i)); | |
1837 | j += 4096; | |
1838 | } | |
1839 | ||
1840 | bl.append(bp); | |
1841 | rand_data.push_back(bl); | |
1842 | } | |
1843 | } | |
1844 | ||
1845 | ConnectionRef _get_random_connection() { | |
1846 | while (dispatcher.get_pending() > max_in_flight) { | |
9f95a23c | 1847 | lock.unlock(); |
7c673cae | 1848 | usleep(500); |
9f95a23c | 1849 | lock.lock(); |
7c673cae | 1850 | } |
9f95a23c | 1851 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae FG |
1852 | boost::uniform_int<> choose(0, available_connections.size() - 1); |
1853 | int index = choose(rng); | |
1854 | map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin(); | |
1855 | for (; index > 0; --index, ++i) ; | |
1856 | return i->first; | |
1857 | } | |
1858 | ||
1859 | bool can_create_connection() { | |
1860 | return available_connections.size() < max_connections; | |
1861 | } | |
1862 | ||
1863 | void generate_connection() { | |
9f95a23c | 1864 | std::lock_guard l{lock}; |
7c673cae FG |
1865 | if (!can_create_connection()) |
1866 | return ; | |
1867 | ||
1868 | Messenger *server, *client; | |
1869 | { | |
1870 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1871 | int index = choose(rng); | |
1872 | set<Messenger*>::iterator i = available_servers.begin(); | |
1873 | for (; index > 0; --index, ++i) ; | |
1874 | server = *i; | |
1875 | } | |
1876 | { | |
1877 | boost::uniform_int<> choose(0, available_clients.size() - 1); | |
1878 | int index = choose(rng); | |
1879 | set<Messenger*>::iterator i = available_clients.begin(); | |
1880 | for (; index > 0; --index, ++i) ; | |
1881 | client = *i; | |
1882 | } | |
1883 | ||
1884 | pair<Messenger*, Messenger*> p; | |
1885 | { | |
1886 | boost::uniform_int<> choose(0, available_servers.size() - 1); | |
1887 | if (server->get_default_policy().server) { | |
1888 | p = make_pair(client, server); | |
11fdf7f2 TL |
1889 | ConnectionRef conn = client->connect_to(server->get_mytype(), |
1890 | server->get_myaddrs()); | |
1891 | available_connections[conn] = p; | |
7c673cae | 1892 | } else { |
11fdf7f2 TL |
1893 | ConnectionRef conn = client->connect_to(server->get_mytype(), |
1894 | server->get_myaddrs()); | |
1895 | p = make_pair(client, server); | |
1896 | available_connections[conn] = p; | |
7c673cae FG |
1897 | } |
1898 | } | |
7c673cae FG |
1899 | } |
1900 | ||
1901 | void send_message() { | |
9f95a23c | 1902 | std::lock_guard l{lock}; |
7c673cae FG |
1903 | ConnectionRef conn = _get_random_connection(); |
1904 | boost::uniform_int<> true_false(0, 99); | |
1905 | int val = true_false(rng); | |
1906 | if (val >= 95) { | |
1907 | uuid_d uuid; | |
1908 | uuid.generate_random(); | |
1909 | MCommand *m = new MCommand(uuid); | |
1910 | vector<string> cmds; | |
1911 | cmds.push_back("command"); | |
1912 | m->cmd = cmds; | |
1913 | m->set_priority(200); | |
1914 | conn->send_message(m); | |
1915 | } else { | |
1916 | boost::uniform_int<> u(0, rand_data.size()-1); | |
1917 | dispatcher.send_message_wrap(conn, rand_data[u(rng)]); | |
1918 | } | |
1919 | } | |
1920 | ||
1921 | void drop_connection() { | |
9f95a23c | 1922 | std::lock_guard l{lock}; |
7c673cae FG |
1923 | if (available_connections.size() < 10) |
1924 | return; | |
1925 | ConnectionRef conn = _get_random_connection(); | |
1926 | dispatcher.clear_pending(conn); | |
1927 | conn->mark_down(); | |
11fdf7f2 TL |
1928 | if (!client_policy.server && |
1929 | !client_policy.lossy && | |
1930 | client_policy.standby) { | |
1931 | // it's a lossless policy, so we need to mark down each side | |
1932 | pair<Messenger*, Messenger*> &p = available_connections[conn]; | |
1933 | if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) { | |
1934 | ASSERT_EQ(conn->get_messenger(), p.first); | |
1935 | ConnectionRef peer = p.second->connect_to(p.first->get_mytype(), | |
1936 | p.first->get_myaddrs()); | |
1937 | peer->mark_down(); | |
1938 | dispatcher.clear_pending(peer); | |
1939 | available_connections.erase(peer); | |
1940 | } | |
7c673cae FG |
1941 | } |
1942 | ASSERT_EQ(available_connections.erase(conn), 1U); | |
1943 | } | |
1944 | ||
1945 | void print_internal_state(bool detail=false) { | |
9f95a23c | 1946 | std::lock_guard l{lock}; |
7c673cae FG |
1947 | lderr(g_ceph_context) << "available_connections: " << available_connections.size() |
1948 | << " inflight messages: " << dispatcher.get_pending() << dendl; | |
1949 | if (detail && !available_connections.empty()) { | |
1950 | dispatcher.print(); | |
1951 | } | |
1952 | } | |
1953 | ||
1954 | void wait_for_done() { | |
1955 | int64_t tick_us = 1000 * 100; // 100ms | |
1956 | int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins | |
1957 | int i = 0; | |
1958 | while (dispatcher.get_pending()) { | |
1959 | usleep(tick_us); | |
1960 | timeout_us -= tick_us; | |
1961 | if (i++ % 50 == 0) | |
1962 | print_internal_state(true); | |
1963 | if (timeout_us < 0) | |
11fdf7f2 | 1964 | ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!"); |
7c673cae FG |
1965 | } |
1966 | for (set<Messenger*>::iterator it = available_servers.begin(); | |
1967 | it != available_servers.end(); ++it) { | |
1968 | (*it)->shutdown(); | |
1969 | (*it)->wait(); | |
1970 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1971 | delete (*it); | |
1972 | } | |
1973 | available_servers.clear(); | |
1974 | ||
1975 | for (set<Messenger*>::iterator it = available_clients.begin(); | |
1976 | it != available_clients.end(); ++it) { | |
1977 | (*it)->shutdown(); | |
1978 | (*it)->wait(); | |
1979 | ASSERT_EQ((*it)->get_dispatch_queue_len(), 0); | |
1980 | delete (*it); | |
1981 | } | |
1982 | available_clients.clear(); | |
1983 | } | |
1984 | ||
1985 | void handle_reset(Connection *con) { | |
9f95a23c | 1986 | std::lock_guard l{lock}; |
7c673cae FG |
1987 | available_connections.erase(con); |
1988 | dispatcher.clear_pending(con); | |
1989 | } | |
1990 | }; | |
1991 | ||
1992 | bool SyntheticDispatcher::ms_handle_reset(Connection *con) { | |
1993 | workload->handle_reset(con); | |
1994 | return true; | |
1995 | } | |
1996 | ||
1997 | TEST_P(MessengerTest, SyntheticStressTest) { | |
1998 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, | |
1999 | Messenger::Policy::stateful_server(0), | |
2000 | Messenger::Policy::lossless_client(0)); | |
2001 | for (int i = 0; i < 100; ++i) { | |
2002 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2003 | test_msg.generate_connection(); | |
2004 | } | |
2005 | gen_type rng(time(NULL)); | |
2006 | for (int i = 0; i < 5000; ++i) { | |
2007 | if (!(i % 10)) { | |
2008 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2009 | test_msg.print_internal_state(); | |
2010 | } | |
2011 | boost::uniform_int<> true_false(0, 99); | |
2012 | int val = true_false(rng); | |
2013 | if (val > 90) { | |
2014 | test_msg.generate_connection(); | |
2015 | } else if (val > 80) { | |
2016 | test_msg.drop_connection(); | |
2017 | } else if (val > 10) { | |
2018 | test_msg.send_message(); | |
2019 | } else { | |
2020 | usleep(rand() % 1000 + 500); | |
2021 | } | |
2022 | } | |
2023 | test_msg.wait_for_done(); | |
2024 | } | |
2025 | ||
2026 | TEST_P(MessengerTest, SyntheticStressTest1) { | |
2027 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, | |
2028 | Messenger::Policy::lossless_peer_reuse(0), | |
2029 | Messenger::Policy::lossless_peer_reuse(0)); | |
2030 | for (int i = 0; i < 10; ++i) { | |
2031 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2032 | test_msg.generate_connection(); | |
2033 | } | |
2034 | gen_type rng(time(NULL)); | |
2035 | for (int i = 0; i < 10000; ++i) { | |
2036 | if (!(i % 10)) { | |
2037 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2038 | test_msg.print_internal_state(); | |
2039 | } | |
2040 | boost::uniform_int<> true_false(0, 99); | |
2041 | int val = true_false(rng); | |
2042 | if (val > 80) { | |
2043 | test_msg.generate_connection(); | |
2044 | } else if (val > 60) { | |
2045 | test_msg.drop_connection(); | |
2046 | } else if (val > 10) { | |
2047 | test_msg.send_message(); | |
2048 | } else { | |
2049 | usleep(rand() % 1000 + 500); | |
2050 | } | |
2051 | } | |
2052 | test_msg.wait_for_done(); | |
2053 | } | |
2054 | ||
2055 | ||
2056 | TEST_P(MessengerTest, SyntheticInjectTest) { | |
2057 | uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes; | |
11fdf7f2 TL |
2058 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); |
2059 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
2060 | g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216"); | |
7c673cae FG |
2061 | SyntheticWorkload test_msg(8, 32, GetParam(), 100, |
2062 | Messenger::Policy::stateful_server(0), | |
2063 | Messenger::Policy::lossless_client(0)); | |
2064 | for (int i = 0; i < 100; ++i) { | |
2065 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2066 | test_msg.generate_connection(); | |
2067 | } | |
2068 | gen_type rng(time(NULL)); | |
2069 | for (int i = 0; i < 1000; ++i) { | |
2070 | if (!(i % 10)) { | |
2071 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2072 | test_msg.print_internal_state(); | |
2073 | } | |
2074 | boost::uniform_int<> true_false(0, 99); | |
2075 | int val = true_false(rng); | |
2076 | if (val > 90) { | |
2077 | test_msg.generate_connection(); | |
2078 | } else if (val > 80) { | |
2079 | test_msg.drop_connection(); | |
2080 | } else if (val > 10) { | |
2081 | test_msg.send_message(); | |
2082 | } else { | |
2083 | usleep(rand() % 500 + 100); | |
2084 | } | |
2085 | } | |
2086 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2087 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2088 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
2089 | g_ceph_context->_conf.set_val( | |
7c673cae FG |
2090 | "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes)); |
2091 | } | |
2092 | ||
2093 | TEST_P(MessengerTest, SyntheticInjectTest2) { | |
11fdf7f2 TL |
2094 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); |
2095 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
7c673cae FG |
2096 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, |
2097 | Messenger::Policy::lossless_peer_reuse(0), | |
2098 | Messenger::Policy::lossless_peer_reuse(0)); | |
2099 | for (int i = 0; i < 100; ++i) { | |
2100 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2101 | test_msg.generate_connection(); | |
2102 | } | |
2103 | gen_type rng(time(NULL)); | |
2104 | for (int i = 0; i < 1000; ++i) { | |
2105 | if (!(i % 10)) { | |
2106 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2107 | test_msg.print_internal_state(); | |
2108 | } | |
2109 | boost::uniform_int<> true_false(0, 99); | |
2110 | int val = true_false(rng); | |
2111 | if (val > 90) { | |
2112 | test_msg.generate_connection(); | |
2113 | } else if (val > 80) { | |
2114 | test_msg.drop_connection(); | |
2115 | } else if (val > 10) { | |
2116 | test_msg.send_message(); | |
2117 | } else { | |
2118 | usleep(rand() % 500 + 100); | |
2119 | } | |
2120 | } | |
2121 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2122 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2123 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
7c673cae FG |
2124 | } |
2125 | ||
2126 | TEST_P(MessengerTest, SyntheticInjectTest3) { | |
11fdf7f2 TL |
2127 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600"); |
2128 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
7c673cae FG |
2129 | SyntheticWorkload test_msg(8, 16, GetParam(), 100, |
2130 | Messenger::Policy::stateless_server(0), | |
2131 | Messenger::Policy::lossy_client(0)); | |
2132 | for (int i = 0; i < 100; ++i) { | |
2133 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2134 | test_msg.generate_connection(); | |
2135 | } | |
2136 | gen_type rng(time(NULL)); | |
2137 | for (int i = 0; i < 1000; ++i) { | |
2138 | if (!(i % 10)) { | |
2139 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2140 | test_msg.print_internal_state(); | |
2141 | } | |
2142 | boost::uniform_int<> true_false(0, 99); | |
2143 | int val = true_false(rng); | |
2144 | if (val > 90) { | |
2145 | test_msg.generate_connection(); | |
2146 | } else if (val > 80) { | |
2147 | test_msg.drop_connection(); | |
2148 | } else if (val > 10) { | |
2149 | test_msg.send_message(); | |
2150 | } else { | |
2151 | usleep(rand() % 500 + 100); | |
2152 | } | |
2153 | } | |
2154 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2155 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2156 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
7c673cae FG |
2157 | } |
2158 | ||
2159 | ||
2160 | TEST_P(MessengerTest, SyntheticInjectTest4) { | |
11fdf7f2 TL |
2161 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30"); |
2162 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1"); | |
2163 | g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1"); | |
2164 | g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd"); | |
2165 | g_ceph_context->_conf.set_val("ms_inject_delay_max", "5"); | |
7c673cae FG |
2166 | SyntheticWorkload test_msg(16, 32, GetParam(), 100, |
2167 | Messenger::Policy::lossless_peer(0), | |
2168 | Messenger::Policy::lossless_peer(0)); | |
2169 | for (int i = 0; i < 100; ++i) { | |
2170 | if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl; | |
2171 | test_msg.generate_connection(); | |
2172 | } | |
2173 | gen_type rng(time(NULL)); | |
2174 | for (int i = 0; i < 1000; ++i) { | |
2175 | if (!(i % 10)) { | |
2176 | lderr(g_ceph_context) << "Op " << i << ": " << dendl; | |
2177 | test_msg.print_internal_state(); | |
2178 | } | |
2179 | boost::uniform_int<> true_false(0, 99); | |
2180 | int val = true_false(rng); | |
2181 | if (val > 95) { | |
2182 | test_msg.generate_connection(); | |
2183 | } else if (val > 80) { | |
2184 | // test_msg.drop_connection(); | |
2185 | } else if (val > 10) { | |
2186 | test_msg.send_message(); | |
2187 | } else { | |
2188 | usleep(rand() % 500 + 100); | |
2189 | } | |
2190 | } | |
2191 | test_msg.wait_for_done(); | |
11fdf7f2 TL |
2192 | g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0"); |
2193 | g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0"); | |
2194 | g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0"); | |
2195 | g_ceph_context->_conf.set_val("ms_inject_delay_type", ""); | |
2196 | g_ceph_context->_conf.set_val("ms_inject_delay_max", "0"); | |
7c673cae FG |
2197 | } |
2198 | ||
2199 | ||
2200 | class MarkdownDispatcher : public Dispatcher { | |
9f95a23c | 2201 | ceph::mutex lock = ceph::make_mutex("MarkdownDispatcher::lock"); |
7c673cae FG |
2202 | set<ConnectionRef> conns; |
2203 | bool last_mark; | |
2204 | public: | |
31f18b77 | 2205 | std::atomic<uint64_t> count = { 0 }; |
9f95a23c | 2206 | explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), |
11fdf7f2 | 2207 | last_mark(false) { |
11fdf7f2 | 2208 | } |
7c673cae FG |
2209 | bool ms_can_fast_dispatch_any() const override { return false; } |
2210 | bool ms_can_fast_dispatch(const Message *m) const override { | |
2211 | switch (m->get_type()) { | |
2212 | case CEPH_MSG_PING: | |
2213 | return true; | |
2214 | default: | |
2215 | return false; | |
2216 | } | |
2217 | } | |
2218 | ||
2219 | void ms_handle_fast_connect(Connection *con) override { | |
2220 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
9f95a23c | 2221 | std::lock_guard l{lock}; |
7c673cae FG |
2222 | conns.insert(con); |
2223 | } | |
2224 | void ms_handle_fast_accept(Connection *con) override { | |
9f95a23c | 2225 | std::lock_guard l{lock}; |
7c673cae FG |
2226 | conns.insert(con); |
2227 | } | |
2228 | bool ms_dispatch(Message *m) override { | |
2229 | lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl; | |
9f95a23c | 2230 | std::lock_guard l{lock}; |
31f18b77 | 2231 | count++; |
7c673cae FG |
2232 | conns.insert(m->get_connection()); |
2233 | if (conns.size() < 2 && !last_mark) { | |
2234 | m->put(); | |
2235 | return true; | |
2236 | } | |
2237 | ||
2238 | last_mark = true; | |
2239 | usleep(rand() % 500); | |
2240 | for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) { | |
2241 | if ((*it) != m->get_connection().get()) { | |
2242 | (*it)->mark_down(); | |
2243 | conns.erase(it); | |
2244 | break; | |
2245 | } | |
2246 | } | |
2247 | if (conns.empty()) | |
2248 | last_mark = false; | |
2249 | m->put(); | |
2250 | return true; | |
2251 | } | |
2252 | bool ms_handle_reset(Connection *con) override { | |
2253 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
9f95a23c | 2254 | std::lock_guard l{lock}; |
7c673cae FG |
2255 | conns.erase(con); |
2256 | usleep(rand() % 500); | |
2257 | return true; | |
2258 | } | |
2259 | void ms_handle_remote_reset(Connection *con) override { | |
9f95a23c | 2260 | std::lock_guard l{lock}; |
7c673cae FG |
2261 | conns.erase(con); |
2262 | lderr(g_ceph_context) << __func__ << " " << con << dendl; | |
2263 | } | |
2264 | bool ms_handle_refused(Connection *con) override { | |
2265 | return false; | |
2266 | } | |
2267 | void ms_fast_dispatch(Message *m) override { | |
2268 | ceph_abort(); | |
2269 | } | |
11fdf7f2 TL |
2270 | int ms_handle_authentication(Connection *con) override { |
2271 | return 1; | |
7c673cae FG |
2272 | } |
2273 | }; | |
2274 | ||
2275 | ||
2276 | // Markdown with external lock | |
2277 | TEST_P(MessengerTest, MarkdownTest) { | |
f67539c2 | 2278 | Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid()); |
7c673cae | 2279 | MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true); |
11fdf7f2 TL |
2280 | DummyAuthClientServer dummy_auth(g_ceph_context); |
2281 | dummy_auth.auth_registry.refresh_config(); | |
7c673cae | 2282 | entity_addr_t bind_addr; |
9f95a23c | 2283 | bind_addr.parse("v2:127.0.0.1:16800"); |
7c673cae FG |
2284 | server_msgr->bind(bind_addr); |
2285 | server_msgr->add_dispatcher_head(&srv_dispatcher); | |
11fdf7f2 TL |
2286 | server_msgr->set_auth_client(&dummy_auth); |
2287 | server_msgr->set_auth_server(&dummy_auth); | |
7c673cae | 2288 | server_msgr->start(); |
9f95a23c | 2289 | bind_addr.parse("v2:127.0.0.1:16801"); |
7c673cae FG |
2290 | server_msgr2->bind(bind_addr); |
2291 | server_msgr2->add_dispatcher_head(&srv_dispatcher); | |
11fdf7f2 TL |
2292 | server_msgr2->set_auth_client(&dummy_auth); |
2293 | server_msgr2->set_auth_server(&dummy_auth); | |
7c673cae FG |
2294 | server_msgr2->start(); |
2295 | ||
2296 | client_msgr->add_dispatcher_head(&cli_dispatcher); | |
11fdf7f2 TL |
2297 | client_msgr->set_auth_client(&dummy_auth); |
2298 | client_msgr->set_auth_server(&dummy_auth); | |
7c673cae FG |
2299 | client_msgr->start(); |
2300 | ||
2301 | int i = 1000; | |
2302 | uint64_t last = 0; | |
2303 | bool equal = false; | |
2304 | uint64_t equal_count = 0; | |
2305 | while (i--) { | |
11fdf7f2 TL |
2306 | ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(), |
2307 | server_msgr->get_myaddrs()); | |
2308 | ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(), | |
2309 | server_msgr2->get_myaddrs()); | |
7c673cae FG |
2310 | MPing *m = new MPing(); |
2311 | ASSERT_EQ(conn1->send_message(m), 0); | |
2312 | m = new MPing(); | |
2313 | ASSERT_EQ(conn2->send_message(m), 0); | |
31f18b77 FG |
2314 | CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1); |
2315 | if (srv_dispatcher.count == last) { | |
7c673cae FG |
2316 | lderr(g_ceph_context) << __func__ << " last is " << last << dendl; |
2317 | equal = true; | |
2318 | equal_count++; | |
2319 | } else { | |
2320 | equal = false; | |
2321 | equal_count = 0; | |
2322 | } | |
31f18b77 | 2323 | last = srv_dispatcher.count; |
7c673cae FG |
2324 | if (equal_count) |
2325 | usleep(1000*500); | |
2326 | ASSERT_FALSE(equal && equal_count > 3); | |
2327 | } | |
2328 | server_msgr->shutdown(); | |
2329 | client_msgr->shutdown(); | |
2330 | server_msgr2->shutdown(); | |
2331 | server_msgr->wait(); | |
2332 | client_msgr->wait(); | |
2333 | server_msgr2->wait(); | |
2334 | delete server_msgr2; | |
2335 | } | |
2336 | ||
9f95a23c | 2337 | INSTANTIATE_TEST_SUITE_P( |
7c673cae FG |
2338 | Messenger, |
2339 | MessengerTest, | |
2340 | ::testing::Values( | |
9f95a23c | 2341 | "async+posix" |
7c673cae FG |
2342 | ) |
2343 | ); | |
2344 | ||
7c673cae | 2345 | int main(int argc, char **argv) { |
20effc67 | 2346 | auto args = argv_to_vec(argc, argv); |
11fdf7f2 TL |
2347 | |
2348 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, | |
2349 | CODE_ENVIRONMENT_UTILITY, | |
2350 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); | |
2351 | g_ceph_context->_conf.set_val("auth_cluster_required", "none"); | |
2352 | g_ceph_context->_conf.set_val("auth_service_required", "none"); | |
2353 | g_ceph_context->_conf.set_val("auth_client_required", "none"); | |
2354 | g_ceph_context->_conf.set_val("keyring", "/dev/null"); | |
2355 | g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async"); | |
2356 | g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true"); | |
2357 | g_ceph_context->_conf.set_val("ms_die_on_old_message", "true"); | |
2358 | g_ceph_context->_conf.set_val("ms_max_backoff", "1"); | |
7c673cae FG |
2359 | common_init_finish(g_ceph_context); |
2360 | ||
2361 | ::testing::InitGoogleTest(&argc, argv); | |
2362 | return RUN_ALL_TESTS(); | |
2363 | } | |
2364 | ||
2365 | /* | |
2366 | * Local Variables: | |
2367 | * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr" | |
2368 | * End: | |
2369 | */ |